Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 215 additions & 0 deletions index/scorch/scorch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3297,6 +3297,221 @@ func TestPersistenceWithoutExclude(t *testing.T) {
}
}

func TestPersistSnapshotMaybeMergeLegacy(t *testing.T) {
cfg := CreateConfig("TestPersistSnapshotMaybeMerge")
err := InitTest(cfg)
if err != nil {
t.Fatal(err)
}
defer func() {
_ = DestroyTest(cfg)
}()

analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, cfg, analysisQueue)
if err != nil {
t.Fatalf("failed to create Scorch: %v", err)
}
defer idx.Close()

s, ok := idx.(*Scorch)
if !ok {
t.Fatalf("expected *Scorch, got %T", idx)
}

err = s.openBolt()
if err != nil {
t.Fatalf("failed to open bolt: %v", err)
}

for i, docID := range []string{"doc1", "doc2"} {
doc := document.NewDocument(docID)
doc.AddField(document.NewTextField("name", []uint64{}, []byte("test")))
doc.AddIDField()
doc.VisitFields(func(field index.Field) {
field.Analyze()
})

seg, _, err := s.segPlugin.New([]index.Document{doc})
if err != nil {
t.Fatalf("failed to create segment %d: %v", i, err)
}

intro := &segmentIntroduction{
id: atomic.AddUint64(&s.nextSegmentID, 1),
data: seg,
ids: []string{docID},
applied: make(chan error),
}

err = s.introduceSegment(intro)
if err != nil {
t.Fatalf("introduceSegment failed for %s: %v", docID, err)
}
}

snapshot := s.root
if len(snapshot.segment) != 2 {
t.Fatalf("expected 2 segments at root, got %d", len(snapshot.segment))
}

mergeDone := make(chan struct{})
go func() {
defer close(mergeDone)
select {
case nextMerge := <-s.merges:
s.introduceMerge(nextMerge)
case <-s.closeCh:
}
}()

po := &persisterOptions{
NumPersisterWorkers: 1,
MaxSizeInMemoryMergePerWorker: 0,
}

persisted, err := s.persistSnapshotMaybeMerge(snapshot, po)
if err != nil {
t.Fatalf("persistSnapshotMaybeMerge failed: %v", err)
}
if !persisted {
t.Fatalf("expected persistSnapshotMaybeMerge to persist merged segment")
}

select {
case <-mergeDone:
case <-time.After(2 * time.Second):
t.Fatalf("timed out waiting for merge introduction")
}

if len(s.root.segment) != 1 {
t.Fatalf("expected merged root to have 1 segment, got %d", len(s.root.segment))
}

if _, ok := s.root.segment[0].segment.(segment.PersistedSegment); !ok {
t.Fatalf("expected merged segment to be persisted")
}

epochs, err := s.RootBoltSnapshotEpochs()
if err != nil {
t.Fatalf("failed to read bolt snapshot epochs: %v", err)
}
if len(epochs) == 0 {
t.Fatalf("expected at least one persisted bolt snapshot")
}
}

func TestPersistSnapshotMaybeMergeMultipleWorkers(t *testing.T) {
cfg := CreateConfig("TestPersistSnapshotMaybeMergeMultipleWorkers")
err := InitTest(cfg)
if err != nil {
t.Fatal(err)
}
defer func() {
_ = DestroyTest(cfg)
}()

analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, cfg, analysisQueue)
if err != nil {
t.Fatalf("failed to create Scorch: %v", err)
}
defer idx.Close()

s, ok := idx.(*Scorch)
if !ok {
t.Fatalf("expected *Scorch, got %T", idx)
}

err = s.openBolt()
if err != nil {
t.Fatalf("failed to open bolt: %v", err)
}

totalMemoryUsed := 0
for i := 0; i < 100; i++ {
docID := fmt.Sprintf("doc%d", i)
doc := document.NewDocument(docID)
testCorpus := []byte(fmt.Sprintf("test %v", docID))
doc.AddField(document.NewTextField("name", []uint64{}, testCorpus))
doc.AddIDField()
doc.VisitFields(func(field index.Field) {
field.Analyze()
})

seg, _, err := s.segPlugin.New([]index.Document{doc})
if err != nil {
t.Fatalf("failed to create segment %d: %v", i, err)
}

totalMemoryUsed += seg.Size()
intro := &segmentIntroduction{
id: atomic.AddUint64(&s.nextSegmentID, 1),
data: seg,
ids: []string{docID},
applied: make(chan error),
}

err = s.introduceSegment(intro)
if err != nil {
t.Fatalf("introduceSegment failed for %s: %v", docID, err)
}
}

snapshot := s.root
if len(snapshot.segment) != 100 {
t.Fatalf("expected 100 segments at root, got %d", len(snapshot.segment))
}

mergeDone := make(chan struct{})
go func() {
defer close(mergeDone)
select {
case nextMerge := <-s.merges:
s.introduceMerge(nextMerge)
case <-s.closeCh:
}
}()

po := &persisterOptions{
NumPersisterWorkers: 2,
MaxSizeInMemoryMergePerWorker: totalMemoryUsed / 2,
}

persisted, err := s.persistSnapshotMaybeMerge(snapshot, po)
if err != nil {
t.Fatalf("persistSnapshotMaybeMerge failed: %v", err)
}
if !persisted {
t.Fatalf("expected persistSnapshotMaybeMerge to persist merged segment")
}

select {
case <-mergeDone:
case <-time.After(2 * time.Second):
t.Fatalf("timed out waiting for merge introduction")
}

if len(s.root.segment) != 2 {
t.Fatalf("expected merged root to have 2 segments, got %d", len(s.root.segment))
}

for _, seg := range s.root.segment {
if _, ok := seg.segment.(segment.PersistedSegment); !ok {
t.Fatalf("expected merged segment to be persisted")
}
}

epochs, err := s.RootBoltSnapshotEpochs()
if err != nil {
t.Fatalf("failed to read bolt snapshot epochs: %v", err)
}

if len(epochs) == 0 {
t.Fatalf("expected at least one persisted bolt snapshot")
}
}

// mockSegmentBase satisfies segment.Segment but does NOT implement
// VectorFieldStatsReporter. Both mock types embed this so the stubs are
// not duplicated, while keeping the interface sets distinct.
Expand Down
3 changes: 3 additions & 0 deletions index/scorch/train_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ func (t *vectorTrainer) loadTrainedData(bucket *util.BoltBucketImpl) error {
func (t *vectorTrainer) train(batch *index.Batch) error {
// regulate the Train function
t.parent.FireIndexEvent()
if t.trainingComplete.Load() {
return fmt.Errorf("training is already complete, cannot accept more training data")
}

var trainData []index.Document
for _, doc := range batch.IndexOps {
Expand Down
Loading
Loading