diff --git a/src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs b/src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs index 762ee3e5770..d1d78f9889b 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs @@ -43,6 +43,7 @@ public enum ChunkVersions : byte { private static readonly ILogger Log = Serilog.Log.ForContext(); public bool IsReadOnly { + //qq EnsureInitializedAsync get { return Interlocked.CompareExchange(ref _isReadOnly, 0, 0) == 1; } set { Interlocked.Exchange(ref _isReadOnly, value ? 1 : 0); } } @@ -52,15 +53,18 @@ public bool IsCached { } // if inMem, _handle is null but not remote + //qq EnsureInitializedAsync public bool IsRemote => _handle is not null and not ChunkFileHandle; // the logical size of (untransformed) data (could be > PhysicalDataSize if scavenged chunk) public long LogicalDataSize { + //qq EnsureInitializedAsync get { return Interlocked.Read(ref _logicalDataSize); } } // the physical size of (untransformed) data public int PhysicalDataSize { + //qq EnsureInitializedAsync get { return _physicalDataSize; } } @@ -69,18 +73,22 @@ public string FileName { } public int FileSize { + //qq EnsureInitializedAsync get { return _fileSize; } } public ChunkHeader ChunkHeader { + //qq EnsureInitializedAsync get { return _chunkHeader; } } public ChunkFooter ChunkFooter { + //qq EnsureInitializedAsync get { return _chunkFooter; } } public ChunkInfo ChunkInfo { + //qq EnsureInitializedAsync get => new() { ChunkFileName = _filename, ChunkStartNumber = _chunkHeader.ChunkStartNumber, @@ -92,6 +100,7 @@ public ChunkInfo ChunkInfo { } public ReadOnlyMemory TransformHeader { + //qq EnsureInitializedAsync get { return _transformHeader; } } @@ -99,6 +108,7 @@ public ReadOnlyMemory TransformHeader { public int RawWriterPosition { get { + // no need for EnsureInitializedAsync because only completed chunks are initialized asynchronously return (int)(_writerWorkItem?.WorkingStream.Position ?? throw new InvalidOperationException(string.Format("TFChunk {0} is not in write mode.", _filename))); } @@ -173,12 +183,17 @@ private enum CacheStatus { private IChunkTransform _transform; private ReadOnlyMemory _transformHeader; + private int _initialized; + private Func _initialize; + private TFChunk(string filename, int midpointsDepth, bool inMem, bool unbuffered, bool writethrough, - bool reduceFileCachePressure) { + bool reduceFileCachePressure, + Func initialize) { + Ensure.NotNullOrEmpty(filename, "filename"); Ensure.Nonnegative(midpointsDepth, "midpointsDepth"); @@ -188,6 +203,7 @@ private TFChunk(string filename, _unbuffered = unbuffered; _writeThrough = writethrough; _reduceFileCachePressure = reduceFileCachePressure; + _initialize = initialize; _memStreams = new(); _fileStreams = new(); @@ -202,20 +218,23 @@ private TFChunk(string filename, } // local or remote - public static async ValueTask FromCompletedFile(IBlobFileSystem fileSystem, string filename, bool verifyHash, bool unbufferedRead, + public static ValueTask FromCompletedFile(IBlobFileSystem fileSystem, string filename, bool verifyHash, bool unbufferedRead, ITransactionFileTracker tracker, Func getTransformFactory, bool reduceFileCachePressure = false, CancellationToken token = default) { - var chunk = new TFChunk(filename, - TFConsts.MidpointsDepth, false, unbufferedRead, false, reduceFileCachePressure); try { - await chunk.InitCompleted(fileSystem, verifyHash, tracker, getTransformFactory, token); - } catch { - chunk.Dispose(); - throw; + //qq do we want any different behaviour here between local and remote completed chunks? + var chunk = new TFChunk(filename, + TFConsts.MidpointsDepth, false, unbufferedRead, false, reduceFileCachePressure, + initialize: (chunk, token) => + chunk.InitCompleted(fileSystem, verifyHash, tracker, getTransformFactory, token)) { + IsReadOnly = true + }; + + return new(chunk); + } catch (Exception ex) { + return ValueTask.FromException(ex); } - - return chunk; } // always local @@ -228,7 +247,8 @@ public static async ValueTask FromOngoingFile(string filename, int writ false, unbuffered, writethrough, - reduceFileCachePressure); + reduceFileCachePressure, + initialize: static (_, _) => ValueTask.CompletedTask); try { await chunk.InitOngoing(writePosition, tracker, getTransformFactory, token); } catch { @@ -285,7 +305,8 @@ public static async ValueTask CreateWithHeader(string filename, inMem, unbuffered, writethrough, - reduceFileCachePressure); + reduceFileCachePressure, + initialize: static (_, _) => ValueTask.CompletedTask); try { await chunk.InitNew(header, fileSize, tracker, transformFactory, transformHeader, token); } catch { @@ -301,8 +322,6 @@ private async ValueTask InitCompleted(IBlobFileSystem fileSystem, bool verifyHas _handle = await fileSystem.OpenForReadAsync(_filename, _reduceFileCachePressure, token); _fileSize = (int)_handle.Length; - IsReadOnly = true; - await using (var stream = _handle.CreateStream()) { _chunkHeader = await ReadHeader(stream, token); Log.Debug("Opened completed {chunk} as version {version} (min. compatible version: {minCompatibleVersion})", _filename, _chunkHeader.Version, _chunkHeader.MinCompatibleVersion); @@ -340,7 +359,42 @@ private async ValueTask InitCompleted(IBlobFileSystem fileSystem, bool verifyHas _readSide.RequestCaching(); if (verifyHash) - await VerifyFileHash(token); + await VerifyFileHash(token); //qq do not do this for remote + } + + //qq important properties that we may want specific tests for + // initialization is not triggered on startup, only on demand for this chunk (or we'd have loads of them) + // initialization can fail, but it will be retried next time the chunk is used. + // if one thread is running initialization, another triggering thread will wait for it + // and not run its own initialization + // if initialization has succeeded we don't run it any more + private async ValueTask EnsureInitializedAsync(CancellationToken token) { + if (_initialized is 0) { + await _cachedDataLock.AcquireAsync(token); + try { + if (_initialized is 0) { + // no one else is initializing, so we can. + //qq we probably don't want to cancel initialization, but we might want to + // stop waiting for it. + //qq make sure that init can indeed be retried. + //qq do we really need the AsTask or am i doing something wrong + await Initialize(CancellationToken.None).AsTask().WaitAsync(token); + } + } catch (Exception ex) { + Log.Warning(ex, "Failed to initialize chunk {Chunk}", _filename); + throw; + } finally { + _cachedDataLock.Release(); + } + } else { + // fast common case + } + + async ValueTask Initialize(CancellationToken token) { + await _initialize(this, CancellationToken.None); + _initialized = 1; + _initialize = null; + } } private async ValueTask InitNew(ChunkHeader chunkHeader, int fileSize, ITransactionFileTracker tracker, @@ -589,6 +643,8 @@ public async ValueTask VerifyFileHash(CancellationToken token) { if (!IsReadOnly) throw new InvalidOperationException("You can't verify hash of not-completed TFChunk."); + await EnsureInitializedAsync(token); + Log.Debug("Verifying hash for TFChunk '{chunk}'...", _filename); using var reader = AcquireRawReader(); reader.Stream.Seek(0, SeekOrigin.Begin); @@ -649,6 +705,7 @@ private static long GetDataPosition(WriterWorkItem workItem) { // this method takes (b) and returns (d) public async ValueTask GetActualRawPosition(long logicalPosition, CancellationToken token) { ArgumentOutOfRangeException.ThrowIfNegative(logicalPosition); + await EnsureInitializedAsync(token); var actualPosition = await _readSide.GetActualPosition(logicalPosition, token); @@ -656,6 +713,8 @@ public async ValueTask GetActualRawPosition(long logicalPosition, Cancella } public async ValueTask CacheInMemory(CancellationToken token) { + await EnsureInitializedAsync(token); + if (_inMem) return; @@ -797,29 +856,44 @@ public void UnCacheFromMemory() { } } - public ValueTask ExistsAt(long logicalPosition, CancellationToken token) - => _readSide.ExistsAt(logicalPosition, token); + public async ValueTask ExistsAt(long logicalPosition, CancellationToken token) { + await EnsureInitializedAsync(token); + return await _readSide.ExistsAt(logicalPosition, token); + } - public ValueTask TryReadAt(long logicalPosition, bool couldBeScavenged, CancellationToken token) - => _readSide.TryReadAt(logicalPosition, couldBeScavenged, token); + public async ValueTask TryReadAt(long logicalPosition, bool couldBeScavenged, CancellationToken token) { + await EnsureInitializedAsync(token); + return await _readSide.TryReadAt(logicalPosition, couldBeScavenged, token); + } - public ValueTask TryReadFirst(CancellationToken token) - => _readSide.TryReadFirst(token); + public async ValueTask TryReadFirst(CancellationToken token) { + await EnsureInitializedAsync(token); + return await _readSide.TryReadFirst(token); + } - public ValueTask TryReadClosestForward(long logicalPosition, CancellationToken token) - => _readSide.TryReadClosestForward(logicalPosition, token); + public async ValueTask TryReadClosestForward(long logicalPosition, CancellationToken token) { + await EnsureInitializedAsync(token); + return await _readSide.TryReadClosestForward(logicalPosition, token); + } - public ValueTask TryReadClosestForwardRaw(long logicalPosition, Func getBuffer, - CancellationToken token) - => _readSide.TryReadClosestForwardRaw(logicalPosition, getBuffer, token); + public async ValueTask TryReadClosestForwardRaw(long logicalPosition, Func getBuffer, + CancellationToken token) { + await EnsureInitializedAsync(token); + return await _readSide.TryReadClosestForwardRaw(logicalPosition, getBuffer, token); + } - public ValueTask TryReadLast(CancellationToken token) - => _readSide.TryReadLast(token); + public async ValueTask TryReadLast(CancellationToken token) { + await EnsureInitializedAsync(token); + return await _readSide.TryReadLast(token); + } - public ValueTask TryReadClosestBackward(long logicalPosition, CancellationToken token) - => _readSide.TryReadClosestBackward(logicalPosition, token); + public async ValueTask TryReadClosestBackward(long logicalPosition, CancellationToken token) { + await EnsureInitializedAsync(token); + return await _readSide.TryReadClosestBackward(logicalPosition, token); + } public async ValueTask TryAppend(ILogRecord record, CancellationToken token) { + // no need for EnsureInitializedAsync because only completed chunks are initialized asynchronously if (IsReadOnly) throw new InvalidOperationException("Cannot write to a read-only block."); @@ -865,6 +939,7 @@ static MemoryOwner SerializeLogRecord(ILogRecord record, out int recordLen } public async ValueTask TryAppendRawData(ReadOnlyMemory buffer, CancellationToken token) { + // no need for EnsureInitializedAsync because only completed chunks are initialized asynchronously var workItem = _writerWorkItem; if (workItem.WorkingStream.Position + buffer.Length > workItem.WorkingStream.Length) return false; @@ -872,8 +947,10 @@ public async ValueTask TryAppendRawData(ReadOnlyMemory buffer, Cance return true; } - public ValueTask Flush(CancellationToken token) - => IsReadOnly ? ValueTask.CompletedTask : _writerWorkItem.FlushToDisk(token); + public ValueTask Flush(CancellationToken token) { + // no need for EnsureInitializedAsync because only completed chunks are initialized asynchronously + return IsReadOnly ? ValueTask.CompletedTask : _writerWorkItem.FlushToDisk(token); + } public ValueTask Complete(CancellationToken token) { return ChunkHeader.IsScavenged @@ -890,6 +967,8 @@ public ValueTask CompleteScavenge(IReadOnlyCollection mapping, Cancellat } private async ValueTask CompleteNonRaw(IReadOnlyCollection mapping, CancellationToken token) { + await EnsureInitializedAsync(token); + if (IsReadOnly) throw new InvalidOperationException("Cannot complete a read-only TFChunk."); @@ -907,6 +986,8 @@ private async ValueTask CompleteNonRaw(IReadOnlyCollection mapping, Canc } public async ValueTask CompleteRaw(CancellationToken token) { + await EnsureInitializedAsync(token); + if (IsReadOnly) throw new InvalidOperationException("Cannot complete a read-only TFChunk."); if (_writerWorkItem.WorkingStream.Position != _writerWorkItem.WorkingStream.Length) @@ -1002,6 +1083,7 @@ static int WriteMapping(Span buffer, IReadOnlyCollection mapping) public void Dispose() => TryClose(); public bool TryClose() { + //qq EnsureInitializedAsync _selfdestructin54321 = true; Thread.MemoryBarrier(); @@ -1014,6 +1096,7 @@ public bool TryClose() { } public void MarkForDeletion() { + //qq EnsureInitializedAsync _selfdestructin54321 = true; _deleteFile = true; @@ -1248,6 +1331,7 @@ private void ReturnReaderWorkItem(ReaderWorkItem item) { } public TFChunkBulkReader AcquireDataReader() { + //qq EnsureInitializedAsync if (TryAcquireBulkMemReader(raw: false, out var reader)) return reader; @@ -1255,6 +1339,7 @@ public TFChunkBulkReader AcquireDataReader() { } public TFChunkBulkReader AcquireRawReader() { + //qq EnsureInitializedAsync if (TryAcquireBulkMemReader(raw: true, out var reader)) return reader; @@ -1387,11 +1472,12 @@ public void ReleaseReader(TFChunkBulkReader reader) { } public override string ToString() { + //qq EnsureInitializedAsync return string.Format("#{0}-{1} ({2})", _chunkHeader.ChunkStartNumber, _chunkHeader.ChunkEndNumber, Path.GetFileName(_filename)); } - private struct Midpoint { + private readonly struct Midpoint { public readonly int ItemIndex; public readonly long LogPos;