Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Mime;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Shared.Diagnostics;
Expand Down Expand Up @@ -46,6 +48,53 @@ public virtual async Task<IngestionDocument> ReadAsync(FileInfo source, string i
return await ReadAsync(stream, identifier, string.IsNullOrEmpty(mediaType) ? GetMediaType(source) : mediaType!, cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Reads all files in the specified directory that match the given search pattern and option,
/// and converts each to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="directory">The directory to read.</param>
/// <param name="searchPattern">The search pattern for file selection.</param>
/// <param name="searchOption">The search option for directory traversal.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An asynchronous sequence of <see cref="IngestionDocument"/> instances.</returns>
/// <exception cref="ArgumentNullException"><paramref name="directory"/> is <see langword="null"/>.</exception>
/// <exception cref="ArgumentNullException"><paramref name="searchPattern"/> is <see langword="null"/> or empty.</exception>
public async IAsyncEnumerable<IngestionDocument> ReadAsync(
DirectoryInfo directory,
string searchPattern = "*.*",
SearchOption searchOption = SearchOption.TopDirectoryOnly,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(directory);
_ = Throw.IfNullOrEmpty(searchPattern);
_ = Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);

await foreach (var document in ReadAsync(directory.EnumerateFiles(searchPattern, searchOption), cancellationToken).ConfigureAwait(false))
{
yield return document;
}
}

/// <summary>
/// Reads the specified files and converts each to an <see cref="IngestionDocument"/>.
/// </summary>
/// <param name="files">The files to read.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>An asynchronous sequence of <see cref="IngestionDocument"/> instances.</returns>
/// <exception cref="ArgumentNullException"><paramref name="files"/> is <see langword="null"/>.</exception>
public async IAsyncEnumerable<IngestionDocument> ReadAsync(
IEnumerable<FileInfo> files,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(files);

foreach (FileInfo file in files)
{
cancellationToken.ThrowIfCancellationRequested();
yield return await ReadAsync(file, cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
/// Reads a stream and converts it to an <see cref="IngestionDocument"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,13 @@ internal static class DiagnosticsConstants
internal const string ActivitySourceName = "Experimental.Microsoft.Extensions.DataIngestion";
internal const string ErrorTypeTagName = "error.type";

internal static class ProcessDirectory
internal static class ProcessDocument
{
internal const string ActivityName = "ProcessDirectory";
internal const string DirectoryPathTagName = "rag.directory.path";
internal const string SearchPatternTagName = "rag.directory.search.pattern";
internal const string SearchOptionTagName = "rag.directory.search.option";
}

internal static class ProcessFiles
{
internal const string ActivityName = "ProcessFiles";
internal const string FileCountTagName = "rag.file.count";
internal const string ActivityName = "ProcessDocument";
}

internal static class ProcessSource
{
internal const string DocumentIdTagName = "rag.document.id";
}

internal static class ProcessFile
{
internal const string ActivityName = "ProcessFile";
internal const string FilePathTagName = "rag.file.path";
}
}
131 changes: 50 additions & 81 deletions src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -17,14 +16,14 @@ namespace Microsoft.Extensions.DataIngestion;
#pragma warning disable IDE0058 // Expression value is never used
#pragma warning disable IDE0063 // Use simple 'using' statement
#pragma warning disable CA1031 // Do not catch general exception types
#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task

/// <summary>
/// Represents a pipeline for ingesting data from documents and processing it into chunks.
/// </summary>
/// <typeparam name="T">The type of the chunk content.</typeparam>
public sealed class IngestionPipeline<T> : IDisposable
{
private readonly IngestionDocumentReader _reader;
private readonly IngestionChunker<T> _chunker;
private readonly IngestionChunkWriter<T> _writer;
private readonly ActivitySource _activitySource;
Expand All @@ -33,19 +32,16 @@ public sealed class IngestionPipeline<T> : IDisposable
/// <summary>
/// Initializes a new instance of the <see cref="IngestionPipeline{T}"/> class.
/// </summary>
/// <param name="reader">The reader for ingestion documents.</param>
/// <param name="chunker">The chunker to split documents into chunks.</param>
/// <param name="writer">The writer for processing chunks.</param>
/// <param name="options">The options for the ingestion pipeline.</param>
/// <param name="loggerFactory">The logger factory for creating loggers.</param>
public IngestionPipeline(
IngestionDocumentReader reader,
IngestionChunker<T> chunker,
IngestionChunkWriter<T> writer,
IngestionPipelineOptions? options = default,
ILoggerFactory? loggerFactory = default)
{
_reader = Throw.IfNull(reader);
_chunker = Throw.IfNull(chunker);
_writer = Throw.IfNull(writer);
_activitySource = new((options ?? new()).ActivitySourceName);
Expand All @@ -70,106 +66,79 @@ public void Dispose()
public IList<IngestionChunkProcessor<T>> ChunkProcessors { get; } = [];

/// <summary>
/// Processes all files in the specified directory that match the given search pattern and option.
/// Processes the specified documents.
/// </summary>
/// <param name="directory">The directory to process.</param>
/// <param name="searchPattern">The search pattern for file selection.</param>
/// <param name="searchOption">The search option for directory traversal.</param>
/// <param name="documents">The asynchronous sequence of documents to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*",
SearchOption searchOption = SearchOption.TopDirectoryOnly, [EnumeratorCancellation] CancellationToken cancellationToken = default)
/// <returns>An asynchronous sequence of <see cref="IngestionResult"/> instances.</returns>
public async IAsyncEnumerable<IngestionResult> ProcessAsync(
IAsyncEnumerable<IngestionDocument> documents,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(directory);
Throw.IfNullOrEmpty(searchPattern);
Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);
Throw.IfNull(documents);

using (Activity? rootActivity = _activitySource.StartActivity(ProcessDirectory.ActivityName))
await using IAsyncEnumerator<IngestionDocument> enumerator = documents.GetAsyncEnumerator(cancellationToken);
while (true)
{
rootActivity?.SetTag(ProcessDirectory.DirectoryPathTagName, directory.FullName)
.SetTag(ProcessDirectory.SearchPatternTagName, searchPattern)
.SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString());
_logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption);
IngestionDocument? document = null;
Exception? fetchException = null;

await foreach (var ingestionResult in ProcessAsync(directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false))
try
{
yield return ingestionResult;
}
}
}

/// <summary>
/// Processes the specified files.
/// </summary>
/// <param name="files">The collection of files to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(files);
if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
{
break;
}

using (Activity? rootActivity = _activitySource.StartActivity(ProcessFiles.ActivityName))
{
await foreach (var ingestionResult in ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
document = enumerator.Current;
}
catch (OperationCanceledException)
{
yield return ingestionResult;
throw;
}
}
}

