Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/cenace-data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: CENACE Data

on:
schedule:
- cron: "30 6 * * *"
workflow_dispatch:
inputs:
execution_date:
description: "Execution date YYYY-MM-DD. Target downloaded date is execution_date + 1 day."
required: false
type: string

jobs:
update-cenace-data:
runs-on: ubuntu-latest

env:
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
MODAL_ENVIRONMENT: ${{ secrets.MODAL_ENVIRONMENT }}

steps:
- uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v6

- name: Set benchmark dates
id: dates
run: |
if [ -n "${{ inputs.execution_date }}" ]; then
execution_date="${{ inputs.execution_date }}"
else
execution_date="$(date -u -d 'yesterday' +%F)"
fi

target_date="$(date -u -d "$execution_date + 1 day" +%F)"

echo "execution_date=$execution_date" >> "$GITHUB_OUTPUT"
echo "target_date=$target_date" >> "$GITHUB_OUTPUT"
echo "evaluation_cutoff=${execution_date}T23:00:00" >> "$GITHUB_OUTPUT"
echo "forecast_cutoff=${target_date}T23:00:00" >> "$GITHUB_OUTPUT"

- name: Update CENACE data
run: make update-cenace-data EXECUTION_DATE=${{ steps.dates.outputs.execution_date }}

- name: Evaluate previous CENACE forecasts
continue-on-error: true
run: make update-cenace-evaluate CUTOFF=${{ steps.dates.outputs.evaluation_cutoff }}

- name: Forecast next CENACE day
run: make update-cenace-forecast CUTOFF=${{ steps.dates.outputs.forecast_cutoff }}
16 changes: 16 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,19 @@ $(addprefix validate-evaluate-,$(EV_FREQUENCIES)): validate-evaluate-%:
.PHONY: leaderboard
leaderboard: # Build leaderboard parquet from all evaluation parquets
$(MODAL) src.evaluation.gh_archive.modal_app::build_leaderboard

## CENACE Data

.PHONY: update-cenace-data
update-cenace-data:
$(MODAL) src.data.cenace.modal_app --execution-date $(EXECUTION_DATE)

## CENACE Forecast/Evaluation

.PHONY: update-cenace-forecast
update-cenace-forecast:
$(MODAL) src.forecast.cenace.modal_app::forecast --cutoff $(CUTOFF)

