Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions AGENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Columns: `workflow_id`, `workflow_name`, `stages_state` (JSON), `status`, `creat
* **Datetime:** All timestamps use `datetime.now(timezone.utc)` (timezone-aware). `datetime.utcnow()` is deprecated in Python 3.12+ and must not be re-introduced.
* **Log path propagation pattern:** When an executor knows the log path at submit time, it writes it to `task_spec.metadata["log_path"]`. The runner reads this after `submit_with_retry()` and passes it to `db.update_status(..., log_path=...)`. The runner then retrieves `record.log_path` and passes it as `executor.logs(remote_id, log_path=record.log_path)`. All `logs()` implementations accept the optional `log_path` kwarg.
* **set_e passthrough:** Tasks that need `set -x` without `set -e` (e.g., retry loops) set `metadata["set_e"] = False`. `SlurmExecutor.submit()` reads this and passes it to `generate_sbatch_script(set_e=...)`. The default is `True` (preserving `set -ex` for all existing tasks).
* **Workflow OmegaConf overrides:** `workflow run` resolves `${params.X}` interpolations via OmegaConf. Merge order: YAML base → `--from-job` extracted params → CLI trailing overrides. The `_PARAM_MAPPING` dict in `WorkflowRunner.extract_workflow_params()` maps task-level param keys to workflow-level dotlist keys.
* **Workflow OmegaConf overrides:** `workflow run` resolves `${params.X}` interpolations via OmegaConf. Merge order: YAML base → `--from-job` extracted params → CLI trailing overrides. From-job params use `OmegaConf.merge()` (dict-only keys), while CLI overrides use `OmegaConf.update()` per-key to correctly handle list-indexed paths like `stages.0.params.X`. The `_PARAM_MAPPING` dict in `WorkflowRunner.extract_workflow_params()` maps task-level param keys to workflow-level dotlist keys.
* **Workflow placeholder validation:** Required params use `<REQUIRED: description>` markers. `WorkflowRunner._validate_no_placeholders()` matches `^<REQUIRED(?::\s*.*?)?>$` and raises `ValueError` listing all unfilled fields before any submission.
* **Workflow detach pattern:** `run_detached()` validates and creates the DB record synchronously, then forks via `subprocess.Popen([sys.executable, "-m", "devrun.workflow", "--state-file", ...])` with `start_new_session=True`. The child process drives the heartbeat loop on the pre-created record.

Expand Down Expand Up @@ -125,7 +125,7 @@ python -m pytest tests/ -v

### Test Coverage

- **713 tests passing**, **10 skipped** (infrastructure-dependent: require real SSH/Slurm connectivity)
- **728 tests passing**, **10 skipped** (infrastructure-dependent: require real SSH/Slurm connectivity)
- Unit tests for all major components (models, registry, database, router, runner, tasks, executors, workflow engine)
- Integration tests between modules
- End-to-end workflow tests
Expand Down
180 changes: 162 additions & 18 deletions devrun/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import json
import logging
import sys
from pathlib import Path
from typing import Optional

Expand Down Expand Up @@ -446,54 +445,199 @@ def fetch(
app.add_typer(workflow_app, name="workflow")


def _show_workflow_help(target: str) -> None:
"""Show help for a specific workflow based on its configuration."""
from devrun.runner import load_merged_config
from rich.panel import Panel
from rich.text import Text

try:
raw = load_merged_config(target)
except FileNotFoundError:
console.print(f"[red]Error:[/red] No config found for workflow '{target}'.")
console.print("Ensure the workflow config exists in one of the config search directories.")
raise typer.Exit(code=1)
except Exception as e:
console.print(f"[red]Failed to load configuration for '{target}':[/red] {e}")
raise typer.Exit(code=1)

workflow_name = raw.get("workflow", target)

console.print(Panel(f"Workflow: [bold cyan]{workflow_name}[/bold cyan] (config: {target})", expand=False))
console.print()

# Workflow-level params
params = raw.get("params", {})
if params:
param_table = Table(title="Workflow Parameters", show_edge=False, title_justify="left", header_style="bold cyan")
param_table.add_column("Override")
param_table.add_column("Default Value")

for k, v in params.items():
val_str = str(v)
if val_str.startswith("<") and val_str.endswith(">"):
val_str = f"[yellow]{val_str}[/yellow]"
param_table.add_row(f"params.[bold]{k}[/bold]", val_str)

console.print(param_table)
console.print()

# Stages
stages = raw.get("stages", [])
if stages:
stage_table = Table(title="Stages", show_edge=False, title_justify="left", header_style="bold cyan")
stage_table.add_column("Name")
stage_table.add_column("Task", style="cyan")
stage_table.add_column("Executor", style="green")
stage_table.add_column("Depends On", style="dim")

for s in stages:
deps = s.get("depends_on", None)
if isinstance(deps, list):
deps_str = ", ".join(deps)
elif deps:
deps_str = str(deps)
else:
deps_str = "—"
stage_table.add_row(s.get("name", "?"), s.get("task", "?"), s.get("executor", "?"), deps_str)

console.print(stage_table)
console.print()

# Usage example
console.print("[dim]Usage Example:[/dim]")
example_cmd = Text("devrun workflow run ", style="bold")
example_cmd.append(target, style="bold cyan")
if params:
first_param = next(iter(params.keys()))
example_cmd.append(f" params.{first_param}=value", style="green")
console.print(example_cmd)


@workflow_app.command(
"run",
context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
context_settings={"allow_extra_args": True, "ignore_unknown_options": True, "help_option_names": []},
)
def workflow_run(
ctx: typer.Context,
target: str = typer.Argument(..., help="Workflow config path, name, or name/variation"),
target: Optional[str] = typer.Argument(None, help="Workflow config path, name, or name/variation"),
dry_run: bool = typer.Option(False, "--dry-run", help="Show execution plan without submitting"),
start_after: Optional[str] = typer.Option(None, "--start-after", help="Skip this stage and its dependencies, start from the next"),
from_job: Optional[str] = typer.Option(None, "--from-job", help="Extract workflow params from an existing job"),
detach: bool = typer.Option(False, "--detach", "-d", help="Run workflow in background, return immediately"),
verbose: bool = typer.Option(False, "--verbose", "-v"),
help: bool = typer.Option(False, "--help", "-h", help="Show this message and exit."),
) -> None:
"""Run a multi-stage workflow from a YAML config.

TARGET can be a file path, a workflow name (e.g. swe_bench_workflow),
or name/variation. Configs are resolved through the same hierarchical
search path as task configs. Trailing arguments are OmegaConf overrides.
"""
if help:
if not target:
console.print(ctx.get_help())
raise typer.Exit()
else:
_show_workflow_help(target)
raise typer.Exit()

if not target:
console.print("[red]Missing argument 'TARGET'.[/red]\n")
console.print(ctx.get_help())
raise typer.Exit(code=2)

_setup_logging(verbose)

from devrun.runner import load_merged_config
from omegaconf import OmegaConf
from devrun.runner import find_configs
import devrun.keystore # noqa: F401 — registers ${key:…} resolver
import devrun.presets # noqa: F401 — registers ${preset:…} resolver
from devrun.models import WorkflowConfig
from devrun.workflow import WorkflowRunner

overrides = ctx.args
if overrides:
console.print(f"[dim]Using overrides: {overrides}[/dim]")
runner = WorkflowRunner()
task_name: Optional[str] = None

# Merge order: YAML base (hierarchical) → from-job params → CLI overrides (highest priority)
try:
raw = load_merged_config(target, overrides=overrides)
config_paths = find_configs(target)
except FileNotFoundError:
console.print(f"[red]Error:[/red] Config not found for '{target}'.")
console.print("Ensure the workflow config exists in one of the config search directories.")
raise typer.Exit(code=1)

try:
cfg = WorkflowConfig(**raw)
raw_cfg = OmegaConf.load(config_paths[0])
for extra_path in config_paths[1:]:
raw_cfg = OmegaConf.merge(raw_cfg, OmegaConf.load(extra_path))

if from_job:
try:
job_params, task_name = runner.extract_workflow_params(from_job)
except ValueError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(code=1)
if job_params:
console.print(f"[dim]From job {from_job}: {list(job_params.keys())}[/dim]")
job_overrides = [f"{k}={v}" for k, v in job_params.items()]
raw_cfg = OmegaConf.merge(raw_cfg, OmegaConf.from_dotlist(job_overrides))

if ctx.args:
console.print(f"[dim]Using overrides: {ctx.args}[/dim]")
for arg in ctx.args:
key, _, value = arg.partition("=")
if key and _ == "=":
# Parse value type (e.g. "30" → int, "true" → bool)
# so numeric/boolean overrides aren't stored as strings.
parsed = yaml.safe_load(value)
OmegaConf.update(raw_cfg, key, parsed)
else:
console.print(f"[yellow]Warning:[/yellow] ignoring malformed override: {arg}")

resolved = OmegaConf.to_container(raw_cfg, resolve=True)
except typer.Exit:
raise
except Exception as exc:
console.print(f"[red]Error parsing workflow config:[/red] {exc}")
console.print(f"[red]Error loading/resolving workflow config:[/red] {exc}")
raise typer.Exit(code=1)

from devrun.workflow import WorkflowRunner
try:
cfg = WorkflowConfig(**resolved)
except Exception as exc:
console.print(f"[red]Error parsing workflow config:[/red] {exc}")
raise typer.Exit(code=1)

runner = WorkflowRunner()
result = runner.run(cfg, dry_run=dry_run)
# Auto-detect stage to skip when --from-job is used without --start-after
if from_job and not start_after and task_name is not None:
detected_stage = runner.detect_stage_for_task(task_name, cfg)
if detected_stage:
start_after = detected_stage
console.print(
f"[dim]Auto-detected: skipping stage '{detected_stage}' "
f"based on job task type '{task_name}'[/dim]"
)

if dry_run:
console.print(result)
console.print("[yellow]Dry-run complete. No jobs were submitted.[/yellow]")
else:
console.print(f"[green]Workflow completed:[/green] {result}")
try:
if detach:
if dry_run:
console.print("[red]Error:[/red] --detach and --dry-run cannot be used together.")
raise typer.Exit(code=1)
wf_id = runner.run_detached(cfg, start_after=start_after)
console.print(
f"[green]Workflow {wf_id} started in background.[/green]\n"
f"Use [bold]devrun workflow status {wf_id}[/bold] to monitor."
)
else:
result = runner.run(cfg, dry_run=dry_run, start_after=start_after)
if dry_run:
console.print(result)
console.print("[yellow]Dry-run complete. No jobs were submitted.[/yellow]")
else:
console.print(f"[green]Workflow completed:[/green] {result}")
except ValueError as exc:
console.print(f"[red]Error:[/red] {exc}")
raise typer.Exit(code=1)


@workflow_app.command("status")
Expand Down
18 changes: 15 additions & 3 deletions devrun/configs/swe_bench_workflow/default.yaml
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
# configs/swe_bench_workflow/default.yaml
# Full SWE-bench pipeline: inference → collect → evaluate
#
# Usage:
# devrun workflow run devrun/configs/swe_bench_workflow/default.yaml \
# params.model_name=openai/gpt-4 \
# params.dataset=/data/swebench/SWE-bench_Lite \
# params.working_dir=/home/user/project
#
# Run collect+eval from existing inference:
# devrun workflow run devrun/configs/swe_bench_workflow/default.yaml \
# --from-job <job_id>
#
# See docs/swe-bench-workflow-guide.md for full documentation.
workflow: swe_bench

params:
model_name: "" # REQUIRED: model identifier
dataset: "" # REQUIRED: absolute path to dataset
model_name: "<REQUIRED: model identifier (e.g. openai/gpt-4)>"
dataset: "<REQUIRED: absolute path to SWE-bench dataset>"
split: test
run_name: "run1"
output_dir: "logs/run1"
working_dir: "" # REQUIRED: remote project root
working_dir: "<REQUIRED: remote project root directory>"

stages:
- name: inference
Expand Down
Loading