Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,26 @@ Change Log
Unreleased
__________

Added
~~~~~

* ``OpenEdxPublicSignal.send_event`` now accepts two new keyword arguments:

* ``send_on_commit`` (bool, default ``False``): defers sending the event until
the current database transaction commits, using ``django.db.transaction.on_commit``.
Sends immediately if there is no open transaction, and does not send at all
if the transaction rolls back.
* ``send_async`` (bool, default ``False``): sends the event from a Celery
task instead of the caller's thread. Event data is serialized with the Avro
serializer so that attrs-based payload classes can round-trip through
Celery's JSON transport. Requires a Celery worker that imports
``openedx_events.tasks``.

The two options compose: combining them defers the Celery dispatch until the
transaction commits.

* Added ``celery`` to ``requirements/base.in``.

[11.2.0] - 2026-04-20
---------------------

Expand Down
19 changes: 19 additions & 0 deletions docs/how-tos/create-a-new-event.rst
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,25 @@ Here is how the integration could look like:
- Ensure that the event is triggered consistently and only when the event should be triggered. Avoid triggering the event multiple times for the same event unless necessary, e.g., when there is no other way to ensure that the event is triggered consistently.
- Try placing the event after the triggering logic completes successfully to ensure that it is triggered only when needed. This will help ensure that the event is triggered only for factual events. If the triggering logic fails, the event should not be triggered.

Deferring and Backgrounding Event Sends
-----------------------------------------

``send_event`` accepts two optional keyword arguments that control *when* and *where* receivers run:

- ``send_on_commit=True``: Defer the send until the current database transaction commits (via ``django.db.transaction.on_commit``). If there is no open transaction, the event is sent immediately. If the transaction rolls back, the event is not sent. Use this when the event reports a database change that may still be rolled back, to avoid "counterfactual" events.

- ``send_async=True``: Send the event from an asynchronous Celery task instead of the caller's thread. Event data is serialized with the Avro serializer (so that ``attrs`` payload classes round-trip through Celery's JSON transport) and handed off to ``openedx_events.tasks.send_async_event``. Use this when receivers are slow, non-critical, or you want to decouple their cost from the triggering request. Requires a Celery worker that imports ``openedx_events.tasks``.

Both options can be combined. For example, to send a notification event only if the enrollment commits, and to run the (possibly slow) receivers in a worker:

.. code-block:: python

COURSE_ENROLLMENT_CREATED.send_event(
enrollment=enrollment_data,
send_on_commit=True,
send_async=True,
)

Step 7: Test the Event
========================

Expand Down
41 changes: 41 additions & 0 deletions openedx_events/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Celery tasks for sending Open edX events asynchronously.
"""

import base64
from logging import getLogger

from celery import shared_task

log = getLogger(__name__)


@shared_task(name="openedx_events.tasks.send_async_event")
def send_async_event(event_type: str, metadata_json: str, event_data_b64: str) -> None:
"""
Re-send an event from a Celery worker.

Event data is serialized using the Avro serializer (because event payloads
are typically attrs classes that are not JSON-serializable) and base64-encoded
so that it can be passed as a JSON-safe argument to Celery. Metadata is
passed as a JSON string.

Arguments:
event_type (str): The event type of the signal to re-send.
metadata_json (str): JSON-serialized EventsMetadata.
event_data_b64 (str): Base64-encoded Avro-serialized event data.
"""
# Imported here to avoid a circular import at module load time.
from .data import EventsMetadata # pylint: disable=import-outside-toplevel
from .event_bus.avro.deserializer import deserialize_bytes_to_event_data # pylint: disable=import-outside-toplevel
from .tooling import OpenEdxPublicSignal # pylint: disable=import-outside-toplevel,cyclic-import

signal = OpenEdxPublicSignal.get_signal_by_type(event_type)
metadata = EventsMetadata.from_json(metadata_json)
event_data = deserialize_bytes_to_event_data(
base64.b64decode(event_data_b64), signal
)
signal._send_event_with_metadata( # pylint: disable=protected-access
metadata=metadata,
**event_data,
)
230 changes: 227 additions & 3 deletions openedx_events/tests/test_tooling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,23 @@
Classes:
EventsToolingTest: Test events tooling.
"""
import base64
import datetime
import sys
from contextlib import contextmanager
from unittest.mock import Mock, patch
from unittest.mock import ANY, Mock, patch
from uuid import UUID, uuid1