.PHONY: update-cenace-evaluate
update-cenace-evaluate:
$(MODAL) src.forecast.cenace.modal_app::evaluate --cutoff $(CUTOFF)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dev = [

[project]
dependencies = [
"beautifulsoup4>=4.15.0",
"boto3>=1.42.24",
"duckdb>=1.4.3",
"freezegun>=1.5.5",
Expand Down
Empty file added src/data/cenace/__init__.py
Empty file.
71 changes: 71 additions & 0 deletions src/data/cenace/aggregate/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

import argparse
import shutil
import tempfile
from pathlib import Path

import pandas as pd

from src.data.cenace.config import PROCESSED_CSV, PROCESSED_EVENTS_HOURLY_DIR

INPUT_CSV = PROCESSED_CSV
OUTPUT_ROOT = PROCESSED_EVENTS_HOURLY_DIR


def write_hourly_partitions(
df: pd.DataFrame,
output_root: Path = OUTPUT_ROOT,
) -> int:
df = df.copy()

df["ds"] = pd.to_datetime(df["ds"], errors="coerce")
df["y"] = pd.to_numeric(df["y"], errors="coerce")

df = df.dropna(subset=["unique_id", "ds", "y"]).copy()
df = df.sort_values(["unique_id", "ds"]).drop_duplicates(["unique_id", "ds"])

df["year"] = df["ds"].dt.year
df["month"] = df["ds"].dt.month
df["day"] = df["ds"].dt.day

output_root.mkdir(parents=True, exist_ok=True)

n_written = 0
for (year, month, day), part in df.groupby(["year", "month", "day"], sort=True):
part_dir = (
output_root / f"year={year:04d}" / f"month={month:02d}" / f"day={day:02d}"
)
part_dir.mkdir(parents=True, exist_ok=True)

out_path = part_dir / "series.parquet"
with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp:
part[["unique_id", "ds", "y"]].to_parquet(tmp.name, index=False)
shutil.copyfile(tmp.name, out_path)

print(f"Saved: {out_path}")
n_written += 1

return n_written


def build_hourly_partitions(
input_csv: Path = INPUT_CSV,
output_root: Path = OUTPUT_ROOT,
) -> int:
df = pd.read_csv(input_csv)
return write_hourly_partitions(df, output_root)


def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--input-csv", type=Path, default=INPUT_CSV)
parser.add_argument("--output-root", type=Path, default=OUTPUT_ROOT)
args = parser.parse_args()

n_written = build_hourly_partitions(args.input_csv, args.output_root)
print(f"\nDone. Wrote {n_written} daily partitions.")


if __name__ == "__main__":
main()
18 changes: 18 additions & 0 deletions src/data/cenace/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

import os
from pathlib import Path

ROOT = Path(__file__).resolve().parents[3]

DATA_ROOT = Path(os.environ.get("CENACE_DATA_ROOT", ROOT / "data" / "cenace"))

RAW_DIR = DATA_ROOT / "raw"
TMP_DIR = DATA_ROOT / "tmp"

PROCESSED_DIR = DATA_ROOT / "processed"
PROCESSED_CSV = PROCESSED_DIR / "cenace.csv"

PROCESSED_EVENTS_HOURLY_DIR = DATA_ROOT / "processed-events" / "hourly"
FORECASTS_HOURLY_DIR = DATA_ROOT / "forecasts" / "hourly"
EVALUATIONS_HOURLY_DIR = DATA_ROOT / "evaluations" / "hourly"
169 changes: 169 additions & 0 deletions src/data/cenace/extract/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from __future__ import annotations

import argparse
import zipfile
from datetime import datetime, timedelta
from pathlib import Path

import requests
from bs4 import BeautifulSoup

URL = "https://www.cenace.gob.mx/Paginas/SIM/Reportes/PreEnerServConMTR.aspx"

session = requests.Session()

HEADERS = {
"User-Agent": "Mozilla/5.0",
"Referer": URL,
"Origin": "https://www.cenace.gob.mx",
"Content-Type": "application/x-www-form-urlencoded",
}

# repo root = impermanent/
ROOT_DIR = Path(__file__).resolve().parents[4]
DEFAULT_BASE_DIR = ROOT_DIR / "data" / "cenace"


def target_date_for_execution(execution_date: datetime) -> datetime:
return execution_date + timedelta(days=1)


def raw_zip_path(date: datetime, raw_dir: Path) -> Path:
return raw_dir / f"{date.strftime('%Y%m%d')}.zip"


def get_form_state() -> dict[str, str]:
r = session.get(URL, headers=HEADERS)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")

def get_value(name: str) -> str:
el = soup.find("input", {"name": name})
return el.get("value") if el else ""

return {
"__VIEWSTATE": get_value("__VIEWSTATE"),
"__VIEWSTATEGENERATOR": get_value("__VIEWSTATEGENERATOR"),
"__VIEWSTATEENCRYPTED": get_value("__VIEWSTATEENCRYPTED"),
"__EVENTVALIDATION": get_value("__EVENTVALIDATION"),
}


def download_and_extract(date: datetime, raw_dir: Path, tmp_dir: Path) -> bool:
date_str = date.strftime("%d/%m/%Y")
period_str = f"{date_str} - {date_str}"

state = get_form_state()

payload = {
"ctl00$ContentPlaceHolder1$ddlReporte": "362,325",
"ctl00$ContentPlaceHolder1$ddlPeriodicidad": "D",
"ctl00$ContentPlaceHolder1$ddlSistema": "SIN",
"ctl00$ContentPlaceHolder1$txtPeriodo": period_str,
"ctl00$ContentPlaceHolder1$hdfStartDateSelected": date_str,
"ctl00$ContentPlaceHolder1$hdfEndDateSelected": date_str,
"ctl00$ContentPlaceHolder1$btnDescargarZIP": "Descargar ZIP",
"__VIEWSTATE": state["__VIEWSTATE"],
"__VIEWSTATEGENERATOR": state["__VIEWSTATEGENERATOR"],
"__VIEWSTATEENCRYPTED": state["__VIEWSTATEENCRYPTED"],
"__EVENTVALIDATION": state["__EVENTVALIDATION"],
"__EVENTTARGET": "",
"__EVENTARGUMENT": "",
}

r = session.post(URL, data=payload, headers=HEADERS)
r.raise_for_status()

size = len(r.content)
print(f"{date_str} | {size} bytes")

if size < 10000:
print(f"Skipping {date_str}: file not published or response too small")
return False

raw_dir.mkdir(parents=True, exist_ok=True)
tmp_dir.mkdir(parents=True, exist_ok=True)

zip_path = raw_zip_path(date, raw_dir)

with open(zip_path, "wb") as f:
f.write(r.content)

with zipfile.ZipFile(zip_path, "r") as z:
z.extractall(tmp_dir)

return True


def backfill_missing(start_date: datetime, end_date: datetime, base_dir: Path) -> None:
raw_dir = base_dir / "raw"
tmp_dir = base_dir / "tmp"

current = start_date
while current <= end_date:
zip_path = raw_zip_path(current, raw_dir)
if zip_path.exists():
print(f"Already have {current.strftime('%Y-%m-%d')}, skipping")
else:
try:
ok = download_and_extract(current, raw_dir, tmp_dir)
if not ok:
print(f"Stopping at {current.strftime('%Y-%m-%d')}")
break
except Exception as e:
print(f"Error on {current.strftime('%Y-%m-%d')}: {e}")
break
current += timedelta(days=1)


def run_execution_date(execution_date: datetime, base_dir: Path) -> bool:
raw_dir = base_dir / "raw"
tmp_dir = base_dir / "tmp"
target_date = target_date_for_execution(execution_date)

zip_path = raw_zip_path(target_date, raw_dir)
if zip_path.exists():
print(f"Already have {target_date.strftime('%Y-%m-%d')}, skipping")
return True

return download_and_extract(target_date, raw_dir, tmp_dir)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--execution-date", default=None)
parser.add_argument("--start-date", default=None)
parser.add_argument("--end-date", default=None)
parser.add_argument("--out", default=str(DEFAULT_BASE_DIR))
return parser.parse_args()


def main() -> None:
args = parse_args()
base_dir = Path(args.out).resolve()

if args.start_date:
start_date = datetime.strptime(args.start_date, "%Y-%m-%d")
if args.end_date:
end_date = datetime.strptime(args.end_date, "%Y-%m-%d")
elif args.execution_date:
end_date = target_date_for_execution(
datetime.strptime(args.execution_date, "%Y-%m-%d")
)
else:
end_date = datetime.today()
backfill_missing(start_date=start_date, end_date=end_date, base_dir=base_dir)
return

if args.execution_date:
execution_date = datetime.strptime(args.execution_date, "%Y-%m-%d")
ok = run_execution_date(execution_date=execution_date, base_dir=base_dir)
if not ok:
print("No new CENACE publication detected; stopping cleanly")
return

raise ValueError("Provide either --start-date or --execution-date")


if __name__ == "__main__":
main()
50 changes: 50 additions & 0 deletions src/data/cenace/modal_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import annotations

import modal

CENACE_DATA_ROOT = "/s3-bucket/v0.1.0/cenace"

app = modal.App(name="timecopilot-cenace-data")
image = (
modal.Image.debian_slim(python_version="3.11")
.pip_install("uv")
.add_local_file("pyproject.toml", "/root/pyproject.toml", copy=True)
.add_local_file(".python-version", "/root/.python-version", copy=True)
.add_local_file("uv.lock", "/root/uv.lock", copy=True)
.workdir("/root")
.run_commands("uv pip install . --system --compile-bytecode")
)

secret = modal.Secret.from_name(
"aws-secret",
required_keys=["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"],
)

volume = {
"/s3-bucket": modal.CloudBucketMount(
bucket_name="impermanent-benchmark",
secret=secret,
)
}


@app.function(
image=image,
volumes=volume,
timeout=60 * 15,
)
def update_cenace_execution_date(execution_date: str) -> int:
import os
from datetime import datetime

os.environ["CENACE_DATA_ROOT"] = CENACE_DATA_ROOT

from src.data.cenace.pipeline import update_execution_date

return update_execution_date(datetime.fromisoformat(execution_date))


@app.local_entrypoint()
def update(execution_date: str):
n_written = update_cenace_execution_date.remote(execution_date)
print(f"Done. Wrote {n_written} CENACE daily partitions.")
Loading
Loading