Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ using Microsoft.Extensions.DataIngestion;
IngestionDocumentReader reader =
new MarkItDownReader(new FileInfo(@"pathToMarkItDown.exe"), extractImages: true);

using IngestionPipeline<string> pipeline = new(reader, CreateChunker(), CreateWriter());
using IngestionPipeline<string> pipeline = new(CreateChunker(), CreateWriter());

await foreach (IngestionResult result in pipeline.ProcessAsync(reader, directory, "*.pdf"))
{
Console.WriteLine($"Processed '{result.DocumentId}'. Succeeded: {result.Succeeded}");
}
```

### Creating a MarkItDownMcpReader for Data Ingestion (MCP Server)
Expand All @@ -44,7 +49,12 @@ using Microsoft.Extensions.DataIngestion;
IngestionDocumentReader reader =
new MarkItDownMcpReader(new Uri("http://localhost:3001/mcp"));

using IngestionPipeline<string> pipeline = new(reader, CreateChunker(), CreateWriter());
using IngestionPipeline<string> pipeline = new(CreateChunker(), CreateWriter());

await foreach (IngestionResult result in pipeline.ProcessAsync(reader, directory, "*.*"))
{
Console.WriteLine($"Processed '{result.DocumentId}'. Succeeded: {result.Succeeded}");
}
```

The MarkItDown MCP server can be run using Docker:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ using Microsoft.Extensions.DataIngestion;

IngestionDocumentReader reader = new MarkdownReader();

using IngestionPipeline<string> pipeline = new(reader, CreateChunker(), CreateWriter());
using IngestionPipeline<string> pipeline = new(CreateChunker(), CreateWriter());

