diff --git a/README.rst b/README.rst index 94a08b5c..db5a5994 100644 --- a/README.rst +++ b/README.rst @@ -81,10 +81,91 @@ To visualize data: - Interact with the course to generate data: - Complete a few activities within the course (e.g., enroll, take quizzes, watch videos) to generate real data. This will provide a more realistic dataset for analytics. + Complete a few activities within the course (e.g., enroll, take quizzes, watch videos) to generate real data. This will provide a more realistic dataset for analytics. +xAPI S3 Sink Configuration +------------------------- + +The S3 sink serves as a backup and safeguard for xAPI events. If ClickHouse is unavailable or encounters errors during event ingestion, events are stored in S3 as a safeguard. This ensures data durability and allows you to recover missed events later using the ``xapi-backfill`` command. + +To enable this backup mechanism, configure the following settings: + +.. code-block:: yaml + + ASPECTS_XAPI_S3_ACCESS_KEY=openedx + ASPECTS_XAPI_S3_BUCKET=xapi-events + ASPECTS_XAPI_S3_ENDPOINT=http://minio:9000 + ASPECTS_XAPI_S3_REGION=us-east-1 + ASPECTS_XAPI_S3_SECRET_KEY=... + +Vector S3 sink options: + +.. code-block:: bash + + ASPECTS_XAPI_S3_SINK_MAX_EVENTS=10000 + ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS=600 + +.. note:: + + - ``ASPECTS_XAPI_S3_SINK_MAX_EVENTS`` controls how many events are batched before writing to S3 + - ``ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS`` controls how long to wait before flushing a batch + - Setting ``ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS`` too low can create many small files in S3 + + +xAPI S3 Backfill +---------------- + +If you have xAPI events stored in S3 (configured via ``ASPECTS_XAPI_S3_BUCKET``), you can backfill them into ClickHouse using the ``xapi-backfill`` command. This is useful for: + +- Restoring data from a backup +- Importing data from another environment +- Re-processing historical events + +Basic usage: + +.. code-block:: bash + + tutor local do xapi-backfill + +By default, this imports all events. You can filter by date using year, month, day, and hour options: + +.. code-block:: bash + + tutor local do xapi-backfill --year 2026 --month 3 + tutor local do xapi-backfill --year 2026 --month 03 --day 19 + tutor local do xapi-backfill --year 2026 --month 03 --day 19 --hour 14 + +For flexible path matching, use the ``--path`` option to specify a custom S3 path: + +.. code-block:: bash + + tutor local do xapi-backfill --path xapi/2026/03/19/14/*.log.zst + +.. note:: + + - Date options accept both single and double-digit values (``03`` and ``3`` are equivalent) + - Hour should be in 24-hour format + - The ``--path`` option is exclusive with date options + +After backfilling, you can run deduplication to remove duplicate events: + +.. code-block:: bash + + tutor local do xapi-backfill --deduplicate + +Or run deduplication separately: + +.. code-block:: bash + + tutor local do xapi-deduplicate + +.. warning:: + + Deduplication uses ``OPTIMIZE TABLE FINAL`` which can be resource-intensive on large tables. Run during low-traffic periods if you have a large dataset. + + - Sync data from an existing Tutor installation with default settings: .. code-block:: bash diff --git a/tutoraspects/commands_v1.py b/tutoraspects/commands_v1.py index 6973e9f5..73e8a679 100644 --- a/tutoraspects/commands_v1.py +++ b/tutoraspects/commands_v1.py @@ -6,8 +6,6 @@ import sys import click -from tutor import env - from tutoraspects.asset_command_helpers import ( ASSETS_PATH, SupersetCommandError, @@ -17,6 +15,8 @@ find_unused_queries, ) +from tutor import env + @click.command() @click.option("-c", "--config_file", default="./xapi-db-load-config.yaml") @@ -107,7 +107,7 @@ def alembic(command: string) -> list[tuple[str, str]]: return [ ( "aspects", - f"bash /app/aspects/scripts/alembic.sh {command} && " "echo 'Done!';", + f"bash /app/aspects/scripts/alembic.sh {command} && echo 'Done!';", ), ] @@ -162,7 +162,7 @@ def init_clickhouse() -> list[tuple[str, str]]: "--slice_name", default="", help="Only run charts for the given slice name, if the name appears in more than " - "one dashboard it will be run for each.", + "one dashboard it will be run for each.", ) @click.option( "--print_sql", is_flag=True, default=False, help="Print the SQL that was run." @@ -171,7 +171,7 @@ def init_clickhouse() -> list[tuple[str, str]]: "--fail_on_error", is_flag=True, default=False, help="Allow errors to fail the run." ) def performance_metrics( # pylint: disable=too-many-arguments,too-many-positional-arguments - org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error + org, course_name, dashboard_slug, slice_name, print_sql, fail_on_error ) -> (list)[tuple[str, str]]: """ Job to measure performance metrics of charts and its queries in Superset and ClickHouse. @@ -257,8 +257,8 @@ def dump_data_to_clickhouse(service, options) -> list[tuple[str, str]]: "--destination_config", type=str, help=( - "A JSON dictionary of configuration for the destination provider. " - "Optional if 'LRS' is used as the destination_provider." + "A JSON dictionary of configuration for the destination provider. " + "Optional if 'LRS' is used as the destination_provider." ), ) @click.option( @@ -283,16 +283,16 @@ def dump_data_to_clickhouse(service, options) -> list[tuple[str, str]]: "--dry_run", is_flag=True, help=( - "A flag to determine if this is a dry run. If present, all lines from all files " - "will be attempted to be transformed, but won't be sent to the destination." + "A flag to determine if this is a dry run. If present, all lines from all files " + "will be attempted to be transformed, but won't be sent to the destination." ), ) @click.option( "--deduplicate", is_flag=True, help=( - "This should only be added if you believe events will be duplicated such as replaying logs" - "that have already been added. De-duplication can take a very long time to process." + "This should only be added if you believe events will be duplicated such as replaying logs" + "that have already been added. De-duplication can take a very long time to process." ), ) def transform_tracking_logs(deduplicate, **kwargs) -> list[tuple[str, str]]: @@ -395,6 +395,99 @@ def check_superset_assets(): ) +@click.command() +@click.option("--year", default="*", help="Year (e.g., '2026', default: '*' for all)") +@click.option( + "--month", default="*", help="Month (e.g., '03' or '3', default: '*' for all)" +) +@click.option( + "--day", default="*", help="Day (e.g., '19' or '9', default: '*' for all)" +) +@click.option( + "--hour", + default="*", + help="Hour in 24h format (e.g., '14' or '3', default: '*' for all)", +) +@click.option( + "--path", + default="", + help="Relative S3 path (e.g., 'xapi/2026/03/19/14/*.log.zst'). Exclusive with date options.", +) +@click.option( + "--deduplicate", + is_flag=True, + help="WARNING: Run deduplication after the backfill to remove duplicate events. This could be a resource consuming operation. Be careful with it", +) +def xapi_block_storage_backfill(year, month, day, hour, path, deduplicate) -> list[tuple[str, str]]: + """ + Import xAPI events from S3 into ClickHouse. + + Examples:\n + tutor local do xapi-backfill\n + tutor local do xapi-backfill --year 2026 --month 3\n + tutor local do xapi-backfill --year 2026 --month 03 --day 19\n + tutor local do xapi-backfill --year 2026 --month 03 --day 19 --hour 14\n + tutor local do xapi-backfill --path xapi/2026/03/19/14/*.log.zst\”\n + tutor local do xapi-backfill --deduplicate + """ + if path: + if year != "*" or month != "*" or day != "*" or hour != "*": + raise click.ClickException( + "Cannot use --path with date options (--year, --month, --day, --hour)" + ) + xapi_s3_path = path + else: + month = month.zfill(2) if month != "*" else "*" + day = day.zfill(2) if day != "*" else "*" + hour = hour.zfill(2) if hour != "*" else "*" + xapi_s3_path = f"xapi/{year}/{month}/{day}/{hour}/*.log.zst" + + script = env.read_template_file( + "aspects", "jobs", "init", "clickhouse", "xapi-backfill.sh" + ) + script = script.replace("{{XAPI_S3_PATH}}", xapi_s3_path) + + tasks = [ + ( + "clickhouse", + script, + ), + ] + + if deduplicate: + tasks.append( + ( + "clickhouse", + env.read_template_file( + "aspects", "jobs", "init", "clickhouse", "deduplicate.sh" + ), + ) + ) + + return tasks + + +@click.command() +def xapi_deduplicate() -> list[tuple[str, str]]: + """ + Deduplicate xAPI tables using OPTIMIZE TABLE. + + Run after xapi-backfill to remove duplicate events. + Note: This is an expensive operation on large tables. + + Example: + tutor local do xapi-deduplicate + """ + return [ + ( + "clickhouse", + env.read_template_file( + "aspects", "jobs", "init", "clickhouse", "xapi-deduplicate.sh" + ), + ), + ] + + DO_COMMANDS = ( load_xapi_test_data, dbt, @@ -405,6 +498,8 @@ def check_superset_assets(): performance_metrics, init_clickhouse, collect_dbt_lineage, + xapi_block_storage_backfill, + xapi_deduplicate, ) COMMANDS = (aspects,) diff --git a/tutoraspects/patches/local-docker-compose-services b/tutoraspects/patches/local-docker-compose-services index 371c0abe..0896e117 100644 --- a/tutoraspects/patches/local-docker-compose-services +++ b/tutoraspects/patches/local-docker-compose-services @@ -87,16 +87,16 @@ superset-worker-beat: {% if RUN_VECTOR %} vector: - image: {{ DOCKER_IMAGE_VECTOR }} - command: -c /etc/vector/vector.toml - volumes: - - ../../data/vector:/var/lib/vector - - ../plugins/aspects/apps/vector/local.toml:/etc/vector/vector.toml:ro - {% if ASPECTS_DOCKER_HOST_SOCK_PATH %}- {{ ASPECTS_DOCKER_HOST_SOCK_PATH }}:/var/run/docker.sock:ro{% endif %} - environment: - - DOCKER_HOST=unix:///var/run/docker.sock - - VECTOR_LOG=warn - restart: unless-stopped + image: {{ DOCKER_IMAGE_VECTOR }} + command: -w -c /etc/vector/vector.toml + volumes: + - ../../data/vector:/var/lib/vector + - ../plugins/aspects/apps/vector/local.toml:/etc/vector/vector.toml + {% if ASPECTS_DOCKER_HOST_SOCK_PATH %}- {{ ASPECTS_DOCKER_HOST_SOCK_PATH }}:/var/run/docker.sock:ro{% endif %} + environment: + - DOCKER_HOST=unix:///var/run/docker.sock + - VECTOR_LOG=info + restart: unless-stopped {% endif %} {% if ASPECTS_ENABLE_EVENT_BUS_CONSUMER %} diff --git a/tutoraspects/patches/xapi-db-load-config-yaml b/tutoraspects/patches/xapi-db-load-config-yaml index bccd12e7..fb1b5ab8 100644 --- a/tutoraspects/patches/xapi-db-load-config-yaml +++ b/tutoraspects/patches/xapi-db-load-config-yaml @@ -12,8 +12,8 @@ lrs_password: "{{RALPH_LMS_PASSWORD}}" # Run options log_dir: logs -num_xapi_batches: 10 -batch_size: 100000 +num_xapi_batches: 3 +batch_size: 100 num_workers: 4 # Overall start and end date for the entire run diff --git a/tutoraspects/plugin.py b/tutoraspects/plugin.py index 44835681..ff6a5437 100644 --- a/tutoraspects/plugin.py +++ b/tutoraspects/plugin.py @@ -179,6 +179,11 @@ ("ASPECTS_DOCKER_HOST_SOCK_PATH", "/var/run/docker.sock"), ("ASPECTS_VECTOR_STORE_TRACKING_LOGS", False), ("ASPECTS_VECTOR_STORE_XAPI", True), + ("ASPECTS_XAPI_S3_BUCKET", ""), + ("ASPECTS_XAPI_S3_REGION", "us-east-1"), + ("ASPECTS_XAPI_S3_ENDPOINT", ""), + ("ASPECTS_XAPI_S3_SINK_MAX_EVENTS", "10000"), + ("ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS", "600"), ("ASPECTS_VECTOR_DATABASE", "openedx"), ("ASPECTS_VECTOR_RAW_TRACKING_LOGS_TABLE", "_tracking"), ("ASPECTS_DATA_TTL_EXPRESSION", "toDateTime(emission_time) + INTERVAL 1 YEAR"), @@ -462,6 +467,8 @@ ("SUPERSET_ADMIN_PASSWORD", "{{ 24|random_string }}"), ("SUPERSET_LMS_USERNAME", "{{ 12|random_string }}"), ("SUPERSET_LMS_PASSWORD", "{{ 24|random_string }}"), + ("ASPECTS_XAPI_S3_ACCESS_KEY", ""), + ("ASPECTS_XAPI_S3_SECRET_KEY", ""), ] ) @@ -548,9 +555,7 @@ def _mount_superset_compose( # run it as part of the `init` job. try: for service, template_path, priority in MY_INIT_TASKS: - hooks.Filters.COMMANDS_INIT.add_item( - (service, template_path) - ) # pylint: disable=no-member + hooks.Filters.COMMANDS_INIT.add_item((service, template_path)) # pylint: disable=no-member except AttributeError: for service, template_path, priority in MY_INIT_TASKS: full_path = os.path.join( diff --git a/tutoraspects/templates/aspects/apps/vector/partials/common-post.toml b/tutoraspects/templates/aspects/apps/vector/partials/common-post.toml index dc2d9121..35366ed4 100644 --- a/tutoraspects/templates/aspects/apps/vector/partials/common-post.toml +++ b/tutoraspects/templates/aspects/apps/vector/partials/common-post.toml @@ -128,6 +128,28 @@ database = "{{ ASPECTS_VECTOR_DATABASE }}" table = "{{ ASPECTS_RAW_XAPI_TABLE }}" healthcheck = false +{% if ASPECTS_XAPI_S3_BUCKET %} +[sinks.s3_xapi] +type = "aws_s3" +inputs = ["xapi"] +region = "{{ ASPECTS_XAPI_S3_REGION }}" +bucket = "{{ ASPECTS_XAPI_S3_BUCKET }}" +{% if ASPECTS_XAPI_S3_ACCESS_KEY and ASPECTS_XAPI_S3_SECRET_KEY %} +auth.access_key_id = "{{ ASPECTS_XAPI_S3_ACCESS_KEY }}" +auth.secret_access_key = "{{ ASPECTS_XAPI_S3_SECRET_KEY }}" +{% endif %} +{% if ASPECTS_XAPI_S3_ENDPOINT %} +endpoint = "{{ ASPECTS_XAPI_S3_ENDPOINT }}" +{% endif %} +key_prefix = "xapi/%Y/%m/%d/%H/" +encoding.codec = "json" +compression = "zstd" +batch.max_events = {{ ASPECTS_XAPI_S3_SINK_MAX_EVENTS }} +batch.timeout_secs = {{ ASPECTS_XAPI_S3_SINK_TIMEOUT_SECS }} +framing.method = "newline_delimited" + +{% endif %} + {% endif %} {{ patch("vector-common-toml") }} diff --git a/tutoraspects/templates/aspects/build/aspects/requirements.txt b/tutoraspects/templates/aspects/build/aspects/requirements.txt index 9d2e003a..b018e615 100644 --- a/tutoraspects/templates/aspects/build/aspects/requirements.txt +++ b/tutoraspects/templates/aspects/build/aspects/requirements.txt @@ -1,5 +1,5 @@ # alembic packages alembic==1.14.1 clickhouse-sqlalchemy==0.3.2 -git+https://github.com/Ian2012/xapi-db-load@tmp-v2 +git+https://github.com/openedx/xapi-db-load@3.1.0 pyyaml diff --git a/tutoraspects/templates/aspects/jobs/init/clickhouse/xapi-backfill.sh b/tutoraspects/templates/aspects/jobs/init/clickhouse/xapi-backfill.sh new file mode 100644 index 00000000..27cdeeba --- /dev/null +++ b/tutoraspects/templates/aspects/jobs/init/clickhouse/xapi-backfill.sh @@ -0,0 +1,35 @@ +#!/bin/bash +set -e + +echo "Initialising xAPI backfill..." +ch_connection_max_attempts=10 +ch_connection_attempt=0 +until clickhouse client --user {{ CLICKHOUSE_ADMIN_USER }} --password="{{ CLICKHOUSE_ADMIN_PASSWORD }}" --host "{{ CLICKHOUSE_HOST }}" {% if CLICKHOUSE_SECURE_CONNECTION %} --secure {% else %} --port {{ CLICKHOUSE_INTERNAL_NATIVE_PORT }}{% endif %} -q 'exit' +do + ch_connection_attempt=$(expr $ch_connection_attempt + 1) + echo " [$ch_connection_attempt/$ch_connection_max_attempts] Waiting for Clickhouse service (this may take a while)..." + if [ $ch_connection_attempt -eq $ch_connection_max_attempts ] + then + echo "Clickhouse connection error" 1>&2 + exit 1 + fi + sleep 10 +done +echo "Clickhouse is up and running" + +XAPI_S3_PATH="{{ ASPECTS_XAPI_S3_ENDPOINT }}/{{ ASPECTS_XAPI_S3_BUCKET }}/{{XAPI_S3_PATH}}" + +echo "Backfilling xAPI events from: ${XAPI_S3_PATH}" + +clickhouse client --user {{ CLICKHOUSE_ADMIN_USER }} --password="{{ CLICKHOUSE_ADMIN_PASSWORD }}" --host "{{ CLICKHOUSE_HOST }}" {% if CLICKHOUSE_SECURE_CONNECTION %} --secure {% else %} --port {{ CLICKHOUSE_INTERNAL_NATIVE_PORT }}{% endif %} --multiquery <