Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ namespace Microsoft.Extensions.DataIngestion;
public abstract class IngestionChunkWriter<T> : IDisposable
{
/// <summary>
/// Writes chunks asynchronously.
/// Writes the chunks of a single document asynchronously.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unimportant: do we still add "asynchronously" to the docs of each and every async function (especially when these don't have a corresponding synchronous overload)? Seems a bit useless to me (but obviously let's follow latest practices and patterns).

/// </summary>
/// <param name="document">The document from which the chunks were extracted.</param>
/// <param name="chunks">The chunks to write.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task representing the asynchronous write operation.</returns>
public abstract Task WriteAsync(IAsyncEnumerable<IngestionChunk<T>> chunks, CancellationToken cancellationToken = default);
public abstract Task WriteAsync(IngestionDocument document, IAsyncEnumerable<IngestionChunk<T>> chunks, CancellationToken cancellationToken = default);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: consider renaming to WriteDocumentAsync rather than adding the document parameter: writing a document is what this function actually does.

I think renaming makes more sense than adding the new document parameter - the parameter has to always be exactly the same as the document referenced by all chunks in the 2nd parameter; in other words, the 1st parameter is useless: it's only there for documentation and/or validation purposes (see below).

But even if you do decide to keep the new parameter (which I'd recommend against), I'd still consider renaming to make the API extra-clear.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameter has to always be exactly the same as the document referenced by all chunks in the 2nd parameter; in other words, the 1st parameter is useless: it's only there for documentation and/or validation purposes (see below).

That is why the original design did not have it ;P

Optional: consider renaming to WriteDocumentAsync rather than adding the document parameter: writing a document is what this function actually does.

Hmm let me think loud about it:

We have IngestionDocumentReader that comes with ReadAsync (not ReadDocumentAsync) that reads a document.
Then we have a IngestionChunkWriter that comes with WriteAsync. It's not called IngestionDocumentWriter because it does not write the document, but the chunks that were created from a document. Because of that I don't think that the method should be called WriteDocumentAsync.

So perhaps we should not rename anything but add doc remarks that clarify that all the chunks belong to a single document?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For one thing, we could simply add validation. That is, if WriteAsync is passed chunks from multiple documents, it throws - should be easy to do, right?

Regardless, I'm a big believer in API names clearly expressing their behavior/semantics; so it feels like somehow the API naming should express that there's a single-document assumption/restriction here.

So... With IngestionDocumentReader, presumably ReadAsync just returns a document type, so there's much less need to clarify anything via naming (the type already does that). But on the writer side things are more complicated, since we receive chunks as input, but there's an unwritten limitation that they have to belong to the same document. That's an important difference IMHO.

So I'd still consider calling the method WriteDocumentChunksAsync, or WriteDocumentAsync if we want to prioritize brevity (the parameter type already tells you that the "document" here is passed in the form of a bunch of chunks).

If we want to 100% lock down this assumption at the type level, we can also rename the type from IngestionChunkWriter to IngestionDocumentChunkWriter (or again, IngestionDocumentWriter to prioritize brevity). I'd do this only if you're 100% sure that you'll never add an API that allows e.g. mixing chunks from different documents; we can do that, but it's maybe a strong commitment to make for the future... It might be safer to only change the naming on the method level, this way if we ever want to add another method which allows multiple documents, we can.

The way I see it, at the end of the day the WriteAsync method accepts and writes a document to the database; it happens to have already been decomposed into chunks, but that's just a form/shape/format for that document.

BTW your ingestion data model might have been different here, with the document actually holding a list of its chunks; if that were the case, you'd just pass in the document directly as the only parameter and everything would be 100% crystal-clear. But since the document (currently) doesn't reference its chunks, we pass the chunks instead as a replacement for the document; but what's written is still the document.

What do you think?


/// <summary>
/// Disposes the writer and releases all associated resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private async Task<IngestionDocument> IngestAsync(IngestionDocument document, Ac
}

_logger?.WritingChunks(GetShortName(_writer));
await _writer.WriteAsync(chunks, cancellationToken).ConfigureAwait(false);
await _writer.WriteAsync(document, chunks, cancellationToken).ConfigureAwait(false);
_logger?.WroteChunks(document.Identifier);

return document;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public VectorStoreWriter(VectorStoreCollection<Guid, TRecord> collection, Vector
public VectorStoreCollection<Guid, TRecord> VectorStoreCollection { get; }

/// <inheritdoc/>
public override async Task WriteAsync(IAsyncEnumerable<IngestionChunk<TChunk>> chunks, CancellationToken cancellationToken = default)
public override async Task WriteAsync(IngestionDocument document, IAsyncEnumerable<IngestionChunk<TChunk>> chunks, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(document);
_ = Throw.IfNull(chunks);

IReadOnlyList<Guid>? preExistingKeys = null;
Expand All @@ -62,13 +63,13 @@ public override async Task WriteAsync(IAsyncEnumerable<IngestionChunk<TChunk>> c
// We obtain the IDs of the pre-existing chunks for given document,
// and delete them after we finish inserting the new chunks,
// to avoid a situation where we delete the chunks and then fail to insert the new ones.
preExistingKeys ??= await GetPreExistingChunksIdsAsync(chunk.Document, cancellationToken).ConfigureAwait(false);
preExistingKeys ??= await GetPreExistingChunksIdsAsync(document, cancellationToken).ConfigureAwait(false);

TRecord record = new()
{
Content = chunk.Content,
Context = chunk.Context,
DocumentId = chunk.Document.Identifier,
DocumentId = document.Identifier,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming we keep the document parameter, add validation that the document of all the chunks is the same as the document argument?

};

if (chunk.HasMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public async Task CanWriteChunksWithCustomDefinition()

List<IngestionChunk<string>> chunks = [chunk];

await writer.WriteAsync(chunks.ToAsyncEnumerable());
await writer.WriteAsync(document, chunks.ToAsyncEnumerable());

IngestionChunkVectorRecord<string> record = await writer.VectorStoreCollection
.GetAsync(filter: record => record.DocumentId == documentId, top: 1)
Expand Down Expand Up @@ -82,7 +82,7 @@ public async Task CanWriteChunks()
List<IngestionChunk<string>> chunks = [chunk];

Assert.False(testEmbeddingGenerator.WasCalled);
await writer.WriteAsync(chunks.ToAsyncEnumerable());
await writer.WriteAsync(document, chunks.ToAsyncEnumerable());

IngestionChunkVectorRecord<string> record = await writer.VectorStoreCollection
.GetAsync(filter: record => record.DocumentId == documentId, top: 1)
Expand Down Expand Up @@ -112,7 +112,7 @@ public async Task CanWriteChunksWithMetadata()

List<IngestionChunk<string>> chunks = [chunk];

await writer.WriteAsync(chunks.ToAsyncEnumerable());
await writer.WriteAsync(document, chunks.ToAsyncEnumerable());

TestChunkRecordWithMetadata record = await writer.VectorStoreCollection
.GetAsync(filter: record => record.DocumentId == documentId, top: 1)
Expand Down Expand Up @@ -148,7 +148,7 @@ public async Task DoesSupportIncrementalIngestion()

List<IngestionChunk<string>> chunks = [chunk1, chunk2];

await writer.WriteAsync(chunks.ToAsyncEnumerable());
await writer.WriteAsync(document, chunks.ToAsyncEnumerable());

int recordCount = await writer.VectorStoreCollection
.GetAsync(filter: record => record.DocumentId == documentId, top: 100)
Expand All @@ -160,7 +160,7 @@ public async Task DoesSupportIncrementalIngestion()

List<IngestionChunk<string>> updatedChunks = [updatedChunk];

await writer.WriteAsync(updatedChunks.ToAsyncEnumerable());
await writer.WriteAsync(document, updatedChunks.ToAsyncEnumerable());

// We ask for 100 records, but we expect only 1 as the previous 2 should have been deleted.
IngestionChunkVectorRecord<string> record = await writer.VectorStoreCollection
Expand Down Expand Up @@ -216,7 +216,7 @@ public async Task BatchesChunks(int? batchTokenCount, int[] chunkTokenCounts)
chunks.Add(new($"chunk {i + 1}", document, context: null, tokenCount: chunkTokenCounts[i]));
}

await writer.WriteAsync(chunks.ToAsyncEnumerable());
await writer.WriteAsync(document, chunks.ToAsyncEnumerable());

int recordCount = await writer.VectorStoreCollection
.GetAsync(filter: record => record.DocumentId == documentId, top: 100)
Expand Down Expand Up @@ -252,7 +252,7 @@ public async Task IncrementalIngestion_WithManyRecords_DeletesAllPreExistingChun
chunks.Add(TestChunkFactory.CreateChunk($"chunk {i}", document));
}

await writer.WriteAsync(chunks.ToAsyncEnumerable());
await writer.WriteAsync(document, chunks.ToAsyncEnumerable());

int recordCount = await writer.VectorStoreCollection
.GetAsync(filter: record => record.DocumentId == documentId, top: 10000)
Expand All @@ -266,7 +266,7 @@ public async Task IncrementalIngestion_WithManyRecords_DeletesAllPreExistingChun
TestChunkFactory.CreateChunk("updated chunk 2", document)
];

await writer.WriteAsync(updatedChunks.ToAsyncEnumerable());
await writer.WriteAsync(document, updatedChunks.ToAsyncEnumerable());

// Verify that all old records were deleted and only the new ones remain
List<IngestionChunkVectorRecord<string>> records = await writer.VectorStoreCollection
Expand Down
Loading