Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_create_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import json
import resource
import sys
import time

from arcticdb import Arctic

LMDB_PATH = "/tmp/arcticdb_bench_col_stats"
SYMBOL_NAME = "test_symbol"


def main():
cols = int(sys.argv[1])

ac = Arctic(f"lmdb://{LMDB_PATH}")
lib = ac.get_library("bench")
nvs = lib._nvs
column_stats_spec = {f"col_{i}": {"MINMAX"} for i in range(cols)}

start = time.time()
nvs.create_column_stats(SYMBOL_NAME, column_stats_spec)
end = time.time()

nvs.drop_column_stats(SYMBOL_NAME)

print(json.dumps({
"elapsed_seconds": end - start,
"peak_rss_mb": resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024,
}))


if __name__ == "__main__":
main()
119 changes: 119 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import json
import shutil
import statistics
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path

LMDB_PATH = "/tmp/arcticdb_bench_col_stats"
WARMUP_RUNS = 2
RUNS = 10
WRITE_SYMBOL_SCRIPT = Path(__file__).parent / "bench_write_symbol.py"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical: these paths don't match the actual filenames. The worker scripts in this PR are col_stats_bench_write_symbol.py and col_stats_bench_create_stats.py, so every subprocess.run call will fail with FileNotFoundError and the orchestrator can't run at all. Please update the constants:

Suggested change
RUNS = 10
WRITE_SYMBOL_SCRIPT = Path(__file__).parent / "bench_write_symbol.py"
WRITE_SYMBOL_SCRIPT = Path(__file__).parent / "col_stats_bench_write_symbol.py"
CREATE_STATS_SCRIPT = Path(__file__).parent / "col_stats_bench_create_stats.py"

CREATE_STATS_SCRIPT = Path(__file__).parent / "bench_col_stats.py"

