Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,91 @@ To visualize data:

- Interact with the course to generate data:

Complete a few activities within the course (e.g., enroll, take quizzes, watch videos) to generate real data. This will provide a more realistic dataset for analytics.
Complete a few activities within the course (e.g., enroll, take quizzes, watch videos) to generate real data. This will provide a more realistic dataset for analytics.



xAPI S3 Sink Configuration
-------------------------

The S3 sink serves as a backup and safeguard for xAPI events. If ClickHouse is unavailable or encounters errors during event ingestion, events are stored in S3 as a safeguard. This ensures data durability and allows you to recover missed events later using the ``xapi-backfill`` command.

To enable this backup mechanism, configure the following settings:

.. code-block:: yaml

ASPECTS_XAPI_S3_ACCESS_KEY=openedx
ASPECTS_XAPI_S3_BUCKET=xapi-events
ASPECTS_XAPI_S3_ENDPOINT=http://minio:9000
ASPECTS_XAPI_S3_REGION=us-east-1
ASPECTS_XAPI_S3_SECRET_KEY=...

Vector S3 sink options:

.. code-block:: bash

ASPECTS_XAPI_S3_SINK_MAX_EVENTS=10000
ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS=600

.. note::

- ``ASPECTS_XAPI_S3_SINK_MAX_EVENTS`` controls how many events are batched before writing to S3
- ``ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS`` controls how long to wait before flushing a batch
- Setting ``ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS`` too low can create many small files in S3


xAPI S3 Backfill
----------------

If you have xAPI events stored in S3 (configured via ``ASPECTS_XAPI_S3_BUCKET``), you can backfill them into ClickHouse using the ``xapi-backfill`` command. This is useful for:

- Restoring data from a backup
- Importing data from another environment
- Re-processing historical events

Basic usage:

.. code-block:: bash

tutor local do xapi-backfill

By default, this imports all events. You can filter by date using year, month, day, and hour options:

.. code-block:: bash

tutor local do xapi-backfill --year 2026 --month 3
tutor local do xapi-backfill --year 2026 --month 03 --day 19
tutor local do xapi-backfill --year 2026 --month 03 --day 19 --hour 14

For flexible path matching, use the ``--path`` option to specify a custom S3 path:

.. code-block:: bash

