mlink/syntaloslink.cpp

mlink/syntaloslink.cpp

mlink/syntaloslink.cpp

Namespaces

Name
Syntalos

Source code

/*
 * Copyright (C) 2020-2026 Matthias Klumpp <matthias@tenstral.net>
 *
 * Licensed under the GNU Lesser General Public License Version 3
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the license, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "syntaloslink.h"

#include <QDebug>
#include <QCoreApplication>
#include <csignal>
#include <sys/prctl.h>
#include <iox2/iceoryx2.hpp>

#include "mlink/ipc-types-private.h"
#include "mlink/ipc-iox-private.h"
#include "utils/misc.h"
#include "utils/rtkit.h"
#include "utils/cpuaffinity.h"

using namespace Syntalos;
using namespace Syntalos::ipc;

namespace Syntalos
{

template<typename Sub>
static auto safeReceive(Sub &sub) -> std::remove_cvref_t<decltype(sub.receive().value())>
{
    auto result = sub.receive();
    if (!result.has_value()) {
        std::cerr << "Client IPC receive failed:" << iox2::bb::into<const char *>(result.error());
        return {};
    }
    return std::move(result).value();
}

std::unique_ptr<SyntalosLink> initSyntalosModuleLink()
{
    auto syModuleId = qgetenv("SYNTALOS_MODULE_ID");
    if (syModuleId.isEmpty() || syModuleId.length() < 2)
        throw std::runtime_error("This module was not run by Syntalos, can not continue!");

    // set up stream data type mapping, if it hasn't been initialized yet
    registerStreamMetaTypes();

    // set IOX log level
    auto verboseLevel = qgetenv("SY_VERBOSE");
    if (verboseLevel == "1")
        iox2::set_log_level(iox2::LogLevel::Debug);
    else
        iox2::set_log_level(iox2::LogLevel::Info);

    // ensure we (try to) die if Syntalos, our parent, dies
    prctl(PR_SET_PDEATHSIG, SIGTERM);

    return std::unique_ptr<SyntalosLink>(new SyntalosLink(syModuleId));
}

class InputPortInfo::Private
{
public:
    explicit Private(const InputPortChange &pc)
        : index(0),
          connected(false),
          id(pc.id),
          title(pc.title),
          dataTypeId(pc.dataTypeId),
          metadata(pc.metadata),
          throttleItemsPerSec(0)
    {
    }

    int index;
    bool connected;
    std::optional<SySubscriber> ioxSub;
    std::optional<IoxWaitSetGuard> ioxGuard;

    QString id;
    QString title;
    int dataTypeId;
    QVariantHash metadata;

    NewDataRawFn newDataCb;
    uint throttleItemsPerSec;
};

InputPortInfo::InputPortInfo(const InputPortChange &pc)
    : d(new InputPortInfo::Private(pc))
{
}

QString InputPortInfo::id() const
{
    return d->id;
}

int InputPortInfo::dataTypeId() const
{
    return d->dataTypeId;
}

QString InputPortInfo::title() const
{
    return d->title;
}

void InputPortInfo::setNewDataRawCallback(NewDataRawFn callback)
{
    d->newDataCb = std::move(callback);
}

void InputPortInfo::setThrottleItemsPerSec(uint itemsPerSec)
{
    d->throttleItemsPerSec = itemsPerSec;
}

QVariantHash InputPortInfo::metadata() const
{
    return d->metadata;
}

class OutputPortInfo::Private
{
public:
    explicit Private(const OutputPortChange &pc)
        : index(0),
          connected(false),
          id(pc.id),
          title(pc.title),
          dataTypeId(pc.dataTypeId),
          metadata(pc.metadata)
    {
    }

    int index;
    bool connected;
    std::optional<SyPublisher> ioxPub;
    std::optional<IoxWaitSetGuard> ioxGuard;

    QString id;
    QString title;
    int dataTypeId;
    QVariantHash metadata;

    // utilized to reuse allocated memory when sending data, to prevent fragmentation
    ByteVector outBuffer;

    [[nodiscard]] std::string ipcChannelId() const
    {
        return "o/" + id.toStdString();
    }
};

OutputPortInfo::OutputPortInfo(const OutputPortChange &pc)
    : d(new OutputPortInfo::Private(pc))
{
}

QString OutputPortInfo::id() const
{
    return d->id;
}

int OutputPortInfo::dataTypeId() const
{
    return d->dataTypeId;
}

void OutputPortInfo::setMetadataVar(const QString &key, const QVariant &value)
{
    d->metadata[key] = value;
}

class SyntalosLink::Private
{
public:
    Private(const QString &instanceId)
        : modId(instanceId.toStdString()),
          state(ModuleState::UNKNOWN),
          maxRTPriority(0),
          syTimer(nullptr),
          shutdownPending(false)
    {
        // make a new node for this module
        node.emplace(makeIoxNode(modId));

        // interfaces
        pubError.emplace(makeTypedPublisher<ErrorEvent>(*node, svcName(ERROR_CHANNEL_ID)));
        pubState.emplace(makeTypedPublisher<StateChangeEvent>(*node, svcName(STATE_CHANNEL_ID)));
        pubStatusMsg.emplace(makeTypedPublisher<StatusMessageEvent>(*node, svcName(STATUS_MESSAGE_CHANNEL_ID)));

        pubSettingsChange.emplace(makeSlicePublisher(*node, svcName(SETTINGS_CHANGE_CHANNEL_ID)));
        pubInPortChange.emplace(makeSlicePublisher(*node, svcName(IN_PORT_CHANGE_CHANNEL_ID)));
        pubOutPortChange.emplace(makeSlicePublisher(*node, svcName(OUT_PORT_CHANGE_CHANNEL_ID)));

        srvPingPong.emplace(makeTypedServer<PingRequest, DoneResponse>(*node, svcName(PING_CALL_ID)));
        srvSetNiceness.emplace(makeTypedServer<SetNicenessRequest, DoneResponse>(*node, svcName(SET_NICENESS_CALL_ID)));
        srvSetMaxRTPriority.emplace(
            makeTypedServer<SetMaxRealtimePriority, DoneResponse>(*node, svcName(SET_MAX_RT_PRIORITY_CALL_ID)));
        srvSetCPUAffinity.emplace(
            makeTypedServer<SetCPUAffinityRequest, DoneResponse>(*node, svcName(SET_CPU_AFFINITY_CALL_ID)));
        srvConnectIPort.emplace(
            makeTypedServer<ConnectInputRequest, DoneResponse>(*node, svcName(CONNECT_INPUT_CALL_ID)));
        srvStart.emplace(makeTypedServer<StartRequest, DoneResponse>(*node, svcName(START_CALL_ID)));
        srvStop.emplace(makeTypedServer<StopRequest, DoneResponse>(*node, svcName(STOP_CALL_ID)));
        srvShutdown.emplace(makeTypedServer<ShutdownRequest, DoneResponse>(*node, svcName(SHUTDOWN_CALL_ID)));
        srvShowDisplay.emplace(makeTypedServer<ShowDisplayRequest, DoneResponse>(*node, svcName(SHOW_DISPLAY_CALL_ID)));
        srvLoadScript.emplace(makeSliceServer(*node, svcName(LOAD_SCRIPT_CALL_ID)));
        srvSetPortsPreset.emplace(makeSliceServer(*node, svcName(SET_PORTS_PRESET_CALL_ID)));
        srvUpdateIPortMetadata.emplace(makeSliceServer(*node, svcName(IN_PORT_UPDATE_METADATA_ID)));
        srvPrepareStart.emplace(makeSliceServer(*node, svcName(PREPARE_START_CALL_ID)));
        srvShowSettings.emplace(makeSliceServer(*node, svcName(SHOW_SETTINGS_CALL_ID)));

        // control event notifications
        masterCtlEventListener.emplace(makeEventListener(*node, svcName(MASTER_CTL_EVENT_ID)));
        ctlEventNotifier.emplace(makeEventNotifier(*node, svcName(WORKER_CTL_EVENT_ID)));
    }

    ~Private() = default;

    std::optional<iox2::Node<iox2::ServiceType::Ipc>> node;
    std::string modId;

    // Publishers: Module process -> Syntalos master
    std::optional<IoxPublisher<ErrorEvent>> pubError;
    std::optional<IoxPublisher<StateChangeEvent>> pubState;
    std::optional<IoxPublisher<StatusMessageEvent>> pubStatusMsg;
    std::optional<IoxSlicePublisher> pubSettingsChange;
    std::optional<IoxSlicePublisher> pubInPortChange;
    std::optional<IoxSlicePublisher> pubOutPortChange;

    // Servers: Syntalos master -> Module process commands
    std::optional<IoxServer<PingRequest, DoneResponse>> srvPingPong;
    std::optional<IoxServer<SetNicenessRequest, DoneResponse>> srvSetNiceness;
    std::optional<IoxServer<SetMaxRealtimePriority, DoneResponse>> srvSetMaxRTPriority;
    std::optional<IoxServer<SetCPUAffinityRequest, DoneResponse>> srvSetCPUAffinity;
    std::optional<IoxServer<ConnectInputRequest, DoneResponse>> srvConnectIPort;
    std::optional<IoxServer<StartRequest, DoneResponse>> srvStart;
    std::optional<IoxServer<StopRequest, DoneResponse>> srvStop;
    std::optional<IoxServer<ShutdownRequest, DoneResponse>> srvShutdown;
    std::optional<IoxUntypedServer> srvLoadScript;
    std::optional<IoxUntypedServer> srvSetPortsPreset;
    std::optional<IoxUntypedServer> srvUpdateIPortMetadata;
    std::optional<IoxUntypedServer> srvPrepareStart;

    std::optional<IoxServer<ShowDisplayRequest, DoneResponse>> srvShowDisplay;
    std::optional<IoxUntypedServer> srvShowSettings;

    // Listens for messages from the server
    std::optional<IoxListener> masterCtlEventListener;

    // Used by us to ping master if we have a message
    std::optional<IoxNotifier> ctlEventNotifier;

    // WaitSet to efficiently wait for messages from master
    std::optional<IoxWaitSet> waitSet;
    std::optional<IoxWaitSetGuard> waitSetCtrlGuard;
    bool waitSetDirty = true;

    ModuleState state;
    int maxRTPriority;
    std::vector<std::shared_ptr<InputPortInfo>> inPortInfo;
    std::vector<std::shared_ptr<OutputPortInfo>> outPortInfo;
    SyncTimer *syTimer;

    LoadScriptFn loadScriptCb;
    PrepareStartFn prepareStartCb;
    StartFn startCb;
    StopFn stopCb;
    ShutdownFn shutdownCb;
    ShowSettingsFn showSettingsCb;
    ShowDisplayFn showDisplayCb;

    bool shutdownPending; 

    [[nodiscard]] std::string svcName(const std::string &channel) const
    {
        assert(!modId.empty());
        return makeModuleServiceName(modId, channel);
    }

    void notifyMaster() const
    {
        if (!ctlEventNotifier.has_value()) [[unlikely]] {
            std::cerr << "notifyMaster: Notifier was not initialized, can not notify master!" << std::endl;
            return;
        }

        auto r = ctlEventNotifier->notify();
        if (!r.has_value())
            std::cerr << "Failed to notify master of control event:" << iox2::bb::into<const char *>(r.error())
                      << std::endl;
    }

    template<typename Req>
    static void replyDone(iox2::ActiveRequest<iox2::ServiceType::Ipc, Req, void, DoneResponse, void> &req, bool success)
    {
        auto maybeResponse = req.loan_uninit();
        if (!maybeResponse.has_value()) {
            std::cerr << "Failed to loan response for 'done' reply: "
                      << iox2::bb::into<const char *>(maybeResponse.error()) << '\n';
            return;
        }
        iox2::send(std::move(maybeResponse).value().write_payload(DoneResponse{success})).value();
    }

    using SliceActiveRequest =
        iox2::ActiveRequest<iox2::ServiceType::Ipc, iox2::bb::Slice<std::byte>, void, DoneResponse, void>;

    static void replyDoneSlice(SliceActiveRequest &req, bool success)
    {
        auto maybeResponse = req.loan_uninit();
        if (!maybeResponse.has_value()) {
            std::cerr << "Failed to loan response for 'done' reply: "
                      << iox2::bb::into<const char *>(maybeResponse.error()) << '\n';
            return;
        }
        iox2::send(std::move(maybeResponse).value().write_payload(DoneResponse{success})).value();
    }

    void rebuildWaitSet()
    {
        // MUST drop ALL WaitSet guards before destroying the WaitSet itself.
        // iceoryx2 contract: "WaitSetGuard must live at most as long as the WaitSet."
        // Dropping a guard after the WaitSet is destroyed is use-after-free on
        // the Rust side and causes EBADF errors and corrupted event state on the
        // next run.
        waitSetCtrlGuard.reset();
        for (auto &iport : inPortInfo)
            iport->d->ioxGuard.reset();
        for (auto &oport : outPortInfo)
            oport->d->ioxGuard.reset();

        // Now safe to destroy the old WaitSet
        waitSet.reset();

        // Build a fresh WaitSet and re-attach everything
        waitSet.emplace(
            iox2::WaitSetBuilder()
                .signal_handling_mode(iox2::SignalHandlingMode::HandleTerminationRequests)
                .create<iox2::ServiceType::Ipc>()
                .value());

        // Control attachment: wakes for requests from the master
        if (masterCtlEventListener.has_value())
            waitSetCtrlGuard.emplace(waitSet->attach_notification(*masterCtlEventListener).value());

        // Per-input-port attachments
        for (auto &iport : inPortInfo) {
            if (!iport->d->connected || !iport->d->ioxSub.has_value())
                continue;
            iport->d->ioxGuard.emplace(waitSet->attach_notification(*iport->d->ioxSub).value());
        }

        // Per-output-port publisher attachments
        for (auto &oport : outPortInfo) {
            if (!oport->d->ioxPub.has_value())
                continue;
            // Proactively update connections for events that queued up while WaitSet was not active
            oport->d->ioxPub->handleEvents();
            oport->d->ioxGuard.emplace(waitSet->attach_notification(*oport->d->ioxPub).value());
        }

        waitSetDirty = false;
    }

    void processPendingData(const iox2::WaitSetAttachmentId<iox2::ServiceType::Ipc> &attachmentId)
    {
        for (auto &iport : inPortInfo) {
            if (!iport->d->connected || !iport->d->ioxSub.has_value())
                continue;
            if (!iport->d->ioxGuard.has_value() || !attachmentId.has_event_from(*iport->d->ioxGuard))
                continue;

            if (iport->d->newDataCb) {
                iport->d->ioxSub->handleEvents([&](const IoxByteSlice &pl) {
                    iport->d->newDataCb(pl.data(), pl.number_of_bytes());
                });
            } else {
                // Still drain to prevent the queue filling up even if there's no callback.
                iport->d->ioxSub->handleEvents([](const IoxByteSlice &) {});
            }
        }

        // Handle output-port publisher events (SubscriberConnected / SubscriberDisconnected).
        for (auto &oport : outPortInfo) {
            if (!oport->d->ioxPub.has_value() || !oport->d->ioxGuard.has_value())
                continue;
            if (!attachmentId.has_event_from(*oport->d->ioxGuard))
                continue;
            oport->d->ioxPub->handleEvents();
        }
    }
};

SyntalosLink::SyntalosLink(const QString &instanceId, QObject *parent)
    : QObject(parent),
      d(new SyntalosLink::Private(instanceId))
{
    d->syTimer = new SyncTimer;

    // Immediately upon creation, we send a message that we are initializing.
    // A client using this interface has to set this to IDLE once it has set up the basics.
    setState(ModuleState::INITIALIZING);
}

SyntalosLink::~SyntalosLink()
{
    delete d->syTimer;
}

QString SyntalosLink::instanceId() const
{
    return QString::fromUtf8(d->modId.c_str());
}

void SyntalosLink::raiseError(const QString &title, const QString &message)
{
    auto uninit = d->pubError->loan_uninit().value();
    auto &ev = uninit.payload_mut();
    ev.title = iox2::bb::StaticString<128>::from_utf8_null_terminated_unchecked_truncated(
        title.toUtf8().constData(), title.toUtf8().size());
    ev.message = iox2::bb::StaticString<2048>::from_utf8_null_terminated_unchecked_truncated(
        message.toUtf8().constData(), message.toUtf8().size());
    iox2::send(iox2::assume_init(std::move(uninit))).value();
    d->notifyMaster();

    setState(ModuleState::ERROR);
}

void SyntalosLink::raiseError(const QString &message)
{
    auto uninit = d->pubError->loan_uninit().value();
    auto &ev = uninit.payload_mut();
    ev.title = iox2::bb::StaticString<128>();
    ev.message = iox2::bb::StaticString<2048>::from_utf8_null_terminated_unchecked_truncated(
        message.toUtf8().constData(), message.toUtf8().size());
    iox2::send(iox2::assume_init(std::move(uninit))).value();
    d->notifyMaster();

    setState(ModuleState::ERROR);
}

void SyntalosLink::processPendingControl()
{
    // Drain the master control listener to keep its socket buffer clear.
    // We *must* drain at the start to immediately consume the notification that
    // triggered this call.
    // Any new notifications arriving DURING processing are left intact,  preventing
    // the race where a notification for a freshly-queued request arrives just before
    // an end-of-function drain and gets silently discarded, stranding the request.
    drainListenerEvents(*d->masterCtlEventListener);

    // ---- Ping ----
    while (true) {
        auto req = safeReceive(*d->srvPingPong);
        if (!req.has_value())
            break;

        // just respond as fast as we can
        Private::replyDone(*req, true);
    }

    // ---- SetNiceness ----
    while (true) {
        auto req = safeReceive(*d->srvSetNiceness);
        if (!req.has_value())
            break;

        // apply niceness request immediately to current thread
        const bool ok = setCurrentThreadNiceness(req->payload().nice);
        if (!ok)
            // Rtkit may have hit its per-user concurrent-thread limit.
            // The module will continue at default priority rather than failing to start entirely.
            std::cerr << "Worker thread niceness could not be set to " << req->payload().nice
                      << " - module will run at default priority." << std::endl;
        Private::replyDone(*req, true);
    }

    // ---- SetMaxRealtimePriority ----
    while (true) {
        auto req = safeReceive(*d->srvSetMaxRTPriority);
        if (!req.has_value())
            break;
        d->maxRTPriority = req->payload().priority;
        Private::replyDone(*req, true);
    }

    // ---- SetCPUAffinity ----
    while (true) {
        auto req = safeReceive(*d->srvSetCPUAffinity);
        if (!req.has_value())
            break;
        const auto &cores = req->payload().cores;
        if (!cores.empty()) {
            std::vector<uint> coreVec;
            coreVec.reserve(cores.size());
            for (uint64_t i = 0; i < cores.size(); ++i)
                coreVec.push_back(cores.unchecked_access()[i]);
            thread_set_affinity_from_vec(pthread_self(), coreVec);
        }
        Private::replyDone(*req, true);
    }

    // ---- LoadScript ----
    while (true) {
        auto req = safeReceive(*d->srvLoadScript);
        if (!req.has_value())
            break;
        const auto pl = req->payload();
        const auto scriptReqData = LoadScriptRequest::fromMemory(pl.data(), pl.number_of_bytes());
        // reply before invoking callback so master is not blocked longer than needed
        Private::replyDoneSlice(*req, true);
        // set script
        if (d->loadScriptCb && !scriptReqData.script.isEmpty())
            d->loadScriptCb(scriptReqData.script, scriptReqData.workingDir);
    }

    // ---- SetPortsPreset ----
    // Add any ports not yet known; never remove existing ones (they may be in use).
    while (true) {
        auto req = safeReceive(*d->srvSetPortsPreset);
        if (!req.has_value())
            break;
        const auto pl = req->payload();
        const auto sppReq = SetPortsPresetRequest::fromMemory(pl.data(), pl.number_of_bytes());

        // We must not remove existing ports, as they might be in use. So instead, we
        // add & merge port data.
        for (const auto &ipc : sppReq.inPorts) {
            bool skip = false;
            for (const auto &ip : d->inPortInfo) {
                if (ip->id() == ipc.id)
                    skip = true;
            }
            if (skip)
                continue;
            d->inPortInfo.push_back(std::shared_ptr<InputPortInfo>(new InputPortInfo(ipc)));

            // we will have to rebuild the waitset after ports changed
            d->waitSetDirty = true;
        }

        for (const auto &opc : sppReq.outPorts) {
            std::shared_ptr<OutputPortInfo> oport;
            bool update = false;
            for (const auto &op : d->outPortInfo) {
                if (op->id() != opc.id)
                    continue;
                oport = op;
                update = true;
                break;
            }
            if (!oport)
                oport = std::shared_ptr<OutputPortInfo>(new OutputPortInfo(opc));

            // Detach old publisher from WaitSet before replacement.
            oport->d->ioxGuard.reset();
            oport->d->ioxPub.reset(); // drop the old connection first, before trying to create a new one
            oport->d->ioxPub.emplace(SyPublisher::create(*d->node, d->modId, oport->d->ipcChannelId(), opc.topology));
            if (!update)
                d->outPortInfo.push_back(oport);

            // we will have to rebuild the waitset after ports changed
            d->waitSetDirty = true;
        }

        Private::replyDoneSlice(*req, true);
    }

    // ---- ConnectInputPort ----
    while (true) {
        auto req = safeReceive(*d->srvConnectIPort);
        if (!req.has_value())
            break;
        const auto &r = req->payload();
        const auto portId = QString::fromUtf8(r.portId.unchecked_access().c_str());

        // find the port
        auto it = std::find_if(d->inPortInfo.begin(), d->inPortInfo.end(), [&](const auto &ip) {
            return ip->id() == portId;
        });
        if (it == d->inPortInfo.end()) {
            // return error if the port was not registered
            Private::replyDone(*req, false);
            continue;
        }
        auto &iport = *it;

        // connect the port
        iport->d->connected = true;

        // MUST reset the WaitSet guard BEFORE replacing the old subscriber
        iport->d->ioxGuard.reset();
        iport->d->ioxSub.reset(); // drop the old connection first, before trying to create a new one
        iport->d->ioxSub.emplace(
            SySubscriber::create(
                *d->node,
                std::string(r.instanceId.unchecked_access().c_str()),
                std::string(r.channelId.unchecked_access().c_str()),
                r.topology));

        Private::replyDone(*req, true);

        // we will have to rebuild the waitset after ports changed
        d->waitSetDirty = true;
    }

    // ---- UpdateInputPortMetadata ----
    while (true) {
        auto req = safeReceive(*d->srvUpdateIPortMetadata);
        if (!req.has_value())
            break;
        const auto pl = req->payload();
        const auto reqUpdateMD = UpdateInputPortMetadataRequest::fromMemory(pl.data(), pl.number_of_bytes());

        // update metadata
        for (const auto &ip : d->inPortInfo) {
            if (ip->id() == reqUpdateMD.id) {
                ip->d->metadata = reqUpdateMD.metadata;
                break;
            }
        }

        Private::replyDoneSlice(*req, true);
    }

    // ---- PrepareStart ----
    while (true) {
        auto req = safeReceive(*d->srvPrepareStart);
        if (!req.has_value())
            break;
        const auto pl = req->payload();
        const auto prepReq = PrepareStartRequest::fromMemory(pl.data(), pl.number_of_bytes());
        const auto prepareSettings = prepReq.settings;
        Private::replyDoneSlice(*req, true); // reply before callback so master is not blocked
        if (d->prepareStartCb)
            d->prepareStartCb(prepareSettings);
    }

    // ---- Start ----
    while (true) {
        auto req = safeReceive(*d->srvStart);
        if (!req.has_value())
            break;
        // NOTE: We reply immediately here and defer processing of the call,
        // so the master will not wait for us. Errors are reported exclusively
        // via the error channel.

        const auto timePoint = symaster_timepoint(microseconds_t(req->payload().startTimestampUsec));
        delete d->syTimer;
        d->syTimer = new SyncTimer;
        d->syTimer->startAt(timePoint);

        // Ensure all output-port publishers know about subscribers that connected
        // during the prepare phase.
        for (auto &oport : d->outPortInfo) {
            if (oport->d->ioxPub.has_value())
                oport->d->ioxPub->handleEvents();
        }

        Private::replyDone(*req, true);
        if (d->startCb)
            d->startCb();
    }

    // ---- Stop ----
    while (true) {
        auto req = safeReceive(*d->srvStop);
        if (!req.has_value())
            break;
        // we wait for the stop callback to finish before responding
        if (d->stopCb)
            d->stopCb();
        Private::replyDone(*req, true);

        // After a stop, drop all input-port subscribers so upstream publishers
        // immediately stop sending notifications to us. This prevents their event
        // socket buffers from filling up while we are in IDLE state in case for
        // whatever reason the master still tries to send something.
        for (auto &iport : d->inPortInfo) {
            iport->d->ioxGuard.reset();
            iport->d->ioxSub.reset();
            iport->d->connected = false;
        }
        if (!d->inPortInfo.empty())
            d->waitSetDirty = true;
    }

    // ---- Shutdown ----
    while (true) {
        auto req = safeReceive(*d->srvShutdown);
        if (!req.has_value())
            break;
        // NOTE: We reply immediately here and defer processing of the call,
        // because otherwise the master would never get a response if we
        // tear down the process too quickly.
        Private::replyDone(*req, true);

        // execute shutdown action after replying to master
        // if no callback is defined, we just exit()
        if (d->shutdownCb)
            d->shutdownCb();
        d->shutdownPending = true;
        break;
    }

    // ---- ShowDisplay ----
    while (true) {
        auto req = safeReceive(*d->srvShowDisplay);
        if (!req.has_value())
            break;
        Private::replyDone(*req, true);
        if (d->showDisplayCb)
            d->showDisplayCb();
    }

    // ---- ShowSettings ----
    while (true) {
        auto req = safeReceive(*d->srvShowSettings);
        if (!req.has_value())
            break;
        const auto pl = req->payload();
        const auto showReq = ShowSettingsRequest::fromMemory(pl.data(), pl.number_of_bytes());
        const QByteArray settingsData = showReq.settings;
        Private::replyDoneSlice(*req, true);
        if (d->showSettingsCb)
            d->showSettingsCb(settingsData);
    }
}

void SyntalosLink::awaitData(int timeoutUsec, const std::function<void()> &eventFn)
{
    // If a shutdown request was processed, we should die ASAP and this function
    // should no longer run - otherwise we may block forever and get killed
    if (d->shutdownPending)
        return;

    if (d->waitSetDirty)
        d->rebuildWaitSet();

    auto onEvent =
        [this](const iox2::WaitSetAttachmentId<iox2::ServiceType::Ipc> &attachmentId) -> iox2::CallbackProgression {
        // handle control messages
        if (attachmentId.has_event_from(*d->waitSetCtrlGuard)) {
            processPendingControl();
            if (d->waitSetDirty)
                return iox2::CallbackProgression::Stop;
        } else {
            // handle incoming data
            d->processPendingData(attachmentId);
        }

        return d->shutdownPending ? iox2::CallbackProgression::Stop : iox2::CallbackProgression::Continue;
    };

    // Helper: inspect the WaitSet run-result and trigger a clean shutdown when
    // IOX reports that SIGTERM/SIGINT was received
    auto handleRunResult = [this](const iox2::bb::Expected<iox2::WaitSetRunResult, iox2::WaitSetRunError> &res) {
        if (!res.has_value())
            return;
        const auto r = res.value();
        if (r == iox2::WaitSetRunResult::Interrupt || r == iox2::WaitSetRunResult::TerminationRequest)
            d->shutdownPending = true;
    };

    if (timeoutUsec < 0) {
        do {
            // Rebuild the WaitSet if ports were connected/disconnected since the last iteration
            if (d->waitSetDirty)
                d->rebuildWaitSet();

            // we do not use wait() here as some functionality depends on the Qt/GLib event loop, and especially
            // for Python users it can be a bit jarring if that is not available. So we will occasionally
            // process events here.
            handleRunResult(
                d->waitSet->wait_and_process_once_with_timeout(onEvent, iox2::bb::Duration::from_millis(250)));
            if (eventFn)
                eventFn();

            // exit if we are supposed to shutdown
            if (d->shutdownPending)
                break;
        } while (d->state == ModuleState::RUNNING);
    } else {
        handleRunResult(
            d->waitSet->wait_and_process_once_with_timeout(onEvent, iox2::bb::Duration::from_micros(timeoutUsec)));
        if (eventFn)
            eventFn();
    }
}

void SyntalosLink::awaitDataForever(const std::function<void()> &eventFn, int intervalUsec)
{
    if (d->waitSetDirty)
        d->rebuildWaitSet();

    auto onEvent =
        [this](const iox2::WaitSetAttachmentId<iox2::ServiceType::Ipc> &attachmentId) -> iox2::CallbackProgression {
        if (attachmentId.has_event_from(*d->waitSetCtrlGuard)) {
            processPendingControl();
            if (d->waitSetDirty)
                return iox2::CallbackProgression::Stop;
        } else {
            d->processPendingData(attachmentId);
        }
        return d->shutdownPending ? iox2::CallbackProgression::Stop : iox2::CallbackProgression::Continue;
    };

    while (true) {
        // Rebuild the WaitSet if ports were connected/disconnected since the last iteration
        // (processPendingControl() sets waitSetDirty when that happens).
        if (d->waitSetDirty)
            d->rebuildWaitSet();

        const auto res = d->waitSet->wait_and_process_once_with_timeout(
            onEvent, iox2::bb::Duration::from_micros(intervalUsec));
        if (!res.has_value()) {
            qDebug().noquote() << "Event loop terminated unexpectedly:" << iox2::bb::into<const char *>(res.error());
            return;
        }
        // Treat SIGTERM/SIGINT as a shutdown request
        const auto r = res.value();
        if (r == iox2::WaitSetRunResult::Interrupt || r == iox2::WaitSetRunResult::TerminationRequest)
            d->shutdownPending = true;

        // call external event function
        if (eventFn)
            eventFn();

        // exit if we are about to shutdown
        if (d->shutdownPending)
            break;
    }
}

ModuleState SyntalosLink::state() const
{
    return d->state;
}

bool SyntalosLink::isShutdownPending() const
{
    return d->shutdownPending;
}

void SyntalosLink::setState(ModuleState state)
{
    auto uninit = d->pubState->loan_uninit().value();
    uninit.payload_mut().state = state;
    iox2::send(iox2::assume_init(std::move(uninit))).value();
    d->notifyMaster();

    d->state = state;
}

void SyntalosLink::setStatusMessage(const QString &message)
{
    auto uninit = d->pubStatusMsg->loan_uninit().value();
    uninit.payload_mut().text = iox2::bb::StaticString<512>::from_utf8_null_terminated_unchecked_truncated(
        message.toUtf8().constData(), message.toUtf8().size());
    iox2::send(iox2::assume_init(std::move(uninit))).value();
    d->notifyMaster();
}

int SyntalosLink::maxRealtimePriority() const
{
    return d->maxRTPriority;
}

void SyntalosLink::setLoadScriptCallback(LoadScriptFn callback)
{
    d->loadScriptCb = std::move(callback);
}

void SyntalosLink::setPrepareStartCallback(PrepareStartFn callback)
{
    d->prepareStartCb = std::move(callback);
}

void SyntalosLink::setStartCallback(StartFn callback)
{
    d->startCb = std::move(callback);
}

void SyntalosLink::setStopCallback(StopFn callback)
{
    d->stopCb = std::move(callback);
}

void SyntalosLink::setShutdownCallback(ShutdownFn callback)
{
    d->shutdownCb = std::move(callback);
}

SyncTimer *SyntalosLink::timer() const
{
    return d->syTimer;
}

void SyntalosLink::setSettingsData(const QByteArray &data)
{
    // we copy twice here - but this is a low-volume event, so it should be fine
    const auto scEvData = SettingsChangeEvent(data).toBytes();
    auto uninit = d->pubSettingsChange->loan_slice_uninit(static_cast<uint64_t>(scEvData.size())).value();
    auto initialized = uninit.write_from_fn([&](uint64_t i) {
        return static_cast<std::byte>(scEvData[static_cast<int>(i)]);
    });
    iox2::send(std::move(initialized)).value();
    d->notifyMaster();
}

void SyntalosLink::setShowSettingsCallback(ShowSettingsFn callback)
{
    d->showSettingsCb = std::move(callback);
}

void SyntalosLink::setShowDisplayCallback(ShowDisplayFn callback)
{
    d->showDisplayCb = std::move(callback);
}

std::vector<std::shared_ptr<InputPortInfo>> SyntalosLink::inputPorts() const
{
    return d->inPortInfo;
}

std::vector<std::shared_ptr<OutputPortInfo>> SyntalosLink::outputPorts() const
{
    return d->outPortInfo;
}

std::shared_ptr<InputPortInfo> SyntalosLink::registerInputPort(
    const QString &id,
    const QString &title,
    const QString &dataTypeName)
{
    // construct our reference for this port
    InputPortChange ipc(PortAction::ADD);
    ipc.id = id;
    ipc.title = title;
    ipc.dataTypeId = BaseDataType::typeIdFromString(dataTypeName.toStdString());

    // check for duplicates
    for (const auto &ip : d->inPortInfo) {
        if (ip->id() == ipc.id)
            return nullptr;
    }

    const auto iportData = ipc.toBytes();

    // announce the new port to master
    auto uninit = d->pubInPortChange->loan_slice_uninit(static_cast<uint64_t>(iportData.size())).value();
    iox2::send(uninit.write_from_fn([&](uint64_t i) {
        return static_cast<std::byte>(iportData[static_cast<int>(i)]);
    })).value();
    d->notifyMaster();

    // we need to rebuild the waitset
    d->waitSetDirty = true;

    // construct proxy
    auto iport = std::shared_ptr<InputPortInfo>(new InputPortInfo(ipc));
    d->inPortInfo.push_back(iport);
    return iport;
}

std::shared_ptr<OutputPortInfo> SyntalosLink::registerOutputPort(
    const QString &id,
    const QString &title,
    const QString &dataTypeName,
    const QVariantHash &metadata)
{
    // construct our reference for this port
    OutputPortChange opc(PortAction::ADD);
    opc.id = id;
    opc.title = title;
    opc.dataTypeId = BaseDataType::typeIdFromString(dataTypeName.toStdString());
    opc.metadata = metadata;

    // check for duplicates
    for (const auto &op : d->outPortInfo) {
        if (op->id() == opc.id)
            return nullptr;
    }

    const auto oportData = opc.toBytes();

    // announce the new port to master
    auto uninit = d->pubOutPortChange->loan_slice_uninit(static_cast<uint64_t>(oportData.size())).value();
    iox2::send(uninit.write_from_fn([&](uint64_t i) {
        return static_cast<std::byte>(oportData[static_cast<int>(i)]);
    })).value();
    d->notifyMaster();

    // we need to rebuild the waitset
    d->waitSetDirty = true;

    // construct proxy
    auto oport = std::shared_ptr<OutputPortInfo>(new OutputPortInfo(opc));
    oport->d->ioxPub.reset();
    oport->d->ioxPub.emplace(SyPublisher::create(*d->node, d->modId, oport->d->ipcChannelId(), opc.topology));
    d->outPortInfo.push_back(oport);
    return oport;
}

void SyntalosLink::updateOutputPort(const std::shared_ptr<OutputPortInfo> &oport)
{
    OutputPortChange opc(PortAction::CHANGE);
    opc.id = oport->id();
    opc.title = oport->d->title;
    opc.dataTypeId = oport->dataTypeId();
    opc.metadata = oport->d->metadata;

    const auto oportData = opc.toBytes();
    auto uninit = d->pubOutPortChange->loan_slice_uninit(static_cast<uint64_t>(oportData.size())).value();
    // we copy twice here - but this is a low-volume event, so it should be fine
    iox2::send(uninit.write_from_fn([&](uint64_t i) {
        return static_cast<std::byte>(oportData[static_cast<int>(i)]);
    })).value();
    d->notifyMaster();
}

void SyntalosLink::updateInputPort(const std::shared_ptr<InputPortInfo> &iport)
{
    InputPortChange ipc(PortAction::CHANGE);
    ipc.id = iport->id();
    ipc.title = iport->d->title;
    ipc.dataTypeId = iport->d->dataTypeId;
    ipc.metadata = iport->d->metadata;
    ipc.throttleItemsPerSec = iport->d->throttleItemsPerSec;

    const auto iportData = ipc.toBytes();
    auto uninit = d->pubInPortChange->loan_slice_uninit(static_cast<uint64_t>(iportData.size())).value();
    // we copy twice here - but this is a low-volume event, so it should be fine
    iox2::send(uninit.write_from_fn([&](uint64_t i) {
        return static_cast<std::byte>(iportData[static_cast<int>(i)]);
    })).value();
    d->notifyMaster();
}

bool SyntalosLink::submitOutput(const std::shared_ptr<OutputPortInfo> &oport, const BaseDataType &data)
{
    if (!oport->d->ioxPub.has_value()) {
        raiseError(
            QStringLiteral("Failed to send data on output port '%1': Publisher was not initialized!").arg(oport->id()));
        return false;
    }
    auto &pub = *oport->d->ioxPub;

    try {
        const auto memSize = data.memorySize();
        if (memSize < 0) {
            // we do not know the required memory size in advance, so we need to
            // perform a serialization and extra copy operation
            data.toBytes(oport->d->outBuffer);
            auto slice = pub.loanSlice(static_cast<size_t>(oport->d->outBuffer.size()));
            std::memcpy(
                slice.payload_mut().data(),
                oport->d->outBuffer.data(),
                static_cast<size_t>(oport->d->outBuffer.size()));
            pub.sendSlice(std::move(slice));
        } else {
            // Higher efficiency code-path since the size is known in advance
            auto loan = pub.loanSlice(static_cast<size_t>(memSize));
            if (!data.writeToMemory(loan.payload_mut().data(), static_cast<ssize_t>(memSize))) {
                raiseError(QStringLiteral("Failed to serialize data for output port '%1'.").arg(oport->id()));
                return false;
            }
            pub.sendSlice(std::move(loan));
        }
    } catch (std::exception &e) {
        raiseError(QStringLiteral("Failed to send data on output port '%1': %2")
                       .arg(oport->id(), QString::fromUtf8(e.what())));
        return false;
    }

    return true;
}

} // namespace Syntalos

Updated on 2026-03-30 at 00:43:15 +0000