import attr
import ddt
import pytest
from django.test import TestCase, override_settings
from django.db import transaction
from django.test import TestCase, TransactionTestCase, override_settings

from openedx_events.data import EventsMetadata
from openedx_events.exceptions import SenderValidationError
from openedx_events.learning.data import UserData, UserPersonalData
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
from openedx_events.testing import FreezeSignalCacheMixin
from openedx_events.tooling import OpenEdxPublicSignal, _process_all_signals_modules, load_all_signals

Expand Down Expand Up @@ -257,7 +261,8 @@ def test_send_event_with_custom_metadata(self, mock_send_event_with_metadata):

assert response == expected_response
mock_send_event_with_metadata.assert_called_once_with(
metadata=metadata, send_robust=True, foo="bar", from_event_bus=True
metadata=metadata, send_robust=True, foo="bar", from_event_bus=True,
send_on_commit=False, send_async=False,
)

@ddt.data(
Expand Down Expand Up @@ -333,6 +338,225 @@ def test_send_event_disabled(self, send_mock):
self.assertListEqual([], result)


def _make_user_data():
"""Build a UserData payload for the real SESSION_LOGIN_COMPLETED signal."""
return UserData(
pii=UserPersonalData(username="alice", email="alice@example.com", name="Alice"),
id=1,
is_active=True,
)


class SendEventOnCommitTests(TestCase):
"""
Tests for the ``send_on_commit`` parameter of ``send_event``.

Uses ``TestCase.captureOnCommitCallbacks`` to observe callbacks that
``transaction.on_commit`` registers inside the outer test-level
transaction.
"""

def setUp(self):
super().setUp()
self.receiver = Mock(return_value="ok")
SESSION_LOGIN_COMPLETED.connect(self.receiver)
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)

def test_send_on_commit_defers_until_commit(self):
"""
With ``send_on_commit=True`` inside a transaction, the receiver is
only called once the enclosing transaction commits.
"""
with self.captureOnCommitCallbacks(execute=False) as callbacks:
result = SESSION_LOGIN_COMPLETED.send_event(
send_on_commit=True, user=_make_user_data(),
)

self.receiver.assert_not_called()
self.assertEqual(result, [])
self.assertEqual(len(callbacks), 1)

# Now "commit" and verify receiver runs.
for cb in callbacks:
cb()
self.receiver.assert_called_once()

def test_send_on_commit_callback_runs_receiver(self):
"""
Executing the captured ``on_commit`` callback (simulating a commit)
triggers the receiver, verifying that the work registered under the
callback is the actual signal send.
"""
with self.captureOnCommitCallbacks(execute=True):
SESSION_LOGIN_COMPLETED.send_event(
send_on_commit=True, user=_make_user_data(),
)
self.receiver.assert_called_once()


class SendEventOnCommitRollbackTests(TransactionTestCase):
"""
Tests that ``send_on_commit`` suppresses sending when the transaction
rolls back. Uses ``TransactionTestCase`` so transactions actually commit
and roll back.
"""

def setUp(self):
super().setUp()
self.receiver = Mock(return_value="ok")
SESSION_LOGIN_COMPLETED.connect(self.receiver)
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)

def test_send_on_commit_immediate_when_no_transaction(self):
"""
Outside any transaction, ``send_on_commit=True`` sends immediately
(per Django's ``transaction.on_commit`` contract).
"""
SESSION_LOGIN_COMPLETED.send_event(
send_on_commit=True, user=_make_user_data(),
)
self.receiver.assert_called_once()

def test_send_on_commit_not_sent_on_rollback(self):
"""
If the transaction rolls back, the on_commit callback is never run,
so the event is not sent.
"""
class _Rollback(Exception):
pass

with self.assertRaises(_Rollback):
with transaction.atomic():
SESSION_LOGIN_COMPLETED.send_event(
send_on_commit=True, user=_make_user_data(),
)
raise _Rollback()

self.receiver.assert_not_called()


class SendEventAsyncTests(TestCase):
"""
Tests for the ``send_async`` parameter of ``send_event``.
"""

