Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ Improvements

* GITHUB#15225: Improve package documentation for org.apache.lucene.util. (Syed Mohammad Saad)

* GITHUB#15795: Introduce MemoryAccountingBitsetCollectorManager to parallelize search when using MemoryAccountingBitsetCollector. (Binlong Gao)

Optimizations
---------------------
* GITHUB#15681: Replace pre-sized array or empty array with lambda expression to call Collection#toArray. (Zhou Hui)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ public void updateBytes(long bytes) {
public long getBytes() {
return memoryUsage.get();
}

public String getName() {
return name;
}

public long getMemoryLimit() {
return memoryLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,46 @@
import org.apache.lucene.search.SimpleCollector;
import org.apache.lucene.util.FixedBitSet;

/** Bitset collector which supports memory tracking */
/**
* Bitset collector which supports memory tracking. Can operate in two modes: Full index mode:
* allocates bitset for entire index (for single-threaded search), Segment-local mode: allocates
* bitset only for segments processed (memory-efficient for concurrent search)
*/
public class MemoryAccountingBitsetCollector extends SimpleCollector {

final CollectorMemoryTracker tracker;
final boolean segmentLocal;
FixedBitSet bitSet = new FixedBitSet(0);
int length = 0;
int docBase = 0;

// For segment-local mode
int minDocBase = Integer.MAX_VALUE;
int maxDocEnd = 0;

public MemoryAccountingBitsetCollector(CollectorMemoryTracker tracker) {
this(tracker, false);
}

public MemoryAccountingBitsetCollector(CollectorMemoryTracker tracker, boolean segmentLocal) {
this.tracker = tracker;
this.segmentLocal = segmentLocal;
tracker.updateBytes(bitSet.ramBytesUsed());
}

@Override
protected void doSetNextReader(LeafReaderContext context) throws IOException {
docBase = context.docBase;
length += context.reader().maxDoc();

if (segmentLocal) {
int docEnd = docBase + context.reader().maxDoc();
minDocBase = Math.min(minDocBase, docBase);
maxDocEnd = Math.max(maxDocEnd, docEnd);
length = maxDocEnd - minDocBase;
} else {
length += context.reader().maxDoc();
}

FixedBitSet newBitSet = FixedBitSet.ensureCapacity(bitSet, length);
if (newBitSet != bitSet) {
tracker.updateBytes(newBitSet.ramBytesUsed() - bitSet.ramBytesUsed());
Expand All @@ -50,11 +73,23 @@ protected void doSetNextReader(LeafReaderContext context) throws IOException {

@Override
public void collect(int doc) {
bitSet.set(docBase + doc);
if (segmentLocal) {
bitSet.set(docBase - minDocBase + doc);
} else {
bitSet.set(docBase + doc);
}
}

@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}

int getMinDocBase() {
return segmentLocal ? minDocBase : 0;
}

int getMaxDocEnd() {
return segmentLocal ? maxDocEnd : length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.misc.search;

import java.util.Collection;
import org.apache.lucene.misc.CollectorMemoryTracker;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.util.FixedBitSet;

/**
* CollectorManager for MemoryAccountingBitsetCollector that supports concurrent search.
*
* <p>Creates multiple collectors for concurrent execution, each collector only allocates bitset for
* segments it processes, then merges with proper offset in reduce().
*/
public class MemoryAccountingBitsetCollectorManager
implements CollectorManager<MemoryAccountingBitsetCollector, FixedBitSet> {

private final CollectorMemoryTracker tracker;
private long totalBytesUsed;

public MemoryAccountingBitsetCollectorManager(CollectorMemoryTracker tracker) {
this.tracker = tracker;
}

@Override
public MemoryAccountingBitsetCollector newCollector() {
return new MemoryAccountingBitsetCollector(
new CollectorMemoryTracker(tracker.getName() + "-collector", tracker.getMemoryLimit()),
true);
}

@Override
public FixedBitSet reduce(Collection<MemoryAccountingBitsetCollector> collectors) {
if (collectors.isEmpty()) {
return new FixedBitSet(0);
}

if (collectors.size() == 1) {
MemoryAccountingBitsetCollector collector = collectors.iterator().next();
this.totalBytesUsed = collector.tracker.getBytes();

if (collector.getMinDocBase() == 0) {
return collector.bitSet;
}

FixedBitSet result = new FixedBitSet(collector.getMaxDocEnd());
int length = collector.getMaxDocEnd() - collector.getMinDocBase();
FixedBitSet.orRange(collector.bitSet, 0, result, collector.getMinDocBase(), length);
return result;
}

// Find global doc range across all collectors
int globalMinDocBase = Integer.MAX_VALUE;
int globalMaxDocEnd = 0;
for (MemoryAccountingBitsetCollector collector : collectors) {
globalMinDocBase = Math.min(globalMinDocBase, collector.getMinDocBase());
globalMaxDocEnd = Math.max(globalMaxDocEnd, collector.getMaxDocEnd());
long collectorBytes = collector.tracker.getBytes();
this.totalBytesUsed += collectorBytes;
}

FixedBitSet result = new FixedBitSet(globalMaxDocEnd);

for (MemoryAccountingBitsetCollector collector : collectors) {
if (collector.bitSet != null && collector.bitSet.length() > 0) {
int length = collector.getMaxDocEnd() - collector.getMinDocBase();
FixedBitSet.orRange(collector.bitSet, 0, result, collector.getMinDocBase(), length);
}
}

return result;
}

public long getBytes() {
return this.totalBytesUsed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;

public class TestMemoryAccountingBitsetCollector extends LuceneTestCase {

Expand Down Expand Up @@ -57,31 +58,43 @@ public void tearDown() throws Exception {
dir.close();
}

public void testMemoryAccountingBitsetCollectorMemoryLimit() {
public void testMemoryAccountingBitsetCollectorMemoryLimit() throws Exception {
long perCollectorMemoryLimit = 150;
CollectorMemoryTracker tracker =
new CollectorMemoryTracker("testMemoryTracker", perCollectorMemoryLimit);
MemoryAccountingBitsetCollector bitSetCollector = new MemoryAccountingBitsetCollector(tracker);

MemoryAccountingBitsetCollectorManager bitsetCollectorManager =
new MemoryAccountingBitsetCollectorManager(tracker);
IndexSearcher searcher = new IndexSearcher(reader);
expectThrows(
IllegalStateException.class,
() -> {
searcher.search(MatchAllDocsQuery.INSTANCE, bitSetCollector);
});
() -> searcher.search(MatchAllDocsQuery.INSTANCE, bitsetCollectorManager));
}

public void testConcurrentMemoryLimit() throws Exception {
// For collector with collecting only 1 doc, 80 bytes are required.
long perCollectorMemoryLimit = 79;
CollectorMemoryTracker tracker =
new CollectorMemoryTracker("testMemoryTracker", perCollectorMemoryLimit);
MemoryAccountingBitsetCollectorManager bitsetCollectorManager =
new MemoryAccountingBitsetCollectorManager(tracker);
IndexSearcher searcher = newSearcher(reader);
expectThrows(
IllegalStateException.class,
() -> searcher.search(MatchAllDocsQuery.INSTANCE, bitsetCollectorManager));
}

public void testCollectedResult() throws Exception {
CollectorMemoryTracker tracker =
new CollectorMemoryTracker("testMemoryTracker", Long.MAX_VALUE);
MemoryAccountingBitsetCollector collector = new MemoryAccountingBitsetCollector(tracker);
MemoryAccountingBitsetCollectorManager bitsetCollectorManager =
new MemoryAccountingBitsetCollectorManager(tracker);

IndexSearcher searcher = new IndexSearcher(reader);
searcher.search(MatchAllDocsQuery.INSTANCE, collector);
IndexSearcher searcher = newSearcher(reader);
FixedBitSet result = searcher.search(MatchAllDocsQuery.INSTANCE, bitsetCollectorManager);

assertEquals(1000, collector.bitSet.cardinality());
assertEquals(1000, result.cardinality());
for (int i = 0; i < 1000; i++) {
assertTrue(collector.bitSet.get(i));
assertTrue(result.get(i));
}
}
}
Loading