Python Module API

Python Module API

Syntalos provides a Python API to easily build new modules. Python modules do not run within the Syntalos process, and instead communicate with the main application via an interface provided by the syntalos_mlink Python module. This API can be used from either the Python Script module, or by standalone modules that are written in Python entirely.

The Python interface is documented below.

syntalos_mlink

Syntalos Python Module Interface

class ControlCommand:

A control command for a module.

ControlCommand(**kwds)

Helper for @overload to raise when called.

command: str

Custom command string (used when kind is CUSTOM).

duration: int

Optional duration associated with this command, in milliseconds.

The ControlCommandKind of this command.

class ControlCommandKind:

The type of a control command sent to controllable modules.

Members:

UNKNOWN

START : Start an operation.

PAUSE : Pause an operation; can be resumed with START.

STOP : Stop an operation.

STEP : Advance operation by one step.

CUSTOM : Custom command.

ControlCommandKind(value: <class 'SupportsInt'>)
CUSTOM: ClassVar[ControlCommandKind]
PAUSE: ClassVar[ControlCommandKind]
START: ClassVar[ControlCommandKind]
STEP: ClassVar[ControlCommandKind]
STOP: ClassVar[ControlCommandKind]
UNKNOWN: ClassVar[ControlCommandKind]
name: str
value: int
class DataType:

Identifies the data type of a port / stream.

Members:

ControlCommand : Module control command.

TableRow : A row of tabular data.

Frame : A video frame.

FirmataControl : Firmata device control message.

FirmataData : Data received from a Firmata device.

IntSignalBlock : A block of integer signal samples.

FloatSignalBlock : A block of float signal samples.

DataType(value: <class 'SupportsInt'>)
ControlCommand: ClassVar[DataType]
FirmataControl: ClassVar[DataType]
FirmataData: ClassVar[DataType]
FloatSignalBlock: ClassVar[DataType]
Frame: ClassVar[DataType]
IntSignalBlock: ClassVar[DataType]
TableRow: ClassVar[DataType]
name: str
value: int
class FirmataCommandKind:

Type of change to be made on a Firmata interface.

Members:

UNKNOWN

NEW_DIG_PIN : Register a new digital pin.

NEW_ANA_PIN : Register a new analog pin.

IO_MODE : Change a pin's I/O mode.

WRITE_ANALOG : Write an analog value to a pin.

WRITE_DIGITAL : Write a digital value to a pin.

WRITE_DIGITAL_PULSE : Emit a digital pulse on a pin.

SYSEX : Send a raw SysEx message.

FirmataCommandKind(value: <class 'SupportsInt'>)
IO_MODE: ClassVar[FirmataCommandKind]
NEW_ANA_PIN: ClassVar[FirmataCommandKind]
NEW_DIG_PIN: ClassVar[FirmataCommandKind]
SYSEX: ClassVar[FirmataCommandKind]
UNKNOWN: ClassVar[FirmataCommandKind]
WRITE_ANALOG: ClassVar[FirmataCommandKind]
WRITE_DIGITAL: ClassVar[FirmataCommandKind]
WRITE_DIGITAL_PULSE: ClassVar[FirmataCommandKind]
name: str
value: int
class FirmataControl:

Control command for a Firmata device.

FirmataControl(**kwds)

Helper for @overload to raise when called.

The FirmataCommandKind to execute.

is_output: bool

True if the pin is configured as output.

is_pullup: bool

True if the internal pull-up resistor is enabled.

pin_id: int

Numeric pin identifier.

pin_name: str

Registered name of the pin.

value: int

Value to write, or pulse duration in ms.

class FirmataData:

Data received from a Firmata device.

is_digital: bool

True if the value is digital, False if analog.

pin_id: int

Numeric pin identifier.

pin_name: str

Registered name of the pin.

time: datetime.timedelta

Time when the data was acquired, as a duration.

time_usec: int

Time when the data was acquired, as an integer in µs.

value: int

Received pin value.

class FloatSignalBlock:

A block of timestamped float signal data.

cols: int

Number of columns (channels).

