fabric/mlinkmodule.cpp

fabric/mlinkmodule.cpp

fabric/mlinkmodule.cpp

Namespaces

Name
Syntalos

Source code

/*
 * Copyright (C) 2016-2026 Matthias Klumpp <matthias@tenstral.net>
 *
 * Licensed under the GNU Lesser General Public License Version 3
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the license, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "mlinkmodule.h"

#include "config.h"
#include <QProcess>
#include <QTimer>
#include <QElapsedTimer>
#include <QCoreApplication>
#include <iox2/iceoryx2.hpp>

#include "mlink/ipc-types-private.h"
#include "mlink/ipc-iox-private.h"
#include "globalconfig.h"
#include "utils/misc.h"

namespace Syntalos
{
Q_LOGGING_CATEGORY(logMLinkMod, "mlink-master")
}

using namespace Syntalos::ipc;

class MLinkModule::Private
{
public:
    Private() {}

    ~Private() {}

    QProcess *proc = nullptr;
    bool outputCaptured = false;
    QString pyVenvDir;
    QString scriptWDir;
    QString scriptContent;
    QString scriptFname;
    QDateTime scriptLastModified;
    QHash<QString, QVariantHash> sentMetadata;

    QByteArray settingsData;

    bool portChangesAllowed = true;
    QHash<QString, std::shared_ptr<VarStreamInputPort>> inPortIdMap;
    QHash<QString, std::shared_ptr<VariantDataStream>> outPortIdMap;

    std::string clientId;
    std::optional<iox2::Node<iox2::ServiceType::Ipc>> node;

    // Subscribers to receive information from module processes
    std::optional<IoxSubscriber<ErrorEvent>> subError;
    std::optional<IoxSubscriber<StateChangeEvent>> subStateChange;
    std::optional<IoxSliceSubscriber> subInPortChange;
    std::optional<IoxSliceSubscriber> subOutPortChange;
    std::optional<IoxSliceSubscriber> subSettingsChange;

    // Output port forwarders
    struct OutPortSub {
        std::optional<SySubscriber> sub;
        std::shared_ptr<StreamOutputPort> oport;
        std::optional<IoxWaitSetGuard> guard;
    };
    std::vector<OutPortSub> outPortSubs;

    // Listener to react to worker control events, notifier
    // to notify the worker if we send control events.
    std::optional<IoxListener> workerCtlEventListener;
    std::optional<IoxNotifier> ctlEventNotifier;
    QTimer *ctlEventTimer = nullptr;

    [[nodiscard]] std::string svcName(const std::string &channel) const
    {
        assert(!clientId.empty());
        return makeModuleServiceName(clientId, channel);
    }

    void notifyClient() const
    {
        if (!ctlEventNotifier.has_value()) [[unlikely]] {
            qCCritical(logMLinkMod) << "notifyWorker: Notifier was not initialized, can not notify client!";
            return;
        }

        auto r = ctlEventNotifier->notify();
        if (!r.has_value())
            qCWarning(logMLinkMod) << "Failed to notify worker of control event:"
                                   << iox2::bb::into<const char *>(r.error());
    }

    void checkClientError(MLinkModule *self)
    {
        if (!subError.has_value())
            return;

        while (true) {
            auto sample = subError->receive().value();
            if (!sample.has_value())
                break;
            const auto &ev = sample->payload();
            const auto title = QString::fromUtf8(ev.title.unchecked_access().c_str());
            const auto msg = QString::fromUtf8(ev.message.unchecked_access().c_str());
            if (title.isEmpty())
                self->raiseError(msg);
            else
                self->raiseError(QStringLiteral("<html><b>%1</b><br/>%2").arg(title, msg));
        }
    }

    void checkClientStateChange(MLinkModule *self)
    {
        if (!subStateChange.has_value())
            return;

        while (true) {
            auto sample = subStateChange->receive().value();
            if (!sample.has_value())
                break;
            const auto newState = sample->payload().state;

            // the error state must only be set by raiseError(), never directly
            if (newState == ModuleState::ERROR)
                continue;

            // only some states are allowed to be set by the module
            if (newState == ModuleState::DORMANT || newState == ModuleState::READY
                || newState == ModuleState::INITIALIZING || newState == ModuleState::IDLE)
                self->setState(newState);
        }
    }

    template<typename Req, typename Func>
    bool callClientSimple(MLinkModule *self, const std::string &channel, Func fillReqFn, int timeoutSec = 8)
    {
        if (!node.has_value()) {
            qCCritical(logMLinkMod).noquote()
                << "callClientSimple: IOX node not initialized, failing call on channel:" << channel;
            return false;
        }

        auto client = makeTypedClient<Req, DoneResponse>(*node, svcName(channel));

        auto maybeReq = client.loan_uninit();
        if (!maybeReq.has_value()) {
            self->raiseError(
                QStringLiteral("Failed to loan shared memory for request on channel '%1': %2")
                    .arg(qstr(channel), QString::fromUtf8(iox2::bb::into<const char *>(maybeReq.error()))));
            return false;
        }
        auto pendingReq = std::move(maybeReq).value();

        fillReqFn(pendingReq.payload_mut());
        auto pending = iox2::send(iox2::assume_init(std::move(pendingReq))).value();
        notifyClient();

        QElapsedTimer timer;
        timer.start();
        while (true) {
            checkClientError(self);
            qApp->processEvents();
            auto response = pending.receive().value();
            if (response.has_value())
                return response->payload().success;

            // quit immediately if an error was already emitted
            if (self->state() == ModuleState::ERROR)
                return false;

            if (timer.elapsed() > timeoutSec * 1000) {
                self->raiseError(QStringLiteral("Timeout while waiting for response on: %1").arg(qstr(channel)));
                return false;
            }

            std::this_thread::sleep_for(microseconds_t(25));
        }
    }

    template<typename ReqData>
    bool callSliceClientSimple(
        MLinkModule *self,
        const std::string &channel,
        const ReqData &reqEntity,
        int timeoutSec = 8)
    {
        if (!node.has_value()) {
            qCCritical(logMLinkMod).noquote()
                << "callClientSimple: IOX node not initialized, failing call on channel:" << channel;
            return false;
        }

        auto client = makeSliceClient(*node, svcName(channel));

        const auto bytes = reqEntity.toBytes();
        auto maybeSlice = client.loan_slice_uninit(static_cast<uint64_t>(bytes.size()));
        if (!maybeSlice.has_value()) {
            self->raiseError(QStringLiteral("Failed to loan shared memory for request on '%1': %2")
                                 .arg(qstr(channel), iox2::bb::into<const char *>(maybeSlice.error())));
            return false;
        }
        auto rawSlice = std::move(maybeSlice).value();
        std::memmove(rawSlice.payload_mut().data(), bytes.data(), bytes.size());
        auto pending = iox2::send(iox2::assume_init(std::move(rawSlice))).value();
        notifyClient();

        QElapsedTimer timer;
        timer.start();
        while (true) {
            checkClientError(self);
            qApp->processEvents();
            auto response = pending.receive().value();
            if (response.has_value())
                return response->payload().success;

            // quit immediately if an error was already emitted
            if (self->state() == ModuleState::ERROR)
                return false;

            if (timer.elapsed() > timeoutSec * 1000) {
                self->raiseError(QStringLiteral("Timeout while waiting for response on: %1").arg(qstr(channel)));
                return false;
            }
            std::this_thread::sleep_for(microseconds_t(25));
        }
    }
};

MLinkModule::MLinkModule(QObject *parent)
    : AbstractModule(parent),
      d(new MLinkModule::Private)
{
    d->proc = new QProcess(this);
    d->portChangesAllowed = true;
    resetConnection();

    // merge stdout/stderr of external process with ours by default
    setOutputCaptured(false);

    connect(d->proc, &QProcess::readyReadStandardOutput, this, [this]() {
        if (d->outputCaptured)
            Q_EMIT processOutputReceived(readProcessOutput());
    });
    connect(
        d->proc,
        static_cast<void (QProcess::*)(int, QProcess::ExitStatus)>(&QProcess::finished),
        this,
        [this](int exitCode, QProcess::ExitStatus exitStatus) {
            if (exitStatus == QProcess::CrashExit) {
                raiseError(QStringLiteral("Module process crashed with exit code %1! Check the log for details.")
                               .arg(exitCode));
            }
        });

    d->ctlEventTimer = new QTimer(this);
    d->ctlEventTimer->setInterval(100);
    connect(d->ctlEventTimer, &QTimer::timeout, this, [this]() {
        handleIncomingControl();
    });
}

MLinkModule::~MLinkModule()
{
    d->ctlEventTimer->stop();
    terminateProcess();
}

void MLinkModule::handleIncomingControl()
{
    // Drain the control event listener to keep its socket buffer clear.
    // We *must* drain at the start to immediately consume the notification that triggered this call,
    // and to prevent race conditions with new events arriving while we process the previous one.
    if (d->workerCtlEventListener.has_value())
        drainListenerEvents(*d->workerCtlEventListener);

    // Error events
    d->checkClientError(this);

    // State changes
    d->checkClientStateChange(this);

    // Input port change events
    if (d->subInPortChange.has_value()) {
        while (true) {
            auto sample = d->subInPortChange->receive().value();
            if (!sample.has_value())
                break;

            // deserialize
            const auto pl = sample->payload();
            const auto ipc = InputPortChange::fromMemory(pl.data(), pl.number_of_bytes());
            const auto action = ipc.action;
            if (!d->portChangesAllowed) {
                qCDebug(logMLinkMod).noquote() << "Input port change request ignored: No changes are allowed.";
                continue;
            }
            if (action == PortAction::ADD) {
                // only register a new input port if we don't have one already
                auto iport = inPortById(ipc.id);
                if (iport && iport->dataTypeId() != ipc.dataTypeId) {
                    removeInPortById(ipc.id);
                    iport = nullptr;
                }
                if (!iport)
                    iport = registerInputPortByTypeId(ipc.dataTypeId, ipc.id, ipc.title);
                d->inPortIdMap.insert(ipc.id, iport);
            } else if (action == PortAction::REMOVE) {
                removeInPortById(ipc.id);
                d->inPortIdMap.remove(ipc.id);
            }
        }
    }

    // Output port change events
    if (d->subOutPortChange.has_value()) {
        while (true) {
            auto sample = d->subOutPortChange->receive().value();
            if (!sample.has_value())
                break;

            // deserialize
            const auto pl = sample->payload();
            const auto opc = OutputPortChange::fromMemory(pl.data(), pl.number_of_bytes());
            const auto action = opc.action;
            if (action == PortAction::ADD) {
                if (!d->portChangesAllowed) {
                    qCDebug(logMLinkMod).noquote() << "Output port addition ignored: No changes are allowed.";
                    continue;
                }

                // only register a new output port if we don't have one with that ID already
                auto oport = outPortById(opc.id);
                std::shared_ptr<VariantDataStream> ostream;
                if (oport) {
                    if (oport->dataTypeId() != opc.dataTypeId) {
                        removeOutPortById(opc.id);
                        oport = nullptr;
                    } else {
                        ostream = oport->streamVar();
                    }
                }
                if (!ostream)
                    ostream = registerOutputPortByTypeId(opc.dataTypeId, opc.id, opc.title);
                ostream->setMetadata(opc.metadata);
                d->outPortIdMap.insert(opc.id, ostream);
            } else if (action == PortAction::REMOVE) {
                if (!d->portChangesAllowed) {
                    qCDebug(logMLinkMod).noquote() << "Output port removal ignored: No changes are allowed.";
                    continue;
                }
                removeOutPortById(opc.id);
                d->outPortIdMap.remove(opc.id);
            } else if (action == PortAction::CHANGE) {
                std::shared_ptr<VariantDataStream> ostream;
                if (d->outPortIdMap.contains(opc.id))
                    ostream = d->outPortIdMap.value(opc.id);
                else if (auto oport = outPortById(opc.id))
                    ostream = oport->streamVar();
                if (ostream)
                    ostream->setMetadata(opc.metadata);
            }
        }
    }

    // Settings change events
    if (d->subSettingsChange.has_value()) {
        while (true) {
            auto sample = d->subSettingsChange->receive().value();
            if (!sample.has_value())
                break;
            const auto pl = sample->payload();
            const auto scev = SettingsChangeEvent::fromMemory(pl.data(), pl.number_of_bytes());
            setSettingsData(scev.settings);
        }
    }
}

void MLinkModule::resetConnection()
{
    d->clientId = QStringLiteral("%1_%2").arg(id()).arg(index()).toStdString();

    // create a fresh node for this module connection
    d->node.emplace(
        iox2::NodeBuilder()
            .name(iox2::NodeName::create(("syntalos-master-" + d->clientId).c_str()).value())
            .create<iox2::ServiceType::Ipc>()
            .value());

    // (re)create subscribers for client -> master data channels
    d->subError.emplace(makeTypedSubscriber<ErrorEvent>(*d->node, d->svcName(ERROR_CHANNEL_ID)));
    d->subStateChange.emplace(makeTypedSubscriber<StateChangeEvent>(*d->node, d->svcName(STATE_CHANNEL_ID)));
    d->subInPortChange.emplace(makeSliceSubscriber(*d->node, d->svcName(IN_PORT_CHANGE_CHANNEL_ID)));
    d->subOutPortChange.emplace(makeSliceSubscriber(*d->node, d->svcName(OUT_PORT_CHANGE_CHANNEL_ID)));
    d->subSettingsChange.emplace(makeSliceSubscriber(*d->node, d->svcName(SETTINGS_CHANGE_CHANNEL_ID)));

    // control listener: Called when the client publishes a control command
    d->workerCtlEventListener.emplace(ipc::makeEventListener(*d->node, d->svcName(WORKER_CTL_EVENT_ID)));
    // control notifier: We use this to wake up the client when we made a request
    d->ctlEventNotifier.emplace(ipc::makeEventNotifier(*d->node, d->svcName(MASTER_CTL_EVENT_ID)));
}

ModuleDriverKind MLinkModule::driver() const
{
    return ModuleDriverKind::THREAD_DEDICATED;
}

ModuleFeatures MLinkModule::features() const
{
    return ModuleFeature::SHOW_DISPLAY | ModuleFeature::SHOW_SETTINGS;
}

QString MLinkModule::moduleBinary() const
{
    return d->proc->program();
}

void MLinkModule::setModuleBinary(const QString &binaryPath)
{
    d->proc->setArguments(QStringList());
    d->proc->setProgram(binaryPath);
}

void MLinkModule::setModuleBinaryArgs(const QStringList &args)
{
    d->proc->setArguments(args);
}

QProcessEnvironment MLinkModule::moduleBinaryEnv() const
{
    const auto env = d->proc->processEnvironment();
    if (env.isEmpty())
        return QProcessEnvironment::systemEnvironment();
    return env;
}

void MLinkModule::setModuleBinaryEnv(const QProcessEnvironment &env)
{
    d->proc->setProcessEnvironment(env);
}

bool MLinkModule::outputCaptured() const
{
    return d->outputCaptured;
}

void MLinkModule::setOutputCaptured(bool capture)
{
    d->outputCaptured = capture;
    if (d->outputCaptured)
        d->proc->setProcessChannelMode(QProcess::MergedChannels);
    else
        d->proc->setProcessChannelMode(QProcess::ForwardedChannels);
}

void MLinkModule::setPythonVirtualEnv(const QString &venvDir)
{
    d->pyVenvDir = venvDir;
}

void MLinkModule::setScript(const QString &script, const QString &wdir)
{
    d->scriptWDir = wdir;
    d->scriptContent = script;
}

bool MLinkModule::setScriptFromFile(const QString &fname, const QString &wdir)
{
    QFile f(fname);
    if (!f.open(QFile::ReadOnly | QFile::Text))
        return false;

    QTextStream in(&f);
    setScript(in.readAll(), wdir);

    d->scriptFname = fname;
    QFileInfo fi(fname);
    d->scriptLastModified = fi.lastModified();

    return true;
}

bool MLinkModule::isScriptModified() const
{
    if (d->scriptFname.isEmpty())
        return false;

    QFileInfo fi(d->scriptFname);
    return d->scriptLastModified != fi.lastModified();
}

QByteArray MLinkModule::settingsData() const
{
    return d->settingsData;
}

void MLinkModule::setSettingsData(const QByteArray &data)
{
    d->settingsData = data;
}

void MLinkModule::showDisplayUi()
{
    if (!d->callClientSimple<ShowDisplayRequest>(this, SHOW_DISPLAY_CALL_ID, [](auto &) {}))
        qCWarning(logMLinkMod).noquote() << "Show display request failed!";
}

void MLinkModule::showSettingsUi()
{
    ShowSettingsRequest ssReq;
    ssReq.settings = d->settingsData;

    if (!d->callSliceClientSimple(this, SHOW_SETTINGS_CALL_ID, ssReq))
        qCWarning(logMLinkMod).noquote() << "Request to show settings UI has failed!";
}

void MLinkModule::terminateProcess()
{
    if (!isProcessRunning())
        return;

    // request the module process to terminate itself
    d->callClientSimple<ShutdownRequest>(this, SHUTDOWN_CALL_ID, [](auto &) {});

    // give the process some time to terminate
    d->proc->waitForFinished(5000);

    // ask nicely
    if (d->proc->state() == QProcess::Running) {
        qCDebug(logMLinkMod).noquote() << "Module process" << d->proc->program()
                                       << "did not terminate on request. Sending SIGTERM.";
        d->proc->terminate();
        d->proc->waitForFinished(5000);
    }

    // no response? kill it!
    if (d->proc->state() == QProcess::Running) {
        qCWarning(logMLinkMod).noquote() << "Module process" << d->proc->program() << "failed to quit. Killing it.";
        d->proc->kill();
        d->proc->waitForFinished(5000);
    }

    // drain any now-stale events
    drainListenerEvents(*d->workerCtlEventListener);
}

bool MLinkModule::runProcess()
{
    // ensure any existing process does not exist
    terminateProcess();

    if (d->proc->program().isEmpty()) {
        qCWarning(logMLinkMod).noquote() << "MLink module has not set a worker binary";
        return false;
    }

    // reset connection, just in case we changed our ID
    resetConnection();

    auto penv = moduleBinaryEnv();
    penv.insert("SYNTALOS_VERSION", syntalosVersionFull());
    penv.insert("SYNTALOS_MODULE_ID", d->clientId.c_str());
    if (!d->pyVenvDir.isEmpty()) {
        penv.remove("PYTHONHOME");
        penv.insert("VIRTUAL_ENV", d->pyVenvDir);
        penv.insert("PATH", QStringLiteral("%1/bin/:%2").arg(d->pyVenvDir, penv.value("PATH", "")));
    }

    // when launching the external process, we are back at initialization
    auto prevState = state();
    setState(ModuleState::INITIALIZING);

    d->proc->setProcessEnvironment(penv);
    d->proc->start(d->proc->program(), d->proc->arguments());
    if (!d->proc->waitForStarted())
        return false;

    // wait for the service to show up & initialize
    bool workerFound = false;
    bool moduleInitDone = false;
    QElapsedTimer timer;
    timer.start();
    do {
        QCoreApplication::processEvents();
        handleIncomingControl();

        if (!workerFound && state() != prevState)
            workerFound = true;
        if (state() != ModuleState::INITIALIZING && state() != prevState)
            moduleInitDone = true;

        if (!workerFound || !moduleInitDone)
            std::this_thread::sleep_for(microseconds_t(1500));

        if (timer.elapsed() > 15 * 1000)
            break;
    } while (!workerFound || !moduleInitDone);

    if (!workerFound) {
        raiseError(
            "Module communication interface did not show up in time! The module might have crashed or may not be "
            "configured correctly.");
        d->proc->kill();
        return false;
    }

    if (!moduleInitDone) {
        raiseError("Module initialization failed! The module might have failed or was taking too long to initialize.");
        d->proc->kill();
        return false;
    }

    if (state() != ModuleState::ERROR)
        setState(prevState);

    return true;
}

bool MLinkModule::isProcessRunning() const
{
    return d->proc->state() == QProcess::Running;
}

bool MLinkModule::loadCurrentScript()
{
    if (d->scriptContent.isEmpty())
        return true;

    LoadScriptRequest req;
    req.workingDir = d->scriptWDir;
    req.venvDir = d->pyVenvDir;
    req.script = d->scriptContent;

    return d->callSliceClientSimple(this, LOAD_SCRIPT_CALL_ID, req);
}

bool MLinkModule::sendPortInformation()
{
    // set the ports that are selected on this module
    {
        SetPortsPresetRequest req;
        QList<InputPortChange> ipDef;
        QList<OutputPortChange> opDef;

        for (const auto &iport : inPorts()) {
            InputPortChange ipc(PortAction::CHANGE);
            ipc.id = iport->id();
            ipc.dataTypeId = iport->dataTypeId();
            ipc.title = iport->title();
            ipDef << ipc;
        }

        for (const auto &oport : outPorts()) {
            OutputPortChange opc(PortAction::CHANGE);
            opc.id = oport->id();
            opc.dataTypeId = oport->dataTypeId();
            opc.title = oport->title();
            opDef << opc;
        }

        req.inPorts = ipDef;
        req.outPorts = opDef;

        if (!d->callSliceClientSimple(this, SET_PORTS_PRESET_CALL_ID, req))
            return false;
    }

    // update input port metadata
    for (const auto &iport : inPorts()) {
        if (!iport->hasSubscription())
            continue;

        UpdateInputPortMetadataRequest req;
        req.id = iport->id();
        req.metadata = iport->subscriptionVar()->metadata();

        d->sentMetadata.insert(req.id, req.metadata);
        if (!d->callSliceClientSimple(this, IN_PORT_UPDATE_METADATA_ID, req))
            return false;
    }

    return true;
}

QString MLinkModule::readProcessOutput()
{
    if (!d->outputCaptured)
        return {};
    return d->proc->readAllStandardOutput();
}

void MLinkModule::markIncomingForExport(StreamExporter *exporter)
{
    for (auto &iport : inPorts()) {
        const auto res = exporter->publishStreamByPort(iport);
        if (!res.has_value()) {
            // there was an error!
            raiseError(res.error());
            continue;
        }
        const auto &details = res.value();
        if (!details.has_value())
            continue;

        ConnectInputRequest req;
        req.portId = IoxServiceNameString::from_utf8_null_terminated_unchecked_truncated(
            iport->id().toUtf8().constData(), iport->id().toUtf8().size());
        req.instanceId = IoxServiceNameString::from_utf8_null_terminated_unchecked_truncated(
            details->instanceId.toUtf8().constData(), details->instanceId.toUtf8().size());
        req.channelId = IoxServiceNameString::from_utf8_null_terminated_unchecked_truncated(
            details->channelId.toUtf8().constData(), details->channelId.toUtf8().size());

        bool ret = d->callClientSimple<ConnectInputRequest>(this, CONNECT_INPUT_CALL_ID, [&req](auto &payload) {
            payload = req;
        });
        if (!ret)
            qWarning().noquote() << "Failed to connect exported input port" << iport->title();
    }
}

void MLinkModule::registerOutPortForwarders()
{
    // ensure we are disconnected
    disconnectOutPortForwarders();

    // connect to external process streams
    for (auto &oport : outPorts()) {
        if (!oport->streamVar()->hasSubscribers())
            continue;

        Private::OutPortSub ps;
        // SySubscriber handles the event/listening actions internally, we only need to call it in runThread()
        ps.sub.emplace(SySubscriber::create(*d->node, d->clientId, "o/" + oport->id().toStdString()));
        ps.oport = oport;
        d->outPortSubs.push_back(std::move(ps));

        // NOTE: oport->startStream() is intentionally NOT called here.
        // The stream is started in MLinkModule::start(), after the worker's
        // start() callback has run and any OutputPortChange messages (e.g.
        // metadata set via set_metadata_value in Python start()) have been
        // processed by handleIncomingControl(). This ensures that
        // DataStream::start() snapshots the complete, final metadata into every
        // subscription.
    }
}

void MLinkModule::disconnectOutPortForwarders()
{
    // stop listening to messages from external process
    for (auto &ps : d->outPortSubs) {
        ps.oport->stopStream();
        ps.sub->drain();
    }
    d->outPortSubs.clear();
}

bool MLinkModule::prepare(const TestSubject &subject)
{
    GlobalConfig gconf;

    // ensure we are reading any messages from the module process
    d->ctlEventTimer->start();

    // at this point, ensure the module process is actually running
    if (!isProcessRunning()) {
        if (!runProcess())
            return false;
    }

    // set module process niceness
    if (!d->callClientSimple<SetNicenessRequest>(this, SET_NICENESS_CALL_ID, [&](auto &req) {
            req.nice = gconf.defaultThreadNice();
        }))
        return false;

    // set module process realtime priority
    if (!d->callClientSimple<SetMaxRealtimePriority>(this, SET_MAX_RT_PRIORITY_CALL_ID, [&](auto &req) {
            req.priority = gconf.defaultRTThreadPriority();
        }))
        return false;

    // send all port information to the module
    if (!sendPortInformation())
        return false;

    // set the script to be run, if any exists
    if (!loadCurrentScript())
        return false;

    // call the module's own startup preparations
    PrepareStartRequest prepReq;
    prepReq.settings = d->settingsData;
    if (!d->callSliceClientSimple(this, PREPARE_START_CALL_ID, prepReq))
        return false;

    QElapsedTimer timer;
    timer.start();
    while (state() != ModuleState::READY) {
        handleIncomingControl();
        qApp->processEvents();
        if (state() == ModuleState::ERROR)
            return false;

        // wait 10sec for the module to become ready
        if (timer.elapsed() > 10000) {
            raiseError("Timeout while waiting for module. Module did not transition to 'ready' state in time.");
            return false;
        }
    }

    // register output port forwarding from exported data streams to internal data transmission
    registerOutPortForwarders();
    if (state() == ModuleState::ERROR)
        return false;

    d->portChangesAllowed = false;
    return true;
}

void MLinkModule::start()
{
    d->portChangesAllowed = false;

    // update input port metadata if the metadata has changed - this may happen in case of circular module connections
    for (auto &iport : inPorts()) {
        if (!iport->hasSubscription())
            continue;

        const auto mdata = iport->subscriptionVar()->metadata();
        if (d->sentMetadata.value(iport->id(), QVariantHash()) == mdata)
            continue;

        UpdateInputPortMetadataRequest req;
        req.id = iport->id();
        req.metadata = mdata;
        if (!d->callSliceClientSimple(this, IN_PORT_UPDATE_METADATA_ID, req))
            return;
    }
    d->sentMetadata.clear();

    // tell the module to launch!
    auto timestampUs =
        std::chrono::duration_cast<std::chrono::microseconds>(m_syTimer->currentTimePoint().time_since_epoch()).count();
    d->callClientSimple<StartRequest>(this, START_CALL_ID, [&](auto &req) {
        req.startTimestampUsec = timestampUs;
    });

    // stop reading control events in the GUI thread - the module thread will do that for us soon
    d->ctlEventTimer->stop();

    // The worker's start() callback has already run before the Done reply
    // arrived. Drain any OutputPortChange messages it published (e.g. metadata
    // updates from Python start()) before we start the output streams below.
    handleIncomingControl();

    // Start all output-port streams now that metadata is complete
    for (auto &ps : d->outPortSubs)
        ps.oport->startStream();

    // call generic
    AbstractModule::start();
}

void MLinkModule::runThread(OptionalWaitCondition *startWaitCondition)
{
    // create waitset and attach control guard
    auto waitSet = iox2::WaitSetBuilder().create<iox2::ServiceType::Ipc>().value();
    auto waitSetCtlGuard = waitSet.attach_notification(*d->workerCtlEventListener).value();

    // prepare guards for output port forwarding
    for (auto &ps : d->outPortSubs) {
        if (!ps.sub.has_value())
            continue;
        ps.guard.emplace(waitSet.attach_notification(*ps.sub).value());
    }

    auto onEvent =
        [this, &waitSetCtlGuard](
            const iox2::WaitSetAttachmentId<iox2::ServiceType::Ipc> &attachmentId) -> iox2::CallbackProgression {
        // handle control messages
        if (attachmentId.has_event_from(waitSetCtlGuard)) {
            handleIncomingControl();
        } else {
            for (auto &ps : d->outPortSubs) {
                if (!ps.guard.has_value())
                    continue;
                if (!attachmentId.has_event_from(*ps.guard))
                    continue;

                // We have incoming data! - handle it, the break because the event
                // is per single attachment ID.
                ps.sub->handleEvents([&ps](const IoxByteSlice &pl) {
                    ps.oport->streamVar()->pushRawData(ps.oport->dataTypeId(), pl.data(), pl.number_of_bytes());
                });
                break;
            }
        }

        return iox2::CallbackProgression::Continue;
    };

    startWaitCondition->wait(this);

    while (m_running) {
        // wait for data - we need to time out every once in a while to check if we are still running
        waitSet.wait_and_process_once_with_timeout(onEvent, iox2::bb::Duration::from_millis(50)).value();
    }

    // MUST reset output-port guards before the local WaitSet goes out of scope.
    // iceoryx2 contract: "WaitSetGuard must live at most as long as the WaitSet."
    for (auto &ps : d->outPortSubs)
        ps.guard.reset();

    // we finished - drain incoming control messages from the module process one more time
    handleIncomingControl();
}

void MLinkModule::stop()
{
    if (isProcessRunning())
        d->callClientSimple<StopRequest>(this, STOP_CALL_ID, [](auto &) {});

    disconnectOutPortForwarders();
    d->sentMetadata.clear();
    d->portChangesAllowed = true;

    // start reading client responses in the GUI thread again
    d->ctlEventTimer->start();

    AbstractModule::stop();
}

Updated on 2026-03-16 at 19:16:01 +0000