diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py index b2b01c43..74fe5f73 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py @@ -10,6 +10,7 @@ from pydantic import Field from starlette.concurrency import run_in_threadpool +from ..common.config import Settings from ..common.dependencies import ClickhouseDep from ..common.metrics import timer from ..common.routers import BaseModel @@ -17,9 +18,12 @@ from ..dependencies import ASNCCReaderDep, SettingsDep from ..metrics import Metrics from ..utils import ( + MeasurementMetadata, + check_measurement_meta, error, generate_report_id, get_cc_asn, + metadata_from_measurement_content, normalize_asn, register_geoip_anomaly, ) @@ -107,7 +111,7 @@ async def receive_measurement( content_encoding: str = Header(default=None), ) -> ReceiveMeasurementResponse | Dict[str, Any]: """ - Submit measurement + Submit measurement. """ setnocacheresponse(response) empty_measurement = {} @@ -118,14 +122,12 @@ async def receive_measurement( f"Unexpected report_id {report_id[:200]}. Error: {e}", ) Metrics.BAD_MEASUREMENTS_CNT.labels(reason="bad_report_id").inc() - raise error("Incorrect format") + error("Incorrect format") - # TODO validate the timestamp? good = len(cc) == 2 and test_name.isalnum() and 1 < len(test_name) < 30 if not good: log.info("Unexpected report_id %r", report_id[:200]) error("Incorrect format") - try: asn_i = int(asn) except ValueError as e: @@ -157,13 +159,20 @@ async def receive_measurement( error("Incorrect format") try: - data = await run_in_threadpool(_set_unverified_flag, data) + data, metadata = await run_in_threadpool( + _process_measurement_body, data + ) except Exception as e: log.info("Failed to parse and modify measurement body") log.exception(e) Metrics.BAD_MEASUREMENTS_CNT.labels(reason="bad_json").inc() error("Incorrect format") + # Raise an exception in case the report_id is not consistent with the body, + # we flag this behavior as faulty data + _compare_report_id_to_body_meta(cc, asn, test_name, metadata) + check_measurement_meta(metadata.test_name, metadata.probe_cc, metadata.probe_asn) + # Write the whole body of the measurement in a directory based on a 1-hour # time window now = datetime.now(timezone.utc) @@ -203,7 +212,7 @@ async def receive_measurement( if success: # Geoip anomaly detection runs only when the measurement was successfully - # submitted to the fastpath, so retries don't cause duplicate anomaly entries. + # submitted to the fastpath with Metrics.COMPARE_CC_TIMING.time(): try: await run_in_threadpool( @@ -214,7 +223,7 @@ async def receive_measurement( cc, asn, msmt_uid, - data, + metadata, ) except Exception as e: log.error(f"Error checking for geoip anomalies: {e}") @@ -242,14 +251,52 @@ async def receive_measurement( Metrics.SEND_S3_CNT.labels(status="fail").inc() return empty_measurement +def _process_measurement_body( + data: bytes +) -> Tuple[bytes, MeasurementMetadata]: + """ + - Parse the measurement body + - extract some metadata fields + - set `is_verified="u"` + - re-serialize. + """ -def _set_unverified_flag(data: bytes) -> bytes: with Metrics.DESERIALIZE_BODY_TIMING.time(): - measurement = ujson.loads(data) - measurement["is_verified"] = "u" + json = ujson.loads(data) + + assert isinstance(json, dict) + + content = json.get("content") + assert isinstance(content, dict) + + metadata = metadata_from_measurement_content(content) + + json["is_verified"] = "u" + with Metrics.SERIALIZE_BODY_TIMING.time(): - return ujson.dumps(measurement).encode("utf-8") + return ujson.dumps(json).encode("utf-8"), metadata +def _compare_report_id_to_body_meta(cc: str, asn: str, test_name: str, metadata: MeasurementMetadata): + """ + Compare the metadata reported by a report_id against the metadata in a + measurement body + + Raise HTTPException on errors + """ + if cc.upper() != metadata.probe_cc.upper(): + log.info(f"CC mismatch: {cc} vs {metadata}") + Metrics.BAD_MEASUREMENTS_CNT.labels(reason="cc_mismatch").inc() + error("Inconsistent measurement") + + if asn.upper().lstrip("AS") != metadata.probe_asn.upper().lstrip("AS"): + log.info(f"ASN mismatch: {asn} vs {metadata.probe_asn}") + Metrics.BAD_MEASUREMENTS_CNT.labels(reason="asn_mismatch").inc() + error("Inconsistent measurement") + + if test_name != metadata.test_name.replace("_",""): + log.info(f"Test name mismatch: {test_name} vs {metadata.test_name}") + Metrics.BAD_MEASUREMENTS_CNT.labels(reason="test_name_mismatch").inc() + error("Inconsistent measurement") def _check_and_register_geoip_anomaly( request: Request, @@ -258,12 +305,10 @@ def _check_and_register_geoip_anomaly( cc: str, asn: str, msmt_uid: str, - data: bytes, + metadata: MeasurementMetadata, ) -> None: actual_cc, actual_asn = get_cc_asn(request, asn_cc_reader) if actual_cc != cc or normalize_asn(actual_asn) != normalize_asn(asn): - # expensive: parses measurement body and sends anomaly to clickhouse - platform, software_name, software_version = _parse_metadata(data) register_geoip_anomaly( cc, actual_cc, @@ -271,33 +316,14 @@ def _check_and_register_geoip_anomaly( actual_asn, clickhouse, msmt_uid, - platform, - software_name, - software_version, + metadata.platform, + metadata.software_name, + metadata.software_version, ) else: Metrics.PROBE_CC_ASN_MATCH.inc() -def _parse_metadata(data: bytes) -> Tuple[str, str, str]: - """ - Parse measurement body, and return the following metadata: - - platform, software_name, software_version - """ - try: - body = ujson.loads(data.decode("utf-8")) - except Exception as e: - log.error(f"Couldn't parse json body: {e}") - return ("", "", "") - content = body.get("content", {}) - annotations = content.get("annotations", {}) - platform = annotations.get("platform", "") - software_name = content.get("software_name", "") - software_version = content.get("software_version", "") - return (platform, software_name, software_version) - - @timer(name="close_report") @router.post("/report/{report_id}/close", tags=["reports"]) def close_report(report_id): diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index 472576af..7c789284 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -1,4 +1,4 @@ -import asyncio +from sqlalchemy import desc import io import logging import random @@ -48,10 +48,12 @@ PsiphonConfigDep ) from ...utils import ( + check_measurement_meta, extract_probe_ipaddr, generate_report_id, geolookup_probe, get_cc_asn, + metadata_from_measurement_content, normalize_asn, register_geoip_anomaly, ) @@ -871,10 +873,15 @@ class SubmitMeasurementResponse(BaseModel): default=None, ) + report_id: str = Field( + description="Report ID generated by the server. Note that this is " + "unique to every measurement, so it's not useful for grouping " + "measurements that were taken together" + ) + -@router.post("/submit_measurement/{report_id}", tags=["anonymous_credentials"]) +@router.post("/submit_measurement", tags=["anonymous_credentials"]) async def submit_measurement( - report_id: str, request: Request, submit_request: SubmitMeasurementRequest, response: Response, @@ -900,61 +907,29 @@ async def submit_measurement( - status code 4xx or 5xx: the measurement was not processed nor stored """ setnocacheresponse(response) - try: - rid_timestamp, test_name, cc, asn, format_cid, rand = report_id.split("_") - except Exception: - err_msg = f"Incorrect format: unexpected report_id {report_id[:200]}" - log.info(err_msg) - raise HTTPException( - status_code=400, - detail={"error": "incorrect_format", "message": err_msg}, - ) - # TODO validate the timestamp? - good = len(cc) == 2 and test_name.isalnum() and 1 < len(test_name) < 30 - if not good: - err_msg = f"Incorrect format: unexpected report_id {report_id[:200]}" - log.info(err_msg) - raise HTTPException( - status_code=400, - detail={"error": "incorrect_format", "message": err_msg}, - ) + metadata = metadata_from_measurement_content(submit_request.content) - try: - asn_i = int(asn) - except ValueError: - err_msg = f"Incorrect format: ASN value not parsable {asn}" - log.info(err_msg) - raise HTTPException( - status_code=400, - detail={"error": "incorrect_format", "message": err_msg}, - ) + test_name = metadata.test_name + cc = metadata.probe_cc + asn = metadata.probe_asn - if asn_i == 0: - log.info("Discarding ASN == 0") - Metrics.MSMNT_DISCARD_ASN0.inc() - raise HTTPException(400, detail = {"error" : "asn_0", "message" : "Measurement discarded, ASN == 0"}) + check_measurement_meta(test_name, cc, asn) - if cc.upper() == "ZZ": - log.info("Discarding CC == ZZ") - Metrics.MSMNT_DISCARD_CC_ZZ.inc() - raise HTTPException(400, detail = {"error" : "cc_zz", "message" : "Measurement discarded, CC == ZZ"}) + # generate the new report_id + rid = generate_report_id(test_name, settings, cc, normalize_asn(asn)) # Anonymous credentials verification verification_status, submit_error, submit_response = _verify_submit( submit_request, manifest, settings ) - annotations = submit_request.content.get("annotations", {}) - platform = annotations.get("platform") or "" - software_name = submit_request.content.get("software_name") or "" - software_version = submit_request.content.get("software_version") or "" - data = submit_request.model_dump() # Add verification-related data. # use one-letter code for DB, human readable for clients data["is_verified"] = verification_status.code + data["content"]["report_id"] = rid data_buff = io.BytesIO() stream = io.TextIOWrapper(data_buff, "utf-8") ujson.dump(data, stream) @@ -969,6 +944,7 @@ async def submit_measurement( ts = now.strftime("%Y%m%d%H%M%S.%f") # msmt_uid is a unique id based on upload time, cc, testname and hash + test_name = test_name.replace("_","") msmt_uid = f"{ts}_{cc}_{test_name}_{h}" Metrics.MSMNT_RECEIVED_CNT.inc() @@ -1013,9 +989,9 @@ async def submit_measurement( cc, asn, msmt_uid, - platform, - software_name, - software_version, + metadata.platform, + metadata.software_name, + metadata.software_version, ) except Exception as e: log.error(f"Error checking for geoip anomalies: {e}") @@ -1026,30 +1002,34 @@ async def submit_measurement( verification_status=verification_status, submit_response=submit_response, error=submit_error, + report_id = rid ) Metrics.SEND_FASTPATH_CNT.labels(status="fail").inc() # wasn't possible to send msmnt to fastpath, try to send it to s3 - data_buff.seek(0) + ts_prefix = now.strftime("%Y%m%d%H") + tn = test_name.replace("_", "") + s3_key = f"postcans/{ts_prefix}/{ts_prefix}_{cc}_{tn}/{msmt_uid}.post" try: await run_in_threadpool( request.app.state.s3_client.upload_fileobj, - data_buff, + io.BytesIO(data_bin), Bucket=settings.failed_reports_bucket, - Key=report_id, + Key=s3_key, ) except Exception as exc: log.error(f"Unable to upload measurement to s3. Error: {exc}") Metrics.SEND_S3_FAILURE.inc() - log.error(f"Unable to send report to fastpath. report_id: {report_id}") + log.error(f"Unable to send report to fastpath. measurement_uid: {msmt_uid}") Metrics.MISSED_MSMNTS.inc() return SubmitMeasurementResponse( measurement_uid=msmt_uid, verification_status=verification_status, submit_response=submit_response, error=submit_error or "submission_delivery_failed", + report_id = rid ) def _verify_submit( diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py index 29648478..8a32ae0c 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py @@ -7,14 +7,15 @@ import io import itertools import logging -import ujson from base64 import b64encode +from dataclasses import dataclass from datetime import datetime, timezone from os import urandom from typing import Any, Dict, List, Tuple, TypedDict -import requests import pem +import requests +import ujson from fastapi import HTTPException, Request from mypy_boto3_s3 import S3Client from sqlalchemy.orm import Session @@ -132,6 +133,126 @@ def generate_report_id(test_name, settings: Settings, cc: str, asn_i: int) -> st return rid +@dataclass +class MeasurementMetadata: + """Metadata extracted from a measurement body's `content` object.""" + + test_name: str + probe_cc: str + probe_asn: str + platform: str + software_name: str + software_version: str + + +def metadata_from_measurement_content(content: dict[str, Any]) -> MeasurementMetadata: + """ + Parses metadata from the `content` key in a measurement body, then + formats fields the same way as `generate_report_id` / `open_report` do. + """ + + annotations = content.get("annotations", {}) + if not isinstance(annotations, dict): + annotations = {} + + return MeasurementMetadata( + test_name=content.get("test_name", ""), + probe_cc=content.get("probe_cc", ""), + probe_asn=content.get("probe_asn", ""), + platform=annotations.get("platform", ""), + software_name=content.get("software_name", ""), + software_version=content.get("software_version", ""), + ) + +def check_measurement_meta( + test_name: str, + probe_cc: str, + probe_asn: str, +): + """ + Checks metadata consistency, raising an HTTPException and stopping + ingestion when an inconsistency is detected. + + This metadata is expected to come from the measurement body + """ + + cc_ok = ( + len(probe_cc) == 2 and + probe_cc.isupper() and + probe_cc.isalnum() + ) + test_name_len_ok = 1 < len(test_name) < 30 + test_name_lower_ok = test_name.islower() + asn_starts_as_ok = probe_asn.startswith("AS") + asn_len_ok = len(probe_asn) >= 3 and len(probe_asn) <= 12 + + if not ( + cc_ok and test_name_len_ok and + test_name_lower_ok and asn_starts_as_ok and asn_len_ok + ): + Metrics.BAD_MEASUREMENTS_CNT.labels(reason="bad_metadata").inc() + log.error( + "Bad metadata in measurement body: test_name=" + f"{test_name[:30]}, cc={probe_cc}" + ) + reasons = [] + if not cc_ok: + reasons.append("bad_cc") + if not test_name_len_ok: + reasons.append("tn_len") + if not test_name_lower_ok: + reasons.append("tn_no_lower") + if not asn_len_ok: + reasons.append("asn_len") + if not asn_starts_as_ok: + reasons.append("asn_no_as_prefix") + + raise HTTPException( + status_code=400, + detail={ + "error": "bad_metadata", + "message": f"Errors: {reasons}" + }, + ) + + error = None + asn = str(probe_asn).strip().upper().lstrip("AS") + try: + asn_i = int(asn) + if asn_i == 0: + Metrics.BAD_MEASUREMENTS_CNT.labels(reason="asn_0").inc() + Metrics.MSMNT_DISCARD_ASN0.inc() + log.info("Discarding ASN == 0") + error = "asn_0" + message = "Measurement discarded, ASN == 0" + except Exception as e: + Metrics.BAD_MEASUREMENTS_CNT.labels(reason="asn_invalid").inc() + log.info(f"Discarding ASN == {probe_asn}, error: {e}") + error = "asn_invalid" + message = f"Measurement discarded, ASN == {probe_asn}" + + if error: + raise HTTPException( + 400, + detail={ + "error": error, + "message" : message + } + ) + + # Check probe_cc for ZZ cases + if probe_cc == "ZZ": + log.info("Discarding CC == ZZ") + Metrics.BAD_MEASUREMENTS_CNT.labels(reason="cc_zz").inc() + Metrics.MSMNT_DISCARD_CC_ZZ.inc() + raise HTTPException( + 400, + detail={ + "error": "cc_zz", + "message": "Measurement discarded, CC == ZZ", + }, + ) + def extract_probe_ipaddr(request: Request) -> str: real_ip_headers = ["X-Forwarded-For", "X-Real-IP"] @@ -162,20 +283,21 @@ def error(msg: str | Dict[str, Any], status_code: int = 400): def normalize_asn(asn: str) -> int: - """Return ASN as int (strip 'AS' prefix if present). Invalid values return 0.""" - s = str(asn).strip() - s = s[2:] if s.startswith("AS") else s + """ + Return ASN as int (strip 'AS' prefix if present). Invalid values return 0. + """ + s = str(asn).strip().upper().lstrip("AS") try: return int(s) except (ValueError, TypeError) as e: log.error(f"Invalid asn: {e}") return 0 - def get_cc_asn( request: Request, asn_cc_reader: ASNCCReaderDep ) -> Tuple[str, str]: - """Geo-lookup the request's source IP and return (cc, asn). + """ + Geo-lookup the request's source IP and return (cc, asn). Falls back to ("ZZ", "AS0") when the lookup fails. """ @@ -198,7 +320,9 @@ def register_geoip_anomaly( software_name: str, software_version: str, ) -> None: - """Record a geoip mismatch in faulty_measurements.""" + """ + Record a geoip mismatch in faulty_measurements. + """ sub_asn = normalize_asn(asn) actual_asn_int = normalize_asn(actual_asn) if actual_cc != cc: diff --git a/ooniapi/services/ooniprobe/tests/conftest.py b/ooniapi/services/ooniprobe/tests/conftest.py index e5a7aeb4..7cfbdc31 100644 --- a/ooniapi/services/ooniprobe/tests/conftest.py +++ b/ooniapi/services/ooniprobe/tests/conftest.py @@ -1,6 +1,7 @@ import json import pathlib import time +from contextlib import asynccontextmanager from datetime import datetime from pathlib import Path from typing import Any, Dict @@ -334,26 +335,18 @@ def close(self): pass -@pytest_asyncio.fixture -async def client_with_mocked_fastpath( - clickhouse_server, test_settings, geoip_db_dir, test_creds +@asynccontextmanager +async def _client_with_mocked_fastpath_urls( + test_settings, geoip_db_dir, test_creds, fastpath_urls ): """ - Client variant to test fastpath behaviour, see the mocked fastpath client - above. - - Yields `(client, mock_fastpath, success_url)`. + Shared setup for the client_with_*_mocked_fastpath* fixtures: applies + dependency overrides, installs a `MockFastpathClient` on app state, and + yields `(client, mock_fastpath)` for the test to use. """ - fail_url = "http://fastpath.ooni/bad" - success_url = "http://fastpath.ooni/good" - _, public_key = test_creds - settings = test_settings().model_copy( - update={ - "fastpath_urls": [fail_url, success_url], - } - ) + settings = test_settings().model_copy(update={"fastpath_urls": fastpath_urls}) app.dependency_overrides[get_settings] = lambda: settings app.dependency_overrides[get_s3_client] = get_s3_client_mock app.dependency_overrides[get_tor_targets_from_s3] = get_tor_targets_from_s3_mock @@ -365,7 +358,41 @@ async def client_with_mocked_fastpath( async with lifespan(app, settings, repeating_tasks_active=False): with TestClient(app) as client: app.state.fastpath_client = mock_fastpath - yield client, mock_fastpath, success_url + yield client, mock_fastpath + + +@pytest_asyncio.fixture +async def client_with_mocked_fastpath( + clickhouse_server, test_settings, geoip_db_dir, test_creds +): + """ + Client with one healthy mocked fastpath URL. + + Yields `(client, mock_fastpath, url)`. + """ + url = "http://fastpath.ooni/good" + async with _client_with_mocked_fastpath_urls( + test_settings, geoip_db_dir, test_creds, [url] + ) as (client, mock_fastpath): + yield client, mock_fastpath, url + + +@pytest_asyncio.fixture +async def client_with_one_good_mocked_fastpath( + clickhouse_server, test_settings, geoip_db_dir, test_creds +): + """ + Client with one failing and one healthy mocked fastpath URL, used to + test the fallback path. + + Yields `(client, mock_fastpath, success_url)`. + """ + fail_url = "http://fastpath.ooni/bad" + success_url = "http://fastpath.ooni/good" + async with _client_with_mocked_fastpath_urls( + test_settings, geoip_db_dir, test_creds, [fail_url, success_url] + ) as (client, mock_fastpath): + yield client, mock_fastpath, success_url @pytest_asyncio.fixture @@ -373,29 +400,13 @@ async def client_with_two_working_fastpaths( clickhouse_server, test_settings, geoip_db_dir, test_creds ): """ - Client variant to test successful fastpath submissions with two healthy fastpath instances + Client with two healthy mocked fastpath URLs. Yields `(client, mock_fastpath, first_url, second_url)`. """ first_url = "http://fastpath-a.ooni/good" second_url = "http://fastpath-b.ooni/good" - - _, public_key = test_creds - - settings = test_settings().model_copy( - update={ - "fastpath_urls": [first_url, second_url], - } - ) - app.dependency_overrides[get_settings] = lambda: settings - app.dependency_overrides[get_s3_client] = get_s3_client_mock - app.dependency_overrides[get_tor_targets_from_s3] = get_tor_targets_from_s3_mock - app.dependency_overrides[get_psiphon_config_from_s3] = get_psiphon_config_from_s3_mock - app.dependency_overrides[_get_manifest] = make_manifest_mock_fn(public_key) - try_update(geoip_db_dir) - - mock_fastpath = MockFastpathClient() - async with lifespan(app, settings, repeating_tasks_active=False): - with TestClient(app) as client: - app.state.fastpath_client = mock_fastpath - yield client, mock_fastpath, first_url, second_url + async with _client_with_mocked_fastpath_urls( + test_settings, geoip_db_dir, test_creds, [first_url, second_url] + ) as (client, mock_fastpath): + yield client, mock_fastpath, first_url, second_url diff --git a/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py b/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py index 36298567..ef18f415 100644 --- a/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py +++ b/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py @@ -152,14 +152,13 @@ async def test_geoip_mismatch_anoncred(client, clickhouse_db, clean_faulty_measu # Open a report for the anoncred submit endpoint report_req = make_report_request(probe_cc="VE", probe_asn="AS65550") - c = postj(client, "/report", report_req) - rid = c["report_id"] + postj(client, "/report", report_req) # Create anoncred user and submit_request user, manifest_version, emission_day = setup_user(client) submit_request = make_submit_request(user, "VE", "AS65550") - # Build measurement body used by /api/v1/submit_measurement/{rid} + # Build measurement body for `/api/v1/submit_measurement` msm = make_measurement( submit_request.nym, submit_request.request, @@ -176,7 +175,7 @@ async def test_geoip_mismatch_anoncred(client, clickhouse_db, clean_faulty_measu # matching cc and asn postj( client, - f"/api/v1/submit_measurement/{rid}", + "/api/v1/submit_measurement", msm, headers={"X-Forwarded-For": "123.123.123.123"}, ) @@ -186,7 +185,7 @@ async def test_geoip_mismatch_anoncred(client, clickhouse_db, clean_faulty_measu # cc mismatch only postj( client, - f"/api/v1/submit_measurement/{rid}", + "/api/v1/submit_measurement", msm, headers={"X-Forwarded-For": "123.123.123.124"}, ) @@ -197,7 +196,7 @@ async def test_geoip_mismatch_anoncred(client, clickhouse_db, clean_faulty_measu # ASN mismatch only postj( client, - f"/api/v1/submit_measurement/{rid}", + "/api/v1/submit_measurement", msm, headers={"X-Forwarded-For": "123.123.123.125"}, ) @@ -208,7 +207,7 @@ async def test_geoip_mismatch_anoncred(client, clickhouse_db, clean_faulty_measu # both cc and ASN mismatch postj( client, - f"/api/v1/submit_measurement/{rid}", + "/api/v1/submit_measurement", msm, headers={"X-Forwarded-For": "123.123.123.126"}, ) diff --git a/ooniapi/services/ooniprobe/tests/integ/test_reports.py b/ooniapi/services/ooniprobe/tests/integ/test_reports.py index 1ce6a437..0e0fdee3 100644 --- a/ooniapi/services/ooniprobe/tests/integ/test_reports.py +++ b/ooniapi/services/ooniprobe/tests/integ/test_reports.py @@ -1,4 +1,5 @@ import json +import re import zstd import pytest import ujson @@ -70,39 +71,89 @@ async def test_collector_upload_msmt_valid(client): } assert len(rid) == 61, rid - upload_payload = {"format": "json", "content": {"test_keys": {}}} + upload_payload = { + "format": "json", + "content": { + "test_keys": {}, + "probe_cc": "IE", + "probe_asn": "AS34245", + "test_name": "web_connectivity", + }, + } c = postj(client, f"/report/{rid}", upload_payload) - expected_hash = get_msmt_hash(upload_payload) - assert c["measurement_uid"].endswith(f"_IE_webconnectivity_{expected_hash}"), c + assert re.fullmatch( + r"\d{14}\.\d+_IE_webconnectivity_[0-9a-f]{16}", c["measurement_uid"] + ), c c = postj(client, f"/report/{rid}/close", json={}) assert c == {}, c +@pytest.mark.parametrize( + "content_overrides", + [ + {"probe_cc": "DE"}, + {"probe_asn": "AS2"}, + {"test_name": "ndt"}, + ], +) +@pytest.mark.asyncio +async def test_rejects_body_report_id_mismatch( + client, content_overrides +): + """Measurement upload must reject when report_id and body disagree""" + rid = "20230101T000000Z_integtest_IT_1_n1_integtest0000000" + msmt_payload = { + "format": "json", + "content": { + "test_keys": {}, + "probe_cc": "IT", + "probe_asn": "AS1", + "test_name": "integtest", + **content_overrides, + }, + } + resp = client.post(f"/report/{rid}", json=msmt_payload) + assert resp.status_code == 400, resp.json() + assert resp.json()["detail"] == "Inconsistent measurement" + + @pytest.mark.asyncio async def test_collector_upload_msmt_valid_zstd(client): rid = "20230101T000000Z_integtest_IT_1_n1_integtest0000000" - msmt_payload = {"test_keys": {}} + msmt_payload = { + "format": "json", + "content": { + "test_keys": {}, + "probe_cc": "IT", + "probe_asn": "AS1", + "test_name": "integtest", + }, + } zmsmt = zstd.compress(json.dumps(msmt_payload).encode()) headers = [("Content-Encoding", "zstd")] c = post(client, f"/report/{rid}", zmsmt, headers=headers) - assert len(c) == 1 + assert "measurement_uid" in c, c - expected_hash = get_msmt_hash(msmt_payload) - assert c["measurement_uid"].endswith(f"_IT_integtest_{expected_hash}"), c + assert re.fullmatch( + r"\d{14}\.\d+_IT_integtest_[0-9a-f]{16}", c["measurement_uid"] + ), c @pytest.mark.asyncio -async def test_fastpath_fallback(client_with_mocked_fastpath): +async def test_fastpath_fallback(client_with_one_good_mocked_fastpath): """When the first fastpath URL fails, the second one in the list should still receive the measurement. """ - client, mock_fastpath, success_url = client_with_mocked_fastpath + client, mock_fastpath, success_url = client_with_one_good_mocked_fastpath rid = "20230101T000000Z_integtest_IT_1_n1_integtest0000000" msmt_payload = { "format": "json", "content": { "test_keys": {}, + "probe_cc": "IT", + "probe_asn": "AS1", + "test_name": "integtest", "annotations": {"platform": "test_platform"}, "software_name": "test_software", "software_version": "0.0.0", @@ -118,7 +169,6 @@ async def test_fastpath_fallback(client_with_mocked_fastpath): expected_hash = get_msmt_hash(msmt_payload) assert msmt_uid.endswith(f"_IT_integtest_{expected_hash}"), msmt_uid - # check saved data expected_url = f"{success_url}/{msmt_uid}" assert list(mock_fastpath.uploads.keys()) == [expected_url] @@ -139,6 +189,9 @@ async def test_fastpath_only_submits_once_on_success(client_with_two_working_fas "format": "json", "content": { "test_keys": {}, + "probe_cc": "IT", + "probe_asn": "AS1", + "test_name": "integtest", "annotations": {"platform": "test_platform"}, "software_name": "test_software", "software_version": "0.0.0", @@ -150,13 +203,15 @@ async def test_fastpath_only_submits_once_on_success(client_with_two_working_fas msmt_uid = body.get("measurement_uid") assert msmt_uid, body - # Sanity-check the bytes that were forwarded to the fastpath expected_hash = get_msmt_hash(msmt_payload) assert msmt_uid.endswith(f"_IT_integtest_{expected_hash}"), msmt_uid - # Only the first fastpath URL should have received the measurement expected_url = f"{first_url}/{msmt_uid}" - assert list(mock_fastpath.uploads.keys()) == [expected_url], ( + uploaded_urls = list(mock_fastpath.uploads.keys()) + assert uploaded_urls == [expected_url], ( "measurement should be forwarded to the first fastpath URL only, " - f"got {list(mock_fastpath.uploads.keys())}" - ) \ No newline at end of file + f"got {uploaded_urls}" + ) + + stored = ujson.loads(mock_fastpath.uploads[expected_url]) + assert get_msmt_hash(stored) == expected_hash diff --git a/ooniapi/services/ooniprobe/tests/test_anoncred.py b/ooniapi/services/ooniprobe/tests/test_anoncred.py index 02d56021..b24e662a 100644 --- a/ooniapi/services/ooniprobe/tests/test_anoncred.py +++ b/ooniapi/services/ooniprobe/tests/test_anoncred.py @@ -1,3 +1,4 @@ +import re from typing import Any, Dict import ooniauth_py import pytest @@ -79,11 +80,6 @@ async def test_registration_errors(client): @pytest.mark.asyncio async def test_submission_basic(client): - # open report - j = make_report_request("IE", "AS34245") - resp = postj(client, "/report", json=j) - rid = resp.pop("report_id") - # Create user user, manifest_version, emission_day = setup_user(client) @@ -91,7 +87,7 @@ async def test_submission_basic(client): msm = make_measurement(submit_request.nym, submit_request.request, manifest_version) - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "verified", c assert c['submit_response'], "Submit response should not be null if the proof was verified" @@ -100,24 +96,19 @@ async def test_submission_basic(client): @pytest.mark.asyncio -async def test_fastpath_fallback(client_with_mocked_fastpath): +async def test_fastpath_fallback(client_with_one_good_mocked_fastpath): """ When the first fastpath URL fails, the second one in the list should still receive a verified anonymous-credentials measurement. """ - client, mock_fastpath, success_url = client_with_mocked_fastpath - - # open report - j = make_report_request("IE", "AS34245") - resp = postj(client, "/report", json=j) - rid = resp.pop("report_id") + client, mock_fastpath, success_url = client_with_one_good_mocked_fastpath # build a verifiable submission user, manifest_version, _ = setup_user(client) submit_request = make_submit_request(user, "IE", "AS34245") msm = make_measurement(submit_request.nym, submit_request.request, manifest_version) - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "verified", c assert c["error"] is None, c assert c["submit_response"], ( @@ -126,20 +117,64 @@ async def test_fastpath_fallback(client_with_mocked_fastpath): msmt_uid = c["measurement_uid"] assert msmt_uid, c - # Verified anonymous-credential submissions inject "t" as the - # `is_verified` flag in the stored payload before hashing. - expected_hash = get_msmt_hash(msm, is_verified="t") - assert msmt_uid.endswith(f"_IE_webconnectivity_{expected_hash}"), msmt_uid - - # Check response bytes expected_url = f"{success_url}/{msmt_uid}" assert list(mock_fastpath.uploads.keys()) == [expected_url] + # Verified anonymous-credential submissions inject "t" as the + # `is_verified` flag in the stored payload before hashing. The bytes the + # server hashes also include a freshly generated random `report_id`, so we + # compute the expected hash from the stored body. stored = ujson.loads(mock_fastpath.uploads[expected_url]) - assert get_msmt_hash(stored, is_verified="t") == expected_hash + expected_hash = get_msmt_hash(stored, is_verified="t") + assert msmt_uid.endswith(f"_IE_webconnectivity_{expected_hash}"), msmt_uid assert stored["is_verified"] == "t" +@pytest.mark.asyncio +async def test_fastpath_payload_has_report_id(client_with_mocked_fastpath): + """ + The body forwarded to the fastpath must always include a freshly + generated `report_id`, even if not provided by the client + """ + client, mock_fastpath, fastpath_url = client_with_mocked_fastpath + + # 1) Verified anonymous-credentials submission + user, manifest_version, _ = setup_user(client) + submit_request = make_submit_request(user, "IE", "AS34245") + msm_verified = make_measurement( + submit_request.nym, submit_request.request, manifest_version + ) + c = postj(client, "/api/v1/submit_measurement", msm_verified) + assert c["verification_status"] == "verified", c + verified_uid = c["measurement_uid"] + + # 2) Unverified submission (missing anoncred fields) + msm_unverified = { + "format": "json", + "content": { + "test_name": "web_connectivity", + "probe_asn": "AS34245", + "probe_cc": "IE", + "test_start_time": "2020-09-09 14:11:11", + }, + } + c = postj(client, "/api/v1/submit_measurement", msm_unverified) + assert c["verification_status"] == "unverified", c + unverified_uid = c["measurement_uid"] + + # ____ni_ + rid_re = re.compile( + r"\d{8}T\d{6}Z_webconnectivity_IE_34245_n1_[A-Za-z0-9oo]{16}" + ) + for uid in (verified_uid, unverified_uid): + url = f"{fastpath_url}/{uid}" + assert url in mock_fastpath.uploads, mock_fastpath.uploads + stored = ujson.loads(mock_fastpath.uploads[url]) + rid = stored.get("content", {}).get("report_id") + assert isinstance(rid, str) and rid, stored + assert rid_re.fullmatch(rid), rid + + @pytest.mark.asyncio async def test_fastpath_only_submits_once_on_success(client_with_two_working_fastpaths): """ @@ -147,26 +182,17 @@ async def test_fastpath_only_submits_once_on_success(client_with_two_working_fas """ client, mock_fastpath, first_url, second_url = client_with_two_working_fastpaths - # open report - j = make_report_request("IE", "AS34245") - resp = postj(client, "/report", json=j) - rid = resp.pop("report_id") - # build a verifiable submission user, manifest_version, _ = setup_user(client) submit_request = make_submit_request(user, "IE", "AS34245") msm = make_measurement(submit_request.nym, submit_request.request, manifest_version) - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "verified", c assert c["error"] is None, c msmt_uid = c["measurement_uid"] assert msmt_uid, c - # Sanity-check the bytes that were forwarded to the fastpath - expected_hash = get_msmt_hash(msm, is_verified="t") - assert msmt_uid.endswith(f"_IE_webconnectivity_{expected_hash}"), msmt_uid - # Only the first fastpath URL should have received the measurement expected_url = f"{first_url}/{msmt_uid}" assert list(mock_fastpath.uploads.keys()) == [expected_url], ( @@ -174,16 +200,17 @@ async def test_fastpath_only_submits_once_on_success(client_with_two_working_fas f"got {list(mock_fastpath.uploads.keys())}" ) + # Sanity-check the bytes that were forwarded to the fastpath + stored = ujson.loads(mock_fastpath.uploads[expected_url]) + expected_hash = get_msmt_hash(stored, is_verified="t") + assert msmt_uid.endswith(f"_IE_webconnectivity_{expected_hash}"), msmt_uid + @pytest.mark.asyncio async def test_submission_non_verified(client): """ """ - j = make_report_request() - resp = postj(client, "/report", json=j) - rid = resp.pop("report_id") - msm = { "format": "json", "content": { @@ -195,14 +222,14 @@ async def test_submission_non_verified(client): } # no anoncred fields -> processed but not verified, no error - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "unverified" assert c["submit_response"] is None assert c["error"] is None # unknown manifest -> processed but not verified, manifest error msm["manifest_version"] = "does-not-exist" - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "unverified" assert c["submit_response"] is None assert c["error"] == "manifest_not_found" @@ -211,7 +238,7 @@ async def test_submission_non_verified(client): user, manifest_version, _ = setup_user(client) msm["nym"] = "dummy-nym" msm["manifest_version"] = manifest_version - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "unverified" assert c["submit_response"] is None assert c["error"] == "incomplete_anonc_fields" @@ -222,14 +249,14 @@ async def test_submission_non_verified(client): msm["zkp_request"] = submit_request.request msm["manifest_version"] = manifest_version msm["protocol_version"] = "0.0.1" - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "unverified" assert c["submit_response"] is None assert c["error"] == "protocol_version_too_old" # unparsable protocol version -> invalid protocol version error msm["protocol_version"] = "abc" - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "unverified" assert c["submit_response"] is None assert c["error"] == "invalid_protocol_version" @@ -490,15 +517,11 @@ async def test_credential_update_with_submission(client, client_with_original_ma (user, manifest_version, emission_day) = client_with_original_manifest # first submit: should just work out of the box - j = make_report_request() - resp = postj(client, "/report", json=j) - rid = resp.pop("report_id") - submit_request = make_submit_request(user, "IE", "AS34245") msm = make_measurement(submit_request.nym, submit_request.request, manifest_version) - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) assert c["verification_status"] == "verified" @@ -514,15 +537,11 @@ async def test_credential_update_with_submission(client, client_with_original_ma assert 'update_response' in result user.handle_credential_update_response(result['update_response']) # should not crash - j = make_report_request() - resp = postj(client, "/report", json=j) - rid = resp.pop("report_id") - submit_request = make_submit_request(user, "IE", "AS34245") msm = make_measurement(submit_request.nym, submit_request.request, manifest_version) - c = postj(client, f"/api/v1/submit_measurement/{rid}", msm) + c = postj(client, "/api/v1/submit_measurement", msm) def make_measurement(