data: Annotated[numpy.ndarray[tuple[Any, ...], numpy.dtype[numpy.float64]], '[m, n]']

2-D data matrix: rows = samples, columns = channels.

length: int

Number of samples (rows) in this block.

rows: int

Number of rows (samples).

timestamps: Annotated[numpy.ndarray[tuple[Any, ...], numpy.dtype[numpy.uint64]], '[m, 1]']

1-D array of sample timestamps in µs.

class Frame:

A video frame.

index: int

Number of the frame.

mat: numpy.ndarray

Frame image data as a NumPy array (OpenCV Mat).

time: datetime.timedelta

Time when the frame was recorded, as a duration.

time_usec: int

Time when the frame was recorded, as an integer in µs.

class InputPort:

A module input port.

Obtain an instance via get_input_port().

def set_throttle_items_per_sec(self, items_per_sec: <class 'SupportsInt'>) -> None:

Limit the number of items delivered to on_data per second.

Parameters
  • items_per_sec: Maximum items per second; 0 disables throttling.
metadata: dict[str, typing.Any]

Read-only dict[str, object] of metadata provided by the upstream module for this port.

Values are native Python types: int, float, str, bool, None, MetaSize, or list / dict for nested structures.

name: str

The unique port ID string.

on_data: Callable[[typing.Any], None]

Callback invoked with each incoming data item.

Assign a callable that accepts a single argument of the port's data type (e.g. Frame, TableRow). Set to None to remove the callback.

Type: Callable[[object], None] | None.

class IntSignalBlock:

A block of timestamped integer signal data.

cols: int

Number of columns (channels).

data: Annotated[numpy.ndarray[tuple[Any, ...], numpy.dtype[numpy.int32]], '[m, n]']

2-D data matrix: rows = samples, columns = channels.

length: int

Number of samples (rows) in this block.

rows: int

Number of rows (samples).

timestamps: Annotated[numpy.ndarray[tuple[Any, ...], numpy.dtype[numpy.uint64]], '[m, 1]']

1-D array of sample timestamps in µs.

class MetaSize:

Two-dimensional size value used in stream metadata.

MetaSize(**kwds)

Helper for @overload to raise when called.

height: int

Height in pixels (or other integer units).

width: int

Width in pixels (or other integer units).

class OutputPort:

A module output port.

Obtain an instance via get_output_port().

def firmata_register_digital_pin( self, pin_id: <class 'SupportsInt'>, name: str, is_output: bool, is_pullup: bool = False) -> FirmataControl:

Register a named digital pin on the connected Firmata device and submit the command immediately.

The pin can subsequently be referenced by name in firmata_submit_digital_value() and firmata_submit_digital_pulse().

Parameters
  • pin_id: Numeric pin identifier on the device.
  • name: Human-readable name to register for this pin.
  • is_output: True to configure the pin as output, False for input.
  • is_pullup: True to enable the internal pull-up resistor (default: False).
Returns

The FirmataControl command that was submitted.

def firmata_submit_digital_pulse( self, name: str, duration_msec: <class 'SupportsInt'> = 50) -> FirmataControl:

Emit a digital pulse on a previously registered pin.

Parameters
  • name: Registered pin name.
  • duration_msec: Pulse duration in milliseconds (default: 50).
Returns

The FirmataControl command that was submitted.

def firmata_submit_digital_value(self, name: str, value: bool) -> FirmataControl:

Write a digital value to a previously registered pin.

Parameters
  • name: Registered pin name.
  • value: True / 1 for HIGH, False / 0 for LOW.
Returns

The FirmataControl command that was submitted.

def set_metadata_value(self, key: str, value: Any) -> None:

Set a metadata entry for this port.

Metadata must be set before the run starts (i.e. in prepare()); it is immutable once acquisition begins.

Parameters
  • key: Metadata key string.
  • value: Metadata value. Accepted types: int, float, str, MetaSize, or list of int/str values.
Raises
  • SyntalosPyError: If the value type is not supported.
def set_metadata_value_size(self, key: str, value: Any) -> None:

