Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rdagent/app/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import typer

from rdagent.app.crypto_signal_writer import btcusdt_15m_signal_writer_v1
from rdagent.app.data_science.loop import main as data_science
from rdagent.app.general_model.general_model import (
extract_models_and_implement as general_model,
Expand Down Expand Up @@ -73,6 +74,7 @@ def ds_user_interact(port=19900):
app.command(name="fin_model")(fin_model)
app.command(name="fin_quant")(fin_quant)
app.command(name="fin_factor_report")(fin_factor_report)
app.command(name="btcusdt_15m_signal_writer_v1")(btcusdt_15m_signal_writer_v1)
app.command(name="general_model")(general_model)
app.command(name="data_science")(data_science)
app.command(name="grade_summary")(grade_summary)
Expand Down
34 changes: 34 additions & 0 deletions rdagent/app/crypto_signal_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from pathlib import Path

import typer

from rdagent.crypto.btcusdt_15m_signal_writer_v1 import (
BTCUSDT15mSignalWriterV1Config,
generate_btcusdt_15m_signals_csv_v1,
)


def btcusdt_15m_signal_writer_v1(
candles_csv_path: str = typer.Option(..., "--input", help="Input candles CSV with 'timestamp' and 'close'"),
output_csv_path: str = typer.Option(..., "--output", help="Output CSV path for signals"),
symbol: str = typer.Option("BTCUSDT", help="Trading symbol"),
interval: str = typer.Option("15m", help="Bar interval"),
fast_span: int = typer.Option(12, help="Fast EMA span"),
slow_span: int = typer.Option(26, help="Slow EMA span"),
) -> str:
"""Write BTCUSDT 15m crossover signals (v1) to CSV."""

cfg = BTCUSDT15mSignalWriterV1Config(
symbol=symbol,
interval=interval,
fast_span=fast_span,
slow_span=slow_span,
)
out = generate_btcusdt_15m_signals_csv_v1(
candles_csv_path=Path(candles_csv_path),
output_csv_path=Path(output_csv_path),
config=cfg,
)
return str(out)
5 changes: 5 additions & 0 deletions rdagent/crypto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""Crypto helpers (small, self-contained utilities)."""

from __future__ import annotations

__all__: list[str] = []
115 changes: 115 additions & 0 deletions rdagent/crypto/btcusdt_15m_signal_writer_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path

import pandas as pd

BTCUSDT_15M_SIGNAL_WRITER_VERSION = "v1"


@dataclass(frozen=True)
class BTCUSDT15mSignalWriterV1Config:
symbol: str = "BTCUSDT"
interval: str = "15m"
fast_span: int = 12
slow_span: int = 26


def _coerce_timestamp_to_utc(ts: pd.Series) -> pd.Series:
"""Coerce timestamps to timezone-aware UTC pandas Timestamps.

Accepts:
- datetime-like strings
- epoch seconds
- epoch milliseconds
"""

if pd.api.types.is_datetime64_any_dtype(ts):
out = pd.to_datetime(ts, utc=True)
else:
s = pd.to_numeric(ts, errors="coerce")
if s.isna().any():
raise ValueError("timestamp contains non-parseable values")
unit = "ms" if s.max() >= 1_000_000_000_000 else "s"
out = pd.to_datetime(s, unit=unit, utc=True)
if out.isna().any():
raise ValueError("timestamp contains NaT values after parsing")
return out


def compute_btcusdt_15m_signals_v1(
candles: pd.DataFrame,
*,
config: BTCUSDT15mSignalWriterV1Config | None = None,
) -> pd.DataFrame:
"""Compute BTCUSDT 15m crossover signals.

Input schema:
- timestamp: datetime-like, epoch seconds, or epoch milliseconds
- close: numeric

Output schema (CSV-ready):
- timestamp: UTC ISO timestamp
- symbol: trading pair (default BTCUSDT)
- interval: bar interval (default 15m)
- signal: 1 (bull cross), -1 (bear cross), 0 (no signal)
- version: writer version
"""

cfg = config or BTCUSDT15mSignalWriterV1Config()

if "timestamp" not in candles.columns or "close" not in candles.columns:
raise ValueError("candles must include 'timestamp' and 'close' columns")

df = candles[["timestamp", "close"]].copy()
df["timestamp"] = _coerce_timestamp_to_utc(df["timestamp"])
df["close"] = pd.to_numeric(df["close"], errors="coerce")
if df["close"].isna().any():
raise ValueError("close contains non-numeric values")

df = df.sort_values("timestamp").reset_index(drop=True)
if cfg.fast_span <= 0 or cfg.slow_span <= 0:
raise ValueError("fast_span and slow_span must be positive")
if cfg.fast_span >= cfg.slow_span:
raise ValueError("fast_span must be < slow_span")

ema_fast = df["close"].ewm(span=cfg.fast_span, adjust=False).mean()
ema_slow = df["close"].ewm(span=cfg.slow_span, adjust=False).mean()

bull = (ema_fast > ema_slow) & (ema_fast.shift(1) <= ema_slow.shift(1))
bear = (ema_fast < ema_slow) & (ema_fast.shift(1) >= ema_slow.shift(1))

signal = pd.Series(0, index=df.index, dtype="int64")
signal[bull] = 1
signal[bear] = -1

out = pd.DataFrame(
{
"timestamp": df["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ"),
"symbol": cfg.symbol,
"interval": cfg.interval,
"signal": signal,
"version": BTCUSDT_15M_SIGNAL_WRITER_VERSION,
}
)
return out


def generate_btcusdt_15m_signals_csv_v1(
*,
candles_csv_path: str | Path,
output_csv_path: str | Path,
config: BTCUSDT15mSignalWriterV1Config | None = None,
) -> Path:
"""Read candles from CSV, compute signals, and write a signal CSV."""

candles_csv_path = Path(candles_csv_path)
output_csv_path = Path(output_csv_path)

candles = pd.read_csv(candles_csv_path)
signals = compute_btcusdt_15m_signals_v1(candles, config=config)

output_csv_path.parent.mkdir(parents=True, exist_ok=True)
signals.to_csv(output_csv_path, index=False)
return output_csv_path
60 changes: 60 additions & 0 deletions test/utils/test_btcusdt_15m_signal_writer_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from pathlib import Path

import pandas as pd

from rdagent.crypto.btcusdt_15m_signal_writer_v1 import (
BTCUSDT15mSignalWriterV1Config,
compute_btcusdt_15m_signals_v1,
generate_btcusdt_15m_signals_csv_v1,
)


def _make_candles(epoch_unit: str) -> pd.DataFrame:
# Make a close series that forces a bull cross and then a bear cross.
close = [1] * 10 + [10] * 10 + [1] * 10
start = 1_700_000_000 # epoch seconds
ts = [start + i * 900 for i in range(len(close))] # 15m steps
if epoch_unit == "ms":
ts = [t * 1000 for t in ts]
return pd.DataFrame({"timestamp": ts, "close": close})


def test_compute_signals_seconds_epoch() -> None:
candles = _make_candles("s")
cfg = BTCUSDT15mSignalWriterV1Config(fast_span=2, slow_span=5)
out = compute_btcusdt_15m_signals_v1(candles, config=cfg)

assert list(out.columns) == ["timestamp", "symbol", "interval", "signal", "version"]
assert out["symbol"].nunique() == 1
assert out["interval"].nunique() == 1
assert out["version"].nunique() == 1
assert 1 in set(out["signal"].tolist())
assert -1 in set(out["signal"].tolist())


def test_compute_signals_milliseconds_epoch() -> None:
candles = _make_candles("ms")
cfg = BTCUSDT15mSignalWriterV1Config(fast_span=2, slow_span=5)
out = compute_btcusdt_15m_signals_v1(candles, config=cfg)
assert out["timestamp"].str.endswith("Z").all()


def test_generate_csv(tmp_path: Path) -> None:
candles = _make_candles("s")
candles_path = tmp_path / "candles.csv"
out_path = tmp_path / "signals.csv"
candles.to_csv(candles_path, index=False)

cfg = BTCUSDT15mSignalWriterV1Config(fast_span=2, slow_span=5)
wrote = generate_btcusdt_15m_signals_csv_v1(
candles_csv_path=candles_path,
output_csv_path=out_path,
config=cfg,
)
assert wrote == out_path
assert out_path.exists()

signals = pd.read_csv(out_path)
assert list(signals.columns) == ["timestamp", "symbol", "interval", "signal", "version"]