-
Notifications
You must be signed in to change notification settings - Fork 434
Distributed Priority Queue
An implementation of the Distributed Priority Queue ZK recipe.
- QueueBuilder
- DistributedPriorityQueue
public static QueueBuilder builder(CuratorFramework client,
QueueSerializer serializer,
java.lang.String queuePath)
Parameters:
client - the curator client
serializer - serializer to use for items
queuePath - path to store queue
QueueBuilder builder = QueueBuilder.builder(client, serializer, path);
... more builder method calls as needed ...
DistributedPriorityQueue<MessageType queue = builder.buildPriorityQueue(minItemsBeforeRefresh);
public DistributedPriorityQueue buildPriorityQueue(int minItemsBeforeRefresh)
Build a DistributedPriorityQueue from the current builder values.
When the priority queue detects an item addition/removal, it will stop processing its current list of items and refresh the list. minItemsBeforeRefresh modifies this. It determines the minimum number of items from the active list that will get processed before a refresh.
Due to a quirk in the way ZooKeeper notifies changes, the queue will get an item addition/remove notification after every item is processed. This can lead to poor performance. Set minItemsBeforeRefresh to the value your application can tolerate being out of sync.
For example: if the queue sees 10 items to process, it will end up making 10 calls to ZooKeeper to check status. You can control this by setting minItemsBeforeRefresh to 10 (or more) and the queue will only refresh with ZooKeeper after 10 items are processed
Parameters: minItemsBeforeRefresh - minimum items to process before refreshing the item list
The queue must be started via the start()
method. Call close()
when you are done with the queue.
To add messages to the queue:
queue.put(aMessage, priority);
To retrieve messages from the queue:
MessageType message = queue.take();
// IMPORTANT, take() blocks so it's best to do this in a thread
- Curator
- Javadoc
- Coverage Report
- Getting Started
- Examples
- FAQ
- Client
- Framework
-
Recipes
- Leader Latch
- Leader Election
- Shared Reentrant Lock
- Shared Lock
- Shared Reentrant Read Write Lock
- Shared Semaphore
- Multi Shared Lock
- Distributed Queue
- Distributed Id Queue
- Distributed Priority Queue
- Distributed Delay Queue
- Simple Distributed Queue
- Barrier
- Double Barrier
- Shared counter
- Distributed Atomic Long
- Path Cache
- Node Cache
- Utilities – Test Server, Test Cluster, ZKPaths, EnsurePath, QueueSharder, Reaper, ChildReaper
- Tech Notes
- Errors
- Exhibitor Integration
- Extensions
- Logging and Tracing