Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_create_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import json
import logging
import resource
import sys
import time

from ahl.mongo import NativeMongoose

logging.getLogger("man.vault.client").setLevel(logging.WARNING)
logging.getLogger("man.secrets.api").setLevel(logging.WARNING)


def main():
cols = int(sys.argv[1])

lib = NativeMongoose("mktdatad").get_library("pmarkovski.columns_stats", api="v2")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This push replaces self-contained LMDB with ahl.mongo.NativeMongoose and hardcodes a user-specific library namespace pmarkovski.columns_stats. Two concrete problems:

  1. The benchmark is now unrunnable outside Man Group (and the PR description still describes the LMDB flow / old script names / old CLI args — please update).
  2. The library/symbol pair is hardcoded identically in all three scripts. Two engineers running this concurrently would clobber each other's data on the shared mktdatad instance.

Please parametrise connection/library/symbol via argv or env vars (e.g. ARCTICDB_BENCH_LIBRARY, ARCTICDB_BENCH_SYMBOL) and pass them from the orchestrator, or at least define LIBRARY / SYMBOL constants in a shared module rather than triplicating the literals.

nvs = lib._nvs
column_stats_spec = {f"col_{i}": {"MINMAX"} for i in range(cols)}

start = time.time()
nvs.create_column_stats("test_symbol", column_stats_spec)
end = time.time()

peak_rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss

nvs.drop_column_stats("test_symbol")

print(json.dumps({
"elapsed_seconds": end - start,
"peak_rss_mb": peak_rss_mb / 1024,
}))


if __name__ == "__main__":
main()
135 changes: 135 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import json
import statistics
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path

from ahl.mongo import NativeMongoose


WARMUP_RUNS = 2
RUNS = 10
WRITE_SYMBOL_SCRIPT = Path(__file__).parent / "col_stats_bench_write_symbol.py"
CREATE_STATS_SCRIPT = Path(__file__).parent / "col_stats_bench_create_stats.py"

BASE_ROWS = 100_000
BASE_COLS = 127


@dataclass
class Result:
rows: int = 0
cols: int = 0
symbol_write_time: float = 0.0
stats_create_times: list = field(default_factory=list)
stats_rss_use: list = field(default_factory=list)


results = []


def run_subprocess(script, args, label):
try:
completed = subprocess.run(
[sys.executable, str(script), *map(str, args)],
stdout=subprocess.PIPE, stderr=sys.stderr, text=True, check=True,
)
return json.loads(completed.stdout)
except subprocess.CalledProcessError as e:
raise MemoryError(f"[{label}] exited with code {e.returncode}") from None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise MemoryError(...) classifies every non-zero subprocess exit as an OOM — a mongo connection blip, a KeyError looking up the library, an unhandled AssertionError, a programmer bug in the worker, all become "MemoryError" here. That trips run_phase's except MemoryError and silently terminates the row/col walk, hiding the real failure.

It also discards the previous distinction between SIGKILL (negative returncode, the actual OOM-killer signature) and a regular exit code (almost always a bug or environment issue).

Either:

  • only re-classify as OOM when e.returncode < 0 (or specifically -signal.SIGKILL), and re-raise other failures as RuntimeError so they propagate up to the user, or
  • inspect the worker's stderr / dmesg for an OOM marker before deciding.

As written, a single mongo hiccup in the warmup of the first scenario will silently end the entire benchmark.



def measure(rows, cols):
print(f" [write_symbol] {rows}x{cols}", file=sys.stderr)
write_time = run_subprocess(
WRITE_SYMBOL_SCRIPT, [rows, cols], "write_symbol"
)["elapsed_seconds"]

result = Result(rows=rows, cols=cols, symbol_write_time=write_time)
results.append(result)

try:
for i in range(1, WARMUP_RUNS + 1):
print(f" [create_stats] warmup {i}/{WARMUP_RUNS}", file=sys.stderr)
run_subprocess(CREATE_STATS_SCRIPT, [cols], "create_stats")

for i in range(1, RUNS + 1):
print(f" [create_stats] run {i}/{RUNS}", file=sys.stderr)
r = run_subprocess(CREATE_STATS_SCRIPT, [cols], "create_stats")
result.stats_create_times.append(r["elapsed_seconds"])
result.stats_rss_use.append(r["peak_rss_mb"])
except MemoryError:
print(f" OOM during stats collection after {len(result.stats_create_times)} run(s), stopping phase", file=sys.stderr)
raise
finally:
cleanup()


