mlink/syntaloslink.cpp
mlink/syntaloslink.cpp
Namespaces
| Name |
|---|
| Syntalos |
Source code
/*
* Copyright (C) 2020-2026 Matthias Klumpp <matthias@tenstral.net>
*
* Licensed under the GNU Lesser General Public License Version 3
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the license, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syntaloslink.h"
#include "modconfig.h"
#include <glib.h>
#include <csignal>
#include <cstring>
#include <sys/prctl.h>
#include <iox2/iceoryx2.hpp>
#include "mlink/ipc-types-private.h"
#include "mlink/ipc-iox-private.h"
#include "datactl/priv/rtkit.h"
#include "datactl/priv/cpuaffinity.h"
#include "datactl/loginternal.h"
using namespace Syntalos;
using namespace Syntalos::ipc;
namespace
{
constexpr int MAIN_CONTEXT_MAX_ITER_PER_TICK = 32;
void iterateDefaultMainContextNonBlocking()
{
// keep timers and other GLib sources responsive without starving IPC handling
auto *const mainContext = g_main_context_default();
if (!mainContext)
return;
for (int i = 0; i < MAIN_CONTEXT_MAX_ITER_PER_TICK; ++i) {
if (!g_main_context_iteration(mainContext, FALSE))
break;
}
}
} // namespace
namespace Syntalos
{
template<typename Sub>
static auto safeReceive(Sub &sub) -> std::remove_cvref_t<decltype(sub.receive().value())>
{
auto result = sub.receive();
if (!result.has_value()) {
std::cerr << "Client IPC receive failed:" << iox2::bb::into<const char *>(result.error());
return {};
}
return std::move(result).value();
}
static std::string getenvSafe(const char *name)
{
if (const char *value = std::getenv(name))
return value;
return {};
}
std::unique_ptr<SyntalosLink> initSyntalosModuleLink(const ModuleInitOptions &optn)
{
// we should obtain the PID of Syntalos here
pid_t parentPid = getppid();
std::string syModuleId = getenvSafe("SYNTALOS_MODULE_ID");
if (syModuleId.empty() || syModuleId.length() < 2)
throw std::runtime_error("This module was not run by Syntalos, can not continue!");
// set the process name to the instance ID, to simplify identification in process trees
if (optn.renameThread) {
// PR_SET_NAME allows max 16 bytes including terminating NUL
const auto procName = syModuleId.substr(0, 15);
prctl(PR_SET_NAME, procName.c_str(), 0, 0, 0);
std::ofstream("/proc/self/comm") << procName;
}
// set up stream data type mapping, if it hasn't been initialized yet
registerStreamMetaTypes();
// set IOX log level
auto verboseLevel = getenvSafe("SY_VERBOSE");
if (verboseLevel == "1")
iox2::set_log_level(iox2::LogLevel::Debug);
else
iox2::set_log_level(iox2::LogLevel::Info);
// ensure we (try to) die if Syntalos, our parent, dies
prctl(PR_SET_PDEATHSIG, SIGTERM);
// race check: parent may have died before prctl().
if (getppid() != parentPid) {
raise(SIGTERM);
}
return std::unique_ptr<SyntalosLink>(new SyntalosLink(syModuleId));
}
class InputPortInfo::Private
{
public:
explicit Private(const InputPortChangeRequest &pc)
: index(0),
connected(false),
id(pc.id),
title(pc.title),
dataTypeId(pc.dataTypeId),
metadata(pc.metadata),
throttleItemsPerSec(0)
{
}
int index;
bool connected;
std::optional<SySubscriber> ioxSub;
std::optional<IoxWaitSetGuard> ioxGuard;
std::string id;
std::string title;
int dataTypeId;
MetaStringMap metadata;
NewDataRawFn newDataCb;
uint throttleItemsPerSec;
};
InputPortInfo::InputPortInfo(const InputPortChangeRequest &pc)
: d(new InputPortInfo::Private(pc))
{
}
std::string InputPortInfo::id() const
{
return d->id;
}
int InputPortInfo::dataTypeId() const
{
return d->dataTypeId;
}
std::string InputPortInfo::title() const
{
return d->title;
}
void InputPortInfo::setNewDataRawCallback(NewDataRawFn callback)
{
d->newDataCb = std::move(callback);
}
void InputPortInfo::setThrottleItemsPerSec(uint itemsPerSec)
{
d->throttleItemsPerSec = itemsPerSec;
}
std::optional<MetaValue> InputPortInfo::metadataValue(const std::string &key) const
{
return d->metadata.value(key);
}
MetaValue InputPortInfo::metadataValueOr(const std::string &key, const MetaValue &defaultVal) const
{
const auto val = d->metadata.value(key);
if (val.has_value())
return *val;
return defaultVal;
}
MetaStringMap InputPortInfo::metadata() const
{
return d->metadata;
}
class OutputPortInfo::Private
{
public:
explicit Private(const OutputPortChangeRequest &pc)
: index(0),
connected(false),
id(pc.id),
title(pc.title),
dataTypeId(pc.dataTypeId),
metadata(pc.metadata)
{
}
int index;
bool connected;
std::optional<SyPublisher> ioxPub;
std::optional<IoxWaitSetGuard> ioxGuard;
std::string id;
std::string title;
int dataTypeId;
MetaStringMap metadata;
// utilized to reuse allocated memory when sending data, to prevent fragmentation
ByteVector outBuffer;
[[nodiscard]] std::string ipcChannelId() const
{
return "o/" + id;
}
};
OutputPortInfo::OutputPortInfo(const OutputPortChangeRequest &pc)
: d(new OutputPortInfo::Private(pc))
{
}
std::string OutputPortInfo::id() const
{
return d->id;
}
int OutputPortInfo::dataTypeId() const
{
return d->dataTypeId;
}
void OutputPortInfo::setMetadataVar(const std::string &key, const MetaValue &value)
{
d->metadata[key] = value;
}
class SyntalosLink::Private
{
public:
Private(const std::string &instanceId)
: modId(instanceId),
state(ModuleState::UNKNOWN),
maxRTPriority(0),
syTimer(nullptr),
shutdownPending(false)
{
// make a new node for this module
node.emplace(makeIoxNode(modId));
// interfaces
pubError.emplace(makeTypedPublisher<ErrorEvent>(*node, svcName(ERROR_CHANNEL_ID)));
pubState.emplace(makeTypedPublisher<StateChangeEvent>(*node, svcName(STATE_CHANNEL_ID)));
pubStatusMsg.emplace(makeTypedPublisher<StatusMessageEvent>(*node, svcName(STATUS_MESSAGE_CHANNEL_ID)));
cltInPortChange.emplace(makeSliceClient(*node, svcName(IN_PORT_CHANGE_CHANNEL_ID)));
cltOutPortChange.emplace(makeSliceClient(*node, svcName(OUT_PORT_CHANGE_CHANNEL_ID)));
srvApiVersion.emplace(
makeTypedServer<ApiVersionRequest, ApiVersionResponse>(*node, svcName(API_VERSION_CALL_ID)));
srvSetNiceness.emplace(makeTypedServer<SetNicenessRequest, DoneResponse>(*node, svcName(SET_NICENESS_CALL_ID)));
srvSetMaxRTPriority.emplace(
makeTypedServer<SetMaxRealtimePriority, DoneResponse>(*node, svcName(SET_MAX_RT_PRIORITY_CALL_ID)));
srvSetCPUAffinity.emplace(
makeTypedServer<SetCPUAffinityRequest, DoneResponse>(*node, svcName(SET_CPU_AFFINITY_CALL_ID)));
srvConnectIPort.emplace(
makeTypedServer<ConnectInputRequest, DoneResponse>(*node, svcName(CONNECT_INPUT_CALL_ID)));
srvStart.emplace(makeTypedServer<StartRequest, DoneResponse>(*node, svcName(START_CALL_ID)));
srvStop.emplace(makeTypedServer<StopRequest, DoneResponse>(*node, svcName(STOP_CALL_ID)));
srvShutdown.emplace(makeTypedServer<ShutdownRequest, DoneResponse>(*node, svcName(SHUTDOWN_CALL_ID)));
srvShowDisplay.emplace(makeTypedServer<ShowDisplayRequest, DoneResponse>(*node, svcName(SHOW_DISPLAY_CALL_ID)));
srvShowSettings.emplace(
makeTypedServer<ShowSettingsRequest, DoneResponse>(*node, svcName(SHOW_SETTINGS_CALL_ID)));
srvLoadScript.emplace(makeSliceServer(*node, svcName(LOAD_SCRIPT_CALL_ID)));
srvSetPortsPreset.emplace(makeSliceServer(*node, svcName(SET_PORTS_PRESET_CALL_ID)));
srvUpdateIPortMetadata.emplace(makeSliceServer(*node, svcName(IN_PORT_UPDATE_METADATA_ID)));
srvSaveSettings.emplace(makeSliceServer<IoxByteSlice>(*node, svcName(SAVE_SETTINGS_CALL_ID)));
srvLoadSettings.emplace(makeSliceServer(*node, svcName(LOAD_SETTINGS_CALL_ID)));
srvPrepareRun.emplace(makeSliceServer(*node, svcName(PREPARE_RUN_CALL_ID)));
// control event notifications
masterCtlEventListener.emplace(makeEventListener(*node, svcName(MASTER_CTL_EVENT_ID)));
ctlEventNotifier.emplace(makeEventNotifier(*node, svcName(WORKER_CTL_EVENT_ID)));
}
~Private() = default;
std::optional<iox2::Node<iox2::ServiceType::Ipc>> node;
std::string modId;
// Publishers: Module process -> Syntalos master
std::optional<IoxPublisher<ErrorEvent>> pubError;
std::optional<IoxPublisher<StateChangeEvent>> pubState;
std::optional<IoxPublisher<StatusMessageEvent>> pubStatusMsg;
// Clients: Module -> Syntalos master
std::optional<IoxUntypedClient> cltInPortChange;
std::optional<IoxUntypedClient> cltOutPortChange;
// Servers: Syntalos master -> Module process commands
std::optional<IoxServer<ApiVersionRequest, ApiVersionResponse>> srvApiVersion;
std::optional<IoxServer<SetNicenessRequest, DoneResponse>> srvSetNiceness;
std::optional<IoxServer<SetMaxRealtimePriority, DoneResponse>> srvSetMaxRTPriority;
std::optional<IoxServer<SetCPUAffinityRequest, DoneResponse>> srvSetCPUAffinity;
std::optional<IoxServer<ConnectInputRequest, DoneResponse>> srvConnectIPort;
std::optional<IoxServer<StartRequest, DoneResponse>> srvStart;
std::optional<IoxServer<StopRequest, DoneResponse>> srvStop;
std::optional<IoxServer<ShutdownRequest, DoneResponse>> srvShutdown;
std::optional<IoxUntypedReqServer> srvLoadScript;
std::optional<IoxUntypedReqServer> srvSetPortsPreset;
std::optional<IoxUntypedReqServer> srvUpdateIPortMetadata;
std::optional<IoxUntypedReqResServer> srvSaveSettings;
std::optional<IoxUntypedReqServer> srvLoadSettings;
std::optional<IoxUntypedReqServer> srvPrepareRun;
std::optional<IoxServer<ShowDisplayRequest, DoneResponse>> srvShowDisplay;
std::optional<IoxServer<ShowSettingsRequest, DoneResponse>> srvShowSettings;
// Listens for messages from the server
std::optional<IoxListener> masterCtlEventListener;
// Used by us to ping master if we have a message
std::optional<IoxNotifier> ctlEventNotifier;
// WaitSet to efficiently wait for messages from master
std::optional<IoxWaitSet> waitSet;
std::optional<IoxWaitSetGuard> waitSetCtrlGuard;
bool waitSetDirty = true;
// Set to true when a Stop command has been processed but input-port subscribers have
// not yet been dropped. The actual drop is deferred until the current WaitSet iteration
// finishes so that data events that arrived in the same iteration as Stop are not lost.
bool inputPortResetPending = false;
ModuleState state;
int maxRTPriority;
std::vector<std::shared_ptr<InputPortInfo>> inPortInfo;
std::vector<std::shared_ptr<OutputPortInfo>> outPortInfo;
SyncTimer *syTimer;
TestSubjectInfo testSubject;
bool allowAsyncStart = true;
LoadScriptFn loadScriptCb;
SaveSettingsFn saveSettingsCb;
LoadSettingsFn loadSettingsCb;
PrepareRunFn prepareRunCb;
StartFn startCb;
StopFn stopCb;
ShutdownFn shutdownCb;
ShowSettingsFn showSettingsCb;
ShowDisplayFn showDisplayCb;
bool shutdownPending;
[[nodiscard]] std::string svcName(const std::string &channel) const
{
assert(!modId.empty());
return makeModuleServiceName(modId, channel);
}
void notifyMaster() const
{
if (!ctlEventNotifier.has_value()) [[unlikely]] {
std::cerr << "notifyMaster: Notifier was not initialized, can not notify master!" << std::endl;
return;
}
auto r = ctlEventNotifier->notify();
if (!r.has_value())
std::cerr << "Failed to notify master of control event:" << iox2::bb::into<const char *>(r.error())
<< std::endl;
}
void sendPortChangeData(IoxUntypedClient &clt, const ByteVector &data)
{
auto maybeSlice = clt.loan_slice_uninit(static_cast<uint64_t>(data.size()));
if (!maybeSlice.has_value()) {
std::cerr << "Failed to loan memory for port change request: "
<< iox2::bb::into<const char *>(maybeSlice.error()) << '\n';
return;
}
auto rawSlice = std::move(maybeSlice).value();
std::memmove(rawSlice.payload_mut().data(), data.data(), data.size());
auto pending = iox2::send(iox2::assume_init(std::move(rawSlice))).value();
notifyMaster();
const auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(60);
while (true) {
auto response = pending.receive().value();
if (response.has_value())
return;
if (std::chrono::steady_clock::now() >= deadline) {
std::cerr << "Port change acknowledgment from master timed out after 60s - aborting worker."
<< std::endl;
std::abort();
}
std::this_thread::sleep_for(std::chrono::microseconds(25));
}
}
template<typename Req>
static void replyDone(iox2::ActiveRequest<iox2::ServiceType::Ipc, Req, void, DoneResponse, void> &req, bool success)
{
auto maybeResponse = req.loan_uninit();
if (!maybeResponse.has_value()) {
std::cerr << "Failed to loan response for 'done' reply: "
<< iox2::bb::into<const char *>(maybeResponse.error()) << '\n';
return;
}
iox2::send(std::move(maybeResponse).value().write_payload(DoneResponse{success})).value();
}
static void replyDoneSlice(SliceActiveRequest &req, bool success)
{
auto maybeResponse = req.loan_uninit();
if (!maybeResponse.has_value()) {
std::cerr << "Failed to loan response for 'done' reply: "
<< iox2::bb::into<const char *>(maybeResponse.error()) << '\n';
return;
}
iox2::send(std::move(maybeResponse).value().write_payload(DoneResponse{success})).value();
}
void rebuildWaitSet()
{
// MUST drop ALL WaitSet guards before destroying the WaitSet itself.
// iceoryx2 contract: "WaitSetGuard must live at most as long as the WaitSet."
// Dropping a guard after the WaitSet is destroyed is use-after-free on
// the Rust side and causes EBADF errors and corrupted event state on the
// next run.
waitSetCtrlGuard.reset();
for (auto &iport : inPortInfo)
iport->d->ioxGuard.reset();
for (auto &oport : outPortInfo)
oport->d->ioxGuard.reset();
// Now safe to destroy the old WaitSet
waitSet.reset();
// Build a fresh WaitSet and re-attach everything
waitSet.emplace(
iox2::WaitSetBuilder()
.signal_handling_mode(iox2::SignalHandlingMode::HandleTerminationRequests)
.create<iox2::ServiceType::Ipc>()
.value());
// Control attachment: wakes for requests from the master
if (masterCtlEventListener.has_value())
waitSetCtrlGuard.emplace(waitSet->attach_notification(*masterCtlEventListener).value());
// Per-input-port attachments
for (auto &iport : inPortInfo) {
if (!iport->d->connected || !iport->d->ioxSub.has_value())
continue;
iport->d->ioxGuard.emplace(waitSet->attach_notification(*iport->d->ioxSub).value());
}
// Per-output-port publisher attachments
for (auto &oport : outPortInfo) {
if (!oport->d->ioxPub.has_value())
continue;
// Proactively update connections for events that queued up while WaitSet was not active
oport->d->ioxPub->handleEvents();
oport->d->ioxGuard.emplace(waitSet->attach_notification(*oport->d->ioxPub).value());
}
waitSetDirty = false;
}
void processPendingIPortReset()
{
if (!inputPortResetPending)
return;
for (auto &iport : inPortInfo) {
iport->d->ioxGuard.reset();
iport->d->ioxSub.reset();
iport->d->connected = false;
}
inputPortResetPending = false;
waitSetDirty = true;
}
void processPendingData(const iox2::WaitSetAttachmentId<iox2::ServiceType::Ipc> &attachmentId)
{
for (auto &iport : inPortInfo) {
if (!iport->d->connected || !iport->d->ioxSub.has_value())
continue;
if (!iport->d->ioxGuard.has_value() || !attachmentId.has_event_from(*iport->d->ioxGuard))
continue;
if (iport->d->newDataCb) {
iport->d->ioxSub->handleEvents([&](const IoxImmutableByteSlice &pl) {
iport->d->newDataCb(pl.data(), pl.number_of_bytes());
});
} else {
// Still drain to prevent the queue filling up even if there's no callback.
iport->d->ioxSub->handleEvents([](const IoxImmutableByteSlice &) {});
}
}
// Handle output-port publisher events (SubscriberConnected / SubscriberDisconnected).
for (auto &oport : outPortInfo) {
if (!oport->d->ioxPub.has_value() || !oport->d->ioxGuard.has_value())
continue;
if (!attachmentId.has_event_from(*oport->d->ioxGuard))
continue;
oport->d->ioxPub->handleEvents();
}
}
};
SyntalosLink::SyntalosLink(const std::string &instanceId)
: d(new SyntalosLink::Private(instanceId))
{
d->syTimer = new SyncTimer;
// we us the fast, async start() by default
d->allowAsyncStart = true;
// Immediately upon creation, we send a message that we are initializing.
// A client using this interface has to set this to IDLE once it has set up the basics.
setState(ModuleState::INITIALIZING);
}
SyntalosLink::~SyntalosLink()
{
delete d->syTimer;
}
std::string SyntalosLink::instanceId() const
{
return d->modId;
}
void SyntalosLink::raiseError(const std::string &title, const std::string &message)
{
auto uninit = d->pubError->loan_uninit().value();
auto &ev = uninit.payload_mut();
ev.title = iox2::bb::StaticString<128>::from_utf8_null_terminated_unchecked_truncated(title.c_str(), title.size());
ev.message = iox2::bb::StaticString<2048>::from_utf8_null_terminated_unchecked_truncated(
message.c_str(), message.size());
iox2::send(iox2::assume_init(std::move(uninit))).value();
d->notifyMaster();
setState(ModuleState::ERROR);
}
void SyntalosLink::raiseError(const std::string &message)
{
auto uninit = d->pubError->loan_uninit().value();
auto &ev = uninit.payload_mut();
ev.title = iox2::bb::StaticString<128>();
ev.message = iox2::bb::StaticString<2048>::from_utf8_null_terminated_unchecked_truncated(
message.c_str(), message.size());
iox2::send(iox2::assume_init(std::move(uninit))).value();
d->notifyMaster();
setState(ModuleState::ERROR);
}
SY_DEFINE_LOG_CATEGORY(logIpc, "ipc");
static void ipcLogMessageDispatch(datactl::LogSeverity severity, const std::string &msg)
{
if (::Syntalos::datactl::shouldLog(logIpc, severity))
::Syntalos::datactl::dispatchLog(logIpc, severity, __FILE__, __LINE__, __func__, msg);
}
void SyntalosLink::processPendingControl()
{
// Drain the master control listener to keep its socket buffer clear.
// We *must* drain at the start to immediately consume the notification that
// triggered this call.
// Any new notifications arriving DURING processing are left intact, preventing
// the race where a notification for a freshly-queued request arrives just before
// an end-of-function drain and gets silently discarded, stranding the request.
drainListenerEvents(*d->masterCtlEventListener);
// ---- ApiVersion ----
while (true) {
auto req = safeReceive(*d->srvApiVersion);
if (!req.has_value())
break;
auto maybeResponse = req->loan_uninit();
if (!maybeResponse.has_value()) {
std::cerr << "Failed to loan response for API version request: "
<< iox2::bb::into<const char *>(maybeResponse.error()) << '\n';
continue;
}
ApiVersionResponse resp;
resp.apiVersion = iox2::bb::StaticString<64>::from_utf8_null_terminated_unchecked_truncated(
SY_MODULE_API_TAG, std::strlen(SY_MODULE_API_TAG));
iox2::send(std::move(maybeResponse).value().write_payload(std::move(resp))).value();
}
// ---- SetNiceness ----
while (true) {
auto req = safeReceive(*d->srvSetNiceness);
if (!req.has_value())
break;
// apply niceness request immediately to current thread
const bool ok = setCurrentThreadNiceness(req->payload().nice);
if (!ok)
// Rtkit may have hit its per-user concurrent-thread limit.
// The module will continue at default priority rather than failing to start entirely.
std::cerr << "Worker thread niceness could not be set to " << req->payload().nice
<< " - module will run at default priority." << std::endl;
Private::replyDone(*req, true);
}
// ---- SetMaxRealtimePriority ----
while (true) {
auto req = safeReceive(*d->srvSetMaxRTPriority);
if (!req.has_value())
break;
d->maxRTPriority = req->payload().priority;
Private::replyDone(*req, true);
}
// ---- SetCPUAffinity ----
while (true) {
auto req = safeReceive(*d->srvSetCPUAffinity);
if (!req.has_value())
break;
const auto &cores = req->payload().cores;
if (!cores.empty()) {
std::vector<uint> coreVec;
coreVec.reserve(cores.size());
for (uint64_t i = 0; i < cores.size(); ++i)
coreVec.push_back(cores.unchecked_access()[i]);
thread_set_affinity_from_vec(pthread_self(), coreVec);
}
Private::replyDone(*req, true);
}
// ---- LoadScript ----
// Execute the script callback BEFORE replying so that by the time the master's
// callSliceClientSimple() returns, all port ADD/REMOVE messages that the script
// published are already in the IPC queues. The master can then drain them with
// a single handleIncomingControl() call and be guaranteed to see all ports.
while (true) {
auto req = safeReceive(*d->srvLoadScript);
if (!req.has_value())
break;
const auto pl = req->payload();
const auto scriptReqData = LoadScriptRequest::fromMemory(pl.data(), pl.number_of_bytes());
// If the caller requested a port reset, clear all existing port state first so
// the script starts with a clean slate (needed when reloading persistent-mode scripts).
if (scriptReqData.resetPorts)
resetPorts();
// execute script first - any registerInput/OutputPort() calls happen here
if (d->loadScriptCb && !scriptReqData.script.empty())
d->loadScriptCb(scriptReqData.script, scriptReqData.workingDir);
// only then reply, so the master knows ports are settled
Private::replyDoneSlice(*req, true);
}
// ---- SetPortsPreset ----
// Add any ports not yet known; never remove existing ones (they may be in use).
while (true) {
auto req = safeReceive(*d->srvSetPortsPreset);
if (!req.has_value())
break;
const auto pl = req->payload();
const auto sppReq = SetPortsPresetRequest::fromMemory(pl.data(), pl.number_of_bytes());
// We must not remove existing ports, as they might be in use. So instead, we
// add & merge port data.
for (const auto &ipc : sppReq.inPorts) {
bool skip = false;
for (const auto &ip : d->inPortInfo) {
if (ip->id() == ipc.id)
skip = true;
}
if (skip)
continue;
d->inPortInfo.push_back(std::shared_ptr<InputPortInfo>(new InputPortInfo(ipc)));
// we will have to rebuild the waitset after ports changed
d->waitSetDirty = true;
}
for (const auto &opc : sppReq.outPorts) {
std::shared_ptr<OutputPortInfo> oport;
bool update = false;
for (const auto &op : d->outPortInfo) {
if (op->id() != opc.id)
continue;
oport = op;
update = true;
break;
}
if (!oport)
oport = std::shared_ptr<OutputPortInfo>(new OutputPortInfo(opc));
// Detach old publisher from WaitSet before replacement.
oport->d->ioxGuard.reset();
oport->d->ioxPub.reset(); // drop the old connection first, before trying to create a new one
oport->d->ioxPub.emplace(
SyPublisher::create(*d->node, d->modId, oport->d->ipcChannelId(), opc.topology, ipcLogMessageDispatch));
if (!update)
d->outPortInfo.push_back(oport);
// we will have to rebuild the waitset after ports changed
d->waitSetDirty = true;
}
Private::replyDoneSlice(*req, true);
}
// ---- ConnectInputPort ----
while (true) {
auto req = safeReceive(*d->srvConnectIPort);
if (!req.has_value())
break;
const auto &r = req->payload();
const std::string_view portId = r.portId.unchecked_access().c_str();
// find the port
auto it = std::find_if(d->inPortInfo.begin(), d->inPortInfo.end(), [&](const auto &ip) {
return ip->id() == portId;
});
if (it == d->inPortInfo.end()) {
// return error if the port was not registered
Private::replyDone(*req, false);
continue;
}
auto &iport = *it;
// connect the port
iport->d->connected = true;
// MUST reset the WaitSet guard BEFORE replacing the old subscriber
iport->d->ioxGuard.reset();
iport->d->ioxSub.reset(); // drop the old connection first, before trying to create a new one
iport->d->ioxSub.emplace(
SySubscriber::create(
*d->node,
std::string(r.instanceId.unchecked_access().c_str()),
std::string(r.channelId.unchecked_access().c_str()),
r.topology));
Private::replyDone(*req, true);
// we will have to rebuild the waitset after ports changed
d->waitSetDirty = true;
}
// ---- UpdateInputPortMetadata ----
while (true) {
auto req = safeReceive(*d->srvUpdateIPortMetadata);
if (!req.has_value())
break;
const auto pl = req->payload();
const auto reqUpdateMD = UpdateInputPortMetadataRequest::fromMemory(pl.data(), pl.number_of_bytes());
// update metadata
for (const auto &ip : d->inPortInfo) {
if (ip->id() == reqUpdateMD.id) {
ip->d->metadata = reqUpdateMD.metadata;
break;
}
}
Private::replyDoneSlice(*req, true);
}
// ---- SaveSettings ----
while (true) {
auto req = safeReceive(*d->srvSaveSettings);
if (!req.has_value())
break;
const auto pl = req->payload();
const auto ssReq = SaveSettingsRequest::fromMemory(pl.data(), pl.number_of_bytes());
SaveSettingsResponse ssResp;
ssResp.success = true;
if (d->saveSettingsCb)
ssResp.success = d->saveSettingsCb(ssResp.data, ssReq.baseDir);
auto ssRespData = ssResp.toBytes();
auto maybeResponse = req->loan_slice_uninit(ssRespData.size());
if (!maybeResponse.has_value()) {
std::cerr << "Failed to loan response (" << ssRespData.size()
<< " bytes) for reply to SaveSettings: " << iox2::bb::into<const char *>(maybeResponse.error())
<< '\n';
break;
}
auto rawResponse = std::move(maybeResponse).value();
std::memcpy(rawResponse.payload_mut().data(), ssRespData.data(), ssRespData.size());
iox2::send(iox2::assume_init(std::move(rawResponse))).value();
}
// ---- LoadSettings ----
while (true) {
auto req = safeReceive(*d->srvLoadSettings);
if (!req.has_value())
break;
const auto pl = req->payload();
const auto lsReq = LoadSettingsRequest::fromMemory(pl.data(), pl.number_of_bytes());
auto success = true;
if (d->loadSettingsCb)
success = d->loadSettingsCb(lsReq.data, lsReq.baseDir);
Private::replyDoneSlice(*req, success);
}
// ---- PrepareRun ----
while (true) {
auto req = safeReceive(*d->srvPrepareRun);
if (!req.has_value())
break;
const auto pl = req->payload();
const auto prepReq = PrepareRunRequest::fromMemory(pl.data(), pl.number_of_bytes());
// update test subject details
d->testSubject.id = prepReq.subjectId;
d->testSubject.group = prepReq.subjectGroup;
// prepare the run
auto success = true;
if (d->prepareRunCb)
success = d->prepareRunCb();
Private::replyDoneSlice(*req, success);
}
// ---- Start ----
while (true) {
auto req = safeReceive(*d->srvStart);
if (!req.has_value())
break;
const auto timePoint = symaster_timepoint(microseconds_t(req->payload().startTimestampUsec));
delete d->syTimer;
d->syTimer = new SyncTimer;
d->syTimer->startAt(timePoint);
// Ensure all output-port publishers know about subscribers that connected
// during the prepare phase.
for (auto &oport : d->outPortInfo) {
if (oport->d->ioxPub.has_value())
oport->d->ioxPub->handleEvents();
}
if (d->allowAsyncStart) {
// reply immediately, so the master can continue starting other modules
Private::replyDone(*req, true);
if (d->startCb)
d->startCb();
} else {
// Run the start callback BEFORE sending Done so that any port metadata
// updates (e.g. forwarding framerate to output ports) are acknowledged
// by the master while it is still in the callClientSimple(START) wait loop.
// This guarantees that startStream() on the master side sees the final metadata.
// Any errors are reported via the error channel.
if (d->startCb)
d->startCb();
Private::replyDone(*req, true);
}
}
// ---- Stop ----
while (true) {
auto req = safeReceive(*d->srvStop);
if (!req.has_value())
break;
// we wait for the stop callback to finish before responding
if (d->stopCb)
d->stopCb();
Private::replyDone(*req, true);
// Mark input ports for deferred dropping. We do NOT drop them here because
// this function may be called from inside a WaitSet onEvent callback
// (awaitData / awaitDataForever). If we drop the subscriber guards while the
// WaitSet is still iterating over triggered events, any data event that fired
// in the same cycle as this Stop would be silently discarded.
// By deferring the drop to the end of the WaitSet iteration, concurrent data
// events are still delivered before the ports are torn down.
if (!d->inPortInfo.empty())
d->inputPortResetPending = true;
}
// ---- Shutdown ----
while (true) {
auto req = safeReceive(*d->srvShutdown);
if (!req.has_value())
break;
// NOTE: We reply immediately here and defer processing of the call,
// because otherwise the master would never get a response if we
// tear down the process too quickly.
Private::replyDone(*req, true);
// execute shutdown action after replying to master
// if no callback is defined, we just exit()
if (d->shutdownCb)
d->shutdownCb();
d->shutdownPending = true;
break;
}
// ---- ShowDisplay ----
while (true) {
auto req = safeReceive(*d->srvShowDisplay);
if (!req.has_value())
break;
Private::replyDone(*req, true);
if (d->showDisplayCb)
d->showDisplayCb();
}
// ---- ShowSettings ----
while (true) {
auto req = safeReceive(*d->srvShowSettings);
if (!req.has_value())
break;
Private::replyDone(*req, true);
if (d->showSettingsCb)
d->showSettingsCb();
}
}
void SyntalosLink::awaitData(int timeoutUsec, const std::function<void()> &eventFn)
{
// If a shutdown request was processed, we should die ASAP and this function
// should no longer run - otherwise we may block forever and get killed
if (d->shutdownPending)
return;
// Complete any deferred input-port subscriber drop from a previous Stop command,
// the rebuild the WaitSet if needed.
d->processPendingIPortReset();
if (d->waitSetDirty)
d->rebuildWaitSet();
auto onEvent =
[this](const iox2::WaitSetAttachmentId<iox2::ServiceType::Ipc> &attachmentId) -> iox2::CallbackProgression {
// handle control messages
if (attachmentId.has_event_from(*d->waitSetCtrlGuard)) {
processPendingControl();
if (d->waitSetDirty)
return iox2::CallbackProgression::Stop;
} else {
// handle incoming data
d->processPendingData(attachmentId);
}
return d->shutdownPending ? iox2::CallbackProgression::Stop : iox2::CallbackProgression::Continue;
};
// Helper: inspect the WaitSet run-result and trigger a clean shutdown when
// IOX reports that SIGTERM/SIGINT was received
auto handleRunResult = [this](const iox2::bb::Expected<iox2::WaitSetRunResult, iox2::WaitSetRunError> &res) {
if (!res.has_value())
return;
const auto r = res.value();
if (r == iox2::WaitSetRunResult::Interrupt || r == iox2::WaitSetRunResult::TerminationRequest)
d->shutdownPending = true;
};
if (timeoutUsec < 0) {
do {
// Complete deferred input-port subscriber drop, then rebuild WaitSet if needed.
d->processPendingIPortReset();
if (d->waitSetDirty)
d->rebuildWaitSet();
handleRunResult(
d->waitSet->wait_and_process_once_with_timeout(onEvent, iox2::bb::Duration::from_millis(250)));
if (eventFn)
eventFn();
// Keep the default GLib context alive so timer/idle sources are dispatched.
iterateDefaultMainContextNonBlocking();
// exit if we are supposed to shutdown
if (d->shutdownPending)
break;
} while (d->state == ModuleState::RUNNING);
} else {
handleRunResult(
d->waitSet->wait_and_process_once_with_timeout(onEvent, iox2::bb::Duration::from_micros(timeoutUsec)));
if (eventFn)
eventFn();
}
}
void SyntalosLink::awaitDataForever(const std::function<void()> &eventFn, int intervalUsec)
{
if (d->waitSetDirty)
d->rebuildWaitSet();
auto onEvent =
[this](const iox2::WaitSetAttachmentId<iox2::ServiceType::Ipc> &attachmentId) -> iox2::CallbackProgression {
if (attachmentId.has_event_from(*d->waitSetCtrlGuard)) {
processPendingControl();
if (d->waitSetDirty)
return iox2::CallbackProgression::Stop;
} else {
d->processPendingData(attachmentId);
}
return d->shutdownPending ? iox2::CallbackProgression::Stop : iox2::CallbackProgression::Continue;
};
iterateDefaultMainContextNonBlocking();
while (true) {
// Complete deferred input-port subscriber drop, then rebuild WaitSet if ports were connected/disconnected
// since the last iteration (processPendingControl() sets waitSetDirty when that happens).
d->processPendingIPortReset();
if (d->waitSetDirty)
d->rebuildWaitSet();
const auto res = d->waitSet->wait_and_process_once_with_timeout(
onEvent, iox2::bb::Duration::from_micros(intervalUsec));
if (!res.has_value()) {
g_warning("Event loop terminated unexpectedly: %s", iox2::bb::into<const char *>(res.error()));
return;
}
// Treat SIGTERM/SIGINT as a shutdown request
const auto r = res.value();
if (r == iox2::WaitSetRunResult::Interrupt || r == iox2::WaitSetRunResult::TerminationRequest)
d->shutdownPending = true;
// call external event function
if (eventFn)
eventFn();
// Dispatch GLib events periodically so external sources can run on the default context.
iterateDefaultMainContextNonBlocking();
// exit if we are about to shutdown
if (d->shutdownPending)
break;
}
}
ModuleState SyntalosLink::state() const
{
return d->state;
}
bool SyntalosLink::isShutdownPending() const
{
return d->shutdownPending;
}
void SyntalosLink::setState(ModuleState state)
{
auto uninit = d->pubState->loan_uninit().value();
uninit.payload_mut().state = state;
iox2::send(iox2::assume_init(std::move(uninit))).value();
d->notifyMaster();
d->state = state;
}
void SyntalosLink::setStatusMessage(const std::string &message)
{
auto uninit = d->pubStatusMsg->loan_uninit().value();
uninit.payload_mut().text = iox2::bb::StaticString<512>::from_utf8_null_terminated_unchecked_truncated(
message.c_str(), message.size());
iox2::send(iox2::assume_init(std::move(uninit))).value();
d->notifyMaster();
}
int SyntalosLink::maxRealtimePriority() const
{
return d->maxRTPriority;
}
void SyntalosLink::setLoadScriptCallback(LoadScriptFn callback)
{
d->loadScriptCb = std::move(callback);
}
void SyntalosLink::setSaveSettingsCallback(SaveSettingsFn callback)
{
d->saveSettingsCb = std::move(callback);
}
void SyntalosLink::setLoadSettingsCallback(LoadSettingsFn callback)
{
d->loadSettingsCb = std::move(callback);
}
void SyntalosLink::setPrepareRunCallback(PrepareRunFn callback)
{
d->prepareRunCb = std::move(callback);
}
void SyntalosLink::setStartCallback(StartFn callback)
{
d->startCb = std::move(callback);
}
void SyntalosLink::setStopCallback(StopFn callback)
{
d->stopCb = std::move(callback);
}
void SyntalosLink::setShutdownCallback(ShutdownFn callback)
{
d->shutdownCb = std::move(callback);
}
SyncTimer *SyntalosLink::timer() const
{
return d->syTimer;
}
const TestSubjectInfo &SyntalosLink::testSubject() const
{
return d->testSubject;
}
bool SyntalosLink::allowAsyncStart() const
{
return d->allowAsyncStart;
}
void SyntalosLink::setAllowAsyncStart(bool allow)
{
d->allowAsyncStart = allow;
}
void SyntalosLink::setShowSettingsCallback(ShowSettingsFn callback)
{
d->showSettingsCb = std::move(callback);
}
void SyntalosLink::setShowDisplayCallback(ShowDisplayFn callback)
{
d->showDisplayCb = std::move(callback);
}
std::vector<std::shared_ptr<InputPortInfo>> SyntalosLink::inputPorts() const
{
return d->inPortInfo;
}
std::vector<std::shared_ptr<OutputPortInfo>> SyntalosLink::outputPorts() const
{
return d->outPortInfo;
}
auto SyntalosLink::registerInputPort(const std::string &id, const std::string &title, BaseDataType::TypeId typeId)
-> std::expected<std::shared_ptr<InputPortInfo>, std::string>
{
// passing an invalid data type is a hard error
if (!BaseDataType::typeIdIsValid(typeId)) {
return std::unexpected(
std::format("Can not register input port. Data type with ID '{}' is unknown.", static_cast<int>(typeId)));
}
// check for duplicates
for (const auto &ip : d->inPortInfo) {
if (ip->id() == id) {
return std::unexpected(std::format("Can not register input port. A port with ID '{}' already exists.", id));
}
}
// construct our reference for this port
InputPortChangeRequest ipc(PortAction::ADD);
ipc.id = id;
ipc.title = title;
ipc.dataTypeId = typeId;
// announce the new port to master
const auto iportData = ipc.toBytes();
d->sendPortChangeData(*d->cltInPortChange, iportData);
// we need to rebuild the waitset
d->waitSetDirty = true;
// construct proxy
auto iport = std::shared_ptr<InputPortInfo>(new InputPortInfo(ipc));
d->inPortInfo.push_back(iport);
return iport;
}
auto SyntalosLink::registerOutputPort(
const std::string &id,
const std::string &title,
BaseDataType::TypeId typeId,
const MetaStringMap &metadata) -> std::expected<std::shared_ptr<OutputPortInfo>, std::string>
{
// passing an invalid data type is a hard error
if (!BaseDataType::typeIdIsValid(typeId)) {
return std::unexpected(
std::format("Can not register output port. Data type with ID '{}' is unknown.", static_cast<int>(typeId)));
}
// check for duplicates
for (const auto &op : d->outPortInfo) {
if (op->id() == id) {
return std::unexpected(
std::format("Can not register output port. A port with ID '{}' already exists.", id));
}
}
// construct our reference for this port
OutputPortChangeRequest opc(PortAction::ADD);
opc.id = id;
opc.title = title;
opc.dataTypeId = typeId;
opc.metadata = metadata;
// announce the new port to master
const auto oportData = opc.toBytes();
d->sendPortChangeData(*d->cltOutPortChange, oportData);
// we need to rebuild the waitset
d->waitSetDirty = true;
// construct proxy
auto oport = std::shared_ptr<OutputPortInfo>(new OutputPortInfo(opc));
oport->d->ioxPub.reset();
oport->d->ioxPub.emplace(
SyPublisher::create(*d->node, d->modId, oport->d->ipcChannelId(), opc.topology, ipcLogMessageDispatch));
d->outPortInfo.push_back(oport);
return oport;
}
void SyntalosLink::updateInputPort(const std::shared_ptr<InputPortInfo> &iport)
{
InputPortChangeRequest ipc(PortAction::CHANGE);
ipc.id = iport->id();
ipc.title = iport->d->title;
ipc.dataTypeId = iport->d->dataTypeId;
ipc.metadata = iport->d->metadata;
ipc.throttleItemsPerSec = iport->d->throttleItemsPerSec;
const auto iportData = ipc.toBytes();
d->sendPortChangeData(*d->cltInPortChange, iportData);
}
void SyntalosLink::updateOutputPort(const std::shared_ptr<OutputPortInfo> &oport)
{
OutputPortChangeRequest opc(PortAction::CHANGE);
opc.id = oport->id();
opc.title = oport->d->title;
opc.dataTypeId = oport->dataTypeId();
opc.metadata = oport->d->metadata;
const auto oportData = opc.toBytes();
d->sendPortChangeData(*d->cltOutPortChange, oportData);
}
void SyntalosLink::removeInputPort(const std::shared_ptr<InputPortInfo> &iport)
{
InputPortChangeRequest ipc(PortAction::REMOVE);
ipc.id = iport->id();
// notify master
const auto iportData = ipc.toBytes();
d->sendPortChangeData(*d->cltInPortChange, iportData);
// reset our reference (it will not be usable afterwards)
iport->d->ioxGuard.reset();
iport->d->ioxSub.reset();
iport->d->connected = false;
d->waitSetDirty = true;
}
void SyntalosLink::removeOutputPort(const std::shared_ptr<OutputPortInfo> &oport)
{
OutputPortChangeRequest opc(PortAction::REMOVE);
opc.id = oport->id();
const auto oportData = opc.toBytes();
// notify master
d->sendPortChangeData(*d->cltOutPortChange, oportData);
// reset
oport->d->ioxGuard.reset();
oport->d->ioxPub.reset();
d->waitSetDirty = true;
}
void SyntalosLink::resetPorts()
{
// Tear down every IPC publisher and subscriber.
// WaitSet guards must be dropped before the publishers/subscribers they
// guard are destroyed (iceoryx2 contract: guard must not outlive the WaitSet).
for (auto &op : d->outPortInfo)
removeOutputPort(op);
for (auto &ip : d->inPortInfo)
removeInputPort(ip);
d->outPortInfo.clear();
d->inPortInfo.clear();
// request a WaitSet rebuild (already done by the port removal calls, this is just to be explicit)
d->waitSetDirty = true;
}
bool SyntalosLink::submitOutput(const std::shared_ptr<OutputPortInfo> &oport, const BaseDataType &data)
{
if (!oport->d->ioxPub.has_value()) {
raiseError(std::format("Failed to send data on output port '{}': Publisher was not initialized!", oport->id()));
return false;
}
auto &pub = *oport->d->ioxPub;
try {
const auto memSize = data.memorySize();
if (memSize < 0) {
// we do not know the required memory size in advance, so we need to
// perform a serialization and extra copy operation
data.toBytes(oport->d->outBuffer);
auto slice = pub.loanSlice(static_cast<size_t>(oport->d->outBuffer.size()));
std::memcpy(
slice.payload_mut().data(),
oport->d->outBuffer.data(),
static_cast<size_t>(oport->d->outBuffer.size()));
pub.sendSlice(std::move(slice));
} else {
// Higher efficiency code-path since the size is known in advance
auto loan = pub.loanSlice(static_cast<size_t>(memSize));
if (!data.writeToMemory(loan.payload_mut().data(), static_cast<ssize_t>(memSize))) {
raiseError(std::format("Failed to serialize data for output port '{}'.", oport->id()));
return false;
}
pub.sendSlice(std::move(loan));
}
} catch (std::exception &e) {
raiseError(std::format("Failed to send data on output port '{}': {}", oport->id(), e.what()));
return false;
}
return true;
}
} // namespace Syntalos
Updated on 2026-04-24 at 23:36:58 +0000