Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e38bc1a
remove report id from legacy measurement sumission function
LDiazN Apr 28, 2026
393d254
Remove report_id from anonymous credentials submit
LDiazN Apr 28, 2026
c804c84
Disaggregate bad measurements count by type of error
LDiazN Apr 29, 2026
77a5ecf
disaggregate type of error in anonc submission path
LDiazN Apr 30, 2026
dff6de5
Refactor metadata checking in submission
LDiazN Apr 30, 2026
69a1628
Remove open report from anonc tests
LDiazN Apr 30, 2026
dc5ee4e
simplify typing
LDiazN Apr 30, 2026
198bb1e
Improve docstring
LDiazN Apr 30, 2026
5374c5c
remove unused function
LDiazN Apr 30, 2026
90d52f3
remove unused imports
LDiazN Apr 30, 2026
eea7d17
Set report_id in measurement submission path and add test for checkin…
LDiazN May 7, 2026
a973560
Add check to verify that submit_measurement always sets the report_id
LDiazN May 7, 2026
0582837
Add report_id to the content key instead of root of json
LDiazN May 8, 2026
2e9739d
Refactor validation into check function
LDiazN May 14, 2026
1843154
Add report_id to submission result
LDiazN May 14, 2026
22bc4f4
Move reformating to function constructing msm uid
LDiazN May 14, 2026
b383f67
Restore report_id parsing; add consistency check between report_id an…
LDiazN May 14, 2026
45e46d9
Update tests
LDiazN May 14, 2026
acbf2ba
Add tests to check metadata consistency against report_id
LDiazN May 14, 2026
72c2b81
Remove outdated comment
LDiazN May 14, 2026
41d6bff
Remove uneccesary identation
LDiazN May 14, 2026
8d7dba6
Improve test name and docs
LDiazN May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 62 additions & 36 deletions ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@
from pydantic import Field
from starlette.concurrency import run_in_threadpool

from ..common.config import Settings
from ..common.dependencies import ClickhouseDep
from ..common.metrics import timer
from ..common.routers import BaseModel
from ..common.utils import setnocacheresponse
from ..dependencies import ASNCCReaderDep, SettingsDep
from ..metrics import Metrics
from ..utils import (
MeasurementMetadata,
check_measurement_meta,
error,
generate_report_id,
get_cc_asn,
metadata_from_measurement_content,
normalize_asn,
register_geoip_anomaly,
)
Expand Down Expand Up @@ -107,7 +111,7 @@ async def receive_measurement(
content_encoding: str = Header(default=None),
) -> ReceiveMeasurementResponse | Dict[str, Any]:
"""
Submit measurement
Submit measurement.
"""
setnocacheresponse(response)
empty_measurement = {}
Expand All @@ -118,14 +122,12 @@ async def receive_measurement(
f"Unexpected report_id {report_id[:200]}. Error: {e}",
)
Metrics.BAD_MEASUREMENTS_CNT.labels(reason="bad_report_id").inc()
raise error("Incorrect format")
error("Incorrect format")

# TODO validate the timestamp?
good = len(cc) == 2 and test_name.isalnum() and 1 < len(test_name) < 30
if not good:
log.info("Unexpected report_id %r", report_id[:200])
error("Incorrect format")

