Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions python/benchmarks/non_asv/bench_col_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import argparse
import json
import signal
import threading
import time

import numpy as np
import pandas as pd
import psutil
from arcticdb import Arctic

LMDB_PATH = "/tmp/arcticdb_bench_col_stats"
SYMBOL_NAME = "test_symbol"

signal.signal(signal.SIGINT, lambda *_: exit(130))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't do cleanup like this. This file should end with:

if __name__ == "__main__":
  try:
    run()
  finally:
    cleanup()

or you could put the same idea in your run(). There's no need for exit handlers.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this??

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


parser = argparse.ArgumentParser()
parser.add_argument("--scenario", required=True, help="Scenario as ROWSxCOLS, e.g. 100000x1000")
parser.add_argument("--operation", required=True, choices=["write_symbol", "create_stats"])
args = parser.parse_args()

rows, cols = map(int, args.scenario.split("x"))

ac = Arctic(f"lmdb://{LMDB_PATH}")
if not ac.has_library("bench"):
ac.create_library("bench")
lib = ac.get_library("bench")
nvs = lib._nvs


def measure_peak_rss(operation_fn):
process = psutil.Process()
rss_baseline_mb = process.memory_info().rss / 1e6
peak_rss_delta_mb = [0.0]
stop_sampling = threading.Event()

def rss_sampler():
while not stop_sampling.is_set():
delta_mb = process.memory_info().rss / 1e6 - rss_baseline_mb
if delta_mb > peak_rss_delta_mb[0]:
peak_rss_delta_mb[0] = delta_mb
time.sleep(0.01)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10 ms sampling is too coarse for the currently-active scenarios in run_bench_col_stats.py (≤1000 rows × 800 cols). The sampler loop samples once, then sleeps; if the operation finishes during the first sleep, only the baseline sample is taken and peak_rss_delta_mb returns 0. Either drop the period (e.g. 1 ms) or guard against operations that completed below the sampler resolution (e.g. log a warning when elapsed_seconds < period).


sampler_thread = threading.Thread(target=rss_sampler, daemon=True)
sampler_thread.start()
start_time = time.time()
operation_fn()
elapsed_seconds = time.time() - start_time
stop_sampling.set()
sampler_thread.join()
return elapsed_seconds, peak_rss_delta_mb[0]


if args.operation == "write_symbol":
dataframe = pd.DataFrame(
np.random.rand(rows, cols).astype(np.float64),
columns=[f"col_{i}" for i in range(cols)],
)
elapsed_seconds, peak_rss_delta_mb = measure_peak_rss(lambda: lib.write(SYMBOL_NAME, dataframe))
else:
column_stats_spec = {f"col_{i}": {"MINMAX"} for i in range(cols)}
elapsed_seconds, peak_rss_delta_mb = measure_peak_rss(
lambda: nvs.create_column_stats(SYMBOL_NAME, column_stats_spec)
)
nvs.drop_column_stats(SYMBOL_NAME)

print(json.dumps({"elapsed_seconds": elapsed_seconds, "peak_rss_delta_mb": peak_rss_delta_mb}))
3 changes: 3 additions & 0 deletions python/benchmarks/non_asv/bench_col_stats.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash
set -euo pipefail
python "$(dirname "$0")/bench_col_stats.py" "$@"
119 changes: 119 additions & 0 deletions python/benchmarks/non_asv/run_bench_col_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import atexit
import json
import shutil
import signal
import statistics
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path

from arcticdb import Arctic

LMDB_PATH = "/tmp/arcticdb_bench_col_stats"
SYMBOL_NAME = "test_symbol"
CREATE_STATS_RUNS = 10
WORKER_SCRIPT = Path(__file__).parent / "bench_col_stats.py"

atexit.register(lambda: shutil.rmtree(LMDB_PATH, ignore_errors=True))
signal.signal(signal.SIGINT, lambda *_: exit(130))
signal.signal(signal.SIGTERM, lambda *_: exit(143))

SCENARIOS = [
(500, 500),
(500, 100),
(600, 600),
(700, 700),
(800, 500),
(900, 800),
(1_000, 100),
]

