Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_create_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import json
import logging
import resource
import sys
import time

from ahl.mongo import NativeMongoose

logging.getLogger("man.vault.client").setLevel(logging.WARNING)
logging.getLogger("man.secrets.api").setLevel(logging.WARNING)


def main():
cols = int(sys.argv[1])

lib = NativeMongoose("mktdatad").get_library("pmarkovski.columns_stats", api="v2")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This push replaces self-contained LMDB with ahl.mongo.NativeMongoose and hardcodes a user-specific library namespace pmarkovski.columns_stats. Two concrete problems:

  1. The benchmark is now unrunnable outside Man Group (and the PR description still describes the LMDB flow / old script names / old CLI args — please update).
  2. The library/symbol pair is hardcoded identically in all three scripts. Two engineers running this concurrently would clobber each other's data on the shared mktdatad instance.

Please parametrise connection/library/symbol via argv or env vars (e.g. ARCTICDB_BENCH_LIBRARY, ARCTICDB_BENCH_SYMBOL) and pass them from the orchestrator, or at least define LIBRARY / SYMBOL constants in a shared module rather than triplicating the literals.

nvs = lib._nvs
column_stats_spec = {f"col_{i}": {"MINMAX"} for i in range(cols)}

start = time.time()
nvs.create_column_stats("test_symbol", column_stats_spec)
end = time.time()

peak_rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss

nvs.drop_column_stats("test_symbol")

print(json.dumps({
"elapsed_seconds": end - start,
"peak_rss_mb": peak_rss_mb / 1024,
}))


if __name__ == "__main__":
main()
114 changes: 114 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import json
import statistics
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path

from ahl.mongo import NativeMongoose


WARMUP_RUNS = 2
RUNS = 10
WRITE_SYMBOL_SCRIPT = Path(__file__).parent / "col_stats_bench_write_symbol.py"
CREATE_STATS_SCRIPT = Path(__file__).parent / "col_stats_bench_create_stats.py"

SCENARIOS = [
(10, 10),
(1_000, 1_000),
(100_000, 1_000),
(100_000, 10_000),
(1_000_000, 1_000),
(1_000_000, 5_000),
(10_000_000, 1_000),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This push reinstates the massive-scenario block that the previous review summary tracked as resolved. Be aware of what this implies for the worker:

  • (100_000, 10_000): a single 100K × 10K float64 chunk is ~8 GB per chunk in col_stats_bench_write_symbol.py. That will OOM most dev machines.
  • (10_000_000, 1_000): ~80 GB total writes across 100 chunks of ~800 MB; many hours against mktdatad.
  • (1_000_000, 5_000): ~40 GB total, 4 GB per chunk.

If these are intentional targets against the shared Man Group mongo, fine — but (a) call this out in the PR description so reviewers know the expected wall-clock and storage cost, and (b) consider gating the heavy scenarios behind a --full flag so a casual run doesn't accidentally write tens of GB to mktdatad under pmarkovski.columns_stats.

]

@dataclass
class Result:
rows: int = 0
cols: int = 0
symbol_write_time: float = 0.0
stats_create_times: list = field(default_factory=list)
stats_rss_use: list = field(default_factory=list)


results = [Result() for _ in SCENARIOS]


def run_subprocess(script, args, label):
try:
completed = subprocess.run(
[sys.executable, str(script), *map(str, args)],
stdout=subprocess.PIPE, stderr=sys.stderr, text=True, check=True,
)
return json.loads(completed.stdout)
except subprocess.CalledProcessError as e:
killed_by_signal = e.returncode < 0
reason = f"killed by signal {-e.returncode}" if killed_by_signal else f"exit code {e.returncode}"
raise RuntimeError(f"[{label}] subprocess failed ({reason})") from None


def measure(scenario, index):
rows, cols = scenario
results[index].rows = rows
results[index].cols = cols

print(f" [write_symbol] {rows}x{cols}", file=sys.stderr)
results[index].symbol_write_time = run_subprocess(
WRITE_SYMBOL_SCRIPT, [rows, cols], "write_symbol"
)["elapsed_seconds"]

for i in range(1, WARMUP_RUNS + 1):
print(f" [create_stats] warmup {i}/{WARMUP_RUNS}", file=sys.stderr)
run_subprocess(CREATE_STATS_SCRIPT, [cols], "create_stats")

for i in range(1, RUNS + 1):
print(f" [create_stats] run {i}/{RUNS}", file=sys.stderr)
r = run_subprocess(CREATE_STATS_SCRIPT, [cols], "create_stats")

results[index].stats_create_times.append(r["elapsed_seconds"])
results[index].stats_rss_use.append(r["peak_rss_mb"])

cleanup()


