StreamSubscriptionAdapter
StreamSubscriptionAdapter
A view over a StreamSubscription
#include <stream.h>
Inherits from StreamSubscription< To >, VariantStreamSubscription
Public Functions
| Name | |
|---|---|
| StreamSubscriptionAdapter(std::shared_ptr< StreamSubscription< From > > inner) | |
| virtual std::optional< To > | next() override Obtain next element from stream, block in case there is no new element. |
| virtual std::optional< To > | peekNext() override Obtain the next stream element if there is any, otherwise return std::nullopt This function behaves the same as next(), but does return immediately without blocking. To see if the stream as ended, check the active() property on this subscription. |
| virtual bool | callIfNextVar(const ProcessVarFn & fn) override Call function on the next element, if there is any. |
| virtual bool | unsubscribe() override |
| virtual bool | isActive() const override |
| virtual bool | hasPending() const override |
| virtual size_t | approxPendingCount() const override |
| virtual ssize_t | approxItemMemSize() const override Approximate in-memory size of a single queued item, in bytes. |
| virtual int | enableNotify() override Enable notifiucations on this stream subscription. |
| virtual void | disableNotify() override Disable notifications via eventFD. |
| virtual void | acknowledgeNotify() override Acknowledge an eventfd notification after the fd became readable. |
| virtual void | rearmNotifyIfPending() override Re-arm the eventfd if there is still undrained data. |
| virtual void | setThrottleItemsPerSec(uint itemsPerSec, bool allowMore =true) override Set a throttle on the output frequency of this subscription By setting a positive integer value, the output of this subscription is effectively limited to the given integer value per second. This will result in some values being thrown away. By setting a throttle value of 0, all output is passed through and no limits apply. Internally, the throttle value is represented as the minimum needed time in microseconds between elements. This effectively means you can not throttle a connection over 1000000 items/sec. |
| virtual void | suspend() override Stop receiving data, but do not unsubscribe from the stream. |
| virtual void | resume() override Resume data transmission, reverses suspend() |
| virtual void | clearPending() override Clear all pending data from the subscription. |
| virtual MetaStringMap | metadata() const override |
| virtual MetaValue | metadataValue(const QString & key, const MetaValue & defaultValue =nullptr) const override |
| virtual MetaValue | metadataValue(CommonMetadataKey key, const MetaValue & defaultValue =nullptr) const override |
| virtual void | forcePushNullopt() override |
| virtual std::shared_ptr< VariantStreamSubscription > | sourceSubscriptionVar() override If this subscription is a type-conversion view, return the underlying source-typed subscription it adapts; otherwise return nullptr. |
Additional inherited members
Public Functions inherited from StreamSubscription< To >
| Name | |
|---|---|
| StreamSubscription(DataStream< T > * stream) | |
| ~StreamSubscription() override | |
| virtual int | dataTypeId() const override |
| virtual QString | dataTypeName() const override |
| bool | isSuspended() const |
| uint | throttleValue() const |
| uint | retrieveApproxSkippedElements() |
Public Functions inherited from VariantStreamSubscription
| Name | |
|---|---|
| virtual | ~VariantStreamSubscription() |
| virtual int | dataTypeId() const =0 |
| virtual QString | dataTypeName() const =0 |
Detailed Description
template <typename From ,
typename To >
class StreamSubscriptionAdapter;A view over a StreamSubscription
Used to make incompatible-typed ports connectable when To has an explicit To(const From &) constructor. The adapter is-a StreamSubscription
All other operations delegate to the inner subscription, so throttling, suspension, metadata, eventfd notifications etc. behave as if the consumer had subscribed directly.
Public Functions Documentation
function StreamSubscriptionAdapter
inline explicit StreamSubscriptionAdapter(
std::shared_ptr< StreamSubscription< From > > inner
)function next
inline virtual std::optional< To > next() overrideObtain next element from stream, block in case there is no new element.
Return: The obtained value, or std::nullopt in case the stream ended.
Reimplements: StreamSubscription::next
function peekNext
inline virtual std::optional< To > peekNext() overrideObtain the next stream element if there is any, otherwise return std::nullopt This function behaves the same as next(), but does return immediately without blocking. To see if the stream as ended, check the active() property on this subscription.
Reimplements: StreamSubscription::peekNext
function callIfNextVar
inline virtual bool callIfNextVar(
const ProcessVarFn & fn
) overrideCall function on the next element, if there is any.
Parameters:
- fn The function to call with the next element.
Return: True if an element was processed, false if there was no element.
Reimplements: StreamSubscription::callIfNextVar
function unsubscribe
inline virtual bool unsubscribe() overrideReimplements: StreamSubscription::unsubscribe
function isActive
inline virtual bool isActive() const overrideReimplements: StreamSubscription::isActive
function hasPending
inline virtual bool hasPending() const overrideReimplements: StreamSubscription::hasPending
function approxPendingCount
inline virtual size_t approxPendingCount() const overrideReimplements: StreamSubscription::approxPendingCount
function approxItemMemSize
inline virtual ssize_t approxItemMemSize() const overrideApproximate in-memory size of a single queued item, in bytes.
Reimplements: StreamSubscription::approxItemMemSize
Sampled once from a real item as it passes through the stream, so the overload monitor can weight the pending count by actual memory pressure. Returns < 0 if no real size is known yet or the data type does not report one (see BaseDataType::memorySize()).
function enableNotify
inline virtual int enableNotify() overrideEnable notifiucations on this stream subscription.
Return: An eventfd file descriptor that is written to when new data is received.
Reimplements: StreamSubscription::enableNotify
function disableNotify
inline virtual void disableNotify() overrideDisable notifications via eventFD.
Reimplements: StreamSubscription::disableNotify
Do not disable notifications unless you know for sure that nothing is listening on this subscription anymore.
function acknowledgeNotify
inline virtual void acknowledgeNotify() overrideAcknowledge an eventfd notification after the fd became readable.
Reimplements: StreamSubscription::acknowledgeNotify
Consumers must call this when their watched eventfd wakes them. It drains the fd and clears the coalesced-wakeup flag, so the next push will write the eventfd again.
function rearmNotifyIfPending
inline virtual void rearmNotifyIfPending() overrideRe-arm the eventfd if there is still undrained data.
Reimplements: StreamSubscription::rearmNotifyIfPending
Consumers that process only a bounded batch of items per wakeup must call this after the batch, so the coalesced eventfd wakes them again until the queue is fully drained. A no-op if nothing is pending.
function setThrottleItemsPerSec
inline virtual void setThrottleItemsPerSec(
uint itemsPerSec,
bool allowMore =true
) overrideSet a throttle on the output frequency of this subscription By setting a positive integer value, the output of this subscription is effectively limited to the given integer value per second. This will result in some values being thrown away. By setting a throttle value of 0, all output is passed through and no limits apply. Internally, the throttle value is represented as the minimum needed time in microseconds between elements. This effectively means you can not throttle a connection over 1000000 items/sec.
Reimplements: StreamSubscription::setThrottleItemsPerSec
function suspend
inline virtual void suspend() overrideStop receiving data, but do not unsubscribe from the stream.
Reimplements: StreamSubscription::suspend
function resume
inline virtual void resume() overrideResume data transmission, reverses suspend()
Reimplements: StreamSubscription::resume
function clearPending
inline virtual void clearPending() overrideClear all pending data from the subscription.
Reimplements: StreamSubscription::clearPending
function metadata
inline virtual MetaStringMap metadata() const overrideReimplements: StreamSubscription::metadata
function metadataValue
inline virtual MetaValue metadataValue(
const QString & key,
const MetaValue & defaultValue =nullptr
) const overrideReimplements: VariantStreamSubscription::metadataValue
function metadataValue
inline virtual MetaValue metadataValue(
CommonMetadataKey key,
const MetaValue & defaultValue =nullptr
) const overrideReimplements: VariantStreamSubscription::metadataValue
function forcePushNullopt
inline virtual void forcePushNullopt() overrideReimplements: StreamSubscription::forcePushNullopt
function sourceSubscriptionVar
inline virtual std::shared_ptr< VariantStreamSubscription > sourceSubscriptionVar() overrideIf this subscription is a type-conversion view, return the underlying source-typed subscription it adapts; otherwise return nullptr.
Reimplements: VariantStreamSubscription::sourceSubscriptionVar
Used by the IPC stream exporter to serialize the source’s native type onto the wire (so cross-process consumers with different but compatible declared input types each get correctly-decoded samples).
Updated on 2026-06-22 at 03:54:46 +0000