await foreach (IngestionResult result in pipeline.ProcessAsync(reader, directory, "*.md"))
{
Console.WriteLine($"Processed '{result.DocumentId}'. Succeeded: {result.Succeeded}");
}
```

## Feedback & Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 10.1.0-preview.1

- Introduced `SectionChunker` class for treating each document section as a separate entity (https://github.com/dotnet/extensions/pull/7015)
- Extended `IngestionPipeline<T>` with a new `ProcessAsync(IAsyncEnumerable<IngestionDocument>)` overload that enables processing documents without a file system reader. The `IngestionDocumentReader` has been moved from the constructor to a parameter on the file-system-oriented `ProcessAsync` overloads.

## 10.0.0-preview.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ internal static class ProcessFiles
internal const string FileCountTagName = "rag.file.count";
}

internal static class ProcessDocuments
{
internal const string ActivityName = "ProcessDocuments";
}

internal static class ProcessDocument
{
internal const string ActivityName = "ProcessDocument";
}

internal static class ProcessSource
{
internal const string DocumentIdTagName = "rag.document.id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ namespace Microsoft.Extensions.DataIngestion;
/// <typeparam name="T">The type of the chunk content.</typeparam>
public sealed class IngestionPipeline<T> : IDisposable
{
private readonly IngestionDocumentReader _reader;
private readonly IngestionChunker<T> _chunker;
private readonly IngestionChunkWriter<T> _writer;
private readonly ActivitySource _activitySource;
Expand All @@ -33,19 +32,16 @@ public sealed class IngestionPipeline<T> : IDisposable
/// <summary>
/// Initializes a new instance of the <see cref="IngestionPipeline{T}"/> class.
/// </summary>
/// <param name="reader">The reader for ingestion documents.</param>
/// <param name="chunker">The chunker to split documents into chunks.</param>
/// <param name="writer">The writer for processing chunks.</param>
/// <param name="options">The options for the ingestion pipeline.</param>
/// <param name="loggerFactory">The logger factory for creating loggers.</param>
public IngestionPipeline(
IngestionDocumentReader reader,
IngestionChunker<T> chunker,
IngestionChunkWriter<T> writer,
IngestionPipelineOptions? options = default,
ILoggerFactory? loggerFactory = default)
{
_reader = Throw.IfNull(reader);
_chunker = Throw.IfNull(chunker);
_writer = Throw.IfNull(writer);
_activitySource = new((options ?? new()).ActivitySourceName);
Expand All @@ -69,17 +65,58 @@ public void Dispose()
/// </summary>
public IList<IngestionChunkProcessor<T>> ChunkProcessors { get; } = [];

/// <summary>
/// Processes the specified documents.
/// </summary>
/// <param name="documents">The documents to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>An async enumerable of ingestion results.</returns>
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IAsyncEnumerable<IngestionDocument> documents, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(documents);

using (Activity? rootActivity = _activitySource.StartActivity(ProcessDocuments.ActivityName))
{
await foreach (IngestionDocument document in documents.WithCancellation(cancellationToken).ConfigureAwait(false))
{
using (Activity? processDocumentActivity = _activitySource.StartActivity(ProcessDocument.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default))
{
processDocumentActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
_logger?.ReadDocument(document.Identifier);

IngestionDocument? processed = null;
Exception? failure = null;
try
{
processed = await IngestAsync(document, processDocumentActivity, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
TraceException(processDocumentActivity, ex);
_logger?.IngestingFailed(ex, document.Identifier);

failure = ex;
}

yield return new IngestionResult(document.Identifier, processed, failure);
}
}
}
}

/// <summary>
/// Processes all files in the specified directory that match the given search pattern and option.
/// </summary>
/// <param name="reader">The reader to use for reading documents from files.</param>
/// <param name="directory">The directory to process.</param>
/// <param name="searchPattern">The search pattern for file selection.</param>
/// <param name="searchOption">The search option for directory traversal.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*",
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IngestionDocumentReader reader, DirectoryInfo directory, string searchPattern = "*.*",
SearchOption searchOption = SearchOption.TopDirectoryOnly, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(reader);
Throw.IfNull(directory);
Throw.IfNullOrEmpty(searchPattern);
Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);
Expand All @@ -91,7 +128,7 @@ public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo direct
.SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString());
_logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption);

await foreach (var ingestionResult in ProcessAsync(directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false))
await foreach (IngestionResult ingestionResult in ProcessFilesAsync(reader, directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false))
{
yield return ingestionResult;
}
Expand All @@ -101,16 +138,18 @@ public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo direct
/// <summary>
/// Processes the specified files.
/// </summary>
/// <param name="reader">The reader to use for reading documents from files.</param>
/// <param name="files">The collection of files to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IngestionDocumentReader reader, IEnumerable<FileInfo> files, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(reader);
Throw.IfNull(files);

using (Activity? rootActivity = _activitySource.StartActivity(ProcessFiles.ActivityName))
{
await foreach (var ingestionResult in ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
await foreach (IngestionResult ingestionResult in ProcessFilesAsync(reader, files, rootActivity, cancellationToken).ConfigureAwait(false))
{
yield return ingestionResult;
}
Expand All @@ -125,7 +164,7 @@ private static void TraceException(Activity? activity, Exception ex)
.SetStatus(ActivityStatusCode.Error, ex.Message);
}

private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, Activity? rootActivity,
private async IAsyncEnumerable<IngestionResult> ProcessFilesAsync(IngestionDocumentReader reader, IEnumerable<FileInfo> files, Activity? rootActivity,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
#if NET
Expand All @@ -143,13 +182,13 @@ private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInf
using (Activity? processFileActivity = _activitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default))
{
processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName);
_logger?.ReadingFile(fileInfo.FullName, GetShortName(_reader));
_logger?.ReadingFile(fileInfo.FullName, GetShortName(reader));

IngestionDocument? document = null;
Exception? failure = null;
try
{
document = await _reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false);
document = await reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false);

processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
_logger?.ReadDocument(document.Identifier);
Expand Down Expand Up @@ -181,7 +220,7 @@ private async Task<IngestionDocument> IngestAsync(IngestionDocument document, Ac
}

IAsyncEnumerable<IngestionChunk<T>> chunks = _chunker.ProcessAsync(document, cancellationToken);
foreach (var processor in ChunkProcessors)
foreach (IngestionChunkProcessor<T> processor in ChunkProcessors)
{
chunks = processor.ProcessAsync(chunks, cancellationToken);
}
Expand Down
37 changes: 37 additions & 0 deletions src/Libraries/Microsoft.Extensions.DataIngestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,43 @@ VectorStoreCollection<Guid, IngestionChunkVectorRecord<string>> collection =
using VectorStoreWriter<string, IngestionChunkVectorRecord<string>> writer = new(collection);
```