def print_results():
cw = 14
header = (
f"{'rows':>12} {'cols':>8}"
f" {'write_s':>{cw}}"
f" {'time_mean':>{cw}} {'time_median':>{cw}} {'time_max':>{cw}}"
f" {'rss_mean_mb':>{cw}} {'rss_median_mb':>{cw}} {'rss_max_mb':>{cw}}"
)
print()
print(header)
print("-" * len(header))

for r in results:
t = r.stats_create_times
m = r.stats_rss_use
print(
f"{r.rows:>12,} {r.cols:>8,}"
f" {r.symbol_write_time:>{cw}.2f}"
f" {statistics.mean(t):>{cw}.2f} {statistics.median(t):>{cw}.2f} {max(t):>{cw}.2f}"
f" {statistics.mean(m):>{cw}.1f} {statistics.median(m):>{cw}.1f} {max(m):>{cw}.1f}"
)


def cleanup():
lib = NativeMongoose("mktdatad").get_library("pmarkovski.columns_stats", api="v2")
try:
lib.delete("test_symbol")
except Exception:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bare except Exception: pass swallows real failures here (mongo auth, connection drops, permission errors, library-not-found). If cleanup silently fails, the next run sees a stale symbol and the measurements are garbage. At minimum log it:

Suggested change
except Exception:
try:
lib.delete("test_symbol")
except Exception as exc:
print(f"cleanup: lib.delete failed: {exc}", file=sys.stderr)

Also note that this is called from if __name__ == "__main__": before any scenario runs — if mongo is unreachable at startup the script will only fail later on the first write/create_stats subprocess, with no obvious clue that the issue is auth/connectivity.

pass


if __name__ == "__main__":
cleanup()
try:
for i, scenario in enumerate(SCENARIOS):
print(f"\n=== scenario {scenario[0]}x{scenario[1]} ===", file=sys.stderr)
measure(scenario, i)
finally:
cleanup()
print_results()
54 changes: 54 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_write_symbol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import json
import logging
import resource
import sys
import time

import numpy as np
import pandas as pd
from ahl.mongo import NativeMongoose

logging.getLogger("man.vault.client").setLevel(logging.WARNING)
logging.getLogger("man.secrets.api").setLevel(logging.WARNING)


CHUNK_ROWS = 100_000

def main():
rows, cols = int(sys.argv[1]), int(sys.argv[2])
column_names = [f"col_{i}" for i in range(cols)]

lib = NativeMongoose("mktdatad").get_library("pmarkovski.columns_stats", api="v2")

total_elapsed = 0.0

for chunk_start in range(0, rows, CHUNK_ROWS):
chunk_row_count = min(CHUNK_ROWS, rows - chunk_start)

chunk = pd.DataFrame(
np.random.rand(chunk_row_count, cols).astype(np.float64),
columns=column_names,
)

chunk_mb = chunk.memory_usage(deep=True).sum() / 1024 / 1024
print(f" chunk [{chunk_start}:{chunk_start + chunk_row_count}] {chunk.shape} {chunk_mb:.1f} MB", file=sys.stderr, flush=True)

start_time = time.time()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: start_time is now reassigned on every iteration of the loop, so the post-loop elapsed_seconds = time.time() - start_time (line 40) only measures the last chunk's write/append — not the total write time. With the active scenarios (rows ≤ 1000 vs CHUNK_ROWS = 100_000) there is only ever one chunk, so the bug is silent today, but it will silently under-report as soon as a scenario exceeds 100k rows (and that's exactly when accurate write timing matters most).

If the intent of this move was to exclude np.random.rand(...) from the measurement, accumulate per chunk instead:

total_elapsed = 0.0
for chunk_start in range(0, rows, CHUNK_ROWS):
    chunk_row_count = min(CHUNK_ROWS, rows - chunk_start)
    chunk = pd.DataFrame(
        np.random.rand(chunk_row_count, cols).astype(np.float64),
        columns=column_names,
    )
    start_time = time.time()
    if chunk_start == 0:
        lib.write(SYMBOL_NAME, chunk)
    else:
        lib.append(SYMBOL_NAME, chunk)
    total_elapsed += time.time() - start_time

elapsed_seconds = total_elapsed

Also: line 27 has trailing whitespace and lines 24/32 add stray blank lines — these will likely be flagged by make lint.


if chunk_start == 0:
lib.write("test_symbol", chunk)
else:
lib.append("test_symbol", chunk)

total_elapsed += (time.time() - start_time)

peak_rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 # ru_maxrss is KB on Linux

print(json.dumps({
"elapsed_seconds": total_elapsed,
"peak_rss_mb": peak_rss_mb,
}))


if __name__ == "__main__":
main()
Loading