fabric/streams/stream.h
fabric/streams/stream.h
Classes
Name | |
---|---|
class | VariantStreamSubscription |
class | VariantDataStream |
class | StreamSubscription |
class | DataStream |
Types
Name | |
---|---|
enum class | CommonMetadataKey { SrcModType, SrcModName, SrcModPortTitle, DataNameProposal} |
typedef QHash< CommonMetadataKey, QString > | CommonMetadataKeyMap |
using std::function< void(BaseDataType &)> | ProcessVarFn A function that can be used to process a variant value. |
Functions
Name | |
---|---|
uint | qHash(CommonMetadataKey key, uint seed) |
Types Documentation
enum CommonMetadataKey
Enumerator | Value | Description |
---|---|---|
SrcModType | ||
SrcModName | ||
SrcModPortTitle | ||
DataNameProposal |
typedef CommonMetadataKeyMap
typedef QHash<CommonMetadataKey, QString> CommonMetadataKeyMap;
using ProcessVarFn
using ProcessVarFn = std::function<void(BaseDataType &)>;
A function that can be used to process a variant value.
Functions Documentation
function qHash
inline uint qHash(
CommonMetadataKey key,
uint seed
)
Source code
/*
* Copyright (C) 2019-2024 Matthias Klumpp <matthias@tenstral.net>
*
* Licensed under the GNU Lesser General Public License Version 3
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the license, or
* (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this software. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <QDebug>
#include <QVariant>
#include <algorithm>
#include <functional>
#include <atomic>
#include <cmath>
#include <mutex>
#include <optional>
#include <sys/eventfd.h>
#include <thread>
#include <unistd.h>
#include "datactl/datatypes.h"
#include "readerwriterqueue.h"
#include "datactl/syclock.h"
using namespace moodycamel;
using namespace Syntalos;
enum class CommonMetadataKey {
SrcModType,
SrcModName,
SrcModPortTitle,
DataNameProposal
};
inline uint qHash(CommonMetadataKey key, uint seed)
{
return ::qHash(static_cast<uint>(key), seed);
}
// clang-format off
typedef QHash<CommonMetadataKey, QString> CommonMetadataKeyMap;
Q_GLOBAL_STATIC_WITH_ARGS(CommonMetadataKeyMap,
_commonMetadataKeyMap,
({
{CommonMetadataKey::SrcModType, QLatin1String("src_mod_type")},
{CommonMetadataKey::SrcModName, QLatin1String("src_mod_name")},
{CommonMetadataKey::SrcModPortTitle, QLatin1String("src_mod_port_title")},
{CommonMetadataKey::DataNameProposal, QLatin1String("data_name_proposal")},
})
);
// clang-format on
using ProcessVarFn = std::function<void(BaseDataType &)>;
class VariantStreamSubscription
{
public:
virtual ~VariantStreamSubscription();
virtual int dataTypeId() const = 0;
virtual QString dataTypeName() const = 0;
virtual bool callIfNextVar(const ProcessVarFn &fn) = 0;
virtual bool unsubscribe() = 0;
virtual bool active() const = 0;
virtual bool hasPending() const = 0;
virtual size_t approxPendingCount() const = 0;
virtual int enableNotify() = 0;
virtual void disableNotify() = 0;
virtual void setThrottleItemsPerSec(uint itemsPerSec, bool allowMore = true) = 0;
virtual void suspend() = 0;
virtual void resume() = 0;
virtual void clearPending() = 0;
virtual QHash<QString, QVariant> metadata() const = 0;
virtual QVariant metadataValue(const QString &key, const QVariant &defaultValue = QVariant()) const = 0;
virtual QVariant metadataValue(CommonMetadataKey key, const QVariant &defaultValue = QVariant()) const = 0;
// used internally by Syntalos
virtual void forcePushNullopt() = 0;
};
class VariantDataStream
{
public:
virtual ~VariantDataStream();
virtual QString dataTypeName() const = 0;
virtual int dataTypeId() const = 0;
virtual std::shared_ptr<VariantStreamSubscription> subscribeVar() = 0;
virtual void start() = 0;
virtual void stop() = 0;
virtual bool active() const = 0;
virtual bool hasSubscribers() const = 0;
virtual void pushRawData(int typeId, const void *data, size_t size) = 0;
virtual QHash<QString, QVariant> metadata() = 0;
virtual void setMetadata(const QHash<QString, QVariant> &metadata) = 0;
virtual void setMetadataValue(const QString &key, const QVariant &value) = 0;
virtual void setCommonMetadata(const QString &srcModType, const QString &srcModName, const QString &portTitle) = 0;
};
template<typename T>
class DataStream;
template<typename T>
class StreamSubscription : public VariantStreamSubscription
{
friend DataStream<T>;
public:
explicit StreamSubscription(DataStream<T> *stream)
: m_stream(stream),
m_queue(BlockingReaderWriterQueue<std::optional<T>>(256)),
m_eventfd(-1),
m_notify(false),
m_active(true),
m_suspended(false),
m_throttle(0),
m_skippedElements(0)
{
m_lastItemTime = currentTimePoint();
m_eventfd = eventfd(0, EFD_NONBLOCK);
if (m_eventfd < 0) {
qFatal("Unable to obtain eventfd for new stream subscription: %s", std::strerror(errno));
assert(0);
}
}
~StreamSubscription() override
{
m_active = false;
unsubscribe();
m_notify = false;
close(m_eventfd);
}
std::optional<T> next()
{
if (!m_active && m_queue.peek() == nullptr)
return std::nullopt;
std::optional<T> data;
m_queue.wait_dequeue(data);
return data;
}
std::optional<T> peekNext()
{
if (!m_active && m_queue.peek() == nullptr)
return std::nullopt;
std::optional<T> data;
if (!m_queue.try_dequeue(data))
return std::nullopt;
return data;
}
bool callIfNextVar(const ProcessVarFn &fn) override
{
auto res = peekNext();
if (res.has_value()) {
fn(res.value());
return true;
}
return false;
}
int dataTypeId() const override
{
return syDataTypeId<T>();
}
QString dataTypeName() const override
{
return BaseDataType::typeIdToString(syDataTypeId<T>());
}
QHash<QString, QVariant> metadata() const override
{
return m_metadata;
}
QVariant metadataValue(const QString &key, const QVariant &defaultValue = QVariant()) const override
{
return m_metadata.value(key, defaultValue);
}
QVariant metadataValue(CommonMetadataKey key, const QVariant &defaultValue = QVariant()) const override
{
return m_metadata.value(_commonMetadataKeyMap->value(key), defaultValue);
}
bool unsubscribe() override
{
// check if we are already unsubscribed
if (m_stream == nullptr)
return true;
if (m_stream->unsubscribe(this)) {
m_stream = nullptr;
return true;
}
return false;
}
bool active() const override
{
return m_active;
}
int enableNotify() override
{
m_notify = true;
return m_eventfd;
}
void disableNotify() override
{
m_notify = false;
}
void suspend() override
{
// suspend receiving new data
m_suspended = true;
// drop currently pending data
while (m_queue.pop()) {
}
}
void resume() override
{
m_suspended = false;
}
void clearPending() override
{
m_suspended = true;
while (m_queue.pop()) {
}
m_suspended = false;
}
size_t approxPendingCount() const override
{
return m_queue.size_approx();
}
bool hasPending() const override
{
return m_queue.size_approx() > 0;
}
uint throttleValue() const
{
return m_throttle;
}
uint retrieveApproxSkippedElements()
{
const uint si = m_skippedElements;
m_skippedElements = 0;
return si;
}
void setThrottleItemsPerSec(uint itemsPerSec, bool allowMore = true) override
{
uint newThrottle;
if (itemsPerSec == 0)
newThrottle = 0;
else
newThrottle = allowMore ? std::floor((1000.0 / itemsPerSec) * 1000)
: std::ceil((1000.0 / itemsPerSec) * 1000);
// clear current queue contents quickly in case we throttle down the subscription
// (this prevents clients from skipping elements too much if they are overeager
// when adjusting the throttle value)
if (newThrottle > m_throttle) {
// suspending and immediately resuming efficiently clears the current buffer
suspend();
resume();
}
// apply
m_throttle = newThrottle;
m_skippedElements = 0;
}
void forcePushNullopt() override
{
std::optional<T> v;
m_queue.enqueue(v);
}
private:
DataStream<T> *m_stream;
BlockingReaderWriterQueue<std::optional<T>> m_queue;
int m_eventfd;
std::atomic_bool m_notify;
std::atomic_bool m_active;
std::atomic_bool m_suspended;
std::atomic_uint m_throttle;
std::atomic_uint m_skippedElements;
// NOTE: These two variables are intentionally *not* threadsafe and are
// only ever manipulated by the stream (in case of the time) or only
// touched once when a stream is started (in case of the metadata).
QHash<QString, QVariant> m_metadata;
symaster_timepoint m_lastItemTime;
void setMetadata(const QHash<QString, QVariant> &metadata)
{
m_metadata = metadata;
}
void push(const T &data)
{
// don't accept any new data if we are suspended
if (m_suspended)
return;
// check if we can throttle the enqueueing speed of data
if (m_throttle != 0) {
const auto timeNow = currentTimePoint();
const auto durUsec = timeDiffUsec(timeNow, m_lastItemTime);
if (durUsec.count() < m_throttle) {
m_skippedElements++;
return;
}
m_lastItemTime = timeNow;
}
// actually send the data to the subscriber
m_queue.enqueue(std::optional<T>(data));
// ping the eventfd, in case anyone is listening for messages
if (m_notify) {
const uint64_t buffer = 1;
if (write(m_eventfd, &buffer, sizeof(buffer)) == -1)
qWarning().noquote() << "Unable to write to eventfd in" << dataTypeName()
<< "data subscription. FD:" << m_eventfd << "Error:" << std::strerror(errno);
}
}
void stop()
{
m_active = false;
m_queue.enqueue(std::nullopt);
}
void reset()
{
m_suspended = false;
m_active = true;
m_throttle = 0;
m_lastItemTime = currentTimePoint();
while (m_queue.pop()) {
} // ensure the queue is empty
}
};
template<typename T>
class DataStream : public VariantDataStream
{
public:
DataStream()
: m_active(false)
{
m_ownerId = std::this_thread::get_id();
}
~DataStream() override
{
terminate();
}
int dataTypeId() const override
{
return syDataTypeId<T>();
}
QString dataTypeName() const override
{
return BaseDataType::typeIdToString(syDataTypeId<T>());
}
QHash<QString, QVariant> metadata() override
{
return m_metadata;
}
void setMetadata(const QHash<QString, QVariant> &metadata) override
{
m_metadata = metadata;
}
void setMetadataValue(const QString &key, const QVariant &value) override
{
m_metadata[key] = value;
}
void setSuggestedDataName(const QString &value)
{
m_metadata[_commonMetadataKeyMap->value(CommonMetadataKey::DataNameProposal)] = value;
}
void removeMetadata(const QString &key)
{
m_metadata.remove(key);
}
void setCommonMetadata(const QString &srcModType, const QString &srcModName, const QString &portTitle) override
{
setMetadataValue(_commonMetadataKeyMap->value(CommonMetadataKey::SrcModType), srcModType);
setMetadataValue(_commonMetadataKeyMap->value(CommonMetadataKey::SrcModName), srcModName);
if (!portTitle.isEmpty())
setMetadataValue(_commonMetadataKeyMap->value(CommonMetadataKey::SrcModPortTitle), portTitle);
}
std::shared_ptr<StreamSubscription<T>> subscribe()
{
// we don't permit subscriptions to an active stream
assert(!m_active);
if (m_active)
return nullptr;
std::lock_guard<std::mutex> lock(m_mutex);
std::shared_ptr<StreamSubscription<T>> sub(new StreamSubscription<T>(this));
sub->setMetadata(m_metadata);
m_subs.push_back(sub);
return sub;
}
std::shared_ptr<VariantStreamSubscription> subscribeVar() override
{
return subscribe();
}
bool unsubscribe(StreamSubscription<T> *sub)
{
// we don't permit unsubscribing from an active stream
assert(!m_active);
if (m_active)
return false;
std::lock_guard<std::mutex> lock(m_mutex);
for (uint i = 0; i < m_subs.size(); i++) {
if (m_subs.at(i).get() == sub) {
m_subs.erase(m_subs.begin() + i);
return true;
}
}
return false;
}
void start() override
{
m_ownerId = std::this_thread::get_id();
for (auto const &sub : m_subs) {
sub->reset();
sub->setMetadata(m_metadata);
}
m_active = true;
}
void stop() override
{
for (auto const &sub : m_subs)
sub->stop();
m_active = false;
}
void push(const T &data)
{
if (!m_active)
return;
for (auto &sub : m_subs)
sub->push(data);
}
void pushRawData(int typeId, const void *data, size_t size) override
{
if (!m_active)
return;
if (typeId != syDataTypeId<T>()) {
qCritical().noquote() << "Invalid data type ID" << typeId << "for stream of type" << dataTypeName();
return;
}
const T &entity = T::fromMemory(data, size);
for (auto &sub : m_subs)
sub->push(entity);
}
void terminate()
{
stop();
// "unsubscribe" use forcefully from any active subscription,
// as this stream is terminated.
for (auto const &sub : m_subs)
sub->m_stream = nullptr;
m_subs.clear();
}
bool active() const override
{
return m_active;
}
bool hasSubscribers() const override
{
return !m_subs.empty();
}
private:
std::thread::id m_ownerId;
std::atomic_bool m_active;
std::mutex m_mutex;
std::vector<std::shared_ptr<StreamSubscription<T>>> m_subs;
QHash<QString, QVariant> m_metadata;
};
Updated on 2024-11-06 at 17:10:29 +0000