MicroPython Workbench
The “MicroPython Workbench” module allows for relatively seamless integration of Syntalos with microcontrollers running the MicroPython firmware.
Usage
You need to flash a microcontroller with MicroPython to get started. A cheap and capable option is using a Raspberry Pi Pico.
Ports
- A custom number of input ports for
TableRow
tabular data can be registered. - Data can be retrieved either as tabular
TableRow
data,FloatSignalBlock
orIntSignalBlock
Input/output ports can be configured in the port editor.
Stream Metadata
- All stream metadata are set to default values.
Example Script
This is an example script that demonstrates how to use async code on the microcontroller and pass messages between the device and Syntalos.
If the microcontroller does not need to receive data from Syntalos, the async code can be replaced by sync code instead.
import machine
from machine import Pin
ledPin = Pin('LED', Pin.OUT)
testPin = Pin(10, Pin.OUT)
sy = SyntalosCommunicator()
async def blink_led():
oport_f = sy.get_output_port('float-out')
while True:
# send some numbers to the host, with the device timestamp
timestamp = sy.ticks_ms()
await oport_f.send_data([0.5, 1 if timestamp % 2 else 0], timestamp_ms=timestamp)
# toggle the LEDs
ledPin.high()
testPin.low()
await uasyncio.sleep(0.5)
ledPin.low()
testPin.high()
await uasyncio.sleep(0.5)
def on_table_row_received(data):
# just print any received table row to the console
print('Received row:', data)
async def main():
# Enable reading incoming data from the host
sy.enable_input()
# Blink a LED
uasyncio.create_task(blink_led())
# Receive tabular input
sy.register_on_input('table-in', on_table_row_received)
# Run this program indefinitely
while True:
await uasyncio.sleep(1)
# Run the main coroutine
uasyncio.run(main())
Syntalos Interface API
You can interface with the Syntalos communication API primarily through the SyntalosCommunicator
class,
which allows for an easy way to pass (text)data between Syntalos and the microcontroller.
upy-comms
1# Communication glue code to allow MicroPython to interface 2# with Syntalos ports to transmit tabular data between host and device 3 4import sys 5import json 6import time 7 8 9def dump_json_compact(obj): 10 """Dump a JSON object to a compact string""" 11 return json.dumps(obj, separators=(',', ':')) 12 13 14class SyntalosOutPort: 15 def __init__(self, writer, port_idx: int): 16 self._port_idx = port_idx 17 self._out_writer = writer 18 19 async def send_data(self, *args, **kwargs): 20 """Send tabular data to the host""" 21 if not args: 22 return 23 if len(args) == 1 and isinstance(args[0], (list, tuple)): 24 args = args[0] 25 26 timestamp = kwargs.get('timestamp_ms') 27 if timestamp: 28 self._out_writer.write( 29 dump_json_compact({'p': self._port_idx, 'd': args, 't': timestamp}) 30 ) 31 else: 32 self._out_writer.write(dump_json_compact({'p': self._port_idx, 'd': args})) 33 34 self._out_writer.write('\n') 35 await self._out_writer.drain() 36 37 def send_data_sync(self, *args, **kwargs): 38 """Synchronous function for sending data.""" 39 uasyncio.run(self.send_data(*args, **kwargs)) 40 41 42class SyntalosCommunicator: 43 def __init__(self): 44 self._out_writer = uasyncio.StreamWriter(sys.stdout) 45 self._in_reader = uasyncio.StreamReader(sys.stdin) 46 self._oport_count = 0 47 self._iport_map = {} 48 self._iport_pending = {} 49 self._ref_time_ms = time.ticks_ms() 50 self._elapsed_ms = 0 51 52 print(dump_json_compact({'dc': 'start-time', 't_ms': self.ticks_ms()})) 53 54 def _register_input_port_info(self, hdata): 55 if hdata['hc'] == 'in-port': 56 _, callback = self._iport_pending.pop(hdata['p'], (None, None)) 57 if callback is None: 58 self._iport_pending[hdata['p']] = (hdata['i'], None) 59 else: 60 self._iport_map[hdata['i']] = callback 61 62 async def _read_stdin(self): 63 buf = bytearray() 64 while True: 65 b = await self._in_reader.read(1) 66 if b == '\r' or b == '\n': 67 s = buf.decode() 68 if s.startswith('{'): 69 obj = json.loads(s) 70 if 'hc' in obj: 71 self._register_input_port_info(obj) 72 elif 'd' in obj: 73 idx = obj['p'] 74 callback = self._iport_map.get(idx, None) 75 if callback: 76 callback(obj['d']) 77 buf[:] = b'' 78 else: 79 buf.extend(b) 80 81 def enable_input(self): 82 """Enable host input handling""" 83 uasyncio.create_task(self._read_stdin()) 84 85 def ticks_ms(self): 86 """A safer version of time.ticks_ms() that tries to mitigate the time.ticks_ms() overflow, if possible.""" 87 cticks = time.ticks_ms() 88 tdiff = time.ticks_diff(cticks, self._ref_time_ms) 89 self._ref_time_ms = cticks 90 self._elapsed_ms += tdiff 91 return self._elapsed_ms 92 93 def get_output_port(self, port_id: str) -> SyntalosOutPort: 94 """Register a port to be used for communication to the host.""" 95 96 port_idx = self._oport_count 97 self._oport_count += 1 98 print(dump_json_compact({'dc': 'new-out-port', 'i': port_idx, 'n': port_id})) 99 100 return SyntalosOutPort(self._out_writer, port_idx) 101 102 def register_on_input(self, port_id: str, callback): 103 """Register a callback to run when data is received from the host.""" 104 105 idx, _ = self._iport_pending.pop(port_id, (None, None)) 106 if idx is None: 107 self._iport_pending[port_id] = (-1, callback) 108 else: 109 self._iport_map[idx] = callback
10def dump_json_compact(obj): 11 """Dump a JSON object to a compact string""" 12 return json.dumps(obj, separators=(',', ':'))
Dump a JSON object to a compact string
15class SyntalosOutPort: 16 def __init__(self, writer, port_idx: int): 17 self._port_idx = port_idx 18 self._out_writer = writer 19 20 async def send_data(self, *args, **kwargs): 21 """Send tabular data to the host""" 22 if not args: 23 return 24 if len(args) == 1 and isinstance(args[0], (list, tuple)): 25 args = args[0] 26 27 timestamp = kwargs.get('timestamp_ms') 28 if timestamp: 29 self._out_writer.write( 30 dump_json_compact({'p': self._port_idx, 'd': args, 't': timestamp}) 31 ) 32 else: 33 self._out_writer.write(dump_json_compact({'p': self._port_idx, 'd': args})) 34 35 self._out_writer.write('\n') 36 await self._out_writer.drain() 37 38 def send_data_sync(self, *args, **kwargs): 39 """Synchronous function for sending data.""" 40 uasyncio.run(self.send_data(*args, **kwargs))
20 async def send_data(self, *args, **kwargs): 21 """Send tabular data to the host""" 22 if not args: 23 return 24 if len(args) == 1 and isinstance(args[0], (list, tuple)): 25 args = args[0] 26 27 timestamp = kwargs.get('timestamp_ms') 28 if timestamp: 29 self._out_writer.write( 30 dump_json_compact({'p': self._port_idx, 'd': args, 't': timestamp}) 31 ) 32 else: 33 self._out_writer.write(dump_json_compact({'p': self._port_idx, 'd': args})) 34 35 self._out_writer.write('\n') 36 await self._out_writer.drain()
Send tabular data to the host
43class SyntalosCommunicator: 44 def __init__(self): 45 self._out_writer = uasyncio.StreamWriter(sys.stdout) 46 self._in_reader = uasyncio.StreamReader(sys.stdin) 47 self._oport_count = 0 48 self._iport_map = {} 49 self._iport_pending = {} 50 self._ref_time_ms = time.ticks_ms() 51 self._elapsed_ms = 0 52 53 print(dump_json_compact({'dc': 'start-time', 't_ms': self.ticks_ms()})) 54 55 def _register_input_port_info(self, hdata): 56 if hdata['hc'] == 'in-port': 57 _, callback = self._iport_pending.pop(hdata['p'], (None, None)) 58 if callback is None: 59 self._iport_pending[hdata['p']] = (hdata['i'], None) 60 else: 61 self._iport_map[hdata['i']] = callback 62 63 async def _read_stdin(self): 64 buf = bytearray() 65 while True: 66 b = await self._in_reader.read(1) 67 if b == '\r' or b == '\n': 68 s = buf.decode() 69 if s.startswith('{'): 70 obj = json.loads(s) 71 if 'hc' in obj: 72 self._register_input_port_info(obj) 73 elif 'd' in obj: 74 idx = obj['p'] 75 callback = self._iport_map.get(idx, None) 76 if callback: 77 callback(obj['d']) 78 buf[:] = b'' 79 else: 80 buf.extend(b) 81 82 def enable_input(self): 83 """Enable host input handling""" 84 uasyncio.create_task(self._read_stdin()) 85 86 def ticks_ms(self): 87 """A safer version of time.ticks_ms() that tries to mitigate the time.ticks_ms() overflow, if possible.""" 88 cticks = time.ticks_ms() 89 tdiff = time.ticks_diff(cticks, self._ref_time_ms) 90 self._ref_time_ms = cticks 91 self._elapsed_ms += tdiff 92 return self._elapsed_ms 93 94 def get_output_port(self, port_id: str) -> SyntalosOutPort: 95 """Register a port to be used for communication to the host.""" 96 97 port_idx = self._oport_count 98 self._oport_count += 1 99 print(dump_json_compact({'dc': 'new-out-port', 'i': port_idx, 'n': port_id})) 100 101 return SyntalosOutPort(self._out_writer, port_idx) 102 103 def register_on_input(self, port_id: str, callback): 104 """Register a callback to run when data is received from the host.""" 105 106 idx, _ = self._iport_pending.pop(port_id, (None, None)) 107 if idx is None: 108 self._iport_pending[port_id] = (-1, callback) 109 else: 110 self._iport_map[idx] = callback
82 def enable_input(self): 83 """Enable host input handling""" 84 uasyncio.create_task(self._read_stdin())
Enable host input handling
86 def ticks_ms(self): 87 """A safer version of time.ticks_ms() that tries to mitigate the time.ticks_ms() overflow, if possible.""" 88 cticks = time.ticks_ms() 89 tdiff = time.ticks_diff(cticks, self._ref_time_ms) 90 self._ref_time_ms = cticks 91 self._elapsed_ms += tdiff 92 return self._elapsed_ms
A safer version of time.ticks_ms() that tries to mitigate the time.ticks_ms() overflow, if possible.
94 def get_output_port(self, port_id: str) -> SyntalosOutPort: 95 """Register a port to be used for communication to the host.""" 96 97 port_idx = self._oport_count 98 self._oport_count += 1 99 print(dump_json_compact({'dc': 'new-out-port', 'i': port_idx, 'n': port_id})) 100 101 return SyntalosOutPort(self._out_writer, port_idx)
Register a port to be used for communication to the host.
103 def register_on_input(self, port_id: str, callback): 104 """Register a callback to run when data is received from the host.""" 105 106 idx, _ = self._iport_pending.pop(port_id, (None, None)) 107 if idx is None: 108 self._iport_pending[port_id] = (-1, callback) 109 else: 110 self._iport_map[idx] = callback
Register a callback to run when data is received from the host.