StreamSubscription
StreamSubscription
Inherits from VariantStreamSubscription
Public Functions
| Name | |
|---|---|
| StreamSubscription(DataStream< T > * stream) | |
| ~StreamSubscription() override | |
| virtual std::optional< T > | next() Obtain next element from stream, block in case there is no new element. |
| virtual std::optional< T > | peekNext() Obtain the next stream element if there is any, otherwise return std::nullopt This function behaves the same as next(), but does return immediately without blocking. To see if the stream as ended, check the active() property on this subscription. |
| virtual bool | callIfNextVar(const ProcessVarFn & fn) override Call function on the next element, if there is any. |
| virtual int | dataTypeId() const override |
| virtual QString | dataTypeName() const override |
| virtual MetaStringMap | metadata() const override |
| virtual MetaValue | metadataValue(const QString & key, const MetaValue & defaultValue =nullptr) const override |
| virtual MetaValue | metadataValue(CommonMetadataKey key, const MetaValue & defaultValue =nullptr) const override |
| template <typename MT > MT | metadataValue(const QString & key, MT fallback) const |
| template <typename MT > MT | metadataValue(CommonMetadataKey key, MT fallback) const |
| virtual bool | unsubscribe() override |
| virtual bool | isActive() const override |
| bool | isSuspended() const |
| virtual int | enableNotify() override Enable notifiucations on this stream subscription. |
| virtual void | disableNotify() override Disable notifications via eventFD. |
| virtual void | acknowledgeNotify() override Acknowledge an eventfd notification after the fd became readable. |
| virtual void | rearmNotifyIfPending() override Re-arm the eventfd if there is still undrained data. |
| virtual void | suspend() override Stop receiving data, but do not unsubscribe from the stream. |
| virtual void | resume() override Resume data transmission, reverses suspend() |
| virtual void | clearPending() override Clear all pending data from the subscription. |
| virtual size_t | approxPendingCount() const override |
| virtual ssize_t | approxItemMemSize() const override Approximate in-memory size of a single queued item, in bytes. |
| virtual bool | hasPending() const override |
| uint | throttleValue() const |
| uint | retrieveApproxSkippedElements() |
| virtual void | setThrottleItemsPerSec(uint itemsPerSec, bool allowMore =true) override Set a throttle on the output frequency of this subscription By setting a positive integer value, the output of this subscription is effectively limited to the given integer value per second. This will result in some values being thrown away. By setting a throttle value of 0, all output is passed through and no limits apply. Internally, the throttle value is represented as the minimum needed time in microseconds between elements. This effectively means you can not throttle a connection over 1000000 items/sec. |
| virtual void | forcePushNullopt() override |
Additional inherited members
Public Functions inherited from VariantStreamSubscription
| Name | |
|---|---|
| virtual | ~VariantStreamSubscription() |
| virtual std::shared_ptr< VariantStreamSubscription > | sourceSubscriptionVar() If this subscription is a type-conversion view, return the underlying source-typed subscription it adapts; otherwise return nullptr. |
Detailed Description
template <typename T >
class StreamSubscription;Public Functions Documentation
function StreamSubscription
inline explicit StreamSubscription(
DataStream< T > * stream
)function ~StreamSubscription
inline ~StreamSubscription() overridefunction next
inline virtual std::optional< T > next()Obtain next element from stream, block in case there is no new element.
Return: The obtained value, or std::nullopt in case the stream ended.
Reimplemented by: StreamSubscriptionAdapter::next
function peekNext
inline virtual std::optional< T > peekNext()Obtain the next stream element if there is any, otherwise return std::nullopt This function behaves the same as next(), but does return immediately without blocking. To see if the stream as ended, check the active() property on this subscription.
Reimplemented by: StreamSubscriptionAdapter::peekNext
function callIfNextVar
inline virtual bool callIfNextVar(
const ProcessVarFn & fn
) overrideCall function on the next element, if there is any.
Parameters:
- fn The function to call with the next element.
Return: True if an element was processed, false if there was no element.
Reimplements: VariantStreamSubscription::callIfNextVar
Reimplemented by: StreamSubscriptionAdapter::callIfNextVar
function dataTypeId
inline virtual int dataTypeId() const overrideReimplements: VariantStreamSubscription::dataTypeId
function dataTypeName
inline virtual QString dataTypeName() const overrideReimplements: VariantStreamSubscription::dataTypeName
function metadata
inline virtual MetaStringMap metadata() const overrideReimplements: VariantStreamSubscription::metadata
Reimplemented by: StreamSubscriptionAdapter::metadata
function metadataValue
inline virtual MetaValue metadataValue(
const QString & key,
const MetaValue & defaultValue =nullptr
) const overrideReimplements: VariantStreamSubscription::metadataValue
function metadataValue
inline virtual MetaValue metadataValue(
CommonMetadataKey key,
const MetaValue & defaultValue =nullptr
) const overrideReimplements: VariantStreamSubscription::metadataValue
function metadataValue
template <typename MT >
inline MT metadataValue(
const QString & key,
MT fallback
) constfunction metadataValue
template <typename MT >
inline MT metadataValue(
CommonMetadataKey key,
MT fallback
) constfunction unsubscribe
inline virtual bool unsubscribe() overrideReimplements: VariantStreamSubscription::unsubscribe
Reimplemented by: StreamSubscriptionAdapter::unsubscribe
function isActive
inline virtual bool isActive() const overrideReimplements: VariantStreamSubscription::isActive
Reimplemented by: StreamSubscriptionAdapter::isActive
function isSuspended
inline bool isSuspended() constfunction enableNotify
inline virtual int enableNotify() overrideEnable notifiucations on this stream subscription.
Return: An eventfd file descriptor that is written to when new data is received.
Reimplements: VariantStreamSubscription::enableNotify
Reimplemented by: StreamSubscriptionAdapter::enableNotify
function disableNotify
inline virtual void disableNotify() overrideDisable notifications via eventFD.
Reimplements: VariantStreamSubscription::disableNotify
Reimplemented by: StreamSubscriptionAdapter::disableNotify
Do not disable notifications unless you know for sure that nothing is listening on this subscription anymore.
function acknowledgeNotify
inline virtual void acknowledgeNotify() overrideAcknowledge an eventfd notification after the fd became readable.
Reimplements: VariantStreamSubscription::acknowledgeNotify
Reimplemented by: StreamSubscriptionAdapter::acknowledgeNotify
Consumers must call this when their watched eventfd wakes them. It drains the fd and clears the coalesced-wakeup flag, so the next push will write the eventfd again.
function rearmNotifyIfPending
inline virtual void rearmNotifyIfPending() overrideRe-arm the eventfd if there is still undrained data.
Reimplements: VariantStreamSubscription::rearmNotifyIfPending
Reimplemented by: StreamSubscriptionAdapter::rearmNotifyIfPending
Consumers that process only a bounded batch of items per wakeup must call this after the batch, so the coalesced eventfd wakes them again until the queue is fully drained. A no-op if nothing is pending.
function suspend
inline virtual void suspend() overrideStop receiving data, but do not unsubscribe from the stream.
Reimplements: VariantStreamSubscription::suspend
Reimplemented by: StreamSubscriptionAdapter::suspend
function resume
inline virtual void resume() overrideResume data transmission, reverses suspend()
Reimplements: VariantStreamSubscription::resume
Reimplemented by: StreamSubscriptionAdapter::resume
function clearPending
inline virtual void clearPending() overrideClear all pending data from the subscription.
Reimplements: VariantStreamSubscription::clearPending
Reimplemented by: StreamSubscriptionAdapter::clearPending
function approxPendingCount
inline virtual size_t approxPendingCount() const overrideReimplements: VariantStreamSubscription::approxPendingCount
Reimplemented by: StreamSubscriptionAdapter::approxPendingCount
function approxItemMemSize
inline virtual ssize_t approxItemMemSize() const overrideApproximate in-memory size of a single queued item, in bytes.
Reimplements: VariantStreamSubscription::approxItemMemSize
Reimplemented by: StreamSubscriptionAdapter::approxItemMemSize
Sampled once from a real item as it passes through the stream, so the overload monitor can weight the pending count by actual memory pressure. Returns < 0 if no real size is known yet or the data type does not report one (see BaseDataType::memorySize()).
function hasPending
inline virtual bool hasPending() const overrideReimplements: VariantStreamSubscription::hasPending
Reimplemented by: StreamSubscriptionAdapter::hasPending
function throttleValue
inline uint throttleValue() constfunction retrieveApproxSkippedElements
inline uint retrieveApproxSkippedElements()function setThrottleItemsPerSec
inline virtual void setThrottleItemsPerSec(
uint itemsPerSec,
bool allowMore =true
) overrideSet a throttle on the output frequency of this subscription By setting a positive integer value, the output of this subscription is effectively limited to the given integer value per second. This will result in some values being thrown away. By setting a throttle value of 0, all output is passed through and no limits apply. Internally, the throttle value is represented as the minimum needed time in microseconds between elements. This effectively means you can not throttle a connection over 1000000 items/sec.
Reimplements: VariantStreamSubscription::setThrottleItemsPerSec
Reimplemented by: StreamSubscriptionAdapter::setThrottleItemsPerSec
function forcePushNullopt
inline virtual void forcePushNullopt() overrideReimplements: VariantStreamSubscription::forcePushNullopt
Reimplemented by: StreamSubscriptionAdapter::forcePushNullopt
Updated on 2026-06-12 at 19:41:28 +0000