try:
asn_i = int(asn)
except ValueError as e:
Expand Down Expand Up @@ -157,13 +159,20 @@ async def receive_measurement(
error("Incorrect format")

try:
data = await run_in_threadpool(_set_unverified_flag, data)
data, metadata = await run_in_threadpool(
_process_measurement_body, data
)
except Exception as e:
log.info("Failed to parse and modify measurement body")
log.exception(e)
Metrics.BAD_MEASUREMENTS_CNT.labels(reason="bad_json").inc()
error("Incorrect format")

# Raise an exception in case the report_id is not consistent with the body,
# we flag this behavior as faulty data
_compare_report_id_to_body_meta(cc, asn, test_name, metadata)
check_measurement_meta(metadata.test_name, metadata.probe_cc, metadata.probe_asn)

# Write the whole body of the measurement in a directory based on a 1-hour
# time window
now = datetime.now(timezone.utc)
Expand Down Expand Up @@ -203,7 +212,7 @@ async def receive_measurement(

if success:
# Geoip anomaly detection runs only when the measurement was successfully
# submitted to the fastpath, so retries don't cause duplicate anomaly entries.
# submitted to the fastpath
with Metrics.COMPARE_CC_TIMING.time():
try:
await run_in_threadpool(
Expand All @@ -214,7 +223,7 @@ async def receive_measurement(
cc,
asn,
msmt_uid,
data,
metadata,
)
except Exception as e:
log.error(f"Error checking for geoip anomalies: {e}")
Expand Down Expand Up @@ -242,14 +251,52 @@ async def receive_measurement(
Metrics.SEND_S3_CNT.labels(status="fail").inc()
return empty_measurement

def _process_measurement_body(
data: bytes
) -> Tuple[bytes, MeasurementMetadata]:
"""
- Parse the measurement body
- extract some metadata fields
- set `is_verified="u"`
- re-serialize.
"""

def _set_unverified_flag(data: bytes) -> bytes:
with Metrics.DESERIALIZE_BODY_TIMING.time():
measurement = ujson.loads(data)
measurement["is_verified"] = "u"
json = ujson.loads(data)

assert isinstance(json, dict)

content = json.get("content")
assert isinstance(content, dict)

metadata = metadata_from_measurement_content(content)

json["is_verified"] = "u"

with Metrics.SERIALIZE_BODY_TIMING.time():
return ujson.dumps(measurement).encode("utf-8")
return ujson.dumps(json).encode("utf-8"), metadata

def _compare_report_id_to_body_meta(cc: str, asn: str, test_name: str, metadata: MeasurementMetadata):
"""
Compare the metadata reported by a report_id against the metadata in a
measurement body

Raise HTTPException on errors
"""
if cc.upper() != metadata.probe_cc.upper():
log.info(f"CC mismatch: {cc} vs {metadata}")
Metrics.BAD_MEASUREMENTS_CNT.labels(reason="cc_mismatch").inc()
error("Inconsistent measurement")

if asn.upper().lstrip("AS") != metadata.probe_asn.upper().lstrip("AS"):
log.info(f"ASN mismatch: {asn} vs {metadata.probe_asn}")
Metrics.BAD_MEASUREMENTS_CNT.labels(reason="asn_mismatch").inc()
error("Inconsistent measurement")

if test_name != metadata.test_name.replace("_",""):
log.info(f"Test name mismatch: {test_name} vs {metadata.test_name}")
Metrics.BAD_MEASUREMENTS_CNT.labels(reason="test_name_mismatch").inc()
error("Inconsistent measurement")

def _check_and_register_geoip_anomaly(
request: Request,
Expand All @@ -258,46 +305,25 @@ def _check_and_register_geoip_anomaly(
cc: str,
asn: str,
msmt_uid: str,
data: bytes,
metadata: MeasurementMetadata,
) -> None:
actual_cc, actual_asn = get_cc_asn(request, asn_cc_reader)
if actual_cc != cc or normalize_asn(actual_asn) != normalize_asn(asn):
# expensive: parses measurement body and sends anomaly to clickhouse
platform, software_name, software_version = _parse_metadata(data)
register_geoip_anomaly(
cc,
actual_cc,
asn,
actual_asn,
clickhouse,
msmt_uid,
platform,
software_name,
software_version,
metadata.platform,
metadata.software_name,
metadata.software_version,
)
else:
Metrics.PROBE_CC_ASN_MATCH.inc()


def _parse_metadata(data: bytes) -> Tuple[str, str, str]:
"""
Parse measurement body, and return the following metadata:

platform, software_name, software_version
"""
try:
body = ujson.loads(data.decode("utf-8"))
except Exception as e:
log.error(f"Couldn't parse json body: {e}")
return ("", "", "")
content = body.get("content", {})
annotations = content.get("annotations", {})
platform = annotations.get("platform", "")
software_name = content.get("software_name", "")
software_version = content.get("software_version", "")
return (platform, software_name, software_version)


@timer(name="close_report")
@router.post("/report/{report_id}/close", tags=["reports"])
def close_report(report_id):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import asyncio
from sqlalchemy import desc
import io
import logging
import random
Expand Down Expand Up @@ -48,10 +48,12 @@
PsiphonConfigDep
)
from ...utils import (
check_measurement_meta,
extract_probe_ipaddr,
generate_report_id,
geolookup_probe,
get_cc_asn,
metadata_from_measurement_content,
normalize_asn,
register_geoip_anomaly,
)
Expand Down Expand Up @@ -871,10 +873,15 @@ class SubmitMeasurementResponse(BaseModel):
default=None,
)

report_id: str = Field(
description="Report ID generated by the server. Note that this is "
"unique to every measurement, so it's not useful for grouping "
"measurements that were taken together"
)


@router.post("/submit_measurement/{report_id}", tags=["anonymous_credentials"])
@router.post("/submit_measurement", tags=["anonymous_credentials"])
async def submit_measurement(
report_id: str,
request: Request,
submit_request: SubmitMeasurementRequest,
response: Response,
Expand All @@ -900,61 +907,29 @@ async def submit_measurement(
- status code 4xx or 5xx: the measurement was not processed nor stored
"""
setnocacheresponse(response)
try:
rid_timestamp, test_name, cc, asn, format_cid, rand = report_id.split("_")
except Exception:
err_msg = f"Incorrect format: unexpected report_id {report_id[:200]}"
log.info(err_msg)
raise HTTPException(
status_code=400,
detail={"error": "incorrect_format", "message": err_msg},
)

# TODO validate the timestamp?
good = len(cc) == 2 and test_name.isalnum() and 1 < len(test_name) < 30
if not good:
err_msg = f"Incorrect format: unexpected report_id {report_id[:200]}"
log.info(err_msg)
raise HTTPException(
status_code=400,
detail={"error": "incorrect_format", "message": err_msg},
)
metadata = metadata_from_measurement_content(submit_request.content)

try:
asn_i = int(asn)
except ValueError:
err_msg = f"Incorrect format: ASN value not parsable {asn}"
log.info(err_msg)
raise HTTPException(
status_code=400,
detail={"error": "incorrect_format", "message": err_msg},
)
test_name = metadata.test_name
cc = metadata.probe_cc
asn = metadata.probe_asn

if asn_i == 0:
log.info("Discarding ASN == 0")
Metrics.MSMNT_DISCARD_ASN0.inc()
raise HTTPException(400, detail = {"error" : "asn_0", "message" : "Measurement discarded, ASN == 0"})
check_measurement_meta(test_name, cc, asn)

if cc.upper() == "ZZ":
log.info("Discarding CC == ZZ")
Metrics.MSMNT_DISCARD_CC_ZZ.inc()
raise HTTPException(400, detail = {"error" : "cc_zz", "message" : "Measurement discarded, CC == ZZ"})
# generate the new report_id
rid = generate_report_id(test_name, settings, cc, normalize_asn(asn))

# Anonymous credentials verification
verification_status, submit_error, submit_response = _verify_submit(
submit_request, manifest, settings
)

annotations = submit_request.content.get("annotations", {})
platform = annotations.get("platform") or ""
software_name = submit_request.content.get("software_name") or ""
software_version = submit_request.content.get("software_version") or ""

data = submit_request.model_dump()

# Add verification-related data.
# use one-letter code for DB, human readable for clients
data["is_verified"] = verification_status.code
data["content"]["report_id"] = rid
data_buff = io.BytesIO()
stream = io.TextIOWrapper(data_buff, "utf-8")
ujson.dump(data, stream)
Expand All @@ -969,6 +944,7 @@ async def submit_measurement(
ts = now.strftime("%Y%m%d%H%M%S.%f")

# msmt_uid is a unique id based on upload time, cc, testname and hash
test_name = test_name.replace("_","")
msmt_uid = f"{ts}_{cc}_{test_name}_{h}"
Metrics.MSMNT_RECEIVED_CNT.inc()

Expand Down Expand Up @@ -1013,9 +989,9 @@ async def submit_measurement(
cc,
asn,
msmt_uid,
platform,
software_name,
software_version,
metadata.platform,
metadata.software_name,
metadata.software_version,
)
except Exception as e:
log.error(f"Error checking for geoip anomalies: {e}")
Expand All @@ -1026,30 +1002,34 @@ async def submit_measurement(
verification_status=verification_status,
submit_response=submit_response,
error=submit_error,
report_id = rid
)

Metrics.SEND_FASTPATH_CNT.labels(status="fail").inc()

# wasn't possible to send msmnt to fastpath, try to send it to s3
data_buff.seek(0)
ts_prefix = now.strftime("%Y%m%d%H")
tn = test_name.replace("_", "")
s3_key = f"postcans/{ts_prefix}/{ts_prefix}_{cc}_{tn}/{msmt_uid}.post"
try:
await run_in_threadpool(
request.app.state.s3_client.upload_fileobj,
data_buff,
io.BytesIO(data_bin),
Bucket=settings.failed_reports_bucket,
Key=report_id,
Key=s3_key,
)
except Exception as exc:
log.error(f"Unable to upload measurement to s3. Error: {exc}")
Metrics.SEND_S3_FAILURE.inc()

log.error(f"Unable to send report to fastpath. report_id: {report_id}")
log.error(f"Unable to send report to fastpath. measurement_uid: {msmt_uid}")
Metrics.MISSED_MSMNTS.inc()
return SubmitMeasurementResponse(
Copy link
Copy Markdown
Member

@hellais hellais May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to return to the caller the report_id that was generated as well? I suppose it doesn't hurt since it's a value which we generated in here and the client has no other way of knowing it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the report_id is redundant to the measurement_uid in this context (it's unique per measurement). The best value we could get out of it is restoring it in the future, but in that case the probe would probably already know it before sending the measurement

But I agree it wouldn't hurt to have add it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree that it's redundant. We should check though with @aanorbel and @sdsantos if this is though still required by the mobile apps for something (I seem to recall this was still being stored in the in-app DB).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Norbel suggested that we keep the report_id in the response model, so I added that 👍

measurement_uid=msmt_uid,
verification_status=verification_status,
submit_response=submit_response,
error=submit_error or "submission_delivery_failed",
report_id = rid
)

def _verify_submit(
Expand Down
Loading
Loading