## Using the ingestion pipeline

The `IngestionPipeline<T>` orchestrates document reading, chunking, optional processing, and writing. It can accept documents directly or read them from the file system using an `IngestionDocumentReader`.

### Processing documents from the file system

Create a pipeline, then call `ProcessAsync` with an `IngestionDocumentReader` and a directory or list of files:

```csharp
IngestionDocumentReader reader = new MarkdownReader();

using IngestionPipeline<string> pipeline = new(CreateChunker(), CreateWriter());

await foreach (IngestionResult result in pipeline.ProcessAsync(reader, new DirectoryInfo("docs"), "*.md"))
{
Console.WriteLine($"Processed '{result.DocumentId}'. Succeeded: {result.Succeeded}");
}
```

### Processing documents without a reader

You can also supply `IngestionDocument` instances directly, without any file-system dependency:

```csharp
using IngestionPipeline<string> pipeline = new(CreateChunker(), CreateWriter());

IngestionDocument document = new("my-doc-id");
document.Sections.Add(new IngestionDocumentSection());
document.Sections[0].Elements.Add(new IngestionDocumentHeader("# Hello"));
document.Sections[0].Elements.Add(new IngestionDocumentParagraph("This content was created in memory."));

await foreach (IngestionResult result in pipeline.ProcessAsync(new[] { document }.ToAsyncEnumerable()))
{
Console.WriteLine($"Processed '{result.DocumentId}'. Succeeded: {result.Succeeded}");
}
```

## Feedback & Contributing

We welcome feedback and contributions in [our GitHub repo](https://github.com/dotnet/extensions).
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
});

using var pipeline = new IngestionPipeline<string>(
reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);

await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
await foreach (var result in pipeline.ProcessAsync(new DocumentReader(directory), directory, searchPattern))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public async Task CanProcessDocuments()
"chunks", TestEmbeddingGenerator<string>.DimensionCount);
using VectorStoreWriter<string, IngestionChunkVectorRecord<string>> vectorStoreWriter = new(collection);

using IngestionPipeline<string> pipeline = new(CreateReader(), CreateChunker(), vectorStoreWriter);
List<IngestionResult> ingestionResults = await pipeline.ProcessAsync(_sampleFiles).ToListAsync();
using IngestionPipeline<string> pipeline = new(CreateChunker(), vectorStoreWriter);
List<IngestionResult> ingestionResults = await pipeline.ProcessAsync(CreateReader(), _sampleFiles).ToListAsync();

Assert.Equal(_sampleFiles.Count, ingestionResults.Count);
AssertAllIngestionsSucceeded(ingestionResults);
Expand Down Expand Up @@ -126,10 +126,10 @@ public async Task CanProcessDocumentsInDirectory()
"chunks-dir", TestEmbeddingGenerator<string>.DimensionCount);
using VectorStoreWriter<string, IngestionChunkVectorRecord<string>> vectorStoreWriter = new(collection);

using IngestionPipeline<string> pipeline = new(CreateReader(), CreateChunker(), vectorStoreWriter);
using IngestionPipeline<string> pipeline = new(CreateChunker(), vectorStoreWriter);

DirectoryInfo directory = new("TestFiles");
List<IngestionResult> ingestionResults = await pipeline.ProcessAsync(directory, "*.md").ToListAsync();
List<IngestionResult> ingestionResults = await pipeline.ProcessAsync(CreateReader(), directory, "*.md").ToListAsync();
Assert.Equal(directory.EnumerateFiles("*.md").Count(), ingestionResults.Count);
AssertAllIngestionsSucceeded(ingestionResults);

