MicroPython Workbench

MicroPython Workbench

The “MicroPython Workbench” module allows for relatively seamless integration of Syntalos with microcontrollers running the MicroPython firmware.

Usage

You need to flash a microcontroller with MicroPython to get started. A cheap and capable option is using a Raspberry Pi Pico.

Ports

  • A custom number of input ports for TableRow tabular data can be registered.
  • Data can be retrieved either as tabular TableRow data, FloatSignalBlock or IntSignalBlock

Input/output ports can be configured in the port editor.

Stream Metadata

  • All stream metadata are set to default values.

Example Script

This is an example script that demonstrates how to use async code on the microcontroller and pass messages between the device and Syntalos.

If the microcontroller does not need to receive data from Syntalos, the async code can be replaced by sync code instead.

import machine
from machine import Pin

ledPin = Pin('LED', Pin.OUT)
testPin = Pin(10, Pin.OUT)
sy = SyntalosCommunicator()


async def blink_led():
    oport_f = sy.get_output_port('float-out')
    while True:
        # send some numbers to the host, with the device timestamp
        timestamp = sy.ticks_ms()
        await oport_f.send_data([0.5, 1 if timestamp % 2 else 0], timestamp_ms=timestamp)

        # toggle the LEDs
        ledPin.high()
        testPin.low()
        await uasyncio.sleep(0.5)
        ledPin.low()
        testPin.high()
        await uasyncio.sleep(0.5)


def on_table_row_received(data):
    # just print any received table row to the console
    print('Received row:', data)


async def main():
    # Enable reading incoming data from the host
    sy.enable_input()

    # Blink a LED
    uasyncio.create_task(blink_led())

    # Receive tabular input
    sy.register_on_input('table-in', on_table_row_received)

    # Run this program indefinitely
    while True:
        await uasyncio.sleep(1)


# Run the main coroutine
uasyncio.run(main())

Syntalos Interface API

You can interface with the Syntalos communication API primarily through the SyntalosCommunicator class, which allows for an easy way to pass (text)data between Syntalos and the microcontroller.

upy-comms

  1# Communication glue code to allow MicroPython to interface
  2# with Syntalos ports to transmit tabular data between host and device
  3
  4import sys
  5import json
  6import time
  7
  8
  9def dump_json_compact(obj):
 10    """Dump a JSON object to a compact string"""
 11    return json.dumps(obj, separators=(',', ':'))
 12
 13
 14class SyntalosOutPort:
 15    def __init__(self, writer, port_idx: int):
 16        self._port_idx = port_idx
 17        self._out_writer = writer
 18
 19    async def send_data(self, *args, **kwargs):
 20        """Send tabular data to the host"""
 21        if not args:
 22            return
 23        if len(args) == 1 and isinstance(args[0], (list, tuple)):
 24            args = args[0]
 25
 26        timestamp = kwargs.get('timestamp_ms')
 27        if timestamp:
 28            self._out_writer.write(
 29                dump_json_compact({'p': self._port_idx, 'd': args, 't': timestamp})
 30            )
 31        else:
 32            self._out_writer.write(dump_json_compact({'p': self._port_idx, 'd': args}))
 33
 34        self._out_writer.write('\n')
 35        await self._out_writer.drain()
 36
 37    def send_data_sync(self, *args, **kwargs):
 38        """Synchronous function for sending data."""
 39        uasyncio.run(self.send_data(*args, **kwargs))
 40
 41
 42class SyntalosCommunicator:
 43    def __init__(self):
 44        self._out_writer = uasyncio.StreamWriter(sys.stdout)
 45        self._in_reader = uasyncio.StreamReader(sys.stdin)
 46        self._oport_count = 0
 47        self._iport_map = {}
 48        self._iport_pending = {}
 49        self._ref_time_ms = time.ticks_ms()
 50        self._elapsed_ms = 0
 51
 52        print(dump_json_compact({'dc': 'start-time', 't_ms': self.ticks_ms()}))
 53
 54    def _register_input_port_info(self, hdata):
 55        if hdata['hc'] == 'in-port':
 56            _, callback = self._iport_pending.pop(hdata['p'], (None, None))
 57            if callback is None:
 58                self._iport_pending[hdata['p']] = (hdata['i'], None)
 59            else:
 60                self._iport_map[hdata['i']] = callback
 61
 62    async def _read_stdin(self):
 63        buf = bytearray()
 64        while True:
 65            b = await self._in_reader.read(1)
 66            if b == '\r' or b == '\n':
 67                s = buf.decode()
 68                if s.startswith('{'):
 69                    obj = json.loads(s)
 70                    if 'hc' in obj:
 71                        self._register_input_port_info(obj)
 72                    elif 'd' in obj:
 73                        idx = obj['p']
 74                        callback = self._iport_map.get(idx, None)
 75                        if callback:
 76                            callback(obj['d'])
 77                buf[:] = b''
 78            else:
 79                buf.extend(b)
 80
 81    def enable_input(self):
 82        """Enable host input handling"""
 83        uasyncio.create_task(self._read_stdin())
 84
 85    def ticks_ms(self):
 86        """A safer version of time.ticks_ms() that tries to mitigate the time.ticks_ms() overflow, if possible."""
 87        cticks = time.ticks_ms()
 88        tdiff = time.ticks_diff(cticks, self._ref_time_ms)
 89        self._ref_time_ms = cticks
 90        self._elapsed_ms += tdiff
 91        return self._elapsed_ms
 92
 93    def get_output_port(self, port_id: str) -> SyntalosOutPort:
 94        """Register a port to be used for communication to the host."""
 95
 96        port_idx = self._oport_count
 97        self._oport_count += 1
 98        print(dump_json_compact({'dc': 'new-out-port', 'i': port_idx, 'n': port_id}))
 99