private static string GetShortName(object any) => any.GetType().Name;

private static void TraceException(Activity? activity, Exception ex)
{
activity?.SetTag(ErrorTypeTagName, ex.GetType().FullName)
.SetStatus(ActivityStatusCode.Error, ex.Message);
}

private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, Activity? rootActivity,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
#if NET
if (System.Linq.Enumerable.TryGetNonEnumeratedCount(files, out int count))
#else
if (files is IReadOnlyCollection<FileInfo> { Count: int count })
#endif
{
rootActivity?.SetTag(ProcessFiles.FileCountTagName, count);
_logger?.LogFileCount(count);
}

foreach (FileInfo fileInfo in files)
{
using (Activity? processFileActivity = _activitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default))
catch (Exception ex)
{
processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName);
_logger?.ReadingFile(fileInfo.FullName, GetShortName(_reader));
fetchException = ex;
}

IngestionDocument? document = null;
Exception? failure = null;
try
using (Activity? processDocumentActivity = _activitySource.StartActivity(ProcessDocument.ActivityName))
{
if (fetchException is not null)
{
document = await _reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false);
TraceException(processDocumentActivity, fetchException);
_logger?.IngestingFailed(fetchException, "unknown");
yield return new IngestionResult("unknown", null, fetchException);
yield break; // Enumerator is in a faulted state and cannot produce more items
}

processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
_logger?.ReadDocument(document.Identifier);
processDocumentActivity?.SetTag(ProcessSource.DocumentIdTagName, document!.Identifier);

document = await IngestAsync(document, processFileActivity, cancellationToken).ConfigureAwait(false);
IngestionDocument? processedDocument = null;
Exception? ingestException = null;
try
{
processedDocument = await IngestAsync(document!, processDocumentActivity, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
TraceException(processFileActivity, ex);
_logger?.IngestingFailed(ex, document?.Identifier ?? fileInfo.FullName);

failure = ex;
TraceException(processDocumentActivity, ex);
_logger?.IngestingFailed(ex, document!.Identifier);
ingestException = ex;
}

string documentId = document?.Identifier ?? fileInfo.FullName;
yield return new IngestionResult(documentId, document, failure);
yield return new IngestionResult(document!.Identifier, processedDocument, ingestException);
}
}
}

