Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_create_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import json
import resource
import sys
import time

from arcticdb import Arctic

LMDB_PATH = "/tmp/arcticdb_bench_col_stats"
SYMBOL_NAME = "test_symbol"


def main():
cols = int(sys.argv[1])

ac = Arctic(f"lmdb://{LMDB_PATH}")
lib = ac.get_library("bench")
nvs = lib._nvs
column_stats_spec = {f"col_{i}": {"MINMAX"} for i in range(cols)}

start = time.time()
nvs.create_column_stats(SYMBOL_NAME, column_stats_spec)
nvs.drop_column_stats(SYMBOL_NAME)

print(json.dumps({
"elapsed_seconds": time.time() - start,
"peak_rss_mb": resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024,
}))


if __name__ == "__main__":
main()
119 changes: 119 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import json
import shutil
import statistics
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path

LMDB_PATH = "/tmp/arcticdb_bench_col_stats"
WARMUP_RUNS = 2
RUNS = 10
WRITE_SYMBOL_SCRIPT = Path(__file__).parent / "bench_write_symbol.py"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical: these paths don't match the actual filenames. The worker scripts in this PR are col_stats_bench_write_symbol.py and col_stats_bench_create_stats.py, so every subprocess.run call will fail with FileNotFoundError and the orchestrator can't run at all. Please update the constants:

Suggested change
RUNS = 10
WRITE_SYMBOL_SCRIPT = Path(__file__).parent / "bench_write_symbol.py"
WRITE_SYMBOL_SCRIPT = Path(__file__).parent / "col_stats_bench_write_symbol.py"
CREATE_STATS_SCRIPT = Path(__file__).parent / "col_stats_bench_create_stats.py"

CREATE_STATS_SCRIPT = Path(__file__).parent / "bench_col_stats.py"

SCENARIOS = [
(10, 10),
(500,500),
(400,400),
(500,500),
(1_000, 1_000),
(700,700),
(900,900),
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(500, 500) is listed twice and the list isn't size-ordered, which makes the output table harder to read. The whitespace style is also mixed ((500,500) vs (1_000, 1_000)). Please dedupe, sort by total cell count, and pick one style — e.g.:

Suggested change
SCENARIOS = [
(10, 10),
(500,500),
(400,400),
(500,500),
(1_000, 1_000),
(700,700),
(900,900),
]
SCENARIOS = [
(10, 10),
(400, 400),
(500, 500),
(700, 700),
(900, 900),
(1_000, 1_000),
]


# SCENARIOS = [
# (10, 10),
# (1_000, 1_000),
# (100_000, 1_000),
# (100_000, 10_000),
# (1_000_000, 1_000),
# (1_000_000, 10_000),
# (10_000_000, 1_000),
# ]

@dataclass
class Result:
rows: int = 0
cols: int = 0
symbol_write_time: float = 0.0
stats_create_times: list = field(default_factory=list)
stats_rss_use: list = field(default_factory=list)


results = [Result() for _ in SCENARIOS]


def run_subprocess(script, args, label):
try:
completed = subprocess.run(
[sys.executable, str(script), *map(str, args)],
capture_output=True, text=True, check=True,
)
return json.loads(completed.stdout)
except subprocess.CalledProcessError as e:
shutil.rmtree(LMDB_PATH, ignore_errors=True)
killed_by_signal = e.returncode < 0
reason = f"killed by signal {-e.returncode}" if killed_by_signal else f"exit code {e.returncode}"
raise RuntimeError(f"[{label}] subprocess failed ({reason}):\n{e.stderr}") from None


def measure(scenario, index):
rows, cols = scenario
results[index].rows = rows
results[index].cols = cols

print(f" [write_symbol] {rows}x{cols}", file=sys.stderr)
results[index].symbol_write_time = run_subprocess(
WRITE_SYMBOL_SCRIPT, [rows, cols], "write_symbol"
)["elapsed_seconds"]

for i in range(1, WARMUP_RUNS + 1):
print(f" [create_stats] warmup {i}/{WARMUP_RUNS}", file=sys.stderr)
run_subprocess(CREATE_STATS_SCRIPT, [cols], "create_stats")

for i in range(1, RUNS + 1):
print(f" [create_stats] run {i}/{RUNS}", file=sys.stderr)
r = run_subprocess(CREATE_STATS_SCRIPT, [cols], "create_stats")
results[index].stats_create_times.append(r["elapsed_seconds"])
results[index].stats_rss_use.append(r["peak_rss_mb"])

shutil.rmtree(LMDB_PATH, ignore_errors=True)


def print_results():
cw = 14
header = (
f"{'rows':>12} {'cols':>8}"
f" {'write_s':>{cw}}"
f" {'time_mean':>{cw}} {'time_median':>{cw}} {'time_max':>{cw}}"
f" {'rss_mean_mb':>{cw}} {'rss_median_mb':>{cw}} {'rss_max_mb':>{cw}}"
)
print()
print(header)
print("-" * len(header))

for r in results:
t = r.stats_create_times
m = r.stats_rss_use
print(
f"{r.rows:>12,} {r.cols:>8,}"
f" {r.symbol_write_time:>{cw}.2f}"
f" {statistics.mean(t):>{cw}.2f} {statistics.median(t):>{cw}.2f} {max(t):>{cw}.2f}"
f" {statistics.mean(m):>{cw}.1f} {statistics.median(m):>{cw}.1f} {max(m):>{cw}.1f}"
)


def cleanup():
shutil.rmtree(LMDB_PATH, ignore_errors=True)


if __name__ == "__main__":
cleanup()
try:
for i, scenario in enumerate(SCENARIOS):
print(f"\n=== scenario {scenario[0]}x{scenario[1]} ===", file=sys.stderr)
measure(scenario, i)
finally:
cleanup()
print_results()
44 changes: 44 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_write_symbol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import json
import resource
import sys
import time

import numpy as np
import pandas as pd
from arcticdb import Arctic

LMDB_PATH = "/tmp/arcticdb_bench_col_stats"
SYMBOL_NAME = "test_symbol"
CHUNK_ROWS = 100_000


def main():
rows, cols = int(sys.argv[1]), int(sys.argv[2])
column_names = [f"col_{i}" for i in range(cols)]

ac = Arctic(f"lmdb://{LMDB_PATH}")
if not ac.has_library("bench"):
ac.create_library("bench")
lib = ac.get_library("bench")

start_time = time.time()

for chunk_start in range(0, rows, CHUNK_ROWS):
chunk_row_count = min(CHUNK_ROWS, rows - chunk_start)
chunk = pd.DataFrame(
np.random.rand(chunk_row_count, cols).astype(np.float64),
columns=column_names,
)
if chunk_start == 0:
lib.write(SYMBOL_NAME, chunk)
else:
lib.append(SYMBOL_NAME, chunk)

elapsed_seconds = time.time() - start_time
peak_rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 # ru_maxrss is KB on Linux

print(json.dumps({"elapsed_seconds": elapsed_seconds, "peak_rss_mb": peak_rss_mb}))


if __name__ == "__main__":
main()
Loading