diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a4537ce4..c3b84a33 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,26 @@ Change Log Unreleased __________ +Added +~~~~~ + +* ``OpenEdxPublicSignal.send_event`` now accepts two new keyword arguments: + + * ``send_on_commit`` (bool, default ``False``): defers sending the event until + the current database transaction commits, using ``django.db.transaction.on_commit``. + Sends immediately if there is no open transaction, and does not send at all + if the transaction rolls back. + * ``send_async`` (bool, default ``False``): sends the event from a Celery + task instead of the caller's thread. Event data is serialized with the Avro + serializer so that attrs-based payload classes can round-trip through + Celery's JSON transport. Requires a Celery worker that imports + ``openedx_events.tasks``. + + The two options compose: combining them defers the Celery dispatch until the + transaction commits. + +* Added ``celery`` to ``requirements/base.in``. + [11.2.0] - 2026-04-20 --------------------- diff --git a/docs/how-tos/create-a-new-event.rst b/docs/how-tos/create-a-new-event.rst index 6232edf0..6c1eadc1 100644 --- a/docs/how-tos/create-a-new-event.rst +++ b/docs/how-tos/create-a-new-event.rst @@ -248,6 +248,25 @@ Here is how the integration could look like: - Ensure that the event is triggered consistently and only when the event should be triggered. Avoid triggering the event multiple times for the same event unless necessary, e.g., when there is no other way to ensure that the event is triggered consistently. - Try placing the event after the triggering logic completes successfully to ensure that it is triggered only when needed. This will help ensure that the event is triggered only for factual events. If the triggering logic fails, the event should not be triggered. +Deferring and Backgrounding Event Sends +----------------------------------------- + +``send_event`` accepts two optional keyword arguments that control *when* and *where* receivers run: + +- ``send_on_commit=True``: Defer the send until the current database transaction commits (via ``django.db.transaction.on_commit``). If there is no open transaction, the event is sent immediately. If the transaction rolls back, the event is not sent. Use this when the event reports a database change that may still be rolled back, to avoid "counterfactual" events. + +- ``send_async=True``: Send the event from an asynchronous Celery task instead of the caller's thread. Event data is serialized with the Avro serializer (so that ``attrs`` payload classes round-trip through Celery's JSON transport) and handed off to ``openedx_events.tasks.send_async_event``. Use this when receivers are slow, non-critical, or you want to decouple their cost from the triggering request. Requires a Celery worker that imports ``openedx_events.tasks``. + +Both options can be combined. For example, to send a notification event only if the enrollment commits, and to run the (possibly slow) receivers in a worker: + +.. code-block:: python + + COURSE_ENROLLMENT_CREATED.send_event( + enrollment=enrollment_data, + send_on_commit=True, + send_async=True, + ) + Step 7: Test the Event ======================== diff --git a/openedx_events/tasks.py b/openedx_events/tasks.py new file mode 100644 index 00000000..1f5cc0b7 --- /dev/null +++ b/openedx_events/tasks.py @@ -0,0 +1,41 @@ +""" +Celery tasks for sending Open edX events asynchronously. +""" + +import base64 +from logging import getLogger + +from celery import shared_task + +log = getLogger(__name__) + + +@shared_task(name="openedx_events.tasks.send_async_event") +def send_async_event(event_type: str, metadata_json: str, event_data_b64: str) -> None: + """ + Re-send an event from a Celery worker. + + Event data is serialized using the Avro serializer (because event payloads + are typically attrs classes that are not JSON-serializable) and base64-encoded + so that it can be passed as a JSON-safe argument to Celery. Metadata is + passed as a JSON string. + + Arguments: + event_type (str): The event type of the signal to re-send. + metadata_json (str): JSON-serialized EventsMetadata. + event_data_b64 (str): Base64-encoded Avro-serialized event data. + """ + # Imported here to avoid a circular import at module load time. + from .data import EventsMetadata # pylint: disable=import-outside-toplevel + from .event_bus.avro.deserializer import deserialize_bytes_to_event_data # pylint: disable=import-outside-toplevel + from .tooling import OpenEdxPublicSignal # pylint: disable=import-outside-toplevel,cyclic-import + + signal = OpenEdxPublicSignal.get_signal_by_type(event_type) + metadata = EventsMetadata.from_json(metadata_json) + event_data = deserialize_bytes_to_event_data( + base64.b64decode(event_data_b64), signal + ) + signal._send_event_with_metadata( # pylint: disable=protected-access + metadata=metadata, + **event_data, + ) diff --git a/openedx_events/tests/test_tooling.py b/openedx_events/tests/test_tooling.py index e152620e..814f26f7 100644 --- a/openedx_events/tests/test_tooling.py +++ b/openedx_events/tests/test_tooling.py @@ -4,19 +4,23 @@ Classes: EventsToolingTest: Test events tooling. """ +import base64 import datetime import sys from contextlib import contextmanager -from unittest.mock import Mock, patch +from unittest.mock import ANY, Mock, patch from uuid import UUID, uuid1 import attr import ddt import pytest -from django.test import TestCase, override_settings +from django.db import transaction +from django.test import TestCase, TransactionTestCase, override_settings from openedx_events.data import EventsMetadata from openedx_events.exceptions import SenderValidationError +from openedx_events.learning.data import UserData, UserPersonalData +from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED from openedx_events.testing import FreezeSignalCacheMixin from openedx_events.tooling import OpenEdxPublicSignal, _process_all_signals_modules, load_all_signals @@ -257,7 +261,8 @@ def test_send_event_with_custom_metadata(self, mock_send_event_with_metadata): assert response == expected_response mock_send_event_with_metadata.assert_called_once_with( - metadata=metadata, send_robust=True, foo="bar", from_event_bus=True + metadata=metadata, send_robust=True, foo="bar", from_event_bus=True, + send_on_commit=False, send_async=False, ) @ddt.data( @@ -333,6 +338,225 @@ def test_send_event_disabled(self, send_mock): self.assertListEqual([], result) +def _make_user_data(): + """Build a UserData payload for the real SESSION_LOGIN_COMPLETED signal.""" + return UserData( + pii=UserPersonalData(username="alice", email="alice@example.com", name="Alice"), + id=1, + is_active=True, + ) + + +class SendEventOnCommitTests(TestCase): + """ + Tests for the ``send_on_commit`` parameter of ``send_event``. + + Uses ``TestCase.captureOnCommitCallbacks`` to observe callbacks that + ``transaction.on_commit`` registers inside the outer test-level + transaction. + """ + + def setUp(self): + super().setUp() + self.receiver = Mock(return_value="ok") + SESSION_LOGIN_COMPLETED.connect(self.receiver) + self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver) + + def test_send_on_commit_defers_until_commit(self): + """ + With ``send_on_commit=True`` inside a transaction, the receiver is + only called once the enclosing transaction commits. + """ + with self.captureOnCommitCallbacks(execute=False) as callbacks: + result = SESSION_LOGIN_COMPLETED.send_event( + send_on_commit=True, user=_make_user_data(), + ) + + self.receiver.assert_not_called() + self.assertEqual(result, []) + self.assertEqual(len(callbacks), 1) + + # Now "commit" and verify receiver runs. + for cb in callbacks: + cb() + self.receiver.assert_called_once() + + def test_send_on_commit_callback_runs_receiver(self): + """ + Executing the captured ``on_commit`` callback (simulating a commit) + triggers the receiver, verifying that the work registered under the + callback is the actual signal send. + """ + with self.captureOnCommitCallbacks(execute=True): + SESSION_LOGIN_COMPLETED.send_event( + send_on_commit=True, user=_make_user_data(), + ) + self.receiver.assert_called_once() + + +class SendEventOnCommitRollbackTests(TransactionTestCase): + """ + Tests that ``send_on_commit`` suppresses sending when the transaction + rolls back. Uses ``TransactionTestCase`` so transactions actually commit + and roll back. + """ + + def setUp(self): + super().setUp() + self.receiver = Mock(return_value="ok") + SESSION_LOGIN_COMPLETED.connect(self.receiver) + self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver) + + def test_send_on_commit_immediate_when_no_transaction(self): + """ + Outside any transaction, ``send_on_commit=True`` sends immediately + (per Django's ``transaction.on_commit`` contract). + """ + SESSION_LOGIN_COMPLETED.send_event( + send_on_commit=True, user=_make_user_data(), + ) + self.receiver.assert_called_once() + + def test_send_on_commit_not_sent_on_rollback(self): + """ + If the transaction rolls back, the on_commit callback is never run, + so the event is not sent. + """ + class _Rollback(Exception): + pass + + with self.assertRaises(_Rollback): + with transaction.atomic(): + SESSION_LOGIN_COMPLETED.send_event( + send_on_commit=True, user=_make_user_data(), + ) + raise _Rollback() + + self.receiver.assert_not_called() + + +class SendEventAsyncTests(TestCase): + """ + Tests for the ``send_async`` parameter of ``send_event``. + """ + + def setUp(self): + super().setUp() + self.receiver = Mock(return_value="ok") + SESSION_LOGIN_COMPLETED.connect(self.receiver) + self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver) + + @patch("openedx_events.tasks.send_async_event.delay") + def test_send_async_dispatches_celery_task(self, mock_delay): + """ + With ``send_async=True``, the receiver is not called synchronously. + Instead a Celery task is dispatched with the serialized event data. + """ + result = SESSION_LOGIN_COMPLETED.send_event( + send_async=True, user=_make_user_data(), + ) + + self.assertEqual(result, []) + self.receiver.assert_not_called() + mock_delay.assert_called_once_with( + SESSION_LOGIN_COMPLETED.event_type, ANY, ANY, + ) + _, metadata_json, event_data_b64 = mock_delay.call_args.args + # The metadata round-trips through JSON. + self.assertEqual( + EventsMetadata.from_json(metadata_json).event_type, + SESSION_LOGIN_COMPLETED.event_type, + ) + # The event data is valid base64. + base64.b64decode(event_data_b64) + + def test_send_async_task_sends_event(self): + """ + End-to-end: when ``send_async=True``, running the dispatched Celery + task synchronously delivers the event to the receiver with the same + metadata and a payload that round-trips through Avro. + """ + captured = {} + + def _capture_delay(event_type, metadata_json, event_data_b64): + captured["args"] = (event_type, metadata_json, event_data_b64) + + with patch("openedx_events.tasks.send_async_event.delay", side_effect=_capture_delay): + SESSION_LOGIN_COMPLETED.send_event( + send_async=True, user=_make_user_data(), + ) + + self.receiver.assert_not_called() + + # Now run the task body directly, simulating a Celery worker. + from openedx_events.tasks import send_async_event # pylint: disable=import-outside-toplevel + send_async_event(*captured["args"]) + + self.receiver.assert_called_once() + call_kwargs = self.receiver.call_args.kwargs + self.assertEqual(call_kwargs["signal"], SESSION_LOGIN_COMPLETED) + self.assertEqual(call_kwargs["user"], _make_user_data()) + self.assertEqual(call_kwargs["metadata"].event_type, SESSION_LOGIN_COMPLETED.event_type) + self.assertEqual(call_kwargs["from_event_bus"], False) + + +class SendEventAsyncOnCommitTests(TestCase): + """ + Tests combining ``send_on_commit=True`` with ``send_async=True``: the + Celery task dispatch should be deferred until the transaction commits. + """ + + def setUp(self): + super().setUp() + self.receiver = Mock(return_value="ok") + SESSION_LOGIN_COMPLETED.connect(self.receiver) + self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver) + + @patch("openedx_events.tasks.send_async_event.delay") + def test_async_on_commit_defers_dispatch(self, mock_delay): + """ + ``send_async=True`` + ``send_on_commit=True`` in a transaction: the + Celery dispatch is registered as an on_commit callback, not invoked + immediately. + """ + with self.captureOnCommitCallbacks(execute=False) as callbacks: + SESSION_LOGIN_COMPLETED.send_event( + send_async=True, send_on_commit=True, user=_make_user_data(), + ) + + mock_delay.assert_not_called() + self.assertEqual(len(callbacks), 1) + + for cb in callbacks: + cb() + mock_delay.assert_called_once() + + +class SendEventAsyncOnCommitRollbackTests(TransactionTestCase): + """Rollback behavior for ``send_async`` + ``send_on_commit``.""" + + def setUp(self): + super().setUp() + self.receiver = Mock(return_value="ok") + SESSION_LOGIN_COMPLETED.connect(self.receiver) + self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver) + + @patch("openedx_events.tasks.send_async_event.delay") + def test_async_on_commit_not_dispatched_on_rollback(self, mock_delay): + class _Rollback(Exception): + pass + + with self.assertRaises(_Rollback): + with transaction.atomic(): + SESSION_LOGIN_COMPLETED.send_event( + send_async=True, send_on_commit=True, user=_make_user_data(), + ) + raise _Rollback() + + mock_delay.assert_not_called() + self.receiver.assert_not_called() + + class TestLoadAllSignals(FreezeSignalCacheMixin, TestCase): """ Tests for the load_all_signals method""" def setUp(self): diff --git a/openedx_events/tooling.py b/openedx_events/tooling.py index 9565b392..3e677d5e 100644 --- a/openedx_events/tooling.py +++ b/openedx_events/tooling.py @@ -1,13 +1,15 @@ """ Tooling necessary to use Open edX events. """ + +import base64 import pkgutil import warnings from importlib import import_module from logging import getLogger from django.conf import settings -from django.db import connection +from django.db import connection, transaction from django.dispatch import Signal from edx_django_utils.cache import RequestCache @@ -111,7 +113,16 @@ def generate_signal_metadata(self, time=None): time=time, ) - def _send_event_with_metadata(self, metadata, send_robust=True, from_event_bus=False, **kwargs): + def _send_event_with_metadata( + self, + *, + metadata, + send_robust=True, + from_event_bus=False, + send_on_commit=False, + send_async=False, + **kwargs, + ): """ Send events to all connected receivers with the provided metadata. @@ -124,6 +135,13 @@ def _send_event_with_metadata(self, metadata, send_robust=True, from_event_bus=F being sent from the event bus. This is used to prevent infinite loops when the event bus is consuming events. It should not be used when sending events from the application. + send_on_commit (bool): Defaults to False. If True, defer sending + the event until the current database transaction commits. If no + transaction is open, the event is sent immediately. If the + transaction rolls back, the event is not sent. + send_async (bool): Defaults to False. If True, serialize the event + and dispatch it to a Celery task so that receivers run in a + worker process instead of the caller's thread. See ``send_event`` docstring for more details on its usage and behavior. """ @@ -162,20 +180,56 @@ def validate_sender(): validate_sender() + if send_async: + # Serialize event data up-front so that (a) serialization errors surface + # in the caller and (b) the task argument is JSON-safe for Celery. + # pylint: disable=import-outside-toplevel + from openedx_events.event_bus.avro.serializer import serialize_event_data_to_bytes + from openedx_events.tasks import send_async_event + + event_data_b64 = base64.b64encode( + serialize_event_data_to_bytes(kwargs, self) + ).decode("ascii") + metadata_json = metadata.to_json() + + def _dispatch_async(): + send_async_event.delay(self.event_type, metadata_json, event_data_b64) + + if send_on_commit: + transaction.on_commit(_dispatch_async) + else: + _dispatch_async() + return [] + kwargs["metadata"] = metadata kwargs[SIGNAL_PROCESSED_FROM_EVENT_BUS] = from_event_bus - if self._allow_send_event_failure or settings.DEBUG or not send_robust: - return super().send(sender=None, **kwargs) + signal_send = super().send + signal_send_robust = super().send_robust - responses = super().send_robust(sender=None, **kwargs) - log.info( - f"Responses of the Open edX Event <{self.event_type}>: \n{format_responses(responses, depth=2)}", - ) + def _dispatch_sync(): + if self._allow_send_event_failure or settings.DEBUG or not send_robust: + return signal_send(sender=None, **kwargs) + + responses = signal_send_robust(sender=None, **kwargs) + log.info(f"Responses of the Open edX Event <{self.event_type}>: \n{format_responses(responses, depth=2)}") + return responses - return responses + if send_on_commit: + transaction.on_commit(_dispatch_sync) + return [] + + return _dispatch_sync() - def send_event(self, send_robust=True, time=None, **kwargs): + def send_event( + self, + *, + send_robust=True, + time=None, + send_on_commit=False, + send_async=False, + **kwargs, + ): """ Send events to all connected receivers. @@ -194,11 +248,26 @@ def send_event(self, send_robust=True, time=None, **kwargs): >>> [(, 'callback response')] Arguments: - send_robust (bool): Defaults to True. See Django signal docs. + send_robust (bool): Defaults to True unless settings.DEBUG is True or you have previously + called .allow_send_event_failure() for this signal. When this is True, exceptions that + occurr during event handling will be ignored, ensuring that every registered receiver + gets the event. This can make debugging difficult, however. See Django signal docs. time (datetime): (Optional - see note) Timestamp when the event was sent with UTC timezone. For events requiring a DB create or update, use the timestamp from the DB record. Defaults to current time in UTC. This argument is optional for backward compatibility, but ideally would be explicitly set. See OEP-41 for details. + send_on_commit (bool): Defaults to False. If True, defer sending the event until the + current database transaction commits (using ``django.db.transaction.on_commit``). If + there is no open transaction the event is sent immediately. If the transaction rolls + back the event is not sent. Useful when the event reports a DB change that may yet + be rolled back. + send_async (bool): Defaults to False. If True, the event is sent from an asynchronous + Celery task instead of the caller's thread. The event payload is serialized with the + Avro serializer so that attrs-based payload classes can round-trip through Celery's + JSON transport. Requires a running Celery worker configured to import + ``openedx_events.tasks``. Can be combined with ``send_on_commit`` to defer the Celery + dispatch until after the transaction commits. When this is True the returned list is + always empty, because receivers run in the worker. Keyword Arguments: kwargs: Data to be sent to the signal's receivers. The keys must match the attributes defined in @@ -206,17 +275,31 @@ def send_event(self, send_robust=True, time=None, **kwargs): Returns: list: response of each receiver following the format [(receiver, response), ... ]. - The list is empty if the event is disabled. + The list is empty if the event is disabled, if ``send_on_commit`` defers the dispatch, + or if ``send_async`` is used. Raises: SenderValidationError: raised when there's a mismatch between arguments passed to this method and arguments used to initialize the event. """ metadata = self.generate_signal_metadata(time=time) - return self._send_event_with_metadata(metadata=metadata, send_robust=send_robust, **kwargs) + return self._send_event_with_metadata( + metadata=metadata, + send_robust=send_robust, + send_on_commit=send_on_commit, + send_async=send_async, + **kwargs, + ) def send_event_with_custom_metadata( - self, metadata, /, *, send_robust=True, **kwargs + self, + metadata, + /, + *, + send_robust=True, + send_on_commit=False, + send_async=False, + **kwargs, ): """ Send events to all connected receivers using the provided metadata. @@ -230,12 +313,19 @@ def send_event_with_custom_metadata( Arguments: metadata (EventsMetadata): The metadata to be sent with the signal. send_robust (bool): Defaults to True. See Django signal docs. + send_on_commit (bool): Defaults to False. See ``send_event``. + send_async (bool): Defaults to False. See ``send_event``. kwargs: Data to be sent to the signal's receivers. See ``send_event`` docstring for more details. """ return self._send_event_with_metadata( - metadata=metadata, send_robust=send_robust, from_event_bus=True, **kwargs + metadata=metadata, + send_robust=send_robust, + from_event_bus=True, + send_on_commit=send_on_commit, + send_async=send_async, + **kwargs, ) def send(self, sender, **kwargs): # pylint: disable=unused-argument @@ -266,8 +356,9 @@ def disable(self): def allow_send_event_failure(self): """ - Allow Django signal to fail. Meaning, uses send_robust instead of send. + Do not silence exceptions in event handlers. + Meaning, uses ``send()`` instead of ``send_robust()``. More information on send_robust in the Django official documentation. """ self._allow_send_event_failure = True diff --git a/requirements/base.in b/requirements/base.in index b4b5eaa8..d8814f0a 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -1,6 +1,7 @@ # Core requirements for using this application -c constraints.txt +celery edx_django_utils django attrs