Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from odoo import SUPERUSER_ID, _, api, http
from odoo.modules.registry import Registry
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY
from odoo.tools import config

from ..delay import chain, group
from ..exception import FailedJobError, RetryableJobError
Expand All @@ -38,8 +39,10 @@ def _prevent_commit(cr):
def forbidden_commit(*args, **kwargs):
raise RuntimeError(
"Commit is forbidden in queue jobs. "
"If the current job is a cron running as queue job, "
"modify it to run as a normal cron."
'You may want to enable the "Allow Commit" option on the Job '
"Function. Alternatively, if the current job is a cron running as "
"queue job, you can modify it to run as a normal cron. More details on: "
"https://github.com/OCA/queue/wiki/%5BDRAFT%5D-Upgrade-warning:-commits-inside-jobs"
Comment thread
guewen marked this conversation as resolved.
Outdated
)

original_commit = cr.commit
Expand Down Expand Up @@ -103,7 +106,8 @@ def _try_perform_job(cls, env, job):
job.set_done()
job.store()
env.flush_all()
env.cr.commit()
if not config["test_enable"]:
env.cr.commit()
_logger.debug("%s done", job)

@classmethod
Expand Down
37 changes: 28 additions & 9 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import uuid
import weakref
from contextlib import contextmanager, nullcontext
from datetime import datetime, timedelta
from random import randint

Expand Down Expand Up @@ -402,14 +403,9 @@ def __init__(
raise TypeError("Job accepts only methods of Models")

recordset = func.__self__
env = recordset.env
self.method_name = func.__name__
self.recordset = recordset

self.env = env
Comment thread
guewen marked this conversation as resolved.
self.job_model = self.env["queue.job"]
self.job_model_name = "queue.job"

self.job_config = (
self.env["queue.job.function"].sudo().job_config(self.job_function_name)
)
Expand Down Expand Up @@ -459,10 +455,10 @@ def __init__(
self.exc_message = None
self.exc_info = None

if "company_id" in env.context:
company_id = env.context["company_id"]
if "company_id" in self.env.context:
company_id = self.env.context["company_id"]
else:
company_id = env.company.id
company_id = self.env.company.id
self.company_id = company_id
self._eta = None
self.eta = eta
Expand All @@ -487,7 +483,12 @@ def perform(self):
"""
self.retry += 1
try:
self.result = self.func(*tuple(self.args), **self.kwargs)
if self.job_config.allow_commit:
env_context_manager = self._with_temporary_env()
else:
env_context_manager = nullcontext()
with env_context_manager:
self.result = self.func(*tuple(self.args), **self.kwargs)
except RetryableJobError as err:
if err.ignore_retry:
self.retry -= 1
Expand All @@ -507,6 +508,16 @@ def perform(self):

return self.result

@contextmanager
def _with_temporary_env(self):
with self.env.registry.cursor() as new_cr:
env = self.recordset.env
Comment thread
guewen marked this conversation as resolved.
Outdated
self.recordset = self.recordset.with_env(env(cr=new_cr))
Comment thread
guewen marked this conversation as resolved.
Outdated
try:
yield
finally:
self.recordset = self.recordset.with_env(env)
Comment thread
guewen marked this conversation as resolved.
Outdated

def _get_common_dependent_jobs_query(self):
return """
UPDATE queue_job
Expand Down Expand Up @@ -665,6 +676,14 @@ def __hash__(self):
def db_record(self):
return self.db_records_from_uuids(self.env, [self.uuid])

@property
def env(self):
return self.recordset.env

@env.setter
def env(self, env):
self.recordset = self.recordset.with_env(env)

@property
def func(self):
recordset = self.recordset.with_context(job_uuid=self.uuid)
Expand Down
11 changes: 10 additions & 1 deletion queue_job/models/queue_job_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class QueueJobFunction(models.Model):
"related_action_enable "
"related_action_func_name "
"related_action_kwargs "
"job_function_id ",
"job_function_id "
"allow_commit",
)

def _default_channel(self):
Expand Down Expand Up @@ -79,6 +80,12 @@ def _default_channel(self):
"enable, func_name, kwargs.\n"
"See the module description for details.",
)
allow_commit = fields.Boolean(
help="Allows the job to commit transactions during execution. "
"Under the hood, this executes the job in a new database cursor, "
"which incurs an overhead as it requires an extra connection to "
"the database. "
)

@api.depends("model_id.model", "method")
def _compute_name(self):
Expand Down Expand Up @@ -149,6 +156,7 @@ def job_default_config(self):
related_action_func_name=None,
related_action_kwargs={},
job_function_id=None,
allow_commit=False,
)

def _parse_retry_pattern(self):
Expand Down Expand Up @@ -184,6 +192,7 @@ def job_config(self, name):
related_action_func_name=config.related_action.get("func_name"),
related_action_kwargs=config.related_action.get("kwargs", {}),
job_function_id=config.id,
allow_commit=config.allow_commit,
)

def _retry_pattern_format_error_message(self):
Expand Down
2 changes: 1 addition & 1 deletion queue_job/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def _add_job(self, *args, **kwargs):

def _prepare_context(self, job):
# pylint: disable=context-overridden
job_model = job.job_model.with_context({})
job_model = job.env["queue.job"].with_context({})
field_records = job_model._fields["records"]
# Filter the context to simulate store/load of the job
job.recordset = field_records.convert_to_write(job.recordset, job_model)
Expand Down
2 changes: 2 additions & 0 deletions queue_job/tests/test_model_job_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def test_function_job_config(self):
' "func_name": "related_action_foo",'
' "kwargs": {"b": 1}}'
),
"allow_commit": True,
}
)
self.assertEqual(
Expand All @@ -53,5 +54,6 @@ def test_function_job_config(self):
related_action_func_name="related_action_foo",
related_action_kwargs={"b": 1},
job_function_id=job_function.id,
allow_commit=True,
),
)
6 changes: 6 additions & 0 deletions queue_job/tests/test_run_rob_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ def test_get_failure_values(self):
self.assertEqual(
rslt, {"exc_info": "info", "exc_name": "Exception", "exc_message": "zero"}
)

def test_runjob_success(self):
job = self.env["queue.job"].with_delay()._test_job()
RunJobController._runjob(self.env, job)
self.assertEqual(job.state, "done")
self.assertEqual(job.db_record().state, "done")
2 changes: 2 additions & 0 deletions queue_job/views/queue_job_function_views.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<field name="model_id" required="1" />
<field name="method" required="1" />
<field name="channel_id" />
<field name="allow_commit" />
<field name="edit_retry_pattern" widget="ace" />
<field name="edit_related_action" widget="ace" />
</group>
Expand All @@ -24,6 +25,7 @@
<list>
<field name="name" />
<field name="channel_id" />
<field name="allow_commit" />
</list>
</field>
</record>
Expand Down