mlink/syntaloslink.cpp

mlink/syntaloslink.cpp

mlink/syntaloslink.cpp

Namespaces

Name
Syntalos

Source code

/*
 * Copyright (C) 2020-2024 Matthias Klumpp <matthias@tenstral.net>
 *
 * Licensed under the GNU Lesser General Public License Version 3
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the license, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include "syntaloslink.h"

#include <QDebug>
#include <QBuffer>
#include <QCoreApplication>
#include <signal.h>
#include <sys/prctl.h>
#include <iceoryx_posh/runtime/posh_runtime.hpp>
#include <iceoryx_posh/popo/server.hpp>
#include <iceoryx_posh/popo/untyped_server.hpp>
#include <iceoryx_posh/popo/publisher.hpp>
#include <iceoryx_posh/popo/untyped_publisher.hpp>
#include <iceoryx_posh/popo/untyped_subscriber.hpp>
#include <iceoryx_hoofs/posix_wrapper/signal_watcher.hpp>
#include <iceoryx_hoofs/log/logmanager.hpp>

#include "ipc-types-private.h"
#include "rtkit.h"
#include "cpuaffinity.h"

using namespace Syntalos;

namespace Syntalos
{

std::unique_ptr<SyntalosLink> initSyntalosModuleLink()
{
    auto syModuleId = qgetenv("SYNTALOS_MODULE_ID");
    if (syModuleId.isEmpty() || syModuleId.length() < 2)
        throw std::runtime_error("This module was not run by Syntalos, can not continue!");

    // set up stream data type mapping, if it hasn't been initialized yet
    registerStreamMetaTypes();

    char rtName[100];
    const auto rtNameStr = QString::fromUtf8(syModuleId.right(100));
    strncpy(rtName, qPrintable(rtNameStr), sizeof(rtName) - 1);
    rtName[sizeof(rtName) - 1] = '\0';

    // set IOX log level
    auto verboseLevel = qgetenv("SY_VERBOSE");
    if (verboseLevel == "1")
        iox::log::LogManager::GetLogManager().SetDefaultLogLevel(iox::log::LogLevel::kVerbose);
    else
        iox::log::LogManager::GetLogManager().SetDefaultLogLevel(iox::log::LogLevel::kInfo);

    // connect to RouDi
    iox::runtime::PoshRuntime::initRuntime(rtName);

    // ensure we (try to) die if Syntalos, our parent, dies
    prctl(PR_SET_PDEATHSIG, SIGTERM);

    return std::unique_ptr<SyntalosLink>(new SyntalosLink(rtNameStr));
}

class InputPortInfo::Private
{
public:
    explicit Private(const InputPortChange &pc)
    {
        connected = false;
        id = pc.id;
        title = pc.title;
        dataTypeId = pc.dataTypeId;
        metadata = pc.metadata;
    }

    int index;
    bool connected;
    std::unique_ptr<iox::popo::UntypedSubscriber> ioxSub;

    QString id;
    QString title;
    int dataTypeId;
    QVariantHash metadata;

    NewDataRawFn newDataCb;
    uint throttleItemsPerSec;
};

InputPortInfo::InputPortInfo(const InputPortChange &pc)
    : d(new InputPortInfo::Private(pc))
{
}

QString InputPortInfo::id() const
{
    return d->id;
}

int InputPortInfo::dataTypeId() const
{
    return d->dataTypeId;
}

QString InputPortInfo::title() const
{
    return d->title;
}

void InputPortInfo::setNewDataRawCallback(NewDataRawFn callback)
{
    d->newDataCb = std::move(callback);
}

void InputPortInfo::setThrottleItemsPerSec(uint itemsPerSec)
{
    d->throttleItemsPerSec = itemsPerSec;
}

QVariantHash InputPortInfo::metadata() const
{
    return d->metadata;
}

class OutputPortInfo::Private
{
public:
    explicit Private(const OutputPortChange &pc)
    {
        connected = false;
        id = pc.id;
        title = pc.title;
        dataTypeId = pc.dataTypeId;
        metadata = pc.metadata;
    }

    int index;
    bool connected;
    std::unique_ptr<iox::popo::UntypedPublisher> ioxPub;

    QString id;
    QString title;
    int dataTypeId;
    QVariantHash metadata;

    iox::capro::IdString_t publisherId() const
    {
        auto channelId = QStringLiteral("oport_%1").arg(id.mid(0, 80));
        return iox::capro::IdString_t(iox::cxx::TruncateToCapacity, channelId.toStdString());
    }
};

OutputPortInfo::OutputPortInfo(const OutputPortChange &pc)
    : d(new OutputPortInfo::Private(pc))
{
}

QString OutputPortInfo::id() const
{
    return d->id;
}

int OutputPortInfo::dataTypeId() const
{
    return d->dataTypeId;
}

void OutputPortInfo::setMetadataVar(const QString &key, const QVariant &value)
{
    d->metadata[key] = value;
}

class SyntalosLink::Private
{
public:
    Private(const QString &instanceId)
    {
        modId = iox::capro::IdString_t(iox::cxx::TruncateToCapacity, instanceId.toStdString());

        // interfaces
        pubError = makePublisher<ErrorEvent>(ERROR_CHANNEL_ID);
        pubState = makePublisher<StateChangeEvent>(STATE_CHANNEL_ID);
        pubStatusMessage = makePublisher<StatusMessageEvent>(STATUS_MESSAGE_CHANNEL_ID, false);
        pubSettingsChange = makeUntypedPublisher(SETTINGS_CHANGE_CHANNEL_ID);
        pubInPortChange = makeUntypedPublisher(IN_PORT_CHANGE_CHANNEL_ID);
        pubOutPortChange = makeUntypedPublisher(OUT_PORT_CHANGE_CHANNEL_ID);

        reqSetNiceness = makeServer<SetNicenessRequest, DoneResponse>(SET_NICENESS_CALL_ID);
        reqSetMaxRTPriority = makeServer<SetMaxRealtimePriority, DoneResponse>(SET_MAX_RT_PRIORITY_CALL_ID);
        reqSetCPUAffinity = makeServer<SetCPUAffinityRequest, DoneResponse>(SET_CPU_AFFINITY_CALL_ID);
        reqLoadScript = makeUntypedServer(LOAD_SCRIPT_CALL_ID);
        reqSetPortsPreset = makeUntypedServer(SET_PORTS_PRESET_CALL_ID);
        reqUpdateIPortMetadata = makeUntypedServer(IN_PORT_UPDATE_METADATA_ID);
        reqConnectIPort = makeServer<ConnectInputRequest, DoneResponse>(CONNECT_INPUT_CALL_ID);
        reqPrepareStart = makeUntypedServer(PREPARE_START_CALL_ID);
        reqStart = makeServer<StartRequest, DoneResponse>(START_CALL_ID);
        reqStop = makeServer<StopRequest, DoneResponse>(STOP_CALL_ID);
        reqShutdown = makeServer<ShutdownRequest, DoneResponse>(SHUTDOWN_CALL_ID);
        reqShowDisplay = makeServer<ShowDisplayRequest, DoneResponse>(SHOW_DISPLAY_CALL_ID);
        reqShowSettings = makeUntypedServer(SHOW_SETTINGS_CALL_ID);
    }

    ~Private() {}

    template<typename T>
    std::unique_ptr<iox::popo::Publisher<T>> makePublisher(
        const iox::capro::IdString_t &channelName,
        bool waitForConsumer = true)
    {
        iox::popo::PublisherOptions publisherOptn;

        publisherOptn.historyCapacity = SY_IOX_HISTORY_SIZE;
        if (waitForConsumer) {
            // allow the subscriber to block us, to ensure we don't lose data
            publisherOptn.subscriberTooSlowPolicy = iox::popo::ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
        }

        return std::make_unique<iox::popo::Publisher<T>>(
            iox::capro::ServiceDescription{"SyntalosModule", modId, channelName}, publisherOptn);
    }

    std::unique_ptr<iox::popo::UntypedPublisher> makeUntypedPublisher(
        const iox::capro::IdString_t &channelName,
        bool waitForConsumer = true)
    {
        iox::popo::PublisherOptions publisherOptn;

        publisherOptn.historyCapacity = SY_IOX_HISTORY_SIZE;
        if (waitForConsumer) {
            // allow the subscriber to block us, to ensure we don't lose data
            publisherOptn.subscriberTooSlowPolicy = iox::popo::ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
        }

        return std::make_unique<iox::popo::UntypedPublisher>(
            iox::capro::ServiceDescription{"SyntalosModule", modId, channelName}, publisherOptn);
    }

    template<typename Req, typename Res>
    std::unique_ptr<iox::popo::Server<Req, Res>> makeServer(const iox::cxx::string<100> &callName)
    {
        auto srv = std::make_unique<iox::popo::Server<Req, Res>>(
            iox::capro::ServiceDescription{"SyntalosModule", modId, callName});

        waitSet.attachState(*srv, iox::popo::ServerState::HAS_REQUEST).or_else([](auto) {
            std::cerr << "Failed to attach watcher for " << typeid(Req).name() << " request responder." << std::endl;
            std::exit(EXIT_FAILURE);
        });

        return srv;
    }

    std::unique_ptr<iox::popo::UntypedServer> makeUntypedServer(const iox::cxx::string<100> &callName)
    {
        auto srv = std::make_unique<iox::popo::UntypedServer>(
            iox::capro::ServiceDescription{"SyntalosModule", modId, callName});

        waitSet.attachState(*srv, iox::popo::ServerState::HAS_REQUEST).or_else([](auto) {
            std::cerr << "Failed to attach watcher for untyped request responder." << std::endl;
            std::exit(EXIT_FAILURE);
        });

        return srv;
    }

    std::unique_ptr<iox::popo::UntypedSubscriber> makeUntypedSubscriber(
        const iox::capro::IdString_t &instanceId,
        const iox::capro::IdString_t &channelId)
    {
        iox::popo::SubscriberOptions subOptn;

        // number of elements held for processing by default
        subOptn.queueCapacity = SY_IOX_QUEUE_CAPACITY;

        // number of samples to get if for whatever reason we connected too late
        subOptn.historyRequest = SY_IOX_HISTORY_SIZE;

        // make producer wait for us
        subOptn.queueFullPolicy = iox::popo::QueueFullPolicy::BLOCK_PRODUCER;

        auto subscr = std::make_unique<iox::popo::UntypedSubscriber>(
            iox::capro::ServiceDescription{"SyntalosModule", instanceId, channelId}, subOptn);

        waitSet.attachState(*subscr, iox::popo::SubscriberState::HAS_DATA).or_else([](auto) {
            std::cerr << "Failed to attach watcher for untyped subscriber." << std::endl;
            std::exit(EXIT_FAILURE);
        });

        return subscr;
    }

    iox::capro::IdString_t modId;

    std::unique_ptr<iox::popo::Publisher<ErrorEvent>> pubError;
    std::unique_ptr<iox::popo::Publisher<StateChangeEvent>> pubState;
    std::unique_ptr<iox::popo::Publisher<StatusMessageEvent>> pubStatusMessage;
    std::unique_ptr<iox::popo::UntypedPublisher> pubSettingsChange;
    std::unique_ptr<iox::popo::UntypedPublisher> pubInPortChange;
    std::unique_ptr<iox::popo::UntypedPublisher> pubOutPortChange;
    std::unique_ptr<iox::popo::Server<SetNicenessRequest, DoneResponse>> reqSetNiceness;
    std::unique_ptr<iox::popo::Server<SetMaxRealtimePriority, DoneResponse>> reqSetMaxRTPriority;
    std::unique_ptr<iox::popo::Server<SetCPUAffinityRequest, DoneResponse>> reqSetCPUAffinity;
    std::unique_ptr<iox::popo::UntypedServer> reqLoadScript;
    std::unique_ptr<iox::popo::UntypedServer> reqSetPortsPreset;
    std::unique_ptr<iox::popo::UntypedServer> reqUpdateIPortMetadata;
    std::unique_ptr<iox::popo::Server<ConnectInputRequest, DoneResponse>> reqConnectIPort;
    std::unique_ptr<iox::popo::UntypedServer> reqPrepareStart;
    std::unique_ptr<iox::popo::Server<StartRequest, DoneResponse>> reqStart;
    std::unique_ptr<iox::popo::Server<StopRequest, DoneResponse>> reqStop;
    std::unique_ptr<iox::popo::Server<ShutdownRequest, DoneResponse>> reqShutdown;

    std::unique_ptr<iox::popo::Server<ShowDisplayRequest, DoneResponse>> reqShowDisplay;
    std::unique_ptr<iox::popo::UntypedServer> reqShowSettings;

    iox::popo::WaitSet<> waitSet;

    ModuleState state;
    int maxRTPriority;
    std::vector<std::shared_ptr<InputPortInfo>> inPortInfo;
    std::vector<std::shared_ptr<OutputPortInfo>> outPortInfo;
    SyncTimer *syTimer;

    LoadScriptFn loadScriptCb;
    PrepareStartFn prepareStartCb;
    StartFn startCb;
    StopFn stopCb;
    ShutdownFn shutdownCb;
    ShowSettingsFn showSettingsCb;
    ShowDisplayFn showDisplayCb;
};

SyntalosLink::SyntalosLink(const QString &instanceId, QObject *parent)
    : QObject(parent),
      d(new SyntalosLink::Private(instanceId))
{
    d->syTimer = new SyncTimer;

    // Immediately upon creation, we send a message that we are initializing.
    // A client using this interface has to set this to IDLE once it has set up the basics.
    setState(ModuleState::INITIALIZING);
}

SyntalosLink::~SyntalosLink()
{
    delete d->syTimer;
}

QString SyntalosLink::instanceId() const
{
    return QString::fromUtf8(d->modId.c_str());
}

void SyntalosLink::raiseError(const QString &title, const QString &message)
{
    d->pubError->loan().and_then([&](auto &error) {
        error->title = iox::cxx::string<128>(iox::cxx::TruncateToCapacity, title.toStdString());
        error->message = iox::cxx::string<2048>(iox::cxx::TruncateToCapacity, message.toStdString());
        error.publish();
    });
    setState(ModuleState::ERROR);
}

void SyntalosLink::raiseError(const QString &message)
{
    d->pubError->loan().and_then([&](auto &error) {
        error->message = iox::cxx::string<2048>(iox::cxx::TruncateToCapacity, message.toStdString());
        error.publish();
    });
    setState(ModuleState::ERROR);
}

void SyntalosLink::awaitData(int timeoutUsec)
{
    if (timeoutUsec < 0) {
        while (true) {
            // we do not use wait() here as some functionality depends on the Qt/GLib event loop, and especially
            // for Python users it can be a bit jarring if that is not available. So we will occasionally
            // process events here.
            const auto qevTimeout = iox::units::Duration::fromMicroseconds(250 * 1000); // 250ms timeout
            auto notificationVector = d->waitSet.timedWait(qevTimeout);
            for (auto &notification : notificationVector)
                processNotification(notification);

            qApp->processEvents();
            if (!notificationVector.empty())
                break;
        }
    } else {
        auto notificationVector = d->waitSet.timedWait(iox::units::Duration::fromMicroseconds(timeoutUsec));
        for (auto &notification : notificationVector)
            processNotification(notification);
        qApp->processEvents();
    }
}

void SyntalosLink::awaitDataForever()
{
    while (!iox::posix::hasTerminationRequested()) {
        auto notificationVector = d->waitSet.wait();
        for (auto &notification : notificationVector) {
            processNotification(notification);
            qApp->processEvents();
        }
    }
}

void SyntalosLink::processNotification(const iox::popo::NotificationInfo *notification)
{
    // Input port data received?
    for (auto &iport : d->inPortInfo) {
        if (!iport->d->connected)
            continue;

        iport->d->ioxSub->take()
            .and_then([&](const void *payload) {
                const auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(payload);
                const auto size = chunkHeader->usedSizeOfChunk();

                // call raw data received callback
                if (iport->d->newDataCb)
                    iport->d->newDataCb(payload, size);

                // release memory chunk
                iport->d->ioxSub->release(payload);
            })
            .or_else([](auto &result) {
                if (result != iox::popo::ChunkReceiveResult::NO_CHUNK_AVAILABLE) {
                    qWarning().noquote() << "Failed to receive new input port info!";
                }
            });
    }

    // SetNiceness
    if (notification->doesOriginateFrom(d->reqSetNiceness.get())) {
        d->reqSetNiceness->take().and_then([&](const auto &request) {
            d->reqSetNiceness->loan(request)
                .and_then([&](auto &response) {
                    // apply niceness request immediately to current thread
                    const auto success = setCurrentThreadNiceness(request->nice);
                    response->success = success;

                    response.send().or_else([&](auto &error) {
                        std::cerr << "Could not respond to SetNiceness! Error: " << error << std::endl;
                    });
                    if (!success)
                        raiseError("Could not set niceness to " + QString::number(request->nice));
                })
                .or_else([&](auto &error) {
                    std::cerr << "Could not allocate response! Error: " << error << std::endl;
                });
        });

        return;
    }

    // SetMaxRealtimePriority
    if (notification->doesOriginateFrom(d->reqSetMaxRTPriority.get())) {
        d->reqSetMaxRTPriority->take().and_then([&](const auto &request) {
            d->reqSetMaxRTPriority->loan(request)
                .and_then([&](auto &response) {
                    d->maxRTPriority = request->priority;
                    response->success = true;
                    response.send().or_else([&](auto &error) {
                        std::cerr << "Could not respond to SetMaxRealtimePriority! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cerr << "Could not allocate response! Error: " << error << std::endl;
                });
        });

        return;
    }

    // SetCPUAffinity
    if (notification->doesOriginateFrom(d->reqSetCPUAffinity.get())) {
        d->reqSetCPUAffinity->take().and_then([&](const auto &request) {
            d->reqSetCPUAffinity->loan(request)
                .and_then([&](auto &response) {
                    if (!request->cores.empty()) {
                        thread_set_affinity_from_vec(
                            pthread_self(), std::vector<uint>(request->cores.begin(), request->cores.end()));
                    }

                    response->success = true;
                    response.send().or_else([&](auto &error) {
                        std::cerr << "Could not respond to SetCPUAffinity! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cerr << "Could not allocate response! Error: " << error << std::endl;
                });
        });

        return;
    }

    // Load script
    if (notification->doesOriginateFrom(d->reqLoadScript.get())) {
        LoadScriptRequest scriptReqData;
        d->reqLoadScript->take().and_then([&](auto &requestPayload) {
            const auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(requestPayload);
            const auto size = chunkHeader->usedSizeOfChunk();

            scriptReqData = LoadScriptRequest::fromMemory(requestPayload, size);

            auto requestHeader = iox::popo::RequestHeader::fromPayload(requestPayload);
            d->reqLoadScript->loan(requestHeader, sizeof(DoneResponse), alignof(DoneResponse))
                .and_then([&](auto &responsePayload) {
                    auto response = static_cast<DoneResponse *>(responsePayload);
                    response->success = true;
                    d->reqLoadScript->send(response).or_else([&](auto &error) {
                        std::cout << "Could not send LoadScript response! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cout << "Could not allocate LoadScript response! Error: " << error << std::endl;
                });

            d->reqLoadScript->releaseRequest(requestPayload);
        });

        // load script after sending a reply if we had a valid request
        if (d->loadScriptCb && !scriptReqData.script.isEmpty())
            d->loadScriptCb(scriptReqData.script, scriptReqData.workingDir);

        return;
    }

    // Have the master set all preset ports
    if (notification->doesOriginateFrom(d->reqSetPortsPreset.get())) {
        d->reqSetPortsPreset->take().and_then([&](auto &requestPayload) {
            const auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(requestPayload);
            const auto size = chunkHeader->usedSizeOfChunk();

            const auto sppReq = SetPortsPresetRequest::fromMemory(requestPayload, size);

            // We must not remove existing ports, as they might be in use. So instead, we
            // add & merge port data.
            for (const auto &ipc : sppReq.inPorts) {
                bool skip = false;
                for (const auto &ip : d->inPortInfo) {
                    if (ip->id() == ipc.id)
                        skip = true;
                }
                if (skip)
                    continue;
                d->inPortInfo.push_back(std::shared_ptr<InputPortInfo>(new InputPortInfo(ipc)));
            }

            for (const auto &opc : sppReq.outPorts) {
                bool skip = false;
                for (const auto &op : d->outPortInfo) {
                    if (op->id() == opc.id)
                        skip = true;
                }
                if (skip)
                    continue;

                auto oport = std::shared_ptr<OutputPortInfo>(new OutputPortInfo(opc));
                oport->d->ioxPub = d->makeUntypedPublisher(oport->d->publisherId());
                d->outPortInfo.push_back(oport);
            }

            auto requestHeader = iox::popo::RequestHeader::fromPayload(requestPayload);
            d->reqSetPortsPreset->loan(requestHeader, sizeof(DoneResponse), alignof(DoneResponse))
                .and_then([&](auto &responsePayload) {
                    auto response = static_cast<DoneResponse *>(responsePayload);
                    response->success = true;
                    d->reqSetPortsPreset->send(response).or_else([&](auto &error) {
                        std::cout << "Could not send SetPortsPreset response! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cout << "Could not allocate SetPortsPreset response! Error: " << error << std::endl;
                });

            d->reqSetPortsPreset->releaseRequest(requestPayload);
        });

        return;
    }

    // ConnectInputPort
    if (notification->doesOriginateFrom(d->reqConnectIPort.get())) {
        d->reqConnectIPort->take().and_then([&](const auto &request) {
            d->reqConnectIPort->loan(request)
                .and_then([&](auto &response) {
                    // find the port
                    const auto portId = QString::fromUtf8(request->portId.c_str());
                    std::shared_ptr<InputPortInfo> iport;
                    for (const auto &ip : d->inPortInfo) {
                        if (ip->id() == portId) {
                            iport = ip;
                            break;
                        }
                    }

                    // return error if the port was not registered
                    if (!iport) {
                        response->success = false;
                        response.send().or_else([&](auto &error) {
                            std::cerr << "Could not respond to ConnectInputPort! Error: " << error << std::endl;
                        });
                        return;
                    }

                    // connect the port
                    iport->d->connected = true;
                    iport->d->ioxSub = d->makeUntypedSubscriber(request->instanceId, request->channelId);

                    response->success = true;
                    response.send().or_else([&](auto &error) {
                        std::cerr << "Could not respond to ConnectInputPort! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cerr << "Could not allocate response! Error: " << error << std::endl;
                });
        });

        return;
    }

    // Update metadata
    if (notification->doesOriginateFrom(d->reqUpdateIPortMetadata.get())) {
        d->reqUpdateIPortMetadata->take().and_then([&](auto &requestPayload) {
            const auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(requestPayload);
            const auto size = chunkHeader->usedSizeOfChunk();

            const auto reqUpdateMD = UpdateInputPortMetadataRequest::fromMemory(requestPayload, size);

            // update metadata
            for (const auto &ip : d->inPortInfo) {
                if (ip->id() == reqUpdateMD.id) {
                    ip->d->metadata = reqUpdateMD.metadata;
                    break;
                }
            }

            auto requestHeader = iox::popo::RequestHeader::fromPayload(requestPayload);
            d->reqUpdateIPortMetadata->loan(requestHeader, sizeof(DoneResponse), alignof(DoneResponse))
                .and_then([&](auto &responsePayload) {
                    auto response = static_cast<DoneResponse *>(responsePayload);
                    response->success = true;
                    d->reqUpdateIPortMetadata->send(response).or_else([&](auto &error) {
                        std::cout << "Could not send UpdateInputPortMetadata response! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cout << "Could not allocate UpdateInputPortMetadata response! Error: " << error << std::endl;
                });

            d->reqUpdateIPortMetadata->releaseRequest(requestPayload);
        });

        return;
    }

    // Prepare start
    if (notification->doesOriginateFrom(d->reqPrepareStart.get())) {
        bool runPrepareRequested = false;
        QByteArray prepareSettings;

        d->reqPrepareStart->take().and_then([&](auto &requestPayload) {
            const auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(requestPayload);
            const auto size = chunkHeader->usedSizeOfChunk();

            const auto req = PrepareStartRequest::fromMemory(requestPayload, size);
            runPrepareRequested = true;
            prepareSettings = req.settings;

            auto requestHeader = iox::popo::RequestHeader::fromPayload(requestPayload);
            d->reqPrepareStart->loan(requestHeader, sizeof(DoneResponse), alignof(DoneResponse))
                .and_then([&](auto &responsePayload) {
                    auto response = static_cast<DoneResponse *>(responsePayload);
                    response->success = true;
                    d->reqPrepareStart->send(response).or_else([&](auto &error) {
                        std::cout << "Could not send PrepareStart response! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cout << "Could not allocate PrepareStart response! Error: " << error << std::endl;
                });

            d->reqPrepareStart->releaseRequest(requestPayload);
        });
        if (runPrepareRequested) {
            // call our preparation delegate
            if (d->prepareStartCb)
                d->prepareStartCb(prepareSettings);
        }

        return;
    }

    // Handle start request
    if (notification->doesOriginateFrom(d->reqStart.get())) {
        bool runStartRequested = false;
        d->reqStart->take().and_then([&](const auto &request) {
            d->reqStart->loan(request)
                .and_then([&](auto &response) {
                    // NOTE: We reply immediately here and defer processing of the call,
                    // so the master will not wait for us. Errors are reported exclusively
                    // via the error channel.

                    const auto timePoint = symaster_timepoint(microseconds_t(request->startTimestampUsec));
                    delete d->syTimer;
                    d->syTimer = new SyncTimer;
                    d->syTimer->startAt(timePoint);
                    runStartRequested = true;

                    response->success = true;
                    response.send().or_else([&](auto &error) {
                        std::cerr << "Could not respond to Start! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cerr << "Could not allocate response! Error: " << error << std::endl;
                });
        });
        if (runStartRequested) {
            // execute start action after replying to master
            if (d->startCb)
                d->startCb();
        }

        return;
    }

    // Handle stop request
    if (notification->doesOriginateFrom(d->reqStop.get())) {
        d->reqStop->take().and_then([&](const auto &request) {
            d->reqStop->loan(request)
                .and_then([&](auto &response) {
                    if (d->stopCb)
                        d->stopCb();

                    response->success = true;
                    response.send().or_else([&](auto &error) {
                        std::cerr << "Could not respond to Stop! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cerr << "Could not allocate response! Error: " << error << std::endl;
                });
        });

        return;
    }

    // Handle shutdown
    if (notification->doesOriginateFrom(d->reqShutdown.get())) {
        bool shutdownRequested = false;
        d->reqShutdown->take().and_then([&](const auto &request) {
            d->reqShutdown->loan(request)
                .and_then([&](auto &response) {
                    // NOTE: We reply immediately here and defer processing of the call,
                    // because otherwise the master would never get a response if we
                    // tear down the process too quickly.

                    shutdownRequested = true;

                    response->success = true;
                    response.send().or_else([&](auto &error) {
                        std::cerr << "Could not respond to Start! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cerr << "Could not allocate response! Error: " << error << std::endl;
                });
        });
        if (shutdownRequested) {
            // execute shutdown action after replying to master
            // if no callback is defined, we just exit()
            if (d->shutdownCb)
                d->shutdownCb();
            else
                qApp->quit();
        }

        return;
    }

    // Handle show display request
    if (notification->doesOriginateFrom(d->reqShowDisplay.get())) {
        bool runShowDisplay = false;
        d->reqShowDisplay->take().and_then([&](const auto &request) {
            d->reqShowDisplay->loan(request)
                .and_then([&](auto &response) {
                    runShowDisplay = true;

                    response->success = true;
                    response.send().or_else([&](auto &error) {
                        std::cerr << "Could not respond to ShowDisplay! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cerr << "Could not allocate response! Error: " << error << std::endl;
                });
        });

        if (runShowDisplay) {
            if (d->showDisplayCb)
                d->showDisplayCb();
        }

        return;
    }

    // Handle show settings request
    if (notification->doesOriginateFrom(d->reqShowSettings.get())) {
        QByteArray settingsData;
        bool runShowSettings = false;

        d->reqShowSettings->take().and_then([&](auto &requestPayload) {
            const auto chunkHeader = iox::mepoo::ChunkHeader::fromUserPayload(requestPayload);
            const auto size = chunkHeader->usedSizeOfChunk();

            const auto req = ShowSettingsRequest::fromMemory(requestPayload, size);
            settingsData = req.settings;
            runShowSettings = true;

            auto requestHeader = iox::popo::RequestHeader::fromPayload(requestPayload);
            d->reqShowSettings->loan(requestHeader, sizeof(DoneResponse), alignof(DoneResponse))
                .and_then([&](auto &responsePayload) {
                    auto response = static_cast<DoneResponse *>(responsePayload);
                    response->success = true;
                    d->reqShowSettings->send(response).or_else([&](auto &error) {
                        std::cout << "Could not send ShowSettings response! Error: " << error << std::endl;
                    });
                })
                .or_else([&](auto &error) {
                    std::cout << "Could not allocate ShowSettings response! Error: " << error << std::endl;
                });

            d->reqShowSettings->releaseRequest(requestPayload);
        });

        if (runShowSettings) {
            if (d->showSettingsCb)
                d->showSettingsCb(settingsData);
        }

        return;
    }
}

ModuleState SyntalosLink::state() const
{
    return d->state;
}

void SyntalosLink::setState(ModuleState state)
{
    d->pubState->loan().and_then([&](auto &sample) {
        sample->state = state;
        sample.publish();
    });

    d->state = state;
}

void SyntalosLink::setStatusMessage(const QString &message)
{
    d->pubStatusMessage->loan().and_then([&](auto &sample) {
        sample->text = iox::cxx::string<512>(iox::cxx::TruncateToCapacity, message.toStdString());
        sample.publish();
    });
}

int SyntalosLink::maxRealtimePriority() const
{
    return d->maxRTPriority;
}

void SyntalosLink::setLoadScriptCallback(LoadScriptFn callback)
{
    d->loadScriptCb = std::move(callback);
}

void SyntalosLink::setPrepareStartCallback(PrepareStartFn callback)
{
    d->prepareStartCb = std::move(callback);
}

void SyntalosLink::setStartCallback(StartFn callback)
{
    d->startCb = std::move(callback);
}

void SyntalosLink::setStopCallback(StopFn callback)
{
    d->stopCb = std::move(callback);
}

void SyntalosLink::setShutdownCallback(ShutdownFn callback)
{
    d->shutdownCb = std::move(callback);
}

SyncTimer *SyntalosLink::timer() const
{
    return d->syTimer;
}

void SyntalosLink::setSettingsData(const QByteArray &data)
{
    SettingsChangeEvent scEv(data);

    const auto scEvData = scEv.toBytes();
    d->pubSettingsChange->loan(scEvData.size())
        .and_then([&](auto &payload) {
            // we copy twice here - but this is a low-volume event, so it should be fine
            memcpy(payload, scEvData.data(), scEvData.size());
            d->pubSettingsChange->publish(payload);
        })
        .or_else([&](auto &error) {
            std::cerr << "Unable to loan sample to announce settings change. Error: " << error << std::endl;
        });
}

void SyntalosLink::setShowSettingsCallback(ShowSettingsFn callback)
{
    d->showSettingsCb = std::move(callback);
}

void SyntalosLink::setShowDisplayCallback(ShowDisplayFn callback)
{
    d->showDisplayCb = std::move(callback);
}

std::vector<std::shared_ptr<InputPortInfo>> SyntalosLink::inputPorts() const
{
    return d->inPortInfo;
}

std::vector<std::shared_ptr<OutputPortInfo>> SyntalosLink::outputPorts() const
{
    return d->outPortInfo;
}

std::shared_ptr<InputPortInfo> SyntalosLink::registerInputPort(
    const QString &id,
    const QString &title,
    const QString &dataTypeName)
{
    // construct our reference for this port
    InputPortChange ipc(PortAction::ADD);
    ipc.id = id;
    ipc.title = title;
    ipc.dataTypeId = BaseDataType::typeIdFromString(dataTypeName);

    // check for duplicates
    for (const auto &ip : d->inPortInfo) {
        if (ip->id() == ipc.id)
            return nullptr;
    }

    const auto iportData = ipc.toBytes();

    // announce the new port to master
    bool haveError = false;
    d->pubInPortChange->loan(iportData.size())
        .and_then([&](auto &payload) {
            // we copy twice here - but this is a low-volume event, so it should be fine
            memcpy(payload, iportData.data(), iportData.size());
            d->pubInPortChange->publish(payload);
        })
        .or_else([&](auto &error) {
            std::cerr << "Unable to loan sample. Error: " << error << std::endl;
            haveError = true;
        });

    if (haveError) {
        return nullptr;
    } else {
        auto iport = std::shared_ptr<InputPortInfo>(new InputPortInfo(ipc));
        d->inPortInfo.push_back(iport);
        return iport;
    }
}

std::shared_ptr<OutputPortInfo> SyntalosLink::registerOutputPort(
    const QString &id,
    const QString &title,
    const QString &dataTypeName,
    const QVariantHash &metadata)
{
    // construct our reference for this port
    OutputPortChange opc(PortAction::ADD);
    opc.id = id;
    opc.title = title;
    opc.dataTypeId = BaseDataType::typeIdFromString(dataTypeName);
    opc.metadata = metadata;

    // check for duplicates
    for (const auto &op : d->outPortInfo) {
        if (op->id() == opc.id)
            return nullptr;
    }

    const auto oportData = opc.toBytes();

    // announce the new port to master
    bool haveError = false;
    d->pubOutPortChange->loan(oportData.size())
        .and_then([&](auto &payload) {
            // we copy twice here - but this is a low-volume event, so it should be fine
            memcpy(payload, oportData.data(), oportData.size());
            d->pubOutPortChange->publish(payload);
        })
        .or_else([&](auto &error) {
            std::cerr << "Unable to loan sample. Error: " << error << std::endl;
            haveError = true;
        });

    if (haveError) {
        return nullptr;
    } else {
        auto oport = std::shared_ptr<OutputPortInfo>(new OutputPortInfo(opc));
        oport->d->ioxPub = d->makeUntypedPublisher(oport->d->publisherId());
        d->outPortInfo.push_back(oport);
        return oport;
    }
}

void SyntalosLink::updateOutputPort(const std::shared_ptr<OutputPortInfo> &oport)
{
    OutputPortChange opc(PortAction::CHANGE);
    opc.id = oport->id();
    opc.title = oport->d->title;
    opc.dataTypeId = oport->dataTypeId();
    opc.metadata = oport->d->metadata;

    const auto oportData = opc.toBytes();
    d->pubOutPortChange->loan(oportData.size())
        .and_then([&](auto &payload) {
            // we copy twice here - but this is a low-volume event, so it should be fine
            memcpy(payload, oportData.data(), oportData.size());
            d->pubOutPortChange->publish(payload);
        })
        .or_else([&](auto &error) {
            std::cerr << "Unable to loan sample. Error: " << error << std::endl;
        });
}

void SyntalosLink::updateInputPort(const std::shared_ptr<InputPortInfo> &iport)
{
    InputPortChange ipc(PortAction::CHANGE);
    ipc.id = iport->id();
    ipc.title = iport->d->title;
    ipc.dataTypeId = iport->d->dataTypeId;
    ipc.metadata = iport->d->metadata;
    ipc.throttleItemsPerSec = iport->d->throttleItemsPerSec;

    const auto iportData = ipc.toBytes();
    d->pubInPortChange->loan(iportData.size())
        .and_then([&](auto &payload) {
            // we copy twice here - but this is a low-volume event, so it should be fine
            memcpy(payload, iportData.data(), iportData.size());
            d->pubInPortChange->publish(payload);
        })
        .or_else([&](auto &error) {
            std::cerr << "Unable to loan sample. Error: " << error << std::endl;
        });
}

bool SyntalosLink::submitOutput(const std::shared_ptr<OutputPortInfo> &oport, const BaseDataType &data)
{
    auto memSize = data.memorySize();
    if (memSize < 0) {
        // we do not know the required memory size in advance, so we need to
        // perform a serialization and extra copy operation
        const auto bytes = data.toBytes();

        oport->d->ioxPub->loan(bytes.size())
            .and_then([&](auto &payload) {
                memcpy(payload, bytes.data(), bytes.size());
                oport->d->ioxPub->publish(payload);
            })
            .or_else([&](auto &error) {
                std::cerr << "Unable to loan sample. Error: " << error << std::endl;
            });
    } else {
        // Higher efficiency code-path since the size is known in advance
        oport->d->ioxPub->loan(memSize)
            .and_then([&](auto &payload) {
                if (!data.writeToMemory(payload, memSize))
                    std::cerr << "Failed to write data to shared memory!" << std::endl;
                oport->d->ioxPub->publish(payload);
            })
            .or_else([&](auto &error) {
                std::cerr << "Unable to loan sample. Error: " << error << std::endl;
            });
    }

    return true;
}

} // namespace Syntalos

Updated on 2024-11-06 at 17:10:29 +0000