-
Notifications
You must be signed in to change notification settings - Fork 16
Complex Job
Like many modules on the NPM registry, rethinkdb-job-queue
is a building block that you use within your application. Its primary function is to reliably process jobs in a distributed application framework. It can be used on a single server very successfully though.
If you have a need to process complex jobs rethinkdb-job-queue
may do the job for you. To help you decide, this document will describe some of the different ways you can employ the queue. The document is broken down into the following complex job processing types:
By all means use rethinkdb-job-queue
if it meets your requirements, however please use the right tool for the right job. Consider other options before making a decision. See this issue for an example of when you should roll your own queue.
If you have a complex job that has a fixed number of processing tasks, the best way to handle these tasks in your application is by building the task logic into your application. If you break the high level task down into individual steps or jobs you can then rely on rethinkdb-job-queue
to process those lower level jobs for you.
Here is an example of the logic required to build an application that has a complex registration process. The registration process requires three sub-tasks to be performed to complete the workflow as follows:
- Send registration activation email.
- Register user with third party service provider.
- Send confirmation email.
With the above three tasks we can use rethinkdb-job-queue
to manage each task by creating three distinct queues; ActivationEmail
, SPRegistration
, and ConfirmationEmail
queues.
For the management of the higher level task you will need to build this logic into your application and use your applications database to save each state. You could add another queue for the higher level application logic however this will not be described below.
The application logic would go something like this;
- User registers on your web application.
- Your application saves the registration to your application database (not the job queue).
- In the same code that you save the registration to your database, initiate a job within the
ActivationEmail
queue to send an account activation email. Use database transactions for reliability. - At the completion of the
ActivationEmail
queue job, update the registration state in your application database. - Wait for the user to click the activation link.
- After the user activates the new account, update the registration state in your application database.
- In the same code that you update the registration state, initiate a job within the
SPRegistration
queue to register the new account with the third party service. Again, use database transactions for reliability. - At the completion of the
SPRegistration
queue job, update the registration state in your application database. - In the same code that you update the registration state, initiate a job within the
ConfirmationEmail
queue to send a congratulations or confirmation email to the new user. - At the completion of the
ConfirmationEmail
queue job, update the registration state to completed in your application database.
Please see the Tutorial, Queue Constructor, Queue PubSub, Job Processing, and Queue.process documents for more detail.
If you have jobs that can vary widely in the number of tasks required to process the jobs, there are a couple of ways you could approach this.
With this approach you could create large numbers of queues to maintain the high level job. This in turn will create large numbers of tables within the database. This is not a problem from a storage perspective because RethinkDB has no hard limit on the number of tables you can place into a database. It may cause issues within Nodejs.
This approach would only be useful if the high level job did not need the job tasks to be processed in any particular order.
Using three features of rethinkdb-job-queue
you could process each high level job as a single job within the job queue.
Thanks to the Job.name property, the Queue.findJobByName method, and the part processing feature, you could maintain the job state in a single queue job.
Here is the general workflow for a job requiring x
number of parts processed with delays between each part.
- The high level job is received with
x
number of tasks. - The job is created and all the tasks get added to the job in an array or some other supported type.
- The job is added to a single queue.
- A worker node picks up the job for processing.
- Withing the Queue.process handler the next
x
task is determined. - The current task is processed and completed.
- Update the job properties or array to indicate that the current job task is complete.
- If there will be a delay before the next task can be processed, change the Job.dateEnable value.
- Invoke the next callback returning the Job object. Eg:
next(null, job)
. - When the job is available for processing again, repeat steps 4 to 10 until all tasks are complete.
- Complete the queue job by calling
next(null, 'Job Complete')
.
This is just to help you come up with a solution by prompting ideas on how to tackle the job.
The default behavior of rethinkdb-job-queue
is as a First In First Out queue. Simply add jobs to the queue and they will be processed in the order they were created.
I don't really think rethinkdb-job-queue
is a good solution if you need First In Last Out job processing. If you think of a good way to process FILO jobs with the queue let me know.
For reference Mnemonist has a Stack (FILO or LIFO) data structure however it is not backed by a persistent storage layer.
If you have jobs that may need to be placed back into the queue due to a known delay or transient failure, you can use the partly completed or delayed processing feature provided by the next()
callback.
Here is an example using code:
const processingModule = <your processing module>
const Queue = require('rethinkdb-job-queue')
const q = new Queue()
const job = q.createJob()
// Here we attach the jobs payload.
job.data = "Something Important"
q.addJob(job).catch(err => console.error(err))
// Queue.process can be placed before or after adding jobs
q.process((job, next, onCancel) => {
onCancel(job, () => {
// Cancel your job processing here
})
return processingModule.process(job.data).then((completed) => {
if (completed === 'delayed') {
job.dateEnable = <some future date>
// The following next call will save the job back to the queue
next(null, job)
} else if (completed === 'success') {
next(null, completed)
} else {
next(new Error('Something went wrong'))
}
}).catch(err => console.error)
})
- Introduction
- Tutorial
- Queue Constructor
- Queue Connection
- Queue Options
- Queue PubSub
- Queue Master
- Queue Events
- State Document
- Job Processing
- Job Options
- Job Status
- Job Retry
- Job Repeat
- Job Logging
- Job Editing
- Job Schema
- Job Name
- Complex Job
- Delayed Job
- Cancel Job
- Error Handling
- Queue.createJob
- Queue.addJob
- Queue.getJob
- Queue.findJob
- Queue.findJobByName
- Queue.containsJobByName
- Queue.cancelJob
- Queue.reanimateJob
- Queue.removeJob
- Queue.process
- Queue.review
- Queue.summary
- Queue.ready
- Queue.pause
- Queue.resume
- Queue.reset
- Queue.stop
- Queue.drop
- Queue.Job
- Queue.host
- Queue.port
- Queue.db
- Queue.name
- Queue.r
- Queue.id
- Queue.jobOptions [R/W]
- Queue.changeFeed
- Queue.master
- Queue.masterInterval
- Queue.removeFinishedJobs
- Queue.running
- Queue.concurrency [R/W]
- Queue.paused
- Queue.idle
- Event.ready
- Event.added
- Event.updated
- Event.active
- Event.processing
- Event.progress
- Event.log
- Event.pausing
- Event.paused
- Event.resumed
- Event.completed
- Event.cancelled
- Event.failed
- Event.terminated
- Event.reanimated
- Event.removed
- Event.idle
- Event.reset
- Event.error
- Event.reviewed
- Event.detached
- Event.stopping
- Event.stopped
- Event.dropped
- Job.setName
- Job.setPriority
- Job.setTimeout
- Job.setDateEnable
- Job.setRetryMax
- Job.setRetryDelay
- Job.setRepeat
- Job.setRepeatDelay
- Job.updateProgress
- Job.update
- Job.getCleanCopy
- Job.addLog
- Job.getLastLog