Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions a4d-python/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ A4D_DATASET=tracker
A4D_DOWNLOAD_BUCKET=a4dphase2_upload
A4D_UPLOAD_BUCKET=a4dphase2_output

# GCP Authentication (optional - uses Application Default Credentials if not set)
# For local development: run `gcloud auth application-default login`
# For CI/CD or VM: set path to service account key file
# GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json

# Paths
A4D_DATA_ROOT=/path/to/tracker/files
A4D_OUTPUT_DIR=output
Expand Down
154 changes: 154 additions & 0 deletions a4d-python/src/a4d/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,160 @@ def create_tables_cmd(
raise typer.Exit(1) from e


@app.command("upload-tables")
def upload_tables_cmd(
tables_dir: Annotated[
Path,
typer.Option("--tables-dir", "-t", help="Directory containing parquet table files"),
],
dataset: Annotated[
str | None,
typer.Option("--dataset", "-d", help="BigQuery dataset name (default: from config)"),
] = None,
project_id: Annotated[
str | None,
typer.Option("--project", "-p", help="GCP project ID (default: from config)"),
] = None,
append: Annotated[
bool,
typer.Option("--append", help="Append to existing tables instead of replacing"),
] = False,
):
"""Upload pipeline output tables to BigQuery.

Loads parquet files from the tables directory into the configured
BigQuery dataset. By default, existing tables are replaced (matching
the R pipeline behavior).

\b
Examples:
# Upload tables from default output directory
uv run a4d upload-tables --tables-dir output/tables

# Upload to a specific dataset
uv run a4d upload-tables --tables-dir output/tables --dataset tracker_dev

# Append instead of replace
uv run a4d upload-tables --tables-dir output/tables --append
"""
from a4d.gcp.bigquery import load_pipeline_tables

console.print("\n[bold blue]A4D BigQuery Upload[/bold blue]\n")
console.print(f"Tables directory: {tables_dir}")

if not tables_dir.exists():
console.print(f"[bold red]Error: Directory not found: {tables_dir}[/bold red]\n")
raise typer.Exit(1)

try:
results = load_pipeline_tables(
tables_dir=tables_dir,
dataset=dataset,
project_id=project_id,
replace=not append,
)

if results:
result_table = Table(title="Uploaded Tables")
result_table.add_column("Table", style="cyan")
result_table.add_column("Rows", justify="right", style="green")
result_table.add_column("Status", style="green")

for table_name, job in results.items():
result_table.add_row(
table_name,
f"{job.output_rows:,}" if job.output_rows else "?",
"✓",
)

console.print(result_table)
console.print(
f"\n[bold green]✓ Uploaded {len(results)} tables to BigQuery[/bold green]\n"
)
else:
console.print("[bold yellow]No tables found to upload[/bold yellow]\n")

except Exception as e:
console.print(f"\n[bold red]Error: {e}[/bold red]\n")
raise typer.Exit(1) from e


@app.command("download-trackers")
def download_trackers_cmd(
destination: Annotated[
Path,
typer.Option("--destination", "-d", help="Local directory to download files to"),
],
bucket: Annotated[
str | None,
typer.Option("--bucket", "-b", help="GCS bucket name (default: from config)"),
] = None,
):
"""Download tracker files from Google Cloud Storage.

\b
Examples:
# Download to local directory
uv run a4d download-trackers --destination /data/trackers

# Download from specific bucket
uv run a4d download-trackers --destination /data/trackers --bucket my-bucket
"""
from a4d.gcp.storage import download_tracker_files

console.print("\n[bold blue]A4D Tracker Download[/bold blue]\n")
console.print(f"Destination: {destination}")

try:
downloaded = download_tracker_files(destination=destination, bucket_name=bucket)
console.print(f"\n[bold green]✓ Downloaded {len(downloaded)} files[/bold green]\n")
except Exception as e:
console.print(f"\n[bold red]Error: {e}[/bold red]\n")
raise typer.Exit(1) from e


@app.command("upload-output")
def upload_output_cmd(
source_dir: Annotated[
Path,
typer.Option("--source", "-s", help="Output directory to upload"),
],
bucket: Annotated[
str | None,
typer.Option("--bucket", "-b", help="GCS bucket name (default: from config)"),
] = None,
prefix: Annotated[
str,
typer.Option("--prefix", help="Prefix for uploaded blob names"),
] = "",
):
"""Upload pipeline output to Google Cloud Storage.

\b
Examples:
# Upload output directory
uv run a4d upload-output --source output/

# Upload with prefix
uv run a4d upload-output --source output/ --prefix 2024-01
"""
from a4d.gcp.storage import upload_output

console.print("\n[bold blue]A4D Output Upload[/bold blue]\n")
console.print(f"Source: {source_dir}")

if not source_dir.exists():
console.print(f"[bold red]Error: Directory not found: {source_dir}[/bold red]\n")
raise typer.Exit(1)

try:
uploaded = upload_output(source_dir=source_dir, bucket_name=bucket, prefix=prefix)
console.print(f"\n[bold green]✓ Uploaded {len(uploaded)} files to GCS[/bold green]\n")
except Exception as e:
console.print(f"\n[bold red]Error: {e}[/bold red]\n")
raise typer.Exit(1) from e


@app.command("version")
def version_cmd():
"""Show version information."""
Expand Down
21 changes: 21 additions & 0 deletions a4d-python/src/a4d/gcp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from a4d.gcp.bigquery import (
TABLE_CONFIGS,
get_bigquery_client,
load_pipeline_tables,
load_table,
)
from a4d.gcp.storage import (
download_tracker_files,
get_storage_client,
upload_output,
)

__all__ = [
"TABLE_CONFIGS",
"download_tracker_files",
"get_bigquery_client",
"get_storage_client",
"load_pipeline_tables",
"load_table",
"upload_output",
]
187 changes: 187 additions & 0 deletions a4d-python/src/a4d/gcp/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""BigQuery table loading from parquet files.

Replaces the R pipeline's `ingest_data()` function which used the `bq` CLI tool.
Uses the google-cloud-bigquery Python client for loading parquet files with
clustering configuration matching the R pipeline.
"""

from pathlib import Path

from google.cloud import bigquery
from loguru import logger

from a4d.config import settings

# Table configurations matching the R pipeline's clustering fields.
# Each table maps to the clustering fields used for optimal query performance.
TABLE_CONFIGS: dict[str, list[str]] = {
"patient_data_monthly": ["clinic_id", "patient_id", "tracker_date"],
"patient_data_annual": ["patient_id", "tracker_date"],
"patient_data_static": ["clinic_id", "patient_id", "tracker_date"],
"patient_data_hba1c": ["clinic_id", "patient_id", "tracker_date"],
"product_data": [
"clinic_id",
"product_released_to",
"product_table_year",
"product_table_month",
],
"clinic_data_static": ["clinic_id"],
"logs": ["level", "log_file", "file_name"],
"tracker_metadata": ["file_name", "clinic_code"],
}

# Maps the pipeline output file names to BigQuery table names.
# Note: table_logs.parquet uses this name from create_table_logs() in tables/logs.py.
PARQUET_TO_TABLE: dict[str, str] = {
"patient_data_static.parquet": "patient_data_static",
"patient_data_monthly.parquet": "patient_data_monthly",
"patient_data_annual.parquet": "patient_data_annual",
"table_logs.parquet": "logs",
}


def get_bigquery_client(project_id: str | None = None) -> bigquery.Client:
"""Create a BigQuery client.

Authentication uses Application Default Credentials (ADC):
- In Cloud Run / GCE: automatic via metadata server
- Locally: via `gcloud auth application-default login`
- In CI: via GOOGLE_APPLICATION_CREDENTIALS environment variable

Args:
project_id: GCP project ID (defaults to settings.project_id)

Returns:
Configured BigQuery client
"""
return bigquery.Client(project=project_id or settings.project_id)


def load_table(
parquet_path: Path,
table_name: str,
client: bigquery.Client | None = None,
dataset: str | None = None,
project_id: str | None = None,
replace: bool = True,
) -> bigquery.LoadJob:
"""Load a parquet file into a BigQuery table.

Replicates the R pipeline's `ingest_data()` function:
1. Optionally deletes the existing table (replace=True, matching R's delete=T default)
2. Loads the parquet file with clustering fields

Args:
parquet_path: Path to the parquet file to load
table_name: BigQuery table name (e.g., "patient_data_monthly")
client: BigQuery client (created if not provided)
dataset: Dataset name (defaults to settings.dataset)
project_id: GCP project ID (defaults to settings.project_id)
replace: If True, replaces the existing table (default matches R pipeline)

Returns:
Completed LoadJob

Raises:
FileNotFoundError: If parquet file doesn't exist
ValueError: If table_name is not in TABLE_CONFIGS
google.api_core.exceptions.GoogleAPIError: On BigQuery API errors
"""
if not parquet_path.exists():
raise FileNotFoundError(f"Parquet file not found: {parquet_path}")

dataset = dataset or settings.dataset
project_id = project_id or settings.project_id

if client is None:
client = get_bigquery_client(project_id)

table_ref = f"{project_id}.{dataset}.{table_name}"
logger.info(f"Loading {parquet_path.name} → {table_ref}")

# Configure the load job
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=(
bigquery.WriteDisposition.WRITE_TRUNCATE
if replace
else bigquery.WriteDisposition.WRITE_APPEND
),
)

# Add clustering if configured for this table
clustering_fields = TABLE_CONFIGS.get(table_name)
if clustering_fields:
job_config.clustering_fields = clustering_fields
logger.info(f"Clustering fields: {clustering_fields}")

# Load the parquet file
with open(parquet_path, "rb") as f:
load_job = client.load_table_from_file(f, table_ref, job_config=job_config)

# Wait for completion
load_job.result()

logger.info(
f"Loaded {load_job.output_rows} rows into {table_ref} "
f"({parquet_path.stat().st_size / 1024 / 1024:.2f} MB)"
)
return load_job


def load_pipeline_tables(
tables_dir: Path,
client: bigquery.Client | None = None,
dataset: str | None = None,
project_id: str | None = None,
replace: bool = True,
) -> dict[str, bigquery.LoadJob]:
"""Load all pipeline output tables into BigQuery.

Scans the tables directory for known parquet files and loads each one
into the corresponding BigQuery table.

Args:
tables_dir: Directory containing parquet table files (e.g., output/tables/)
client: BigQuery client (created if not provided)
dataset: Dataset name (defaults to settings.dataset)
project_id: GCP project ID (defaults to settings.project_id)
replace: If True, replaces existing tables

Returns:
Dictionary mapping table name to completed LoadJob

Raises:
FileNotFoundError: If tables_dir doesn't exist
"""
if not tables_dir.exists():
raise FileNotFoundError(f"Tables directory not found: {tables_dir}")

if client is None:
project_id = project_id or settings.project_id
client = get_bigquery_client(project_id)

logger.info(f"Loading pipeline tables from: {tables_dir}")

results: dict[str, bigquery.LoadJob] = {}

for parquet_name, table_name in PARQUET_TO_TABLE.items():
parquet_path = tables_dir / parquet_name
if parquet_path.exists():
try:
job = load_table(
parquet_path=parquet_path,
table_name=table_name,
client=client,
dataset=dataset,
project_id=project_id,
replace=replace,
)
results[table_name] = job
except Exception:
logger.exception(f"Failed to load table: {table_name}")
else:
logger.warning(f"Table file not found, skipping: {parquet_name}")

logger.info(f"Successfully loaded {len(results)}/{len(PARQUET_TO_TABLE)} tables")
return results
Loading
Loading