def setUp(self):
super().setUp()
self.receiver = Mock(return_value="ok")
SESSION_LOGIN_COMPLETED.connect(self.receiver)
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)

@patch("openedx_events.tasks.send_async_event.delay")
def test_send_async_dispatches_celery_task(self, mock_delay):
"""
With ``send_async=True``, the receiver is not called synchronously.
Instead a Celery task is dispatched with the serialized event data.
"""
result = SESSION_LOGIN_COMPLETED.send_event(
send_async=True, user=_make_user_data(),
)

self.assertEqual(result, [])
self.receiver.assert_not_called()
mock_delay.assert_called_once_with(
SESSION_LOGIN_COMPLETED.event_type, ANY, ANY,
)
_, metadata_json, event_data_b64 = mock_delay.call_args.args
# The metadata round-trips through JSON.
self.assertEqual(
EventsMetadata.from_json(metadata_json).event_type,
SESSION_LOGIN_COMPLETED.event_type,
)
# The event data is valid base64.
base64.b64decode(event_data_b64)

def test_send_async_task_sends_event(self):
"""
End-to-end: when ``send_async=True``, running the dispatched Celery
task synchronously delivers the event to the receiver with the same
metadata and a payload that round-trips through Avro.
"""
captured = {}

def _capture_delay(event_type, metadata_json, event_data_b64):
captured["args"] = (event_type, metadata_json, event_data_b64)

with patch("openedx_events.tasks.send_async_event.delay", side_effect=_capture_delay):
SESSION_LOGIN_COMPLETED.send_event(
send_async=True, user=_make_user_data(),
)

self.receiver.assert_not_called()

# Now run the task body directly, simulating a Celery worker.
from openedx_events.tasks import send_async_event # pylint: disable=import-outside-toplevel
send_async_event(*captured["args"])

self.receiver.assert_called_once()
call_kwargs = self.receiver.call_args.kwargs
self.assertEqual(call_kwargs["signal"], SESSION_LOGIN_COMPLETED)
self.assertEqual(call_kwargs["user"], _make_user_data())
self.assertEqual(call_kwargs["metadata"].event_type, SESSION_LOGIN_COMPLETED.event_type)
self.assertEqual(call_kwargs["from_event_bus"], False)


class SendEventAsyncOnCommitTests(TestCase):
"""
Tests combining ``send_on_commit=True`` with ``send_async=True``: the
Celery task dispatch should be deferred until the transaction commits.
"""

def setUp(self):
super().setUp()
self.receiver = Mock(return_value="ok")
SESSION_LOGIN_COMPLETED.connect(self.receiver)
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)

@patch("openedx_events.tasks.send_async_event.delay")
def test_async_on_commit_defers_dispatch(self, mock_delay):
"""
``send_async=True`` + ``send_on_commit=True`` in a transaction: the
Celery dispatch is registered as an on_commit callback, not invoked
immediately.
"""
with self.captureOnCommitCallbacks(execute=False) as callbacks:
SESSION_LOGIN_COMPLETED.send_event(
send_async=True, send_on_commit=True, user=_make_user_data(),
)

mock_delay.assert_not_called()
self.assertEqual(len(callbacks), 1)

for cb in callbacks:
cb()
mock_delay.assert_called_once()


class SendEventAsyncOnCommitRollbackTests(TransactionTestCase):
"""Rollback behavior for ``send_async`` + ``send_on_commit``."""

def setUp(self):
super().setUp()
self.receiver = Mock(return_value="ok")
SESSION_LOGIN_COMPLETED.connect(self.receiver)
self.addCleanup(SESSION_LOGIN_COMPLETED.disconnect, self.receiver)

@patch("openedx_events.tasks.send_async_event.delay")
def test_async_on_commit_not_dispatched_on_rollback(self, mock_delay):
class _Rollback(Exception):
pass

with self.assertRaises(_Rollback):
with transaction.atomic():
SESSION_LOGIN_COMPLETED.send_event(
send_async=True, send_on_commit=True, user=_make_user_data(),
)
raise _Rollback()

mock_delay.assert_not_called()
self.receiver.assert_not_called()


class TestLoadAllSignals(FreezeSignalCacheMixin, TestCase):
""" Tests for the load_all_signals method"""
def setUp(self):
Expand Down
Loading