diff --git a/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/IngestionDocument.cs b/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/IngestionDocument.cs
index 119a4acee91..b66e9b82605 100644
--- a/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/IngestionDocument.cs
+++ b/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/IngestionDocument.cs
@@ -27,6 +27,11 @@ public IngestionDocument(string identifier)
///
public string Identifier { get; }
+ ///
+ /// Gets the exception that occurred while reading the document, if any.
+ ///
+ internal Exception? ReadException { get; init; }
+
///
/// Gets the sections of the document.
///
diff --git a/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/IngestionDocumentReader.cs b/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/IngestionDocumentReader.cs
index cffd56f3c1c..96b47a73441 100644
--- a/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/IngestionDocumentReader.cs
+++ b/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/IngestionDocumentReader.cs
@@ -2,8 +2,10 @@
// The .NET Foundation licenses this file to you under the MIT license.
using System;
+using System.Collections.Generic;
using System.IO;
using System.Net.Mime;
+using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
@@ -46,6 +48,64 @@ public virtual async Task ReadAsync(FileInfo source, string i
return await ReadAsync(stream, identifier, string.IsNullOrEmpty(mediaType) ? GetMediaType(source) : mediaType!, cancellationToken).ConfigureAwait(false);
}
+ ///
+ /// Reads all files in the specified directory that match the given search pattern and option,
+ /// and converts each to an .
+ ///
+ /// The directory to read.
+ /// The search pattern for file selection.
+ /// The search option for directory traversal.
+ /// The token to monitor for cancellation requests.
+ /// An asynchronous sequence of instances.
+ /// is .
+ /// is or empty.
+ public async IAsyncEnumerable ReadAsync(
+ DirectoryInfo directory,
+ string searchPattern = "*.*",
+ SearchOption searchOption = SearchOption.TopDirectoryOnly,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ _ = Throw.IfNull(directory);
+ _ = Throw.IfNullOrEmpty(searchPattern);
+ _ = Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);
+
+ await foreach (var document in ReadAsync(directory.EnumerateFiles(searchPattern, searchOption), cancellationToken).ConfigureAwait(false))
+ {
+ yield return document;
+ }
+ }
+
+ ///
+ /// Reads the specified files and converts each to an .
+ ///
+ /// The files to read.
+ /// The token to monitor for cancellation requests.
+ /// An asynchronous sequence of instances.
+ /// is .
+ public async IAsyncEnumerable ReadAsync(
+ IEnumerable files,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ _ = Throw.IfNull(files);
+
+ foreach (FileInfo file in files)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+
+ IngestionDocument document;
+ try
+ {
+ document = await ReadAsync(file, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex) when (ex is not OperationCanceledException)
+ {
+ document = new IngestionDocument(file.FullName) { ReadException = ex };
+ }
+
+ yield return document;
+ }
+ }
+
///
/// Reads a stream and converts it to an .
///
diff --git a/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/Microsoft.Extensions.DataIngestion.Abstractions.csproj b/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/Microsoft.Extensions.DataIngestion.Abstractions.csproj
index 459bd2cc850..43de72f2d7e 100644
--- a/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/Microsoft.Extensions.DataIngestion.Abstractions.csproj
+++ b/src/Libraries/Microsoft.Extensions.DataIngestion.Abstractions/Microsoft.Extensions.DataIngestion.Abstractions.csproj
@@ -19,6 +19,10 @@
true
+
+
+
+
diff --git a/src/Libraries/Microsoft.Extensions.DataIngestion/DiagnosticsConstants.cs b/src/Libraries/Microsoft.Extensions.DataIngestion/DiagnosticsConstants.cs
index 4251bef6ae3..a32300fcab0 100644
--- a/src/Libraries/Microsoft.Extensions.DataIngestion/DiagnosticsConstants.cs
+++ b/src/Libraries/Microsoft.Extensions.DataIngestion/DiagnosticsConstants.cs
@@ -8,28 +8,13 @@ internal static class DiagnosticsConstants
internal const string ActivitySourceName = "Experimental.Microsoft.Extensions.DataIngestion";
internal const string ErrorTypeTagName = "error.type";
- internal static class ProcessDirectory
+ internal static class ProcessDocument
{
- internal const string ActivityName = "ProcessDirectory";
- internal const string DirectoryPathTagName = "rag.directory.path";
- internal const string SearchPatternTagName = "rag.directory.search.pattern";
- internal const string SearchOptionTagName = "rag.directory.search.option";
- }
-
- internal static class ProcessFiles
- {
- internal const string ActivityName = "ProcessFiles";
- internal const string FileCountTagName = "rag.file.count";
+ internal const string ActivityName = "ProcessDocument";
}
internal static class ProcessSource
{
internal const string DocumentIdTagName = "rag.document.id";
}
-
- internal static class ProcessFile
- {
- internal const string ActivityName = "ProcessFile";
- internal const string FilePathTagName = "rag.file.path";
- }
}
diff --git a/src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipeline.cs b/src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipeline.cs
index 1eeb94058ee..ab13b28efdb 100644
--- a/src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipeline.cs
+++ b/src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipeline.cs
@@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
-using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -24,7 +23,6 @@ namespace Microsoft.Extensions.DataIngestion;
/// The type of the chunk content.
public sealed class IngestionPipeline : IDisposable
{
- private readonly IngestionDocumentReader _reader;
private readonly IngestionChunker _chunker;
private readonly IngestionChunkWriter _writer;
private readonly ActivitySource _activitySource;
@@ -33,19 +31,16 @@ public sealed class IngestionPipeline : IDisposable
///
/// Initializes a new instance of the class.
///
- /// The reader for ingestion documents.
/// The chunker to split documents into chunks.
/// The writer for processing chunks.
/// The options for the ingestion pipeline.
/// The logger factory for creating loggers.
public IngestionPipeline(
- IngestionDocumentReader reader,
IngestionChunker chunker,
IngestionChunkWriter writer,
IngestionPipelineOptions? options = default,
ILoggerFactory? loggerFactory = default)
{
- _reader = Throw.IfNull(reader);
_chunker = Throw.IfNull(chunker);
_writer = Throw.IfNull(writer);
_activitySource = new((options ?? new()).ActivitySourceName);
@@ -70,106 +65,58 @@ public void Dispose()
public IList> ChunkProcessors { get; } = [];
///
- /// Processes all files in the specified directory that match the given search pattern and option.
+ /// Processes the specified documents.
///
- /// The directory to process.
- /// The search pattern for file selection.
- /// The search option for directory traversal.
+ /// The asynchronous sequence of documents to process.
/// The cancellation token for the operation.
- /// A task representing the asynchronous operation.
- public async IAsyncEnumerable ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*",
- SearchOption searchOption = SearchOption.TopDirectoryOnly, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ /// An asynchronous sequence of instances.
+ public async IAsyncEnumerable ProcessAsync(
+ IAsyncEnumerable documents,
+ [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
- Throw.IfNull(directory);
- Throw.IfNullOrEmpty(searchPattern);
- Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);
+ Throw.IfNull(documents);
- using (Activity? rootActivity = _activitySource.StartActivity(ProcessDirectory.ActivityName))
+ await foreach (IngestionDocument document in documents.WithCancellation(cancellationToken).ConfigureAwait(false))
{
- rootActivity?.SetTag(ProcessDirectory.DirectoryPathTagName, directory.FullName)
- .SetTag(ProcessDirectory.SearchPatternTagName, searchPattern)
- .SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString());
- _logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption);
-
- await foreach (var ingestionResult in ProcessAsync(directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false))
+ using (Activity? processDocumentActivity = _activitySource.StartActivity(ProcessDocument.ActivityName))
{
- yield return ingestionResult;
- }
- }
- }
+ processDocumentActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
- ///
- /// Processes the specified files.
- ///
- /// The collection of files to process.
- /// The cancellation token for the operation.
- /// A task representing the asynchronous operation.
- public async IAsyncEnumerable ProcessAsync(IEnumerable files, [EnumeratorCancellation] CancellationToken cancellationToken = default)
- {
- Throw.IfNull(files);
-
- using (Activity? rootActivity = _activitySource.StartActivity(ProcessFiles.ActivityName))
- {
- await foreach (var ingestionResult in ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
- {
- yield return ingestionResult;
- }
- }
- }
-
- private static string GetShortName(object any) => any.GetType().Name;
-
- private static void TraceException(Activity? activity, Exception ex)
- {
- activity?.SetTag(ErrorTypeTagName, ex.GetType().FullName)
- .SetStatus(ActivityStatusCode.Error, ex.Message);
- }
-
- private async IAsyncEnumerable ProcessAsync(IEnumerable files, Activity? rootActivity,
- [EnumeratorCancellation] CancellationToken cancellationToken)
- {
-#if NET
- if (System.Linq.Enumerable.TryGetNonEnumeratedCount(files, out int count))
-#else
- if (files is IReadOnlyCollection { Count: int count })
-#endif
- {
- rootActivity?.SetTag(ProcessFiles.FileCountTagName, count);
- _logger?.LogFileCount(count);
- }
-
- foreach (FileInfo fileInfo in files)
- {
- using (Activity? processFileActivity = _activitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default))
- {
- processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName);
- _logger?.ReadingFile(fileInfo.FullName, GetShortName(_reader));
+ if (document.ReadException is not null)
+ {
+ TraceException(processDocumentActivity, document.ReadException);
+ _logger?.IngestingFailed(document.ReadException, document.Identifier);
+ yield return new IngestionResult(document.Identifier, null, document.ReadException);
+ continue;
+ }
- IngestionDocument? document = null;
+ IngestionDocument? processedDocument = null;
Exception? failure = null;
try
{
- document = await _reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false);
-
- processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
- _logger?.ReadDocument(document.Identifier);
-
- document = await IngestAsync(document, processFileActivity, cancellationToken).ConfigureAwait(false);
+ processedDocument = await IngestAsync(document, processDocumentActivity, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
- TraceException(processFileActivity, ex);
- _logger?.IngestingFailed(ex, document?.Identifier ?? fileInfo.FullName);
+ TraceException(processDocumentActivity, ex);
+ _logger?.IngestingFailed(ex, document.Identifier);
failure = ex;
}
- string documentId = document?.Identifier ?? fileInfo.FullName;
- yield return new IngestionResult(documentId, document, failure);
+ yield return new IngestionResult(document.Identifier, processedDocument, failure);
}
}
}
+ private static string GetShortName(object any) => any.GetType().Name;
+
+ private static void TraceException(Activity? activity, Exception ex)
+ {
+ activity?.SetTag(ErrorTypeTagName, ex.GetType().FullName)
+ .SetStatus(ActivityStatusCode.Error, ex.Message);
+ }
+
private async Task IngestAsync(IngestionDocument document, Activity? parentActivity, CancellationToken cancellationToken)
{
foreach (IngestionDocumentProcessor processor in DocumentProcessors)
diff --git a/src/Libraries/Microsoft.Extensions.DataIngestion/Log.cs b/src/Libraries/Microsoft.Extensions.DataIngestion/Log.cs
index 58732e8ead7..350de10e24b 100644
--- a/src/Libraries/Microsoft.Extensions.DataIngestion/Log.cs
+++ b/src/Libraries/Microsoft.Extensions.DataIngestion/Log.cs
@@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.
using System;
-using System.IO;
using Microsoft.Extensions.Logging;
#pragma warning disable S109 // Magic numbers should not be used
@@ -11,18 +10,6 @@ namespace Microsoft.Extensions.DataIngestion
{
internal static partial class Log
{
- [LoggerMessage(0, LogLevel.Information, "Starting to process files in directory '{directory}' with search pattern '{searchPattern}' and search option '{searchOption}'.")]
- internal static partial void ProcessingDirectory(this ILogger logger, string directory, string searchPattern, System.IO.SearchOption searchOption);
-
- [LoggerMessage(1, LogLevel.Information, "Processing {fileCount} files.")]
- internal static partial void LogFileCount(this ILogger logger, int fileCount);
-
- [LoggerMessage(2, LogLevel.Information, "Reading file '{filePath}' using '{reader}'.")]
- internal static partial void ReadingFile(this ILogger logger, string filePath, string reader);
-
- [LoggerMessage(3, LogLevel.Information, "Read document '{documentId}'.")]
- internal static partial void ReadDocument(this ILogger logger, string documentId);
-
[LoggerMessage(4, LogLevel.Information, "Writing chunks using {writer}.")]
internal static partial void WritingChunks(this ILogger logger, string writer);
diff --git a/src/Libraries/Microsoft.Extensions.DataIngestion/README.md b/src/Libraries/Microsoft.Extensions.DataIngestion/README.md
index 030ac8da43b..2549f88dfe2 100644
--- a/src/Libraries/Microsoft.Extensions.DataIngestion/README.md
+++ b/src/Libraries/Microsoft.Extensions.DataIngestion/README.md
@@ -29,9 +29,59 @@ Or directly in the C# project file:
```
-## Writing chunks to a vector store
+## Creating and using an ingestion pipeline
-### Basic usage
+### Basic usage for reading from directory
+
+Use `IngestionDocumentReader.ReadAsync` to read documents from files or a directory, and pass the result to `IngestionPipeline.ProcessAsync`:
+
+```csharp
+VectorStoreCollection> collection =
+ vectorStore.GetIngestionRecordCollection("chunks", dimensionCount: 1536);
+
+using VectorStoreWriter> writer = new(collection);
+using IngestionPipeline pipeline = new(chunker, writer);
+
+// Read from a directory and ingest all matching files
+MarkdownReader reader = new();
+await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, "*.md")))
+{
+ Console.WriteLine($"Processed '{result.DocumentId}': {(result.Succeeded ? "success" : "failed")}");
+}
+```
+
+### Reading from a list of files
+
+```csharp
+IEnumerable files = [ new FileInfo("doc1.md"), new FileInfo("doc2.md") ];
+await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(files)))
+{
+ Console.WriteLine($"Processed '{result.DocumentId}': {(result.Succeeded ? "success" : "failed")}");
+}
+```
+
+### Using the pipeline without a reader
+
+You can also create documents directly and pass them to the pipeline without using a reader:
+
+```csharp
+async IAsyncEnumerable GetDocumentsAsync()
+{
+ var document = new IngestionDocument("my-document-id");
+ document.Sections.Add(new IngestionDocumentSection
+ {
+ Elements = { new IngestionDocumentParagraph("Document content goes here.") }
+ });
+ yield return document;
+}
+
+await foreach (var result in pipeline.ProcessAsync(GetDocumentsAsync()))
+{
+ Console.WriteLine($"Processed '{result.DocumentId}': {(result.Succeeded ? "success" : "failed")}");
+}
+```
+
+### Basic usage for writing to a vector store
The simplest way to store ingestion chunks in a vector store is to use the `GetIngestionRecordCollection` extension method to create a collection, and then pass it to a `VectorStoreWriter`:
diff --git a/src/ProjectTemplates/Microsoft.Extensions.AI.Templates/templates/AIChatWeb-CSharp/AIChatWeb-CSharp.Web/Services/Ingestion/DataIngestor.cs b/src/ProjectTemplates/Microsoft.Extensions.AI.Templates/templates/AIChatWeb-CSharp/AIChatWeb-CSharp.Web/Services/Ingestion/DataIngestor.cs
index 76168b6e632..186ee1e5e5e 100644
--- a/src/ProjectTemplates/Microsoft.Extensions.AI.Templates/templates/AIChatWeb-CSharp/AIChatWeb-CSharp.Web/Services/Ingestion/DataIngestor.cs
+++ b/src/ProjectTemplates/Microsoft.Extensions.AI.Templates/templates/AIChatWeb-CSharp/AIChatWeb-CSharp.Web/Services/Ingestion/DataIngestor.cs
@@ -20,12 +20,12 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
});
using var pipeline = new IngestionPipeline(
- reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);
- await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
+ var reader = new DocumentReader(directory);
+ await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, searchPattern)))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}
diff --git a/test/Libraries/Microsoft.Extensions.DataIngestion.Tests/IngestionPipelineTests.cs b/test/Libraries/Microsoft.Extensions.DataIngestion.Tests/IngestionPipelineTests.cs
index e865ff39d9b..c62a7f650de 100644
--- a/test/Libraries/Microsoft.Extensions.DataIngestion.Tests/IngestionPipelineTests.cs
+++ b/test/Libraries/Microsoft.Extensions.DataIngestion.Tests/IngestionPipelineTests.cs
@@ -90,8 +90,9 @@ public async Task CanProcessDocuments()
"chunks", TestEmbeddingGenerator.DimensionCount);
using VectorStoreWriter> vectorStoreWriter = new(collection);
- using IngestionPipeline pipeline = new(CreateReader(), CreateChunker(), vectorStoreWriter);
- List ingestionResults = await pipeline.ProcessAsync(_sampleFiles).ToListAsync();
+ IngestionDocumentReader reader = CreateReader();
+ using IngestionPipeline pipeline = new(CreateChunker(), vectorStoreWriter);
+ List ingestionResults = await pipeline.ProcessAsync(reader.ReadAsync(_sampleFiles)).ToListAsync();
Assert.Equal(_sampleFiles.Count, ingestionResults.Count);
AssertAllIngestionsSucceeded(ingestionResults);
@@ -110,7 +111,7 @@ public async Task CanProcessDocuments()
Assert.Contains(retrieved[i].DocumentId, _sampleFiles.Select(info => info.FullName));
}
- AssertActivities(activities, "ProcessFiles");
+ AssertActivities(activities);
}
[Fact]
@@ -126,10 +127,11 @@ public async Task CanProcessDocumentsInDirectory()
"chunks-dir", TestEmbeddingGenerator.DimensionCount);
using VectorStoreWriter> vectorStoreWriter = new(collection);
- using IngestionPipeline pipeline = new(CreateReader(), CreateChunker(), vectorStoreWriter);
+ IngestionDocumentReader reader = CreateReader();
+ using IngestionPipeline pipeline = new(CreateChunker(), vectorStoreWriter);
DirectoryInfo directory = new("TestFiles");
- List ingestionResults = await pipeline.ProcessAsync(directory, "*.md").ToListAsync();
+ List ingestionResults = await pipeline.ProcessAsync(reader.ReadAsync(directory, "*.md")).ToListAsync();
Assert.Equal(directory.EnumerateFiles("*.md").Count(), ingestionResults.Count);
AssertAllIngestionsSucceeded(ingestionResults);
@@ -147,7 +149,7 @@ public async Task CanProcessDocumentsInDirectory()
Assert.StartsWith(directory.FullName, retrieved[i].DocumentId);
}
- AssertActivities(activities, "ProcessDirectory");
+ AssertActivities(activities);
}
[Fact]
@@ -162,10 +164,12 @@ public async Task ChunksCanBeMoreThanJustText()
var collection = testVectorStore.GetIngestionRecordCollection, DataContent>(
"chunks-img", TestEmbeddingGenerator.DimensionCount);
using VectorStoreWriter> vectorStoreWriter = new(collection);
- using IngestionPipeline pipeline = new(CreateReader(), new ImageChunker(), vectorStoreWriter);
+
+ IngestionDocumentReader reader = CreateReader();
+ using IngestionPipeline pipeline = new(new ImageChunker(), vectorStoreWriter);
Assert.False(embeddingGenerator.WasCalled);
- var ingestionResults = await pipeline.ProcessAsync(_sampleFiles).ToListAsync();
+ var ingestionResults = await pipeline.ProcessAsync(reader.ReadAsync(_sampleFiles)).ToListAsync();
AssertAllIngestionsSucceeded(ingestionResults);
var retrieved = await vectorStoreWriter.VectorStoreCollection
@@ -180,7 +184,7 @@ public async Task ChunksCanBeMoreThanJustText()
Assert.EndsWith(_withImage.Name, retrieved[i].DocumentId);
}
- AssertActivities(activities, "ProcessFiles");
+ AssertActivities(activities);
}
internal class ImageChunker : IngestionChunker
@@ -215,10 +219,10 @@ public async Task SingleFailureDoesNotTearDownEntirePipeline()
"chunks-fail", TestEmbeddingGenerator.DimensionCount);
using VectorStoreWriter> vectorStoreWriter = new(collection);
- using IngestionPipeline pipeline = new(failingForFirstReader, CreateChunker(), vectorStoreWriter);
+ using IngestionPipeline pipeline = new(CreateChunker(), vectorStoreWriter);
- await Verify(pipeline.ProcessAsync(_sampleFiles));
- await Verify(pipeline.ProcessAsync(_sampleDirectory));
+ await Verify(pipeline.ProcessAsync(failingForFirstReader.ReadAsync(_sampleFiles)));
+ await Verify(pipeline.ProcessAsync(failingForFirstReader.ReadAsync(_sampleDirectory)));
async Task Verify(IAsyncEnumerable results)
{
@@ -235,6 +239,72 @@ async Task Verify(IAsyncEnumerable results)
}
}
+ internal sealed class FailingDocumentProcessor(Func action) : IngestionDocumentProcessor
+ {
+ public override async Task ProcessAsync(IngestionDocument document, CancellationToken cancellationToken = default)
+ {
+ await action();
+ return document;
+ }
+ }
+
+ [Fact]
+ public async Task SingleIngestionFailureDoesNotTearDownEntirePipeline()
+ {
+ int failed = 0;
+ IngestionDocumentReader reader = CreateReader();
+
+ List activities = [];
+ using TracerProvider tracerProvider = CreateTraceProvider(activities);
+
+ TestEmbeddingGenerator embeddingGenerator = new();
+ using InMemoryVectorStore testVectorStore = new(new() { EmbeddingGenerator = embeddingGenerator });
+
+ var collection = testVectorStore.GetIngestionRecordCollection, string>(
+ "chunks-ingest-fail", TestEmbeddingGenerator.DimensionCount);
+ using VectorStoreWriter> vectorStoreWriter = new(collection);
+
+ using IngestionPipeline pipeline = new(CreateChunker(), vectorStoreWriter);
+ pipeline.DocumentProcessors.Add(new FailingDocumentProcessor(() => failed++ == 0
+ ? throw new ExpectedException()
+ : Task.CompletedTask));
+
+ List ingestionResults = await pipeline.ProcessAsync(reader.ReadAsync(_sampleFiles)).ToListAsync();
+
+ Assert.Equal(_sampleFiles.Count, ingestionResults.Count);
+ Assert.All(ingestionResults, result => Assert.NotEmpty(result.DocumentId));
+ IngestionResult ingestionResult = Assert.Single(ingestionResults.Where(result => !result.Succeeded));
+ Assert.IsType(ingestionResult.Exception);
+ AssertErrorActivities(activities, expectedFailedActivitiesCount: 1);
+ }
+
+ [Fact]
+ public async Task CanProcessDocumentsWithoutReader()
+ {
+ TestEmbeddingGenerator embeddingGenerator = new();
+ using InMemoryVectorStore testVectorStore = new(new() { EmbeddingGenerator = embeddingGenerator });
+
+ var collection = testVectorStore.GetIngestionRecordCollection, string>(
+ "chunks-no-reader", TestEmbeddingGenerator.DimensionCount);
+ using VectorStoreWriter> vectorStoreWriter = new(collection);
+
+ using IngestionPipeline pipeline = new(CreateChunker(), vectorStoreWriter);
+
+ // Create a document directly without using a reader.
+ IngestionDocument document = new("my-document-id");
+ document.Sections.Add(new IngestionDocumentSection
+ {
+ Elements = { new IngestionDocumentParagraph("This is a test paragraph for direct ingestion.") }
+ });
+
+ List ingestionResults = await pipeline.ProcessAsync(new[] { document }.ToAsyncEnumerable()).ToListAsync();
+
+ Assert.Single(ingestionResults);
+ Assert.True(ingestionResults[0].Succeeded);
+ Assert.Equal("my-document-id", ingestionResults[0].DocumentId);
+ Assert.True(embeddingGenerator.WasCalled, "Embedding generator should have been called.");
+ }
+
private static IngestionDocumentReader CreateReader() => new MarkdownReader();
private static IngestionChunker CreateChunker() => new HeaderChunker(new(TiktokenTokenizer.CreateForModel("gpt-4")));
@@ -255,12 +325,11 @@ private static void AssertAllIngestionsSucceeded(List ingestion
Assert.All(ingestionResults, result => Assert.Null(result.Exception));
}
- private static void AssertActivities(List activities, string rootActivityName)
+ private static void AssertActivities(List activities)
{
Assert.NotEmpty(activities);
Assert.All(activities, a => Assert.Equal("Experimental.Microsoft.Extensions.DataIngestion", a.Source.Name));
- Assert.Single(activities, a => a.OperationName == rootActivityName);
- Assert.Contains(activities, a => a.OperationName == "ProcessFile");
+ Assert.Contains(activities, a => a.OperationName == "ProcessDocument");
}
private static void AssertErrorActivities(List activities, int expectedFailedActivitiesCount)
diff --git a/test/Libraries/Microsoft.Extensions.DataIngestion.Tests/Readers/MarkItDownMcpReaderTests.cs b/test/Libraries/Microsoft.Extensions.DataIngestion.Tests/Readers/MarkItDownMcpReaderTests.cs
index 37142f8b20e..b1af4584a9d 100644
--- a/test/Libraries/Microsoft.Extensions.DataIngestion.Tests/Readers/MarkItDownMcpReaderTests.cs
+++ b/test/Libraries/Microsoft.Extensions.DataIngestion.Tests/Readers/MarkItDownMcpReaderTests.cs
@@ -34,7 +34,7 @@ public async Task ReadAsync_ThrowsWhenSourceIsNull()
{
var reader = new MarkItDownMcpReader(new Uri("http://localhost:3001/sse"));
- await Assert.ThrowsAsync("source", async () => await reader.ReadAsync(null!, "identifier"));
+ await Assert.ThrowsAsync("source", async () => await reader.ReadAsync((FileInfo)null!, "identifier"));
await Assert.ThrowsAsync("source", async () => await reader.ReadAsync((Stream)null!, "identifier", "mediaType"));
}
diff --git a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.A.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.A.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
index 61088b1225d..27517699fd4 100644
--- a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.A.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
+++ b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.A.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
@@ -20,12 +20,12 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
});
using var pipeline = new IngestionPipeline(
- reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);
- await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
+ var reader = new DocumentReader(directory);
+ await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, searchPattern)))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}
diff --git a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.A_aoai_aais.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.A_aoai_aais.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
index 61088b1225d..27517699fd4 100644
--- a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.A_aoai_aais.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
+++ b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.A_aoai_aais.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
@@ -20,12 +20,12 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
});
using var pipeline = new IngestionPipeline(
- reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);
- await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
+ var reader = new DocumentReader(directory);
+ await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, searchPattern)))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}
diff --git a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb._defaults.verified/aichatweb/Services/Ingestion/DataIngestor.cs b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb._defaults.verified/aichatweb/Services/Ingestion/DataIngestor.cs
index b4675927d47..1bb7e2a65a7 100644
--- a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb._defaults.verified/aichatweb/Services/Ingestion/DataIngestor.cs
+++ b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb._defaults.verified/aichatweb/Services/Ingestion/DataIngestor.cs
@@ -20,12 +20,12 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
});
using var pipeline = new IngestionPipeline(
- reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);
- await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
+ var reader = new DocumentReader(directory);
+ await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, searchPattern)))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}
diff --git a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.o_q.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.o_q.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
index 61088b1225d..27517699fd4 100644
--- a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.o_q.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
+++ b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.o_q.verified/aichatweb/aichatweb.Web/Services/Ingestion/DataIngestor.cs
@@ -20,12 +20,12 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
});
using var pipeline = new IngestionPipeline(
- reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);
- await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
+ var reader = new DocumentReader(directory);
+ await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, searchPattern)))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}
diff --git a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.oai_aais.verified/aichatweb/Services/Ingestion/DataIngestor.cs b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.oai_aais.verified/aichatweb/Services/Ingestion/DataIngestor.cs
index b4675927d47..1bb7e2a65a7 100644
--- a/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.oai_aais.verified/aichatweb/Services/Ingestion/DataIngestor.cs
+++ b/test/ProjectTemplates/Microsoft.Extensions.AI.Templates.IntegrationTests/Snapshots/aichatweb/aichatweb.oai_aais.verified/aichatweb/Services/Ingestion/DataIngestor.cs
@@ -20,12 +20,12 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
});
using var pipeline = new IngestionPipeline(
- reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);
- await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
+ var reader = new DocumentReader(directory);
+ await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, searchPattern)))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}