|
template<class... Ts> |
| stream_distribution_tree (scheduled_actor *selfptr, Ts &&... xs) |
|
Policy & | policy () |
|
const Policy & | policy () const |
|
void | handle (inbound_path *path, downstream_msg::batch &x) override |
|
void | handle (inbound_path *path, downstream_msg::close &x) override |
|
void | handle (inbound_path *path, downstream_msg::forced_close &x) override |
|
bool | handle (stream_slots slots, upstream_msg::ack_open &x) override |
|
void | handle (stream_slots slots, upstream_msg::drop &x) override |
|
void | handle (stream_slots slots, upstream_msg::forced_drop &x) override |
|
bool | done () const override |
| Returns whether the manager has reached the end and can be discarded safely.
|
|
bool | idle () const noexcept override |
| Returns whether the manager cannot make any progress on its own at the moment.
|
|
downstream_manager_type & | out () override |
| Returns the manager for downstream communication.
|
|
| stream_manager (scheduled_actor *selfptr, stream_priority prio=stream_priority::normal) |
|
virtual void | handle (stream_slots slots, upstream_msg::ack_batch &x) |
|
virtual void | stop (error reason=none) |
| Closes all output and input paths and sends the final result to the client.
|
|
virtual void | shutdown () |
| Mark this stream as shutting down, only allowing flushing all related buffers of in- and outbound paths.
|
|
void | advance () |
| Tries to advance the stream by generating more credit or by sending batches.
|
|
virtual void | push () |
| Pushes new data to downstream actors by sending batches.
|
|
virtual bool | congested () const noexcept |
| Returns true if the handler is not able to process any further batches since it is unable to make progress sending on its own.
|
|
virtual void | deliver_handshake (response_promise &rp, stream_slot slot, message handshake) |
| Sends a handshake to dest .
|
|
virtual bool | generate_messages () |
| Tries to generate new messages for the stream.
|
|
const downstream_manager & | out () const |
| Returns the manager for downstream communication.
|
|
virtual void | cycle_timeout (size_t cycle_nr) |
| Advances time.
|
|
virtual void | register_input_path (inbound_path *x) |
| Informs the manager that a new input path opens.
|
|
virtual void | deregister_input_path (inbound_path *x) noexcept |
| Informs the manager that an input path closes.
|
|
virtual void | remove_input_path (stream_slot slot, error reason, bool silent) |
| Removes an input path.
|
|
bool | running () const noexcept |
| Returns whether this stream is neither shutting down nor has stopped.
|
|
bool | continuous () const noexcept |
| Returns whether this stream remains open even if no in- or outbound paths exist.
|
|
void | continuous (bool x) noexcept |
| Sets whether this stream remains open even if no in- or outbound paths exist.
|
|
const inbound_paths_list & | inbound_paths () const noexcept |
| Returns the list of inbound paths.
|
|
inbound_path * | get_inbound_path (stream_slot x) const noexcept |
| Returns the inbound paths at slot x .
|
|
bool | inbound_paths_idle () const noexcept |
| Queries whether all inbound paths are up-to-date and have non-zero credit.
|
|
scheduled_actor * | self () |
| Returns the parent actor.
|
|
virtual int32_t | acquire_credit (inbound_path *path, int32_t desired) |
| Acquires credit on an inbound path.
|
|
stream_slot | add_unchecked_inbound_path_impl (rtti_pair rtti) |
| Adds the current sender as an inbound path.
|
|
| ref_counted (const ref_counted &) |
|
ref_counted & | operator= (const ref_counted &) |
|
void | ref () const noexcept |
| Increases reference count by one.
|
|
void | deref () const noexcept |
| Decreases reference count by one and calls request_deletion when it drops to zero.
|
|
bool | unique () const noexcept |
| Queries whether there is exactly one reference.
|
|
size_t | get_reference_count () const noexcept |
|
virtual void | request_deletion (bool decremented_rc) const noexcept |
| Default implementations calls `delete this, but can be overriden in case deletion depends on some condition or the class doesn't use default new/delete.
|
|
template<
class Policy>
class caf::detail::stream_distribution_tree< Policy >
A stream distribution tree consist of peers forming an acyclic graph.
The user is responsible for making sure peers do not form a loop. Data is flooded along the tree. Each peer serves any number of subscribers. The policy of the tree enables subscriptions to different chunks of the whole stream (substreams).
The tree uses two CAF streams between each pair of peers for transmitting data. This automatically adds backpressure to the system, i.e., no peer can overwhelm others.
Policies need to provide the following member types and functions:
};
Identifies a statically typed actor.
Definition typed_actor.hpp:58