Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ Optimizations

Bug Fixes
---------------------
* GITHUB#15967: Fix IndexWriter's HNSW InfoStream progress during concurrent merges to show elapsed
time since merge start instead of duplicate incremental/total times. Add per-chunk completion
logging with sub-millisecond precision to aid merge concurrency debugging. (Prithvi S)

* GITHUB#14049: Randomize KNN codec params in RandomCodec. Fixes scalar quantization div-by-zero
when all values are identical. (Mike Sokolov)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.lucene.internal.hppc.IntHashSet;
import org.apache.lucene.search.TaskExecutor;
Expand Down Expand Up @@ -76,11 +78,17 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException {
if (frozen) {
throw new IllegalStateException("graph has already been built");
}
long mergeStartTimeNS = System.nanoTime();
if (infoStream.isEnabled(HNSW_COMPONENT)) {
infoStream.message(
HNSW_COMPONENT,
"build graph from " + maxOrd + " vectors, with " + workers.length + " workers");
}
AtomicLong cumulativeWorkTimeNS = new AtomicLong();
for (ConcurrentMergeWorker worker : workers) {
worker.setMergeStartTimeNS(mergeStartTimeNS);
worker.setCumulativeWorkTimeNS(cumulativeWorkTimeNS);
}
List<Callable<Void>> futures = new ArrayList<>();
for (int i = 0; i < workers.length; i++) {
int finalI = i;
Expand All @@ -91,6 +99,20 @@ public OnHeapHnswGraph build(int maxOrd) throws IOException {
});
}
taskExecutor.invokeAll(futures);
if (infoStream.isEnabled(HNSW_COMPONENT)) {
double wallClockMs = (System.nanoTime() - mergeStartTimeNS) / 1_000_000.0;
double totalWorkerMs = cumulativeWorkTimeNS.get() / 1_000_000.0;
double effectiveConcurrency = wallClockMs > 0 ? totalWorkerMs / wallClockMs : 0;
infoStream.message(
HNSW_COMPONENT,
String.format(
Locale.ROOT,
"merge completed: %d vectors, %.2f ms wall clock, %.2f ms cumulative worker time, %.2fx effective concurrency",
maxOrd,
wallClockMs,
totalWorkerMs,
effectiveConcurrency));
}
return getCompletedGraph();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.Locale;
import java.util.Objects;
import java.util.SplittableRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.lucene.internal.hppc.IntHashSet;
import org.apache.lucene.search.KnnCollector;
Expand Down Expand Up @@ -85,6 +85,29 @@ public class HnswGraphBuilder implements HnswBuilder {
protected InfoStream infoStream = InfoStream.getDefault();
protected boolean frozen;

/**
* Merge-level start time in nanoseconds. When set, the periodic progress prints (every 10K
* vectors) show elapsed time since the overall merge began rather than since the current chunk
* began. A value of -1 means not set (non-concurrent path).
*/
private long mergeStartTimeNS = -1;

/**
* Shared accumulator for total worker time across all concurrent merge workers. Each chunk's
* elapsed time is added here so that effective concurrency can be computed at merge end.
*/
private AtomicLong cumulativeWorkTimeNS;

/** Set the merge-level start time so progress prints show time since merge began. */
void setMergeStartTimeNS(long mergeStartTimeNS) {
this.mergeStartTimeNS = mergeStartTimeNS;
}

/** Set the shared accumulator for tracking cumulative worker time across concurrent chunks. */
void setCumulativeWorkTimeNS(AtomicLong cumulativeWorkTimeNS) {
this.cumulativeWorkTimeNS = cumulativeWorkTimeNS;
}

public static HnswGraphBuilder create(
RandomVectorScorerSupplier scorerSupplier, int M, int beamWidth, long seed)
throws IOException {
Expand Down Expand Up @@ -204,16 +227,30 @@ protected void addVectors(int minOrd, int maxOrd) throws IOException {
if (frozen) {
throw new IllegalStateException("This HnswGraphBuilder is frozen and cannot be updated");
}
long start = System.nanoTime(), t = start;
if (infoStream.isEnabled(HNSW_COMPONENT)) {
infoStream.message(HNSW_COMPONENT, "addVectors [" + minOrd + " " + maxOrd + ")");
}
long startNs = System.nanoTime();
long progressStartNs = mergeStartTimeNS != -1 ? mergeStartTimeNS : startNs;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm we still have some NSs mergeStartTimeNS, cumulativeWorkTimeNS at least? Let's fix all of them to be consistent? Otherwise this looks awesome, thanks @iprithv.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done. Thanks!

for (int node = minOrd; node < maxOrd; node++) {
addGraphNode(node);
if ((node % 10000 == 0) && infoStream.isEnabled(HNSW_COMPONENT)) {
t = printGraphBuildStatus(node, start, t);
printGraphBuildStatus(node, progressStartNs);
}
}
long chunkElapsedNS = System.nanoTime() - startNs;
if (cumulativeWorkTimeNS != null) {
cumulativeWorkTimeNS.addAndGet(chunkElapsedNS);
}
if (infoStream.isEnabled(HNSW_COMPONENT)) {
double elapsedMs = chunkElapsedNS / 1_000_000.0;
infoStream.message(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome! I wonder if, separately from this new InfoStream log line, we could also aggregate these times and then (when HNSW merge is completely done, upstairs) add another summary InfoStream log line stating the effective concurrency? Could maybe just be an AtomicLong that each chunk increments its elapsed time into, then upstairs at HNSW merge end just divide that value by elapsed wall clock time to get & InfoStream.message the implied concurrency?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Added a shared AtomicLong cumulativeWorkTimeNS that each chunk increments with its elapsed time. At the end of HnswConcurrentMergeBuilder.build(), after all workers finish, it logs:

merge completed: 100000 vectors, 12500.00 ms wall clock, 42300.45 ms cumulative worker time, 3.38x effective concurrency

This should make it straightforward to detect the single threaded merge scenario, it would show ~1.0x effective concurrency.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay, this is awesome! Can't wait for nightly build to run on this then I scrutinize the InfoStream -- first light for this new metric!

HNSW_COMPONENT,
String.format(
Locale.ROOT,
"addVectors [%d %d): %d vectors in %.2f ms",
minOrd,
maxOrd,
maxOrd - minOrd,
elapsedMs));
}
}

private void addVectors(int maxOrd) throws IOException {
Expand Down Expand Up @@ -339,17 +376,10 @@ public void addGraphNode(int node, IntHashSet eps0) throws IOException {
addGraphNodeInternal(node, scorer, eps0);
}

private long printGraphBuildStatus(int node, long start, long t) {
long now = System.nanoTime();
private void printGraphBuildStatus(int node, long startNs) {
double elapsedMs = (System.nanoTime() - startNs) / 1_000_000.0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can we be consistent about startNs vs chunkElapsedNS ("treat it like a word" --> chunkedElapsedNs)? Darned camel-casing rules intersecting with acronyms/abbreviations... Wikipedia's CamelCase page has a whole paragraph dedicated to the conundrum.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, updated. Thanks!

infoStream.message(
HNSW_COMPONENT,
String.format(
Locale.ROOT,
"built %d in %d/%d ms",
node,
TimeUnit.NANOSECONDS.toMillis(now - t),
TimeUnit.NANOSECONDS.toMillis(now - start)));
return now;
HNSW_COMPONENT, String.format(Locale.ROOT, "built %d in %.2f ms", node, elapsedMs));
}

void addDiverseNeighbors(
Expand Down
Loading