def run_phase(scenario_gen):
for rows, cols in scenario_gen:
print(f"\n=== scenario {rows:,}x{cols:,} ===", file=sys.stderr)
try:
measure(rows, cols)
except MemoryError as e:
print(f" OOM: {e}, stopping phase", file=sys.stderr)
break


def row_scenarios():
rows = BASE_ROWS
while True:
yield rows, BASE_COLS
rows *= 10


def col_scenarios():
cols = BASE_COLS * 10
while True:
yield BASE_ROWS, cols
cols *= 10
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both row_scenarios() and col_scenarios() are unbounded generators that escalate by 10× indefinitely. The only stopping condition is the MemoryError raised inside run_subprocess — which, per the comment on line 40, currently fires for any non-zero exit, not just real OOMs.

Consequences:

  • On a sufficiently large machine, the row phase never terminates and col_scenarios() is never reached.
  • On any machine, a transient mongo/network blip terminates the phase before its actual memory ceiling is found.
  • The 10× step is also coarse: col_scenarios() jumps (100_000, 1_270) → (100_000, 12_700) → (100_000, 127_000); the second step is already ~10 GB of float64 in the writer (see col_stats_bench_write_symbol.py — it now materialises the whole frame in one np.random.rand call), so the col walk almost certainly OOMs on the second yield and you get one data point for the whole phase.

Please bound the walk explicitly (e.g. max iterations, or rows * cols cap) and consider a 2× step so the failure point is found with usable resolution.



def print_results():
cw = 20
header = (
f"{'rows':>12} {'cols':>8}"
f" {'write_s':>{cw}}"
f" {'col_stats_mean_s':>{cw}} {'col_stats_max_s':>{cw}}"
f" {'col_stats_mean_rss_mb':>{cw}} {'col_stats_max_rss_mb':>{cw}}"
)
print()
print(header)
print("-" * len(header))

for r in results:
if not r.stats_create_times:
continue
t = r.stats_create_times
m = r.stats_rss_use
print(
f"{r.rows:>12,} {r.cols:>8,}"
f" {r.symbol_write_time:>{cw}.2f}"
f" {statistics.mean(t):>{cw}.2f} {max(t):>{cw}.2f}"
f" {statistics.mean(m):>{cw}.1f} {max(m):>{cw}.1f}"
)


def cleanup():
lib = NativeMongoose("mktdatad").get_library("pmarkovski.columns_stats", api="v2")
try:
lib.delete("test_symbol")
except Exception:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bare except Exception: pass swallows real failures here (mongo auth, connection drops, permission errors, library-not-found). If cleanup silently fails, the next run sees a stale symbol and the measurements are garbage. At minimum log it:

Suggested change
except Exception:
try:
lib.delete("test_symbol")
except Exception as exc:
print(f"cleanup: lib.delete failed: {exc}", file=sys.stderr)

Also note that this is called from if __name__ == "__main__": before any scenario runs — if mongo is unreachable at startup the script will only fail later on the first write/create_stats subprocess, with no obvious clue that the issue is auth/connectivity.

pass


if __name__ == "__main__":
cleanup()
try:
run_phase(row_scenarios())
run_phase(col_scenarios())
except Exception as e:
print(f"Benchmark aborted unexpectedly: {e}", file=sys.stderr)
finally:
cleanup()
print_results()
39 changes: 39 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_write_symbol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import json
import logging
import resource
import sys
import time

import numpy as np
import pandas as pd
from ahl.mongo import NativeMongoose

logging.getLogger("man.vault.client").setLevel(logging.WARNING)
logging.getLogger("man.secrets.api").setLevel(logging.WARNING)


def main():
rows, cols = int(sys.argv[1]), int(sys.argv[2])
column_names = [f"col_{i}" for i in range(cols)]

lib = NativeMongoose("mktdatad").get_library("pmarkovski.columns_stats", api="v2")

chunk = pd.DataFrame(
np.random.rand(rows, cols).astype(np.float64),
columns=column_names,
)

start_time = time.time()
lib.write("test_symbol", chunk)
end_time = time.time()

peak_rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 # ru_maxrss is KB on Linux

print(json.dumps({
"elapsed_seconds": end_time - start_time,
"peak_rss_mb": peak_rss_mb,
}))


if __name__ == "__main__":
main()
Loading