tutor local do xapi-backfill --path xapi/2026/03/19/14/*.log.zst

.. note::

- Date options accept both single and double-digit values (``03`` and ``3`` are equivalent)
- Hour should be in 24-hour format
- The ``--path`` option is exclusive with date options

After backfilling, you can run deduplication to remove duplicate events:

.. code-block:: bash

tutor local do xapi-backfill --deduplicate

Or run deduplication separately:

.. code-block:: bash

tutor local do xapi-deduplicate

.. warning::

Deduplication uses ``OPTIMIZE TABLE FINAL`` which can be resource-intensive on large tables. Run during low-traffic periods if you have a large dataset.


- Sync data from an existing Tutor installation with default settings:

.. code-block:: bash
Expand Down
117 changes: 106 additions & 11 deletions tutoraspects/commands_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import sys

import click
from tutor import env

from tutoraspects.asset_command_helpers import (
ASSETS_PATH,
SupersetCommandError,
Expand All @@ -17,6 +15,8 @@
find_unused_queries,
)

from tutor import env


@click.command()
@click.option("-c", "--config_file", default="./xapi-db-load-config.yaml")
Expand Down Expand Up @@ -107,7 +107,7 @@ def alembic(command: string) -> list[tuple[str, str]]:
return [
(
"aspects",
f"bash /app/aspects/scripts/alembic.sh {command} && " "echo 'Done!';",
f"bash /app/aspects/scripts/alembic.sh {command} && echo 'Done!';",
),
]

Expand Down Expand Up @@ -162,7 +162,7 @@ def init_clickhouse() -> list[tuple[str, str]]:
"--slice_name",
default="",
help="Only run charts for the given slice name, if the name appears in more than "
"one dashboard it will be run for each.",
"one dashboard it will be run for each.",
)
@click.option(
"--print_sql", is_flag=True, default=False, help="Print the SQL that was run."
Expand All @@ -171,7 +171,7 @@ def init_clickhouse() -> list[tuple[str, str]]:
"--fail_on_error", is_flag=True, default=False, help="Allow errors to fail the run."
)
def performance_metrics( # pylint: disable=too-many-arguments,too-many-positional-arguments
org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error
org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error
) -> (list)[tuple[str, str]]:
"""
Job to measure performance metrics of charts and its queries in Superset and ClickHouse.
Expand Down Expand Up @@ -257,8 +257,8 @@ def dump_data_to_clickhouse(service, options) -> list[tuple[str, str]]:
"--destination_config",
type=str,
help=(
"A JSON dictionary of configuration for the destination provider. "
"Optional if 'LRS' is used as the destination_provider."
"A JSON dictionary of configuration for the destination provider. "
"Optional if 'LRS' is used as the destination_provider."
),
)
@click.option(
Expand All @@ -283,16 +283,16 @@ def dump_data_to_clickhouse(service, options) -> list[tuple[str, str]]:
"--dry_run",
is_flag=True,
help=(
"A flag to determine if this is a dry run. If present, all lines from all files "
"will be attempted to be transformed, but won't be sent to the destination."
"A flag to determine if this is a dry run. If present, all lines from all files "
"will be attempted to be transformed, but won't be sent to the destination."
),
)
@click.option(
"--deduplicate",
is_flag=True,
help=(
"This should only be added if you believe events will be duplicated such as replaying logs"
"that have already been added. De-duplication can take a very long time to process."
"This should only be added if you believe events will be duplicated such as replaying logs"
"that have already been added. De-duplication can take a very long time to process."
),
)
def transform_tracking_logs(deduplicate, **kwargs) -> list[tuple[str, str]]:
Expand Down Expand Up @@ -395,6 +395,99 @@ def check_superset_assets():
)


@click.command()
@click.option("--year", default="*", help="Year (e.g., '2026', default: '*' for all)")
@click.option(
"--month", default="*", help="Month (e.g., '03' or '3', default: '*' for all)"
)
@click.option(
"--day", default="*", help="Day (e.g., '19' or '9', default: '*' for all)"
)
@click.option(
"--hour",
default="*",
help="Hour in 24h format (e.g., '14' or '3', default: '*' for all)",
)
@click.option(
"--path",
default="",
help="Relative S3 path (e.g., 'xapi/2026/03/19/14/*.log.zst'). Exclusive with date options.",
)
@click.option(
"--deduplicate",
is_flag=True,
help="WARNING: Run deduplication after the backfill to remove duplicate events. This could be a resource consuming operation. Be careful with it",
)
def xapi_block_storage_backfill(year, month, day, hour, path, deduplicate) -> list[tuple[str, str]]:
"""
Import xAPI events from S3 into ClickHouse.

Examples:\n
tutor local do xapi-backfill\n
tutor local do xapi-backfill --year 2026 --month 3\n
tutor local do xapi-backfill --year 2026 --month 03 --day 19\n
tutor local do xapi-backfill --year 2026 --month 03 --day 19 --hour 14\n
tutor local do xapi-backfill --path xapi/2026/03/19/14/*.log.zst\”\n
tutor local do xapi-backfill --deduplicate
"""
if path:
if year != "*" or month != "*" or day != "*" or hour != "*":
raise click.ClickException(
"Cannot use --path with date options (--year, --month, --day, --hour)"
)
xapi_s3_path = path
else:
month = month.zfill(2) if month != "*" else "*"
day = day.zfill(2) if day != "*" else "*"
hour = hour.zfill(2) if hour != "*" else "*"
xapi_s3_path = f"xapi/{year}/{month}/{day}/{hour}/*.log.zst"
Comment thread
bmtcril marked this conversation as resolved.

script = env.read_template_file(
"aspects", "jobs", "init", "clickhouse", "xapi-backfill.sh"
)
script = script.replace("{{XAPI_S3_PATH}}", xapi_s3_path)

tasks = [
(
"clickhouse",
script,
),
]

if deduplicate:
tasks.append(
(
"clickhouse",
env.read_template_file(
"aspects", "jobs", "init", "clickhouse", "deduplicate.sh"
),
)
)

return tasks


@click.command()
def xapi_deduplicate() -> list[tuple[str, str]]:
"""
Deduplicate xAPI tables using OPTIMIZE TABLE.

Run after xapi-backfill to remove duplicate events.
Note: This is an expensive operation on large tables.

Example:
tutor local do xapi-deduplicate
"""
return [
(
"clickhouse",
env.read_template_file(
"aspects", "jobs", "init", "clickhouse", "xapi-deduplicate.sh"
),
),
]


DO_COMMANDS = (
load_xapi_test_data,
dbt,
Expand All @@ -405,6 +498,8 @@ def check_superset_assets():
performance_metrics,
init_clickhouse,
collect_dbt_lineage,
xapi_block_storage_backfill,
xapi_deduplicate,
)

COMMANDS = (aspects,)
20 changes: 10 additions & 10 deletions tutoraspects/patches/local-docker-compose-services
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ superset-worker-beat:

{% if RUN_VECTOR %}
vector:
image: {{ DOCKER_IMAGE_VECTOR }}
command: -c /etc/vector/vector.toml
volumes:
- ../../data/vector:/var/lib/vector
- ../plugins/aspects/apps/vector/local.toml:/etc/vector/vector.toml:ro
{% if ASPECTS_DOCKER_HOST_SOCK_PATH %}- {{ ASPECTS_DOCKER_HOST_SOCK_PATH }}:/var/run/docker.sock:ro{% endif %}
environment:
- DOCKER_HOST=unix:///var/run/docker.sock
- VECTOR_LOG=warn
restart: unless-stopped
image: {{ DOCKER_IMAGE_VECTOR }}
command: -w -c /etc/vector/vector.toml
volumes:
- ../../data/vector:/var/lib/vector
- ../plugins/aspects/apps/vector/local.toml:/etc/vector/vector.toml

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be writable?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it never writes to it, and the watch option is for development only

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it looks like -w is on local here and the :ro got removed from the volume?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local should use this too as a vector restart didn't changed the vector run options. I had to delete the container with docker rm for changes to take effect

{% if ASPECTS_DOCKER_HOST_SOCK_PATH %}- {{ ASPECTS_DOCKER_HOST_SOCK_PATH }}:/var/run/docker.sock:ro{% endif %}
environment:
- DOCKER_HOST=unix:///var/run/docker.sock
- VECTOR_LOG=info
restart: unless-stopped
{% endif %}

{% if ASPECTS_ENABLE_EVENT_BUS_CONSUMER %}
Expand Down
4 changes: 2 additions & 2 deletions tutoraspects/patches/xapi-db-load-config-yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ lrs_password: "{{RALPH_LMS_PASSWORD}}"

# Run options
log_dir: logs
num_xapi_batches: 10
batch_size: 100000
num_xapi_batches: 3
batch_size: 100
num_workers: 4

# Overall start and end date for the entire run
Expand Down
11 changes: 8 additions & 3 deletions tutoraspects/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@
("ASPECTS_DOCKER_HOST_SOCK_PATH", "/var/run/docker.sock"),
("ASPECTS_VECTOR_STORE_TRACKING_LOGS", False),
("ASPECTS_VECTOR_STORE_XAPI", True),
("ASPECTS_XAPI_S3_BUCKET", ""),
("ASPECTS_XAPI_S3_REGION", "us-east-1"),
("ASPECTS_XAPI_S3_ENDPOINT", ""),
("ASPECTS_XAPI_S3_SINK_MAX_EVENTS", "10000"),
("ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS", "600"),
("ASPECTS_VECTOR_DATABASE", "openedx"),
("ASPECTS_VECTOR_RAW_TRACKING_LOGS_TABLE", "_tracking"),
("ASPECTS_DATA_TTL_EXPRESSION", "toDateTime(emission_time) + INTERVAL 1 YEAR"),
Expand Down Expand Up @@ -462,6 +467,8 @@
("SUPERSET_ADMIN_PASSWORD", "{{ 24|random_string }}"),
("SUPERSET_LMS_USERNAME", "{{ 12|random_string }}"),
("SUPERSET_LMS_PASSWORD", "{{ 24|random_string }}"),
("ASPECTS_XAPI_S3_ACCESS_KEY", ""),
("ASPECTS_XAPI_S3_SECRET_KEY", ""),
]
)

Expand Down Expand Up @@ -548,9 +555,7 @@ def _mount_superset_compose(
# run it as part of the `init` job.
try:
for service, template_path, priority in MY_INIT_TASKS:
hooks.Filters.COMMANDS_INIT.add_item(
(service, template_path)
) # pylint: disable=no-member
hooks.Filters.COMMANDS_INIT.add_item((service, template_path)) # pylint: disable=no-member
except AttributeError:
for service, template_path, priority in MY_INIT_TASKS:
full_path = os.path.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ database = "{{ ASPECTS_VECTOR_DATABASE }}"
table = "{{ ASPECTS_RAW_XAPI_TABLE }}"
healthcheck = false

{% if ASPECTS_XAPI_S3_BUCKET %}
[sinks.s3_xapi]
type = "aws_s3"
inputs = ["xapi"]
region = "{{ ASPECTS_XAPI_S3_REGION }}"
bucket = "{{ ASPECTS_XAPI_S3_BUCKET }}"
{% if ASPECTS_XAPI_S3_ACCESS_KEY and ASPECTS_XAPI_S3_SECRET_KEY %}
auth.access_key_id = "{{ ASPECTS_XAPI_S3_ACCESS_KEY }}"
auth.secret_access_key = "{{ ASPECTS_XAPI_S3_SECRET_KEY }}"
{% endif %}
{% if ASPECTS_XAPI_S3_ENDPOINT %}
endpoint = "{{ ASPECTS_XAPI_S3_ENDPOINT }}"
{% endif %}
key_prefix = "xapi/%Y/%m/%d/%H/"
encoding.codec = "json"
compression = "zstd"
batch.max_events = {{ ASPECTS_XAPI_S3_SINK_MAX_EVENTS }}
batch.timeout_secs = {{ ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS }}
framing.method = "newline_delimited"

{% endif %}

{% endif %}

{{ patch("vector-common-toml") }}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# alembic packages
alembic==1.14.1
clickhouse-sqlalchemy==0.3.2
git+https://github.com/Ian2012/xapi-db-load@tmp-v2
git+https://github.com/openedx/xapi-db-load@3.1.0
pyyaml
Loading