-
Notifications
You must be signed in to change notification settings - Fork 107
Using workqueue for continuous input block deletion
This documentation is the result of an investigation of the usage of global/local workqueue elements for continuous input data deletion, such that blocks already fully processed by the agents can be removed from Disk without waiting for the completion and archival of the workflow. Please see GitHub ticket https://github.com/dmwm/WMCore/pull/11708 for further details.
It is important to note that there is a CouchDB bi-directional replication between Global WorkQueue and WMAgents. That means, whenever a document gets updated in any of those databases, CouchDB will automatically propagate those changes to the other endpoint of the replication as well (usually within a couple of seconds).
As replications are defined as bi-directional, the following 2 continuous replications are in place:
- From global workqueue to the local workqueue_inbox databases:
"source": "https://cmsweb-testbed.cern.ch/couchdb/workqueue",
"target": "http://localhost:5984/workqueue_inbox",
- And from local workqueue_inbox to the global workqueue databases:
"source": "http://cmst1:SomeSenha@localhost:5984/workqueue_inbox",
"target": "https://cmsweb-testbed.cern.ch/couchdb/workqueue",
Note that both replications use a filter function, named WorkQueue/queueFilter
, such that only documents passing the filter can actually get replicated. The common input parameters for the filter function in both directions is:
"filter": "WorkQueue/queueFilter",
"query_params": {
"childUrl": "http://vocms0192.cern.ch:5984",
"parentUrl": "https://cmsweb-testbed.cern.ch/couchdb/workqueue"
}
The WorkQueueElement
class in this module defines all the parameters supported in a WorkQueueElement object, as can be seen in the setdefault
call.
The parameters that are actually relevant for this activity are only:
-
Inputs
: contains a dictionary key'ed by the block name; the value is a list of sites (PSN) that hold the given input block name; -
PercentComplete
: an integer value defining the percentage of the input block that has been processed so far; -
PercentSuccess
: an integer value defining the percentage of the input block that has been successfully processed so far; -
Status
: string with the workqueue status; -
SubscriptionId
: integer with the subscription ID, from the WMBS table in the agent (always None in these databases though).
One example of this structure could be:
'Inputs': {'/HLTPhysicsIsolatedBunch/Run2016H-v1/RAW#92049f6e-92f9-11e6: ["T1_US_FNAL", "T2_CH_CERN"]},
'PercentComplete': 100,
'PercentSuccess': 0,
'Status': 'Done',
'SubscriptionId': None,
For completeness, a definition of the allowed workqueue elements can be found HERE.
WMAgent updates the workqueue elements in the local database, which then relies on the database replication in order to have the updates propagated all the way to the global workqueue element.
The source code logic for these updates is described as follows:
- The
WorkQueueManagerCleaner
thread is in charge of these updates. This thread belongs to theWorkQueueManager
agent component. - Which then calls the
performQueueCleanupActions
method, from WorkQueueManager/WorkQueueManagerCleaner.py#L50. - Which then executes the
performSyncAndCancelAction
method, from WorkQueue/WorkQueue.py#L945 line. - which then executes the
status
method, from WorkQueue/WorkQueue.py#L705 - Finally executing the
updateFromSubscription
method, from DataStructs/WorkQueueElement.py
The default polling cycle for WorkQueueManagerCleaner
is 10min, so agents should be updating element statuses and completion in a fairly short basis.
A summary of how each database gets updated is:
- Global workqueue:
SubscriptionId
is always None; whilePercentSuccess
andPercentCompleted
gets updated throughout the lifetime of the workflow via database replication (update is actually performed on the local document). - Local workqueue:
SubscriptionId
is properly set to the id in WMBS; butPercentSuccess
andPercentCompleted
are actually not updated (maybe only in the End policy?) - Local workqueue_inbox:
SubscriptionId
is always None; whilePercentSuccess
andPercentCompleted
gets updated throughout the lifetime of the element.
As mentioned above, the agent is responsible for calculating the workqueue element completion and success rate. This is performed by the WorkQueueManagerCleaner
thread, which belongs to the WorkQueueManager
component of the agent.
In short, each workqueue element is associated to a given subscription, which have one or many job groups, it also has jobs in the wmbs_job
table. The agent checks the outcome of every single job and then calculates the execution completion and success rate of the subscription (same as the workqueue element in this case).
The source code logic for this is as follows:
- The
performSyncAndCancelAction
method calls thegetWMBSSubscriptionStatus
method, in WorkQueue/WorkQueue.py#L737 - Which then executes the
wmbsSubscriptionStatus
method, from WorkQueue/WMBSHelper.py#L36 - Finally executing the Monitoring/SubscriptionStatus.py DAO, fetching all the subscriptions and calculating their completion.
There are multiple ways to design and implement a continuous input data deletion in the Workload Management system. Two potentially good solutions would be:
- Based on the global workqueue: an external service that would be querying workqueue elements in Global WorkQueue, where its Status is
Done
. Then parsing each of these documents and selecting only those withPercentCompleted=100
andPercentSuccess=100
. This would likely be a central service deployed in CMSWEB. - Based on the local workqueue_inbox: an extension of a component - or a new WMAgent component - that would be querying the Local workqueue_inbox elements, for elements with
Status="Done"
. Then parsing each of these documents and selecting only those withPercentCompleted=100
andPercentSuccess=100
.
An alternative candidate for the WMAgent architecture would be to, create a new component similar to WorkQueueManager
, move the thread WorkQueueManagerCleaner
to this new component and create a new thread to perform the continuous input data deletion followed by local workqueue_inbox removal.
Note that there is no need to evaluate these elements as often as they are at the moment (every 10 minutes). Executing this logic once or twice a day would suffice.
One of the concerns with this mechanism would be the check for uniqueness. In other words, provided a fully and successfully processed element, should we check if there is any other workflow using the same input data that is now eligible for deletion? The answer is likely yes, and in this case we would have to query ReqMgr2 to see which other workflows are using the same input data. If it is an ACDC/Resubmission workflow, then we can probably disregard it and proceed with the input data deletion.
A second concern with this mechanism is whether we can actually delete local workqueue_inbox elements for successfully deleted input data. Blocks already acquired by the Global WorkQueue are stored in the global workqueue_inbox database and we (Alan) believe that it would cause no issues for the workflow processing and completion. If it does, alternatives that we can explore are (a) not delete the local workqueue_inbox elements upon successful input data deletion; (b) or to actually create a new boolean WorkQueueElement field to keep track of this deletion.