From 6a69e80b045f2fc5f5f8f112b2587e11311d8e12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Bidoul?= Date: Tue, 18 Oct 2022 13:04:31 +0200 Subject: [PATCH 01/14] Update test with proper way to pass company to job --- test_queue_job/models/test_models.py | 2 +- test_queue_job/tests/test_job.py | 41 ++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index ff9622106a..4c0dd6b2d3 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -38,7 +38,7 @@ class ModelTestQueueJob(models.Model): # to test the context is serialized/deserialized properly @api.model def _job_prepare_context_before_enqueue_keys(self): - return ("tz", "lang") + return ("tz", "lang", "allowed_company_ids") def testing_method(self, *args, **kwargs): """Method used for tests diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index 35884cd2b3..c4ec5081b3 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -185,6 +185,47 @@ def test_postpone(self): self.assertEqual(job_a.result, "test") self.assertFalse(job_a.exc_info) + def test_company_simple(self): + company = self.env.ref("base.main_company") + eta = datetime.now() + timedelta(hours=5) + test_job = Job( + self.env["test.queue.job"].with_company(company).testing_method, + args=("o", "k"), + kwargs={"return_context": 1}, + priority=15, + eta=eta, + description="My description", + ) + test_job.worker_pid = 99999 # normally set on "set_start" + test_job.store() + job_read = Job.load(self.env, test_job.uuid) + self.assertEqual(test_job.func.__func__, job_read.func.__func__) + result_ctx = job_read.func(*tuple(test_job.args), **test_job.kwargs) + self.assertEqual(result_ctx.get("allowed_company_ids"), company.ids) + + def test_company_complex(self): + company1 = self.env.ref("base.main_company") + company2 = company1.create({"name": "Queue job company"}) + companies = company1 | company2 + self.env.user.write({"company_ids": [(6, False, companies.ids)]}) + # Ensure the main company still the first + self.assertEqual(self.env.user.company_id, company1) + eta = datetime.now() + timedelta(hours=5) + test_job = Job( + self.env["test.queue.job"].with_company(company2).testing_method, + args=("o", "k"), + kwargs={"return_context": 1}, + priority=15, + eta=eta, + description="My description", + ) + test_job.worker_pid = 99999 # normally set on "set_start" + test_job.store() + job_read = Job.load(self.env, test_job.uuid) + self.assertEqual(test_job.func.__func__, job_read.func.__func__) + result_ctx = job_read.func(*tuple(test_job.args), **test_job.kwargs) + self.assertEqual(result_ctx.get("allowed_company_ids"), company2.ids) + def test_store(self): test_job = Job(self.method) test_job.store() From 49135691bff8b48e510ae7e95eca455de1f537c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miquel=20Ra=C3=AFch?= Date: Thu, 16 Nov 2023 17:48:11 +0100 Subject: [PATCH 02/14] [IMP] queue_job: track error in chatter --- queue_job/models/queue_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 7607a2701f..2e4fcd8c5e 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -94,7 +94,7 @@ class QueueJob(models.Model): state = fields.Selection(STATES, readonly=True, required=True, index=True) priority = fields.Integer() exc_name = fields.Char(string="Exception", readonly=True) - exc_message = fields.Char(string="Exception Message", readonly=True) + exc_message = fields.Char(string="Exception Message", readonly=True, tracking=True) exc_info = fields.Text(string="Exception Info", readonly=True) result = fields.Text(readonly=True) From c508eb16531aebf040e5d8707c9de473fef24d39 Mon Sep 17 00:00:00 2001 From: "Laurent Mignon (ACSONE)" Date: Thu, 21 Dec 2023 13:41:47 +0100 Subject: [PATCH 03/14] [IMP] queue_job_cron: Avoid parallel run By default, odoo never runs the same cron job in parallel. This commit uses the identity key mechanism to enforce this mechanism when a cron job is run as a queue job. This behaviour can be controlled by a new setting on the cron definition but is activated by default to keep the original behaviour --- queue_job_cron/README.rst | 4 +- queue_job_cron/models/ir_cron.py | 37 ++++++++++++++----- .../readme/newsfragments/.gitignore | 0 .../readme/newsfragments/612.feature | 9 +++++ queue_job_cron/tests/test_queue_job_cron.py | 19 ++++++++++ queue_job_cron/views/ir_cron_view.xml | 4 ++ 6 files changed, 61 insertions(+), 12 deletions(-) create mode 100644 queue_job_cron/readme/newsfragments/.gitignore create mode 100644 queue_job_cron/readme/newsfragments/612.feature diff --git a/queue_job_cron/README.rst b/queue_job_cron/README.rst index 7943963cd1..de39b65e14 100644 --- a/queue_job_cron/README.rst +++ b/queue_job_cron/README.rst @@ -79,8 +79,8 @@ Authors Contributors ------------ -- Cédric Pigeon -- Nguyen Minh Chien +- Cédric Pigeon +- Nguyen Minh Chien Maintainers ----------- diff --git a/queue_job_cron/models/ir_cron.py b/queue_job_cron/models/ir_cron.py index 440740f164..7e4f5b848d 100644 --- a/queue_job_cron/models/ir_cron.py +++ b/queue_job_cron/models/ir_cron.py @@ -4,12 +4,23 @@ from odoo import api, fields, models +from odoo.addons.queue_job.job import identity_exact + _logger = logging.getLogger(__name__) class IrCron(models.Model): _inherit = "ir.cron" + no_parallel_queue_job_run = fields.Boolean( + help="Avoid parallel run. " + "If the cron job is already running, the new one will be skipped. " + "By default, odoo never runs the same cron job in parallel. This " + "option is therefore set to True by default when job is run as a " + "queue job.", + default=True, + ) + run_as_queue_job = fields.Boolean( help="Specify if this cron should be ran as a queue job" ) @@ -39,23 +50,29 @@ def method_direct_trigger(self): _cron = cron.with_user(cron.user_id).with_context( lastcall=cron.lastcall ) - _cron.with_delay( - priority=_cron.priority, - description=_cron.name, - channel=_cron.channel_id.complete_name, - )._run_job_as_queue_job(server_action=_cron.ir_actions_server_id) + _cron._delay_run_job_as_queue_job( + server_action=_cron.ir_actions_server_id + ) return True def _callback(self, cron_name, server_action_id, job_id): cron = self.env["ir.cron"].sudo().browse(job_id) if cron.run_as_queue_job: server_action = self.env["ir.actions.server"].browse(server_action_id) - return self.with_delay( - priority=cron.priority, - description=cron.name, - channel=cron.channel_id.complete_name, - )._run_job_as_queue_job(server_action=server_action) + return cron._delay_run_job_as_queue_job(server_action=server_action) else: return super()._callback( cron_name=cron_name, server_action_id=server_action_id, job_id=job_id ) + + def _delay_run_job_as_queue_job(self, server_action): + self.ensure_one() + identity_key = None + if self.no_parallel_queue_job_run: + identity_key = identity_exact + return self.with_delay( + priority=self.priority, + description=self.name, + channel=self.channel_id.complete_name, + identity_key=identity_key, + )._run_job_as_queue_job(server_action=server_action) diff --git a/queue_job_cron/readme/newsfragments/.gitignore b/queue_job_cron/readme/newsfragments/.gitignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/queue_job_cron/readme/newsfragments/612.feature b/queue_job_cron/readme/newsfragments/612.feature new file mode 100644 index 0000000000..9c521620a2 --- /dev/null +++ b/queue_job_cron/readme/newsfragments/612.feature @@ -0,0 +1,9 @@ +By default prevent parallel run of the same cron job when run as queue job. + +When a cron job is run by odoo, the odoo runner will prevent parallel run +of the same cron job. Before this change, this was not the case when the +cron job was run as a queue job. A new option is added to the cron job when +run as a queue job to prevent parallel run. This option is set to True by +default. In this way, the behavior is now the same as when the cron job is run +by odoo but you keep the possibility to disable this restriction when run as +a queue job. diff --git a/queue_job_cron/tests/test_queue_job_cron.py b/queue_job_cron/tests/test_queue_job_cron.py index 3eec55f7e9..d3cc18d636 100644 --- a/queue_job_cron/tests/test_queue_job_cron.py +++ b/queue_job_cron/tests/test_queue_job_cron.py @@ -39,3 +39,22 @@ def test_queue_job_cron_run(self): cron = self.env.ref("queue_job.ir_cron_autovacuum_queue_jobs") IrCron = self.env["ir.cron"] IrCron._run_job_as_queue_job(server_action=cron.ir_actions_server_id) + + def test_queue_job_no_parallelism(self): + cron = self.env.ref("queue_job.ir_cron_autovacuum_queue_jobs") + default_channel = self.env.ref("queue_job_cron.channel_root_ir_cron") + cron.write( + { + "no_parallel_queue_job_run": True, + "run_as_queue_job": True, + "channel_id": default_channel.id, + } + ) + cron.method_direct_trigger() + cron.method_direct_trigger() + nb_jobs = self.env["queue.job"].search_count([("name", "=", cron.name)]) + self.assertEqual(nb_jobs, 1) + cron.no_parallel_queue_job_run = False + cron.method_direct_trigger() + nb_jobs = self.env["queue.job"].search_count([("name", "=", cron.name)]) + self.assertEqual(nb_jobs, 2) diff --git a/queue_job_cron/views/ir_cron_view.xml b/queue_job_cron/views/ir_cron_view.xml index bd46c85289..33c0c85d33 100644 --- a/queue_job_cron/views/ir_cron_view.xml +++ b/queue_job_cron/views/ir_cron_view.xml @@ -7,6 +7,10 @@ + Date: Thu, 1 Feb 2024 10:07:01 +0100 Subject: [PATCH 04/14] queue_job: fix retry format with tuple values Configuration of randomized retry intervals is not possible due to the formatting checks not being updated. This should fix it. --- queue_job/models/queue_job_function.py | 25 +++++++++++----- test_queue_job/tests/__init__.py | 1 + test_queue_job/tests/test_job_function.py | 35 +++++++++++++++++++++++ 3 files changed, 54 insertions(+), 7 deletions(-) create mode 100644 test_queue_job/tests/test_job_function.py diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py index 10b19345b7..7cf73ea370 100644 --- a/queue_job/models/queue_job_function.py +++ b/queue_job/models/queue_job_function.py @@ -155,10 +155,12 @@ def _parse_retry_pattern(self): try: # as json can't have integers as keys and the field is stored # as json, convert back to int - retry_pattern = { - int(try_count): postpone_seconds - for try_count, postpone_seconds in self.retry_pattern.items() - } + retry_pattern = {} + for try_count, postpone_value in self.retry_pattern.items(): + if isinstance(postpone_value, int): + retry_pattern[int(try_count)] = postpone_value + else: + retry_pattern[int(try_count)] = tuple(postpone_value) except ValueError: _logger.error( "Invalid retry pattern for job function %s," @@ -187,8 +189,9 @@ def job_config(self, name): def _retry_pattern_format_error_message(self): return _( "Unexpected format of Retry Pattern for {}.\n" - "Example of valid format:\n" - "{{1: 300, 5: 600, 10: 1200, 15: 3000}}" + "Example of valid formats:\n" + "{{1: 300, 5: 600, 10: 1200, 15: 3000}}\n" + "{{1: (1, 10), 5: (11, 20), 10: (21, 30), 15: (100, 300)}}" ).format(self.name) @api.constrains("retry_pattern") @@ -201,12 +204,20 @@ def _check_retry_pattern(self): all_values = list(retry_pattern) + list(retry_pattern.values()) for value in all_values: try: - int(value) + self._retry_value_type_check(value) except ValueError as ex: raise exceptions.UserError( record._retry_pattern_format_error_message() ) from ex + def _retry_value_type_check(self, value): + if isinstance(value, (tuple | list)): + if len(value) != 2: + raise ValueError + [self._retry_value_type_check(element) for element in value] + return + int(value) + def _related_action_format_error_message(self): return _( "Unexpected format of Related Action for {}.\n" diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index dc59429e71..0405022ce0 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -4,5 +4,6 @@ from . import test_job from . import test_job_auto_delay from . import test_job_channels +from . import test_job_function from . import test_related_actions from . import test_delay_mocks diff --git a/test_queue_job/tests/test_job_function.py b/test_queue_job/tests/test_job_function.py new file mode 100644 index 0000000000..17781ac475 --- /dev/null +++ b/test_queue_job/tests/test_job_function.py @@ -0,0 +1,35 @@ +import odoo.tests.common as common +from odoo import exceptions + + +class TestJobFunction(common.TransactionCase): + def setUp(self): + super(TestJobFunction, self).setUp() + self.test_function_model = self.env.ref( + "queue_job.job_function_queue_job__test_job" + ) + + def test_check_retry_pattern_randomized_case(self): + randomized_pattern = "{1: (10, 20), 2: (20, 40)}" + self.test_function_model.edit_retry_pattern = randomized_pattern + self.assertEqual( + self.test_function_model.edit_retry_pattern, randomized_pattern + ) + + def test_check_retry_pattern_fixed_case(self): + fixed_pattern = "{1: 10, 2: 20}" + self.test_function_model.edit_retry_pattern = fixed_pattern + self.assertEqual(self.test_function_model.edit_retry_pattern, fixed_pattern) + + def test_check_retry_pattern_invalid_cases(self): + invalid_time_value_pattern = "{1: a, 2: 20}" + with self.assertRaises(exceptions.UserError): + self.test_function_model.edit_retry_pattern = invalid_time_value_pattern + + invalid_retry_count_pattern = "{a: 10, 2: 20}" + with self.assertRaises(exceptions.UserError): + self.test_function_model.edit_retry_pattern = invalid_retry_count_pattern + + invalid_randomized_pattern = "{1: (1, 2, 3), 2: 20}" + with self.assertRaises(exceptions.UserError): + self.test_function_model.edit_retry_pattern = invalid_randomized_pattern From 950ec90364b7331adfff68f722306dcb554001c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Alix?= Date: Wed, 6 Mar 2024 12:03:03 +0100 Subject: [PATCH 05/14] queue_job: fix partial index to add 'wait_dependencies' state --- queue_job/__manifest__.py | 2 +- queue_job/migrations/17.0.1.1.2/pre-migration.py | 10 ++++++++++ queue_job/models/queue_job.py | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) create mode 100644 queue_job/migrations/17.0.1.1.2/pre-migration.py diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 93ae82789c..85ba89ecd2 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -2,7 +2,7 @@ { "name": "Job Queue", - "version": "17.0.1.1.1", + "version": "17.0.1.1.2", "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", "website": "https://github.com/OCA/queue", "license": "LGPL-3", diff --git a/queue_job/migrations/17.0.1.1.2/pre-migration.py b/queue_job/migrations/17.0.1.1.2/pre-migration.py new file mode 100644 index 0000000000..53d9690caa --- /dev/null +++ b/queue_job/migrations/17.0.1.1.2/pre-migration.py @@ -0,0 +1,10 @@ +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +from odoo.tools.sql import table_exists + + +def migrate(cr, version): + if table_exists(cr, "queue_job"): + # Drop index 'queue_job_identity_key_state_partial_index', + # it will be recreated during the update + cr.execute("DROP INDEX IF EXISTS queue_job_identity_key_state_partial_index;") diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 2e4fcd8c5e..b6acf43dab 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -139,7 +139,7 @@ def init(self): self._cr.execute( "CREATE INDEX queue_job_identity_key_state_partial_index " "ON queue_job (identity_key) WHERE state in ('pending', " - "'enqueued') AND identity_key IS NOT NULL;" + "'enqueued', 'wait_dependencies') AND identity_key IS NOT NULL;" ) @api.depends("records") From 587dfd57db14fef84b7e0389578ec1bf6018408b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Alix?= Date: Wed, 27 Mar 2024 12:35:26 +0100 Subject: [PATCH 06/14] queue_job: triggers stored computed fields before calling 'set_done()' So the time required to compute such fields by the ORM is taken into account when the 'date_done' and 'exec_time' values are set on the job. --- queue_job/controllers/main.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index f18401476f..22969ea152 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -34,6 +34,9 @@ def _try_perform_job(self, env, job): _logger.debug("%s started", job) job.perform() + # Triggers any stored computed fields before calling 'set_done' + # so that will be part of the 'exec_time' + env["base"].flush() job.set_done() job.store() env.flush_all() From c460d6905fb9bcac7a45f5ea1686f5b3b794ab93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Alix?= Date: Mon, 8 Apr 2024 10:54:38 +0200 Subject: [PATCH 07/14] queue_job: fix warning when triggering stored computed fields Starting from 16.0, we should call `env.flush_all()` instead of `env["base"].flush()`, like it is done few lines below. --- queue_job/controllers/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 22969ea152..54bbba312f 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -36,7 +36,7 @@ def _try_perform_job(self, env, job): job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' - env["base"].flush() + env.flush_all() job.set_done() job.store() env.flush_all() From b001ab2ac82dc45c269e05957bd0a17d03ef2fab Mon Sep 17 00:00:00 2001 From: Pierre Verkest Date: Wed, 10 Apr 2024 18:29:50 +0200 Subject: [PATCH 08/14] [FIX] queue_job_cron_jobrunner: use priority to select job * use FIFO, firt createad job will be treat first * if priority are different it take the precedent Yet we are not using channel priority into account --- queue_job_cron_jobrunner/models/queue_job.py | 4 +-- .../tests/test_queue_job.py | 30 ++++++++++++++++++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/queue_job_cron_jobrunner/models/queue_job.py b/queue_job_cron_jobrunner/models/queue_job.py index 2e19556b95..55a4b8a310 100644 --- a/queue_job_cron_jobrunner/models/queue_job.py +++ b/queue_job_cron_jobrunner/models/queue_job.py @@ -40,7 +40,7 @@ def _acquire_one_job(self): FROM queue_job WHERE state = 'pending' AND (eta IS NULL OR eta <= (now() AT TIME ZONE 'UTC')) - ORDER BY date_created DESC + ORDER BY priority, date_created LIMIT 1 FOR NO KEY UPDATE SKIP LOCKED """ ) @@ -59,7 +59,7 @@ def _process(self, commit=False): # while the job is processing. However, doing this will release the # lock on the db, so we need to find another way. # if commit: - # self.flush() + # self.env.flush_all() # self.env.cr.commit() # Actual processing diff --git a/queue_job_cron_jobrunner/tests/test_queue_job.py b/queue_job_cron_jobrunner/tests/test_queue_job.py index 3f2e0ef637..54800b792c 100644 --- a/queue_job_cron_jobrunner/tests/test_queue_job.py +++ b/queue_job_cron_jobrunner/tests/test_queue_job.py @@ -67,5 +67,33 @@ def test_queue_job_cron_trigger_enqueue_dependencies(self): self.assertEqual(job_record.state, "done", "Processed OK") # if the state is "waiting_dependencies", it means the "enqueue_waiting()" - # step has not been doen when the parent job has been done + # step has not been done when the parent job has been done self.assertEqual(job_record_depends.state, "done", "Processed OK") + + def test_acquire_one_job_use_priority(self): + with freeze_time("2024-01-01 10:01:01"): + self.env["res.partner"].with_delay(priority=3).create({"name": "test"}) + + with freeze_time("2024-01-01 10:02:01"): + job = ( + self.env["res.partner"].with_delay(priority=1).create({"name": "test"}) + ) + + with freeze_time("2024-01-01 10:03:01"): + self.env["res.partner"].with_delay(priority=2).create({"name": "test"}) + + self.assertEqual(self.env["queue.job"]._acquire_one_job(), job.db_record()) + + def test_acquire_one_job_consume_the_oldest_first(self): + with freeze_time("2024-01-01 10:01:01"): + job = ( + self.env["res.partner"].with_delay(priority=30).create({"name": "test"}) + ) + + with freeze_time("2024-01-01 10:02:01"): + self.env["res.partner"].with_delay(priority=30).create({"name": "test"}) + + with freeze_time("2024-01-01 10:03:01"): + self.env["res.partner"].with_delay(priority=30).create({"name": "test"}) + + self.assertEqual(self.env["queue.job"]._acquire_one_job(), job.db_record()) From bbe16176f1c6f1407bf02fe893584514d66927af Mon Sep 17 00:00:00 2001 From: Florian Mounier Date: Tue, 19 Nov 2024 09:21:11 +0100 Subject: [PATCH 09/14] [IMP] queue_job: Add split method --- queue_job/README.rst | 33 +++++++++ queue_job/delay.py | 62 +++++++++++++--- queue_job/readme/USAGE.md | 32 +++++++++ queue_job/tests/__init__.py | 1 + queue_job/tests/test_delayable_split.py | 94 +++++++++++++++++++++++++ 5 files changed, 213 insertions(+), 9 deletions(-) create mode 100644 queue_job/tests/test_delayable_split.py diff --git a/queue_job/README.rst b/queue_job/README.rst index 50d20da798..8aeb546076 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -277,6 +277,39 @@ is at the top of the graph. In the example above, if it was called on ``group_a``, then ``group_b`` would never be delayed (but a warning would be shown). +It is also possible to split a job into several jobs, each one +processing a part of the work. This can be useful to avoid very long +jobs, parallelize some task and get more specific errors. Usage is as +follows: + +.. code:: python + + def button_split_delayable(self): + ( + self # Can be a big recordset, let's say 1000 records + .delayable() + .generate_thumbnail((50, 50)) + .set(priority=30) + .set(description=_("generate xxx")) + .split(50) # Split the job in 20 jobs of 50 records each + .delay() + ) + +The ``split()`` method takes a ``chain`` boolean keyword argument. If +set to True, the jobs will be chained, meaning that the next job will +only start when the previous one is done: + +.. code:: python + + def button_increment_var(self): + ( + self + .delayable() + .increment_counter() + .split(1, chain=True) # Will exceute the jobs one after the other + .delay() + ) + Enqueing Job Options ~~~~~~~~~~~~~~~~~~~~ diff --git a/queue_job/delay.py b/queue_job/delay.py index 9b596b1665..0ba54e48a9 100644 --- a/queue_job/delay.py +++ b/queue_job/delay.py @@ -232,7 +232,7 @@ def _ensure_same_graph_uuid(jobs): elif jobs_count == 1: if jobs[0].graph_uuid: raise ValueError( - f"Job {jobs[0]} is a single job, it should not" " have a graph uuid" + f"Job {jobs[0]} is a single job, it should not have a graph uuid" ) else: graph_uuids = {job.graph_uuid for job in jobs if job.graph_uuid} @@ -483,11 +483,10 @@ def _tail(self): return [self] def __repr__(self): - return "Delayable({}.{}({}, {}))".format( - self.recordset, - self._job_method.__name__ if self._job_method else "", - self._job_args, - self._job_kwargs, + return ( + f"Delayable({self.recordset}." + f"{self._job_method.__name__ if self._job_method else ''}" + f"({self._job_args}, {self._job_kwargs}))" ) def __del__(self): @@ -525,6 +524,51 @@ def delay(self): """Delay the whole graph""" self._graph.delay() + def split(self, size, chain=False): + """Split the Delayables. + + Use `DelayableGroup` or `DelayableChain` + if `chain` is True containing batches of size `size` + """ + if not self._job_method: + raise ValueError("No method set on the Delayable") + + total_records = len(self.recordset) + + delayables = [] + for index in range(0, total_records, size): + recordset = self.recordset[index : index + size] + delayable = Delayable( + recordset, + priority=self.priority, + eta=self.eta, + max_retries=self.max_retries, + description=self.description, + channel=self.channel, + identity_key=self.identity_key, + ) + # Update the __self__ + delayable._job_method = getattr(recordset, self._job_method.__name__) + delayable._job_args = self._job_args + delayable._job_kwargs = self._job_kwargs + + delayables.append(delayable) + + description = self.description or ( + self._job_method.__doc__.splitlines()[0].strip() + if self._job_method.__doc__ + else f"{self.recordset._name}.{self._job_method.__name__}" + ) + for index, delayable in enumerate(delayables): + delayable.set( + description=f"{description} (split {index + 1}/{len(delayables)})" + ) + + # Prevent warning on deletion + self._generated_job = True + + return (DelayableChain if chain else DelayableGroup)(*delayables) + def _build_job(self): if self._generated_job: return self._generated_job @@ -611,9 +655,9 @@ def _delay_delayable(*args, **kwargs): return _delay_delayable def __str__(self): - return "DelayableRecordset({}{})".format( - self.delayable.recordset._name, - getattr(self.delayable.recordset, "_ids", ""), + return ( + f"DelayableRecordset({self.delayable.recordset._name}" + f"{getattr(self.delayable.recordset, '_ids', '')})" ) __repr__ = __str__ diff --git a/queue_job/readme/USAGE.md b/queue_job/readme/USAGE.md index fb160bfa48..c08374b9fc 100644 --- a/queue_job/readme/USAGE.md +++ b/queue_job/readme/USAGE.md @@ -108,6 +108,38 @@ is at the top of the graph. In the example above, if it was called on `group_a`, then `group_b` would never be delayed (but a warning would be shown). +It is also possible to split a job into several jobs, each one processing +a part of the work. This can be useful to avoid very long jobs, parallelize +some task and get more specific errors. Usage is as follows: + +``` python +def button_split_delayable(self): + ( + self # Can be a big recordset, let's say 1000 records + .delayable() + .generate_thumbnail((50, 50)) + .set(priority=30) + .set(description=_("generate xxx")) + .split(50) # Split the job in 20 jobs of 50 records each + .delay() + ) +``` + +The `split()` method takes a `chain` boolean keyword argument. If set to +True, the jobs will be chained, meaning that the next job will only start +when the previous one is done: + +``` python +def button_increment_var(self): + ( + self + .delayable() + .increment_counter() + .split(1, chain=True) # Will exceute the jobs one after the other + .delay() + ) +``` + ### Enqueing Job Options - priority: default is 10, the closest it is to 0, the faster it will be diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py index e0ff9576a5..db53ac3a60 100644 --- a/queue_job/tests/__init__.py +++ b/queue_job/tests/__init__.py @@ -1,6 +1,7 @@ from . import test_runner_channels from . import test_runner_runner from . import test_delayable +from . import test_delayable_split from . import test_json_field from . import test_model_job_channel from . import test_model_job_function diff --git a/queue_job/tests/test_delayable_split.py b/queue_job/tests/test_delayable_split.py new file mode 100644 index 0000000000..b761878b2e --- /dev/null +++ b/queue_job/tests/test_delayable_split.py @@ -0,0 +1,94 @@ +# Copyright 2024 Akretion (http://www.akretion.com). +# @author Florian Mounier +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from odoo.tests import common + +# pylint: disable=odoo-addons-relative-import +from odoo.addons.queue_job.delay import Delayable + + +class TestDelayableSplit(common.BaseCase): + def setUp(self): + super().setUp() + + class FakeRecordSet(list): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._name = "recordset" + + def __getitem__(self, key): + if isinstance(key, slice): + return FakeRecordSet(super().__getitem__(key)) + return super().__getitem__(key) + + def method(self, arg, kwarg=None): + """Method to be called""" + return arg, kwarg + + self.FakeRecordSet = FakeRecordSet + + def test_delayable_split_no_method_call_beforehand(self): + dl = Delayable(self.FakeRecordSet(range(20))) + with self.assertRaises(ValueError): + dl.split(3) + + def test_delayable_split_10_3(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(3) + self.assertEqual(len(group._delayables), 4) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2])) + self.assertEqual(delayables[1].recordset, self.FakeRecordSet([3, 4, 5])) + self.assertEqual(delayables[2].recordset, self.FakeRecordSet([6, 7, 8])) + self.assertEqual(delayables[3].recordset, self.FakeRecordSet([9])) + self.assertEqual(delayables[0].description, "Method to be called (split 1/4)") + self.assertEqual(delayables[1].description, "Method to be called (split 2/4)") + self.assertEqual(delayables[2].description, "Method to be called (split 3/4)") + self.assertEqual(delayables[3].description, "Method to be called (split 4/4)") + self.assertNotEqual(delayables[0]._job_method, dl._job_method) + self.assertNotEqual(delayables[1]._job_method, dl._job_method) + self.assertNotEqual(delayables[2]._job_method, dl._job_method) + self.assertNotEqual(delayables[3]._job_method, dl._job_method) + self.assertEqual(delayables[0]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[1]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[2]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[3]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[0]._job_args, ("arg",)) + self.assertEqual(delayables[1]._job_args, ("arg",)) + self.assertEqual(delayables[2]._job_args, ("arg",)) + self.assertEqual(delayables[3]._job_args, ("arg",)) + self.assertEqual(delayables[0]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[1]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[2]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[3]._job_kwargs, {"kwarg": "kwarg"}) + + def test_delayable_split_10_5(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(5) + self.assertEqual(len(group._delayables), 2) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2, 3, 4])) + self.assertEqual(delayables[1].recordset, self.FakeRecordSet([5, 6, 7, 8, 9])) + self.assertEqual(delayables[0].description, "Method to be called (split 1/2)") + self.assertEqual(delayables[1].description, "Method to be called (split 2/2)") + + def test_delayable_split_10_10(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(10) + self.assertEqual(len(group._delayables), 1) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10))) + self.assertEqual(delayables[0].description, "Method to be called (split 1/1)") + + def test_delayable_split_10_20(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(20) + self.assertEqual(len(group._delayables), 1) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10))) + self.assertEqual(delayables[0].description, "Method to be called (split 1/1)") From 2073f87124ef20806743a59cbac083a1682c5171 Mon Sep 17 00:00:00 2001 From: Quoc Duong Date: Thu, 1 Aug 2024 11:45:38 +0700 Subject: [PATCH 10/14] [IMP] queue_job: Cancel child jobs when the parent is cancelled --- queue_job/job.py | 12 +++++++++-- queue_job/models/queue_job.py | 2 ++ test_queue_job/tests/test_job.py | 37 ++++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index 9843c01f05..80294568a8 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -539,8 +539,8 @@ def perform(self): return self.result - def enqueue_waiting(self): - sql = """ + def _get_common_dependent_jobs_query(self): + return """ UPDATE queue_job SET state = %s FROM ( @@ -568,9 +568,17 @@ def enqueue_waiting(self): AND %s = ALL(jobs.parent_states) AND state = %s; """ + + def enqueue_waiting(self): + sql = self._get_common_dependent_jobs_query() self.env.cr.execute(sql, (PENDING, self.uuid, DONE, WAIT_DEPENDENCIES)) self.env["queue.job"].invalidate_model(["state"]) + def cancel_dependent_jobs(self): + sql = self._get_common_dependent_jobs_query() + self.env.cr.execute(sql, (CANCELLED, self.uuid, CANCELLED, WAIT_DEPENDENCIES)) + self.env["queue.job"].invalidate_model(["state"]) + def store(self): """Store the Job""" job_model = self.env["queue.job"] diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index b6acf43dab..1650a374bc 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -326,6 +326,8 @@ def _change_job_state(self, state, result=None): elif state == CANCELLED: job_.set_cancelled(result=result) job_.store() + record.env["queue.job"].flush_model() + job_.cancel_dependent_jobs() else: raise ValueError("State not supported: %s" % state) diff --git a/test_queue_job/tests/test_job.py b/test_queue_job/tests/test_job.py index c4ec5081b3..d7414ef7aa 100644 --- a/test_queue_job/tests/test_job.py +++ b/test_queue_job/tests/test_job.py @@ -15,6 +15,7 @@ RetryableJobError, ) from odoo.addons.queue_job.job import ( + CANCELLED, DONE, ENQUEUED, FAILED, @@ -530,6 +531,42 @@ def test_button_done(self): stored.result, "Manually set to done by %s" % self.env.user.name ) + def test_button_done_enqueue_waiting_dependencies(self): + job_root = Job(self.env["test.queue.job"].testing_method) + job_child = Job(self.env["test.queue.job"].testing_method) + job_child.add_depends({job_root}) + + DelayableGraph._ensure_same_graph_uuid([job_root, job_child]) + job_root.store() + job_child.store() + + self.assertEqual(job_child.state, WAIT_DEPENDENCIES) + record_root = job_root.db_record() + record_child = job_child.db_record() + # Trigger button done + record_root.button_done() + # Check the state + self.assertEqual(record_root.state, DONE) + self.assertEqual(record_child.state, PENDING) + + def test_button_cancel_dependencies(self): + job_root = Job(self.env["test.queue.job"].testing_method) + job_child = Job(self.env["test.queue.job"].testing_method) + job_child.add_depends({job_root}) + + DelayableGraph._ensure_same_graph_uuid([job_root, job_child]) + job_root.store() + job_child.store() + + self.assertEqual(job_child.state, WAIT_DEPENDENCIES) + record_root = job_root.db_record() + record_child = job_child.db_record() + # Trigger button cancelled + record_root.button_cancelled() + # Check the state + self.assertEqual(record_root.state, CANCELLED) + self.assertEqual(record_child.state, CANCELLED) + def test_requeue(self): stored = self._create_job() stored.write({"state": "failed"}) From 7becbb5c9e04bcdf4787b71ae78456bdd2bf3277 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Mon, 16 Sep 2024 13:43:47 +0200 Subject: [PATCH 11/14] [FIX] queue_job: typo --- queue_job/README.rst | 2 +- queue_job/readme/USAGE.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/queue_job/README.rst b/queue_job/README.rst index 8aeb546076..a0b7ae2b4e 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -464,7 +464,7 @@ running Odoo** When you are developing (ie: connector modules) you might want to bypass the queue job and run your code immediately. -To do so you can set QUEUE_JOB\__NO_DELAY=1 in your enviroment. +To do so you can set QUEUE_JOB\__NO_DELAY=1 in your environment. **Bypass jobs in tests** diff --git a/queue_job/readme/USAGE.md b/queue_job/readme/USAGE.md index c08374b9fc..deb6fe2aca 100644 --- a/queue_job/readme/USAGE.md +++ b/queue_job/readme/USAGE.md @@ -290,7 +290,7 @@ running Odoo** When you are developing (ie: connector modules) you might want to bypass the queue job and run your code immediately. -To do so you can set QUEUE_JOB\_\_NO_DELAY=1 in your enviroment. +To do so you can set QUEUE_JOB\_\_NO_DELAY=1 in your environment. **Bypass jobs in tests** From 41e79a69479789fcadfe6677a43a104dfa09f418 Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Mon, 16 Sep 2024 13:44:12 +0200 Subject: [PATCH 12/14] [IMP] queue_job: add filter on Date Created --- queue_job/views/queue_job_views.xml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 3d7a368971..be12b4294b 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -246,6 +246,27 @@ string="Failed" domain="[('state', '=', 'failed')]" /> + + + + + + From f202072f6e5c23010cb5378878aa7d6b13ec285f Mon Sep 17 00:00:00 2001 From: Florent Xicluna Date: Thu, 26 Dec 2024 17:28:17 +0100 Subject: [PATCH 13/14] [REF] remove explicit super() arguments --- test_queue_job/tests/test_job_function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test_queue_job/tests/test_job_function.py b/test_queue_job/tests/test_job_function.py index 17781ac475..320b4973c5 100644 --- a/test_queue_job/tests/test_job_function.py +++ b/test_queue_job/tests/test_job_function.py @@ -4,7 +4,7 @@ class TestJobFunction(common.TransactionCase): def setUp(self): - super(TestJobFunction, self).setUp() + super().setUp() self.test_function_model = self.env.ref( "queue_job.job_function_queue_job__test_job" ) From 2b0c0337716a12860dec85a744c4ab76f6315b4d Mon Sep 17 00:00:00 2001 From: Lois Rilo Date: Fri, 11 Jun 2021 16:11:14 +0200 Subject: [PATCH 14/14] [13.0][FIX] queue_job_cron: channel_id must be storable. Otherwise, you cannot use any channel other than default ( root.ir_cron) --- queue_job_cron/models/ir_cron.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/queue_job_cron/models/ir_cron.py b/queue_job_cron/models/ir_cron.py index 7e4f5b848d..bb09ed075e 100644 --- a/queue_job_cron/models/ir_cron.py +++ b/queue_job_cron/models/ir_cron.py @@ -28,13 +28,16 @@ class IrCron(models.Model): comodel_name="queue.job.channel", compute="_compute_run_as_queue_job", readonly=False, + store=True, string="Channel", ) @api.depends("run_as_queue_job") def _compute_run_as_queue_job(self): for cron in self: - if cron.run_as_queue_job and not cron.channel_id: + if cron.channel_id: + continue + if cron.run_as_queue_job: cron.channel_id = self.env.ref("queue_job_cron.channel_root_ir_cron").id else: cron.channel_id = False