100        return SyntalosOutPort(self._out_writer, port_idx)
101
102    def register_on_input(self, port_id: str, callback):
103        """Register a callback to run when data is received from the host."""
104
105        idx, _ = self._iport_pending.pop(port_id, (None, None))
106        if idx is None:
107            self._iport_pending[port_id] = (-1, callback)
108        else:
109            self._iport_map[idx] = callback
def dump_json_compact(obj):
10def dump_json_compact(obj):
11    """Dump a JSON object to a compact string"""
12    return json.dumps(obj, separators=(',', ':'))

Dump a JSON object to a compact string

class SyntalosOutPort:
15class SyntalosOutPort:
16    def __init__(self, writer, port_idx: int):
17        self._port_idx = port_idx
18        self._out_writer = writer
19
20    async def send_data(self, *args, **kwargs):
21        """Send tabular data to the host"""
22        if not args:
23            return
24        if len(args) == 1 and isinstance(args[0], (list, tuple)):
25            args = args[0]
26
27        timestamp = kwargs.get('timestamp_ms')
28        if timestamp:
29            self._out_writer.write(
30                dump_json_compact({'p': self._port_idx, 'd': args, 't': timestamp})
31            )
32        else:
33            self._out_writer.write(dump_json_compact({'p': self._port_idx, 'd': args}))
34
35        self._out_writer.write('\n')
36        await self._out_writer.drain()
37
38    def send_data_sync(self, *args, **kwargs):
39        """Synchronous function for sending data."""
40        uasyncio.run(self.send_data(*args, **kwargs))
SyntalosOutPort(writer, port_idx: int)
16    def __init__(self, writer, port_idx: int):
17        self._port_idx = port_idx
18        self._out_writer = writer
async def send_data(self, *args, **kwargs):
20    async def send_data(self, *args, **kwargs):
21        """Send tabular data to the host"""
22        if not args:
23            return
24        if len(args) == 1 and isinstance(args[0], (list, tuple)):
25            args = args[0]
26
27        timestamp = kwargs.get('timestamp_ms')
28        if timestamp:
29            self._out_writer.write(
30                dump_json_compact({'p': self._port_idx, 'd': args, 't': timestamp})
31            )
32        else:
33            self._out_writer.write(dump_json_compact({'p': self._port_idx, 'd': args}))
34
35        self._out_writer.write('\n')
36        await self._out_writer.drain()

Send tabular data to the host

def send_data_sync(self, *args, **kwargs):
38    def send_data_sync(self, *args, **kwargs):
39        """Synchronous function for sending data."""
40        uasyncio.run(self.send_data(*args, **kwargs))

Synchronous function for sending data.