SCENARIOS = [
(10, 10),
(500,500),
(400,400),
(500,500),
(1_000, 1_000),
(700,700),
(900,900),
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(500, 500) is listed twice and the list isn't size-ordered, which makes the output table harder to read. The whitespace style is also mixed ((500,500) vs (1_000, 1_000)). Please dedupe, sort by total cell count, and pick one style — e.g.:

Suggested change
SCENARIOS = [
(10, 10),
(500,500),
(400,400),
(500,500),
(1_000, 1_000),
(700,700),
(900,900),
]
SCENARIOS = [
(10, 10),
(400, 400),
(500, 500),
(700, 700),
(900, 900),
(1_000, 1_000),
]


# SCENARIOS = [
# (10, 10),
# (1_000, 1_000),
# (100_000, 1_000),
# (100_000, 10_000),
# (1_000_000, 1_000),
# (1_000_000, 10_000),
# (10_000_000, 1_000),
# ]

@dataclass
class Result:
rows: int = 0
cols: int = 0
symbol_write_time: float = 0.0
stats_create_times: list = field(default_factory=list)
stats_rss_use: list = field(default_factory=list)


results = [Result() for _ in SCENARIOS]


def run_subprocess(script, args, label):
try:
completed = subprocess.run(
[sys.executable, str(script), *map(str, args)],
capture_output=True, text=True, check=True,
)
return json.loads(completed.stdout)
except subprocess.CalledProcessError as e:
shutil.rmtree(LMDB_PATH, ignore_errors=True)
killed_by_signal = e.returncode < 0
reason = f"killed by signal {-e.returncode}" if killed_by_signal else f"exit code {e.returncode}"
raise RuntimeError(f"[{label}] subprocess failed ({reason}):\n{e.stderr}") from None


def measure(scenario, index):
rows, cols = scenario
results[index].rows = rows
results[index].cols = cols

print(f" [write_symbol] {rows}x{cols}", file=sys.stderr)
results[index].symbol_write_time = run_subprocess(
WRITE_SYMBOL_SCRIPT, [rows, cols], "write_symbol"
)["elapsed_seconds"]

for i in range(1, WARMUP_RUNS + 1):
print(f" [create_stats] warmup {i}/{WARMUP_RUNS}", file=sys.stderr)
run_subprocess(CREATE_STATS_SCRIPT, [cols], "create_stats")

for i in range(1, RUNS + 1):
print(f" [create_stats] run {i}/{RUNS}", file=sys.stderr)
r = run_subprocess(CREATE_STATS_SCRIPT, [cols], "create_stats")
results[index].stats_create_times.append(r["elapsed_seconds"])
results[index].stats_rss_use.append(r["peak_rss_mb"])

shutil.rmtree(LMDB_PATH, ignore_errors=True)


def print_results():
cw = 14
header = (
f"{'rows':>12} {'cols':>8}"
f" {'write_s':>{cw}}"
f" {'time_mean':>{cw}} {'time_median':>{cw}} {'time_max':>{cw}}"
f" {'rss_mean_mb':>{cw}} {'rss_median_mb':>{cw}} {'rss_max_mb':>{cw}}"
)
print()
print(header)
print("-" * len(header))

for r in results:
t = r.stats_create_times
m = r.stats_rss_use
print(
f"{r.rows:>12,} {r.cols:>8,}"
f" {r.symbol_write_time:>{cw}.2f}"
f" {statistics.mean(t):>{cw}.2f} {statistics.median(t):>{cw}.2f} {max(t):>{cw}.2f}"
f" {statistics.mean(m):>{cw}.1f} {statistics.median(m):>{cw}.1f} {max(m):>{cw}.1f}"
)


def cleanup():
shutil.rmtree(LMDB_PATH, ignore_errors=True)


if __name__ == "__main__":
cleanup()
try:
for i, scenario in enumerate(SCENARIOS):
print(f"\n=== scenario {scenario[0]}x{scenario[1]} ===", file=sys.stderr)
measure(scenario, i)
finally:
cleanup()
print_results()
47 changes: 47 additions & 0 deletions python/benchmarks/non_asv/col_stats_bench_write_symbol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import json
import resource
import sys
import time

import numpy as np
import pandas as pd
from arcticdb import Arctic

LMDB_PATH = "/tmp/arcticdb_bench_col_stats"
SYMBOL_NAME = "test_symbol"
CHUNK_ROWS = 100_000


def main():
rows, cols = int(sys.argv[1]), int(sys.argv[2])
column_names = [f"col_{i}" for i in range(cols)]

ac = Arctic(f"lmdb://{LMDB_PATH}")
if not ac.has_library("bench"):
ac.create_library("bench")
lib = ac.get_library("bench")


for chunk_start in range(0, rows, CHUNK_ROWS):
chunk_row_count = min(CHUNK_ROWS, rows - chunk_start)

chunk = pd.DataFrame(
np.random.rand(chunk_row_count, cols).astype(np.float64),
columns=column_names,
)

start_time = time.time()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: start_time is now reassigned on every iteration of the loop, so the post-loop elapsed_seconds = time.time() - start_time (line 40) only measures the last chunk's write/append — not the total write time. With the active scenarios (rows ≤ 1000 vs CHUNK_ROWS = 100_000) there is only ever one chunk, so the bug is silent today, but it will silently under-report as soon as a scenario exceeds 100k rows (and that's exactly when accurate write timing matters most).

If the intent of this move was to exclude np.random.rand(...) from the measurement, accumulate per chunk instead:

total_elapsed = 0.0
for chunk_start in range(0, rows, CHUNK_ROWS):
    chunk_row_count = min(CHUNK_ROWS, rows - chunk_start)
    chunk = pd.DataFrame(
        np.random.rand(chunk_row_count, cols).astype(np.float64),
        columns=column_names,
    )
    start_time = time.time()
    if chunk_start == 0:
        lib.write(SYMBOL_NAME, chunk)
    else:
        lib.append(SYMBOL_NAME, chunk)
    total_elapsed += time.time() - start_time

elapsed_seconds = total_elapsed

Also: line 27 has trailing whitespace and lines 24/32 add stray blank lines — these will likely be flagged by make lint.


if chunk_start == 0:
lib.write(SYMBOL_NAME, chunk)
else:
lib.append(SYMBOL_NAME, chunk)

elapsed_seconds = time.time() - start_time
peak_rss_mb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 # ru_maxrss is KB on Linux

print(json.dumps({"elapsed_seconds": elapsed_seconds, "peak_rss_mb": peak_rss_mb}))


if __name__ == "__main__":
main()
Loading