Replies: 4 comments
-
An another interesting use case: dropping of all waiting messages from the event_queue if a special Let's suppose there is an agent that processes But there is also |
Beta Was this translation helpful? Give feedback.
-
Here is another open question and it's related to Event Queue Hook feature: should the hook be applied to the original event_queue received from a dispatcher or to custom event_queue returned by It seems to me that the hook has to be applied to the custom event_queue because the custom queue will receive all messages (while the original event_queue may get only a few messages). But there is a possibility to another variant: both event_queues (the original and the custom) have to be passed to the event queue hook. But there is a problem: how to avoid invocation of the hook if struct use_original_queue_t {};
using queue_interception_result_t = std::variant<event_queue_t *, use_original_queue_t>;
[[nodiscard]]
virtual queue_interception_result_t
so_try_intercept_event_queue(event_queue_t * original_queue); In that case the void
agent_t::so_bind_to_dispatcher(
event_queue_t & queue ) noexcept
{
// Since v.5.5.24 we should use event_queue_hook to get an
// actual event_queue.
auto * disp_queue = impl::internal_env_iface_t{ m_env }
.event_queue_on_bind( this, &queue );
// Since v.5.8.1 we should allow to intercept and customize queue.
disp_queue = std::visit(
[&]( auto result )
{
using result_type = decltype(result);
if constexpr( std::is_same_v<event_queue_t *, result_type> )
{
// Custom event queue should be used.
// It has to be passed to event_queue_hook too.
return impl::internal_env_iface_t{ m_env }
.event_queue_on_bind( this, result );
}
else if constexpr( std::is_same_v<use_original_queue_t, result_type> )
{
// The original queue has to be used.
return disp_queue;
}
else {
... // Some kind of compile-time error should be raised here.
}
},
so_try_intercept_event_queue( disp_queue ) );
std::lock_guard< default_rw_spinlock_t > queue_lock{ m_event_queue_lock };
// Cooperation usage counter should be incremented.
// It will be decremented during final agent event execution.
impl::coop_private_iface_t::increment_usage_count( *m_agent_coop );
// A starting demand must be sent first.
actual_queue->push_evt_start(
execution_demand_t(
this,
message_limit::control_block_t::none(),
0,
typeid(void),
message_ref_t(),
&agent_t::demand_handler_on_start ) );
// Only then pointer to the queue could be stored.
m_event_queue = actual_queue;
} |
Beta Was this translation helpful? Give feedback.
-
A few notes related to the "double locking problem" described in the starting message.
|
Beta Was this translation helpful? Give feedback.
-
A note related to this sentence:
The |
Beta Was this translation helpful? Give feedback.
-
Let's look at several cases that can't be handled properly by the current SObjectizer-5 architecture and working principles:
msg_urgent_data
andmsg_ordinary_data
messages. If there are severalmsg_ordinary_data
messages in the queue and a newmsg_urgent_data
arrives then the newmsg_urgent_data
has to be handled before pendingmsg_ordinary_data
messages.As a workaround, the standard prio_one_thread::strictly_ordered dispatcher can be used with two agents bound to it: one with a higher priority for handling
msg_urgent_data
, another with a lower priority for handlingmsg_ordinary_data
messages. Both agents have to share some common data, but this isn't a problem because only one of them will be working at the moment, so there won't be any data races.This workaround has several significant drawbacks: sharing data between several agents makes implementation more complex than needed. But, more importantly, this workaround isn't applicable if we have to bind such agents to different types of dispatcher, for example, to thread_pool dispatcher.
msg_outgoing_data
: if data from a message can be written to the destination right now, thenmsg_outgoing_data
has to be processed immediately. But if the destination isn't writable at the moment (temporarily unavailable, for example) then themsg_outgoing_data
has to be queued inside the agent.Sometimes the agent sends a
msg_check_pending_data
message to itself. But there is a trick: there is no need to have more than one instance ofmsg_check_pending_data
in the agent's queue. Moreover, old instances ofmsg_check_pending_data
have to be ignored if a new instance ofmsg_check_pending_data
is sent. For example, let's say the agent queue looks like this:When a new
msg_check_pending_data
is being sent, the old instances ofmsg_check_pending_data
with timestamps 14:15:53 and 14:15:59 have to be removed from the queue.As a workaround the agent can store the timestamp of the last
msg_check_pending_data
sent and then ignore all instances ofmsg_check_pending_data
with different timestamps.Of course, this workaround has its own drawbacks: the agent can only store the last timestamp only if a
msg_check_pending_data
is sent by the agent itself, it is impossible ifmsg_check_pending_data
is sent by someone else. Event-handlers for oldmsg_check_pending_data
will still be called anyway, even if those messages have to be discarded.a la
Erlang. For example, an agent has messages of typeM1
,M2
andM3
in the queue, but only handles messages of typeM1
in the current state. Messages of all other types will be discarded by SObjectizer, because there are no handlers for them in the current state of the agent. But what to do if the agent wants to keep them in the queue and return to processing them after switching to another state?See at Akka's stash as another implementation of selective receive.
There is no workaround for such a case in the SObjectizer-5.
The idea
What if an agent can set a custom event_queue for itself?
An agent creates a custom queue and owns it. When a mbox tries to store a demand for handling a message this original demand is stored inside the custom agent's queue and a new special demand will be given to the dispatcher. So we'll have two demands:
When the dispatcher extracts this special demand the dispatcher calls the special demand handler. This handler extracts the original demand from the custom queue and calls the original demand handler. The original demand handler initiates searching for the appropriate event handler and calls it if it's found.
The trick is a custom event_queue inside an agent. This queue can be implemented differently: with support for message priority, with removal of old or new messages, with postponing a message for a while and so on.
There is the so_5::agent_t::so_bind_to_dispatcher method that is called by SObjectizer when an agent is being bound to the dispatcher. The reference to the
event_queue_t
object is the reference to the dispatcher's queue for this agent. The agent has to intercept this queue and install a custom queue. Something like:This new method
so_try_intercept_event_queue
will be called insideso_bind_to_dispatcher
. The default implementation ofso_try_intercept_event_queue
will return reference to the original queue.A note: difference between demand- and event-handlers.
Dispatchers store demands in event_queue(s). Every demand is represented as an instance of so_5::execution_demand_t type. Every
execution_demand_t
has a pointer to ordinary function called demand_handler (of type demand_handler_pfn_t).When a dispatcher extracts a demand from a queue (some dispatchers have more than one event_queue) it calls the demand_handler specified inside the extracted demand.
There are several types of standard demand_handlers:
The last two demand handlers do searching for an event handler for a message and call it if it is found. If an event handler is not found then the message is just discarded.
So, dispatchers know only about demand handlers and the pointer to the appropriate demand handler is stored inside every demand.
Only a couple of the standard demand handlers know how to find and call event handlers.
Open question: How to avoid double locking?
An obvious approach is to have two separate locks: one for a custom event_queue in an agent (private agent lock) and another for dispatcher's event_queue (private dispatcher lock). When an agent gets a message from mbox it acquires the private agent lock, stores the original demand in the custom event_queue, then releases the private agent lock. Then the agent passes a custom demand to the dispatcher and dispatcher acquires/releases its own private dispatcher lock.
Such a scheme was used in the very first versions of SObjectizer-5 and proved to be very inefficient. So I want to avoid the double locking scheme.
The current idea is to try to use the dispatcher's lock to perform actions with a custom demand queue. This can be done by introducing a new method to the
so_5::event_queue_t
interface:The method
event_queue_t::lock_then_apply()
is expected to be implemented this way:consumer.apply()
method;consumer.apply()
method modifies the custom demand queue and, if necessary, calls methods ofalready_locked_queue
.Such an approach will allow to do something like:
Open question: How to handle stashing?
The main problem is that there is no way to push a demand back into the queue if there is no appropriate event-handler for the message in the current agent state. The demand is extracted from the queue, then an event-handler is looked up in the subscriptions. If there is no event-handler, the demand is simply discarded.
If a custom demand queue implements stashing then a custom demand handler has to do the following:
One problem here is the repetition of "look up for an event-handler". It's not good to ask a user to implement it by him/herself.
Another problem is the lack of synchronization between demands in the custom queue and in the dispatcher's queue. Without stashing, every demand in the dispatcher's queue corresponds to a demand in the custom queue. However, if one demand is stashed, the dispatcher's queue contains one demand less than the custom queue. One possible solution is: a custom event_queue pushes a special demand into the original event_queue only if the custom event_queue was empty. If the custom queue wasn't empty, then the special demand is already into the original event_queue and we have to wait while the dispatcher extracts it and calls the special demand-handler. This special demand-handler processes the first appropriate demand from the custom event_queue and then checks the custom event_queue for emptiness: if it's not empty then a new special demand is pushed into the original queue (but only one).
Beta Was this translation helpful? Give feedback.
All reactions