Expand Down Expand Up @@ -162,10 +162,10 @@ public async Task ChunksCanBeMoreThanJustText()
var collection = testVectorStore.GetIngestionRecordCollection<IngestionChunkVectorRecord<DataContent>, DataContent>(
"chunks-img", TestEmbeddingGenerator<DataContent>.DimensionCount);
using VectorStoreWriter<DataContent, IngestionChunkVectorRecord<DataContent>> vectorStoreWriter = new(collection);
using IngestionPipeline<DataContent> pipeline = new(CreateReader(), new ImageChunker(), vectorStoreWriter);
using IngestionPipeline<DataContent> pipeline = new(new ImageChunker(), vectorStoreWriter);

Assert.False(embeddingGenerator.WasCalled);
var ingestionResults = await pipeline.ProcessAsync(_sampleFiles).ToListAsync();
List<IngestionResult> ingestionResults = await pipeline.ProcessAsync(CreateReader(), _sampleFiles).ToListAsync();
AssertAllIngestionsSucceeded(ingestionResults);

var retrieved = await vectorStoreWriter.VectorStoreCollection
Expand Down Expand Up @@ -215,10 +215,10 @@ public async Task SingleFailureDoesNotTearDownEntirePipeline()
"chunks-fail", TestEmbeddingGenerator<string>.DimensionCount);
using VectorStoreWriter<string, IngestionChunkVectorRecord<string>> vectorStoreWriter = new(collection);

using IngestionPipeline<string> pipeline = new(failingForFirstReader, CreateChunker(), vectorStoreWriter);
using IngestionPipeline<string> pipeline = new(CreateChunker(), vectorStoreWriter);

await Verify(pipeline.ProcessAsync(_sampleFiles));
await Verify(pipeline.ProcessAsync(_sampleDirectory));
await Verify(pipeline.ProcessAsync(failingForFirstReader, _sampleFiles));
await Verify(pipeline.ProcessAsync(failingForFirstReader, _sampleDirectory));

async Task Verify(IAsyncEnumerable<IngestionResult> results)
{
Expand All @@ -235,6 +235,35 @@ async Task Verify(IAsyncEnumerable<IngestionResult> results)
}
}

[Fact]
public async Task CanProcessDocumentsWithoutReader()
{
TestEmbeddingGenerator<string> embeddingGenerator = new();
using InMemoryVectorStore testVectorStore = new(new() { EmbeddingGenerator = embeddingGenerator });

VectorStoreCollection<Guid, IngestionChunkVectorRecord<string>> collection =
testVectorStore.GetIngestionRecordCollection<IngestionChunkVectorRecord<string>, string>(
"chunks-direct", TestEmbeddingGenerator<string>.DimensionCount);
using VectorStoreWriter<string, IngestionChunkVectorRecord<string>> vectorStoreWriter = new(collection);

using IngestionPipeline<string> pipeline = new(CreateChunker(), vectorStoreWriter);

IngestionDocument document = new("doc-1");
IngestionDocumentSection section = new();
section.Elements.Add(new IngestionDocumentHeader("# Hello"));
section.Elements.Add(new IngestionDocumentParagraph("This is a test document created without a reader."));
document.Sections.Add(section);

List<IngestionResult> ingestionResults = await pipeline.ProcessAsync(new[] { document }.ToAsyncEnumerable()).ToListAsync();

Assert.Single(ingestionResults);
IngestionResult result = ingestionResults[0];
Assert.Equal("doc-1", result.DocumentId);
Assert.True(result.Succeeded);
Assert.NotNull(result.Document);
Assert.True(embeddingGenerator.WasCalled, "Embedding generator should have been called.");
}

private static IngestionDocumentReader CreateReader() => new MarkdownReader();

private static IngestionChunker<string> CreateChunker() => new HeaderChunker(new(TiktokenTokenizer.CreateForModel("gpt-4")));
Expand Down
Loading