# SCENARIOS = [
# (5_000, 5_000),
# (5_000, 10_000),
# (6_000, 6_000),
# (7_000, 7_000),
# (8_000, 5_000),
# (9_000, 8_000),
# (10_000, 10_000),
# ]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't ship a commented-out alternative SCENARIOS block — either expose it as a CLI flag (e.g. --large selecting between two named presets) or delete it. As-is, switching between scenario sets requires editing the source.

Also, the active list maxes out at 1000 rows × 800 cols, which is very small for a create_column_stats performance benchmark — operations may complete in <10 ms and never get sampled by the RSS thread (see comment on bench_col_stats.py).



@dataclass
class ScenarioResult:
rows: int = 0
cols: int = 0
symbol_write_time: float = 0.0
stats_elapsed_time: list = field(default_factory=list)
stats_memory_delta_mb: list = field(default_factory=list)


def print_results(results):
column_width = 16
header = (
f"{'rows':>12} {'cols':>6}"
f" {'write_time_s':>{column_width}}"
f" {'stats_time_min':>{column_width}} {'stats_time_max':>{column_width}} {'stats_time_var':>{column_width}}"
f" {'stats_rss_min_mb':>{column_width}} {'stats_rss_max_mb':>{column_width}} {'stats_rss_var_mb':>{column_width}}"
)
print()
print(header)
print("-" * len(header))

for result in results:
elapsed_times = result.stats_elapsed_time
memory_values = result.stats_memory_delta_mb
print(
f"{result.rows:>12,} {result.cols:>6,}"
f" {result.symbol_write_time:>{column_width}.2f}"
f" {min(elapsed_times):>{column_width}.2f} {max(elapsed_times):>{column_width}.2f} {statistics.variance(elapsed_times):>{column_width}.4f}"
f" {min(memory_values):>{column_width}.1f} {max(memory_values):>{column_width}.1f} {statistics.variance(memory_values):>{column_width}.2f}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior review feedback is only partially addressed here:

  1. The reviewer asked for mean, median, and max across measured iterations — this reports min/max/variance. Variance is harder to interpret and sensitive to outliers; please add mean and median (max is fine to keep).
  2. The reviewer also asked for a couple of un-measured warmup iterations before the N measured iterations. All 10 runs in measure() (lines 98-102) are still measured. Discard at least the first 1–2 to stabilise results — first-run latency on LMDB / page-cache effects can dominate.



def run_subprocess(operation, rows, cols):
try:
completed = subprocess.run(
[sys.executable, str(WORKER_SCRIPT), "--scenario", f"{rows}x{cols}", "--operation", operation],
capture_output=True, text=True, check=True,
)
return json.loads(completed.stdout)
except subprocess.CalledProcessError as e:
shutil.rmtree(LMDB_PATH, ignore_errors=True)
killed_by_signal = e.returncode < 0
reason = f"killed by signal {-e.returncode}" if killed_by_signal else f"exit code {e.returncode}"
raise RuntimeError(f"[{operation}] subprocess failed ({reason}):\n{e.stderr}") from None


def measure(scenario, index, results):
rows, cols = scenario
results[index].rows = rows
results[index].cols = cols

print(f" [write_symbol] {rows}x{cols}", file=sys.stderr)
write_result = run_subprocess("write_symbol", rows, cols)
results[index].symbol_write_time = write_result["elapsed_seconds"]

for run_number in range(1, CREATE_STATS_RUNS + 1):
print(f" [create_stats] run {run_number}/{CREATE_STATS_RUNS}", file=sys.stderr)
stats_result = run_subprocess("create_stats", rows, cols)
results[index].stats_elapsed_time.append(stats_result["elapsed_seconds"])
results[index].stats_memory_delta_mb.append(stats_result["peak_rss_delta_mb"])

ac = Arctic(f"lmdb://{LMDB_PATH}")
ac.get_library("bench").delete(SYMBOL_NAME)


shutil.rmtree(LMDB_PATH, ignore_errors=True)

results = [ScenarioResult() for _ in SCENARIOS]

try:
for index, scenario in enumerate(SCENARIOS):
print(f"\n=== scenario {scenario[0]}x{scenario[1]} ===", file=sys.stderr)
measure(scenario, index, results)
finally:
shutil.rmtree(LMDB_PATH, ignore_errors=True)

print_results(results)
Loading