private static string GetShortName(object any) => any.GetType().Name;

private static void TraceException(Activity? activity, Exception ex)
{
activity?.SetTag(ErrorTypeTagName, ex.GetType().FullName)
.SetStatus(ActivityStatusCode.Error, ex.Message);
}

private async Task<IngestionDocument> IngestAsync(IngestionDocument document, Activity? parentActivity, CancellationToken cancellationToken)
{
foreach (IngestionDocumentProcessor processor in DocumentProcessors)
Expand Down
13 changes: 0 additions & 13 deletions src/Libraries/Microsoft.Extensions.DataIngestion/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using Microsoft.Extensions.Logging;

#pragma warning disable S109 // Magic numbers should not be used
Expand All @@ -11,18 +10,6 @@ namespace Microsoft.Extensions.DataIngestion
{
internal static partial class Log
{
[LoggerMessage(0, LogLevel.Information, "Starting to process files in directory '{directory}' with search pattern '{searchPattern}' and search option '{searchOption}'.")]
internal static partial void ProcessingDirectory(this ILogger logger, string directory, string searchPattern, System.IO.SearchOption searchOption);

[LoggerMessage(1, LogLevel.Information, "Processing {fileCount} files.")]
internal static partial void LogFileCount(this ILogger logger, int fileCount);

[LoggerMessage(2, LogLevel.Information, "Reading file '{filePath}' using '{reader}'.")]
internal static partial void ReadingFile(this ILogger logger, string filePath, string reader);

[LoggerMessage(3, LogLevel.Information, "Read document '{documentId}'.")]
internal static partial void ReadDocument(this ILogger logger, string documentId);

[LoggerMessage(4, LogLevel.Information, "Writing chunks using {writer}.")]
internal static partial void WritingChunks(this ILogger logger, string writer);

Expand Down
54 changes: 52 additions & 2 deletions src/Libraries/Microsoft.Extensions.DataIngestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,59 @@ Or directly in the C# project file:
</ItemGroup>
```

## Writing chunks to a vector store
## Creating and using an ingestion pipeline

### Basic usage
### Basic usage for reading from directory

Use `IngestionDocumentReader.ReadAsync` to read documents from files or a directory, and pass the result to `IngestionPipeline.ProcessAsync`:

```csharp
VectorStoreCollection<Guid, IngestionChunkVectorRecord<string>> collection =
vectorStore.GetIngestionRecordCollection("chunks", dimensionCount: 1536);

using VectorStoreWriter<string, IngestionChunkVectorRecord<string>> writer = new(collection);
using IngestionPipeline<string> pipeline = new(chunker, writer);

// Read from a directory and ingest all matching files
MarkdownReader reader = new();
await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, "*.md")))
{
Console.WriteLine($"Processed '{result.DocumentId}': {(result.Succeeded ? "success" : "failed")}");
}
```

### Reading from a list of files

```csharp
IEnumerable<FileInfo> files = [ new FileInfo("doc1.md"), new FileInfo("doc2.md") ];
await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(files)))
{
Console.WriteLine($"Processed '{result.DocumentId}': {(result.Succeeded ? "success" : "failed")}");
}
```

### Using the pipeline without a reader

You can also create documents directly and pass them to the pipeline without using a reader:

```csharp
async IAsyncEnumerable<IngestionDocument> GetDocumentsAsync()
{
var document = new IngestionDocument("my-document-id");
document.Sections.Add(new IngestionDocumentSection
{
Elements = { new IngestionDocumentParagraph("Document content goes here.") }
});
yield return document;
}

await foreach (var result in pipeline.ProcessAsync(GetDocumentsAsync()))
{
Console.WriteLine($"Processed '{result.DocumentId}': {(result.Succeeded ? "success" : "failed")}");
}
```

### Basic usage for writing to a vector store

The simplest way to store ingestion chunks in a vector store is to use the `GetIngestionRecordCollection` extension method to create a collection, and then pass it to a `VectorStoreWriter`:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
});

using var pipeline = new IngestionPipeline<string>(
reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);

await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
var reader = new DocumentReader(directory);
await foreach (var result in pipeline.ProcessAsync(reader.ReadAsync(directory, searchPattern)))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}
Expand Down
Loading
Loading