StreamSubscriptionAdapter

StreamSubscriptionAdapter

StreamSubscriptionAdapter

A view over a StreamSubscription that yields values of type To. More…

#include <stream.h>

Inherits from StreamSubscription< To >, VariantStreamSubscription

Public Functions

Name
StreamSubscriptionAdapter(std::shared_ptr< StreamSubscription< From > > inner)
virtual std::optional< To >next() override
Obtain next element from stream, block in case there is no new element.
virtual std::optional< To >peekNext() override
Obtain the next stream element if there is any, otherwise return std::nullopt This function behaves the same as next(), but does return immediately without blocking. To see if the stream as ended, check the active() property on this subscription.
virtual boolcallIfNextVar(const ProcessVarFn & fn) override
Call function on the next element, if there is any.
virtual boolunsubscribe() override
virtual boolisActive() const override
virtual boolhasPending() const override
virtual size_tapproxPendingCount() const override
virtual intenableNotify() override
Enable notifiucations on this stream subscription.
virtual voiddisableNotify() override
Disable notifications via eventFD.
virtual voidsetThrottleItemsPerSec(uint itemsPerSec, bool allowMore =true) override
Set a throttle on the output frequency of this subscription By setting a positive integer value, the output of this subscription is effectively limited to the given integer value per second. This will result in some values being thrown away. By setting a throttle value of 0, all output is passed through and no limits apply. Internally, the throttle value is represented as the minimum needed time in microseconds between elements. This effectively means you can not throttle a connection over 1000000 items/sec.
virtual voidsuspend() override
Stop receiving data, but do not unsubscribe from the stream.
virtual voidresume() override
Resume data transmission, reverses suspend()
virtual voidclearPending() override
Clear all pending data from the subscription.
virtual MetaStringMapmetadata() const override
virtual MetaValuemetadataValue(const QString & key, const MetaValue & defaultValue =nullptr) const override
virtual MetaValuemetadataValue(CommonMetadataKey key, const MetaValue & defaultValue =nullptr) const override
virtual voidforcePushNullopt() override

Additional inherited members

Public Functions inherited from StreamSubscription< To >

Name
StreamSubscription(DataStream< T > * stream)
~StreamSubscription() override
virtual intdataTypeId() const override
virtual QStringdataTypeName() const override
boolisSuspended() const
uintthrottleValue() const
uintretrieveApproxSkippedElements()

Public Functions inherited from VariantStreamSubscription

Name
virtual~VariantStreamSubscription()
virtual intdataTypeId() const =0
virtual QStringdataTypeName() const =0

Detailed Description

template <typename From ,
typename To >
class StreamSubscriptionAdapter;

A view over a StreamSubscription that yields values of type To.

Used to make incompatible-typed ports connectable when To has an explicit To(const From &) constructor. The adapter is-a StreamSubscription so the existing dynamic_pointer_cast in StreamInputPort::subscription() succeeds; next()/peekNext() pull a From from the inner subscription and construct a To on the fly.

All other operations delegate to the inner subscription, so throttling, suspension, metadata, eventfd notifications etc. behave as if the consumer had subscribed directly.

Public Functions Documentation

function StreamSubscriptionAdapter

inline explicit StreamSubscriptionAdapter(
    std::shared_ptr< StreamSubscription< From > > inner
)

function next

inline virtual std::optional< To > next() override

Obtain next element from stream, block in case there is no new element.

Return: The obtained value, or std::nullopt in case the stream ended.

Reimplements: StreamSubscription::next

function peekNext

inline virtual std::optional< To > peekNext() override

Obtain the next stream element if there is any, otherwise return std::nullopt This function behaves the same as next(), but does return immediately without blocking. To see if the stream as ended, check the active() property on this subscription.

Reimplements: StreamSubscription::peekNext

function callIfNextVar

inline virtual bool callIfNextVar(
    const ProcessVarFn & fn
) override

Call function on the next element, if there is any.

Parameters:

  • fn The function to call with the next element.

Return: True if an element was processed, false if there was no element.

Reimplements: StreamSubscription::callIfNextVar

function unsubscribe

inline virtual bool unsubscribe() override

Reimplements: StreamSubscription::unsubscribe

function isActive

inline virtual bool isActive() const override

Reimplements: StreamSubscription::isActive

function hasPending

inline virtual bool hasPending() const override

Reimplements: StreamSubscription::hasPending

function approxPendingCount

inline virtual size_t approxPendingCount() const override

Reimplements: StreamSubscription::approxPendingCount

function enableNotify

inline virtual int enableNotify() override

Enable notifiucations on this stream subscription.

Return: An eventfd file descriptor that is written to when new data is received.

Reimplements: StreamSubscription::enableNotify

function disableNotify

inline virtual void disableNotify() override

Disable notifications via eventFD.

Reimplements: StreamSubscription::disableNotify

Do not disable notifications unless you know for sure that nothing is listening on this subscription anymore.

function setThrottleItemsPerSec

inline virtual void setThrottleItemsPerSec(
    uint itemsPerSec,
    bool allowMore =true
) override

Set a throttle on the output frequency of this subscription By setting a positive integer value, the output of this subscription is effectively limited to the given integer value per second. This will result in some values being thrown away. By setting a throttle value of 0, all output is passed through and no limits apply. Internally, the throttle value is represented as the minimum needed time in microseconds between elements. This effectively means you can not throttle a connection over 1000000 items/sec.

Reimplements: StreamSubscription::setThrottleItemsPerSec

function suspend

inline virtual void suspend() override

Stop receiving data, but do not unsubscribe from the stream.

Reimplements: StreamSubscription::suspend

function resume

inline virtual void resume() override

Resume data transmission, reverses suspend()

Reimplements: StreamSubscription::resume

function clearPending

inline virtual void clearPending() override

Clear all pending data from the subscription.

Reimplements: StreamSubscription::clearPending

function metadata

inline virtual MetaStringMap metadata() const override

Reimplements: StreamSubscription::metadata

function metadataValue

inline virtual MetaValue metadataValue(
    const QString & key,
    const MetaValue & defaultValue =nullptr
) const override

Reimplements: VariantStreamSubscription::metadataValue

function metadataValue

inline virtual MetaValue metadataValue(
    CommonMetadataKey key,
    const MetaValue & defaultValue =nullptr
) const override

Reimplements: VariantStreamSubscription::metadataValue

function forcePushNullopt

inline virtual void forcePushNullopt() override

Reimplements: StreamSubscription::forcePushNullopt


Updated on 2026-05-15 at 01:05:43 +0000