Set a 2-D size metadata entry for this port (e.g. 'size' for video streams).

Parameters
  • key: Metadata key string.
  • value: Either a MetaSize object or a sequence of exactly two integers [width, height].
Raises
  • SyntalosPyError: If value does not have exactly two elements.
def submit(self, data: Any) -> None:

Send a data item to all modules connected to this port.

Parameters
  • data: Data item matching this port's type (e.g. Frame, TableRow).
Raises
  • SyntalosPyError: If the item type does not match the port's declared data type.
name: str

The unique port ID string.

class SyntalosPyError(builtins.Exception):

Common base class for all non-exit exceptions.

def await_data(timeout_usec: <class 'SupportsInt'> = -1) -> None:

Wait for incoming data and dispatch it to registered on_data callbacks.

Also services the IPC channel to the Syntalos process. Call this regularly inside a run() loop to keep the module responsive.

Parameters
  • timeout_usec: Maximum time to block in microseconds. Pass -1 (default) to wait until the module is no longer in RUNNING state.
def get_input_port(id: str) -> InputPort | None:

Retrieve a reference to an input port by its ID.

Parameters
Returns

An InputPort handle, or None if no port with that ID exists.

def get_output_port(id: str) -> OutputPort | None:

Retrieve a reference to an output port by its ID.

Parameters
Returns

An OutputPort handle, or None if no port with that ID exists.

def is_running() -> bool:

Check whether the experiment is still active.

Returns

True while the run is in progress, False once a stop has been requested.

def list_types_synonyms() -> list:
def new_firmatactl_with_id( kind: FirmataCommandKind, pin_id: <class 'SupportsInt'>) -> FirmataControl:

Create a FirmataControl command identified by numeric pin ID only.

Parameters
Returns

A new FirmataControl instance.

def new_firmatactl_with_id_name( kind: FirmataCommandKind, pin_id: <class 'SupportsInt'>, name: str) -> FirmataControl:

Create a FirmataControl command identified by both numeric ID and name.

Parameters
  • kind: The FirmataCommandKind to execute.
  • pin_id: Numeric pin identifier.
  • name: Registered pin name.
Returns

A new FirmataControl instance.

def new_firmatactl_with_name( kind: FirmataCommandKind, name: str) -> FirmataControl:

Create a FirmataControl command identified by a registered pin name.

The pin must have been previously registered with OutputPort.firmata_register_digital_pin().

Parameters
Returns

A new FirmataControl instance.

def println(text: str) -> None:

Print a line of text to stdout.

Parameters
  • text: The text to print.
def raise_error(message: str) -> None:

Raise a module error, immediately stopping the current run.

Parameters
  • message: Human-readable error description.
def schedule_delayed_call( delay_msec: <class 'SupportsInt'>, callable_fn: Callable[[], None]) -> None:

Schedule a callable to be invoked after a delay.

The call is executed on the module's event loop, so it is safe to interact with ports and other module state from the callback.

Signature: schedule_delayed_call(delay_msec: int, callable_fn: Callable[[], None]) -> None.

Parameters
  • delay_msec: Delay before the call is made, in milliseconds. Must be >= 0.
  • callable_fn: Zero-argument callable to invoke.
Raises
  • SyntalosPyError: If delay_msec is negative.
def time_since_start_msec() -> int:

Return the time elapsed since the experiment started.

Returns

Elapsed time in milliseconds.

def time_since_start_usec() -> int:

Return the time elapsed since the experiment started.

Returns

Elapsed time in microseconds.

def wait(msec: <class 'SupportsInt'>) -> None:

Sleep for approximately the given number of milliseconds.

Unlike a plain time.sleep(), this keeps the communication channel to Syntalos alive so that control messages (e.g. stop requests) are still handled.

Parameters
  • msec: Duration to wait in milliseconds.
def wait_sec(sec: <class 'SupportsInt'>) -> None:

Sleep for approximately the given number of seconds.

Unlike a plain time.sleep(), this keeps the communication channel to Syntalos alive so that control messages (e.g. stop requests) are still handled.

Parameters
  • sec: Duration to wait in seconds.