class SyntalosCommunicator:
 43class SyntalosCommunicator:
 44    def __init__(self):
 45        self._out_writer = uasyncio.StreamWriter(sys.stdout)
 46        self._in_reader = uasyncio.StreamReader(sys.stdin)
 47        self._oport_count = 0
 48        self._iport_map = {}
 49        self._iport_pending = {}
 50        self._ref_time_ms = time.ticks_ms()
 51        self._elapsed_ms = 0
 52
 53        print(dump_json_compact({'dc': 'start-time', 't_ms': self.ticks_ms()}))
 54
 55    def _register_input_port_info(self, hdata):
 56        if hdata['hc'] == 'in-port':
 57            _, callback = self._iport_pending.pop(hdata['p'], (None, None))
 58            if callback is None:
 59                self._iport_pending[hdata['p']] = (hdata['i'], None)
 60            else:
 61                self._iport_map[hdata['i']] = callback
 62
 63    async def _read_stdin(self):
 64        buf = bytearray()
 65        while True:
 66            b = await self._in_reader.read(1)
 67            if b == '\r' or b == '\n':
 68                s = buf.decode()
 69                if s.startswith('{'):
 70                    obj = json.loads(s)
 71                    if 'hc' in obj:
 72                        self._register_input_port_info(obj)
 73                    elif 'd' in obj:
 74                        idx = obj['p']
 75                        callback = self._iport_map.get(idx, None)
 76                        if callback:
 77                            callback(obj['d'])
 78                buf[:] = b''
 79            else:
 80                buf.extend(b)
 81
 82    def enable_input(self):
 83        """Enable host input handling"""
 84        uasyncio.create_task(self._read_stdin())
 85
 86    def ticks_ms(self):
 87        """A safer version of time.ticks_ms() that tries to mitigate the time.ticks_ms() overflow, if possible."""
 88        cticks = time.ticks_ms()
 89        tdiff = time.ticks_diff(cticks, self._ref_time_ms)
 90        self._ref_time_ms = cticks
 91        self._elapsed_ms += tdiff
 92        return self._elapsed_ms
 93
 94    def get_output_port(self, port_id: str) -> SyntalosOutPort:
 95        """Register a port to be used for communication to the host."""
 96
 97        port_idx = self._oport_count
 98        self._oport_count += 1
 99        print(dump_json_compact({'dc': 'new-out-port', 'i': port_idx, 'n': port_id}))
100
101        return SyntalosOutPort(self._out_writer, port_idx)
102
103    def register_on_input(self, port_id: str, callback):
104        """Register a callback to run when data is received from the host."""
105
106        idx, _ = self._iport_pending.pop(port_id, (None, None))
107        if idx is None:
108            self._iport_pending[port_id] = (-1, callback)
109        else:
110            self._iport_map[idx] = callback
def enable_input(self):
82    def enable_input(self):
83        """Enable host input handling"""
84        uasyncio.create_task(self._read_stdin())

Enable host input handling

def ticks_ms(self):
86    def ticks_ms(self):
87        """A safer version of time.ticks_ms() that tries to mitigate the time.ticks_ms() overflow, if possible."""
88        cticks = time.ticks_ms()
89        tdiff = time.ticks_diff(cticks, self._ref_time_ms)
90        self._ref_time_ms = cticks
91        self._elapsed_ms += tdiff
92        return self._elapsed_ms

A safer version of time.ticks_ms() that tries to mitigate the time.ticks_ms() overflow, if possible.

def get_output_port(self, port_id: str) -> upy-comms.SyntalosOutPort:
 94    def get_output_port(self, port_id: str) -> SyntalosOutPort:
 95        """Register a port to be used for communication to the host."""
 96
 97        port_idx = self._oport_count
 98        self._oport_count += 1
 99        print(dump_json_compact({'dc': 'new-out-port', 'i': port_idx, 'n': port_id}))
100
101        return SyntalosOutPort(self._out_writer, port_idx)

Register a port to be used for communication to the host.

def register_on_input(self, port_id: str, callback):
103    def register_on_input(self, port_id: str, callback):
104        """Register a callback to run when data is received from the host."""
105
106        idx, _ = self._iport_pending.pop(port_id, (None, None))
107        if idx is None:
108            self._iport_pending[port_id] = (-1, callback)
109        else:
110            self._iport_map[idx] = callback

Register a callback to run when data is received from the host.