diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs index f125768992c..e009e2b8f18 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs @@ -261,16 +261,16 @@ void FlushRunner() if (endAddress > GetLogicalAddressOfStartOfPage(endPage)) endPage++; - long prevEndPage = GetPage(prevEndAddress); - deltaLog.Allocate(out int entryLength, out long destPhysicalAddress); - int destOffset = 0; + var prevEndPage = GetPage(prevEndAddress); + deltaLog.Allocate(out var entryLength, out var destPhysicalAddress); + var destOffset = 0; // We perform delta capture under epoch protection with page-wise refresh for latency reasons - bool epochTaken = epoch.ResumeIfNotProtected(); + var epochTaken = epoch.ResumeIfNotProtected(); try { - for (long p = startPage; p < endPage; p++) + for (var p = startPage; p < endPage; p++) { // Check if we have the entire page safely available to process in memory if (HeadAddress >= GetLogicalAddressOfStartOfPage(p) + PageSize) @@ -305,7 +305,7 @@ void FlushRunner() if (info.Dirty) { info.ClearDirtyAtomic(); // there may be read locks being taken, hence atomic - int size = sizeof(long) + sizeof(int) + alignedRecordSize; + var size = sizeof(long) + sizeof(int) + alignedRecordSize; if (destOffset + size > entryLength) { deltaLog.Seal(destOffset); @@ -332,15 +332,27 @@ void FlushRunner() epoch.ProtectAndDrain(); } } + catch (Exception ex) + { + logger?.LogError(ex, "{method} failed while flushing snapshot pages from {startPage} to {endPage}", nameof(AsyncFlushDeltaToDevice), startPage, endPage); + _ = _completedSemaphore.Release(); + throw; + } finally { if (epochTaken) epoch.Suspend(); } - if (destOffset > 0) - deltaLog.Seal(destOffset); - _completedSemaphore.Release(); + try + { + if (destOffset > 0) + deltaLog.Seal(destOffset); + } + finally + { + _completedSemaphore.Release(); + } } } @@ -1864,8 +1876,10 @@ public void AsyncFlushPagesForRecovery(long scanFromAddress, long flus /// /// Flush pages from startPage (inclusive) to endPage (exclusive) to specified log device and obj device for a snapshot checkpoint. /// + /// /// /// + /// /// /// /// @@ -1893,47 +1907,58 @@ void FlushRunner() var flushCompletionTracker = new FlushCompletionTracker(_completedSemaphore, throttleCheckpointFlushDelayMs >= 0 ? new SemaphoreSlim(0) : null, totalNumPages); - // Flush each page in sequence - for (long flushPage = startPage; flushPage < endPage; flushPage++) + try { - // For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page; - // for middle pages, we flush the entire page. - var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage); - if (startLogicalAddress > flushStartAddress) - flushStartAddress = startLogicalAddress; - var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1); - if (endLogicalAddress < flushEndAddress) - flushEndAddress = endLogicalAddress; - var flushSize = flushEndAddress - flushStartAddress; - if (flushSize <= 0) - continue; - var asyncResult = new PageAsyncFlushResult + // Flush each page in sequence + for (var flushPage = startPage; flushPage < endPage; flushPage++) { - flushCompletionTracker = flushCompletionTracker, - page = flushPage, - fromAddress = flushStartAddress, - untilAddress = flushEndAddress, - count = 1, - flushRequestState = FlushRequestState.Snapshot, - flushBuffers = flushBuffers - }; - - // Intended destination is flushPage - WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress); - - // If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here; - // otherwise, we wait for the completion of the flush (and the callback will release the asyncResult). - if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued) - { - if (throttleCheckpointFlushDelayMs >= 0) + // For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page; + // for middle pages, we flush the entire page. + var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage); + if (startLogicalAddress > flushStartAddress) + flushStartAddress = startLogicalAddress; + var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1); + if (endLogicalAddress < flushEndAddress) + flushEndAddress = endLogicalAddress; + var flushSize = flushEndAddress - flushStartAddress; + // TODO: Should we release completedSemaphore also if the flushSize <=0 + if (flushSize <= 0) + continue; + + var asyncResult = new PageAsyncFlushResult { - flushCompletionTracker.WaitOneFlush(); - Thread.Sleep(throttleCheckpointFlushDelayMs); + flushCompletionTracker = flushCompletionTracker, + page = flushPage, + fromAddress = flushStartAddress, + untilAddress = flushEndAddress, + count = 1, + flushRequestState = FlushRequestState.Snapshot, + flushBuffers = flushBuffers + }; + + // Intended destination is flushPage + WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress); + + // If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here; + // otherwise, we wait for the completion of the flush (and the callback will release the asyncResult). + if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued) + { + if (throttleCheckpointFlushDelayMs >= 0) + { + flushCompletionTracker.WaitOneFlush(); + Thread.Sleep(throttleCheckpointFlushDelayMs); + } } + else + _ = asyncResult.Release(); } - else - _ = asyncResult.Release(); + } + catch (Exception ex) + { + logger?.LogError(ex, "{method} failed while flushing snapshot pages from {startPage} to {endPage}", nameof(AsyncFlushPagesForSnapshot), startPage, endPage); + _completedSemaphore.Release(); + throw; } } } @@ -2257,9 +2282,8 @@ protected void AsyncFlushPageForSnapshotCallback(uint errorCode, uint numBytes, { if (epochTaken) epoch.Suspend(); + _ = result.Release(); } - - _ = result.Release(); } catch when (disposed) { } } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FoldOverSMTask.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FoldOverSMTask.cs index aae6d23f474..c28b794af5f 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FoldOverSMTask.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/FoldOverSMTask.cs @@ -39,7 +39,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri store.epoch.Resume(); _ = store.hlogBase.ShiftReadOnlyToTail(out var tailAddress, out store._hybridLogCheckpoint.flushedSemaphore); if (store._hybridLogCheckpoint.flushedSemaphore != null) - stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore); + stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.FoldOverSMTaskHybridLogFlushed); // Update final logical address to the flushed tail - this may not be necessary store._hybridLogCheckpoint.info.finalLogicalAddress = tailAddress; diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IncrementalSnapshotCheckpointSMTask.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IncrementalSnapshotCheckpointSMTask.cs index 7b3798aa2e7..3068240328e 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IncrementalSnapshotCheckpointSMTask.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/IncrementalSnapshotCheckpointSMTask.cs @@ -59,12 +59,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri out store._hybridLogCheckpoint.flushedSemaphore, store.ThrottleCheckpointFlushDelayMs); if (store._hybridLogCheckpoint.flushedSemaphore != null) - stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore); - break; - - case Phase.PERSISTENCE_CALLBACK: - ObjectLog_OnPersistenceCallback(); - store._hybridLogCheckpoint.info.deltaTailAddress = store._hybridLogCheckpoint.deltaLog.TailAddress; + stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.IncrementalSnapshotCheckpointSMTaskHybridLogFlushed); store.WriteHybridLogIncrementalMetaInfo(store._hybridLogCheckpoint.deltaLog); store._hybridLogCheckpoint.info.deltaTailAddress = store._hybridLogCheckpoint.deltaLog.TailAddress; store._lastSnapshotCheckpoint = store._hybridLogCheckpoint.Transfer(); diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/SemaphoreWaiterMonitor.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/SemaphoreWaiterMonitor.cs new file mode 100644 index 00000000000..7f2487a1f44 --- /dev/null +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/SemaphoreWaiterMonitor.cs @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System.Threading; + +namespace Tsavorite.core +{ + /// + /// Identifies the type of semaphore added to the state machine driver waiting list, + /// including the originating state machine task. + /// + public enum StateMachineSemaphoreType + { + /// + /// Waiting for all transactions in the last version to complete. + /// + LastVersionTransactionsDone, + + /// + /// Waiting for the main index checkpoint to complete (IndexCheckpointSMTask). + /// + IndexCheckpointSMTaskMainIndexCheckpoint, + + /// + /// Waiting for the overflow buckets checkpoint to complete (IndexCheckpointSMTask). + /// + IndexCheckpointSMTaskOverflowBucketsCheckpoint, + + /// + /// Waiting for the hybrid log flush to complete (FoldOverSMTask). + /// + FoldOverSMTaskHybridLogFlushed, + + /// + /// Waiting for the hybrid log flush to complete (IncrementalSnapshotCheckpointSMTask). + /// + IncrementalSnapshotCheckpointSMTaskHybridLogFlushed, + + /// + /// Waiting for the hybrid log flush to complete (SnapshotCheckpointSMTask). + /// + SnapshotCheckpointSMTaskHybridLogFlushed, + } + + /// + /// Pairs a with a + /// so the state machine driver can identify which operation a semaphore belongs to. + /// + internal readonly struct SemaphoreWaiterMonitor + { + public SemaphoreSlim Semaphore { get; } + public StateMachineSemaphoreType Type { get; } + + public SemaphoreWaiterMonitor(SemaphoreSlim semaphore, StateMachineSemaphoreType type) + { + Semaphore = semaphore; + Type = type; + } + + public override string ToString() => Type.ToString(); + } +} \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/SnapshotCheckpointSMTask.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/SnapshotCheckpointSMTask.cs index c2d3e4015ca..ece9f961035 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/SnapshotCheckpointSMTask.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/SnapshotCheckpointSMTask.cs @@ -70,7 +70,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri out store._hybridLogCheckpoint.flushedSemaphore, store.ThrottleCheckpointFlushDelayMs); if (store._hybridLogCheckpoint.flushedSemaphore != null) - stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore); + stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.SnapshotCheckpointSMTaskHybridLogFlushed); break; case Phase.PERSISTENCE_CALLBACK: diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs index a03f631bf47..b235f01557f 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs @@ -17,7 +17,7 @@ public class StateMachineDriver { SystemState systemState; IStateMachine stateMachine; - readonly List waitingList; + readonly List waitingList; TaskCompletionSource stateMachineCompleted; // All threads have entered the given state SemaphoreSlim waitForTransitionIn; @@ -75,7 +75,7 @@ internal void TrackLastVersion(long version) // We have to re-check the number of active transactions after assigning lastVersion and lastVersionTransactionsDone if (GetNumActiveTransactions(version) > 0) - AddToWaitingList(lastVersionTransactionsDone); + AddToWaitingList(lastVersionTransactionsDone, StateMachineSemaphoreType.LastVersionTransactionsDone); } internal void ResetLastVersion() @@ -155,10 +155,10 @@ public long VerifyTransactionVersion(long txnVersion) public void EndTransaction(long txnVersion) => DecrementActiveTransactions(txnVersion); - internal void AddToWaitingList(SemaphoreSlim waiter) + internal void AddToWaitingList(SemaphoreSlim waiter, StateMachineSemaphoreType type) { if (waiter != null) - waitingList.Add(waiter); + waitingList.Add(new SemaphoreWaiterMonitor(waiter, type)); } public bool Register(IStateMachine stateMachine, CancellationToken token = default) @@ -311,7 +311,7 @@ async Task ProcessWaitingListAsync(CancellationToken token = default) } foreach (var waiter in waitingList) { - await waiter.WaitAsync(token); + await waiter.Semaphore.WaitAsync(token); } waitingList.Clear(); } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs index a2d78b88694..9c32b8cea72 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Recovery/IndexCheckpoint.cs @@ -54,8 +54,8 @@ internal bool IsIndexFuzzyCheckpointCompleted() internal void AddIndexCheckpointWaitingList(StateMachineDriver stateMachineDriver) { - stateMachineDriver.AddToWaitingList(mainIndexCheckpointSemaphore); - stateMachineDriver.AddToWaitingList(overflowBucketsAllocator.GetCheckpointSemaphore()); + stateMachineDriver.AddToWaitingList(mainIndexCheckpointSemaphore, StateMachineSemaphoreType.IndexCheckpointSMTaskMainIndexCheckpoint); + stateMachineDriver.AddToWaitingList(overflowBucketsAllocator.GetCheckpointSemaphore(), StateMachineSemaphoreType.IndexCheckpointSMTaskOverflowBucketsCheckpoint); } internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken token = default) @@ -67,7 +67,6 @@ internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken await t2.ConfigureAwait(false); } - // Implementation of an asynchronous checkpointing scheme // for main hash index of Tsavorite private int mainIndexCheckpointCallbackCount; @@ -160,7 +159,7 @@ private async ValueTask IsMainIndexCheckpointCompletedAsync(CancellationToken to s.Release(); } - private unsafe void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context) + private void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context) { // Set the page status to flushed var mem = ((HashIndexPageAsyncFlushResult)context).mem; diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index 449799d5444..8b663751ea2 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -769,31 +769,31 @@ public void ClusterFailoverSpinWait(int replicaNodeIndex, ILogger logger) } } - public void AttachAndWaitForSync(int primary_count, int replica_count, bool disableObjects) + public void AttachAndWaitForSync(int primaryIndex, int replicaStartIndex, int replicaCount, bool disableObjects) { - var primaryId = clusterTestUtils.GetNodeIdFromNode(0, logger); + var primaryId = clusterTestUtils.GetNodeIdFromNode(primaryIndex, logger); // Wait until primary node is known so as not to fail replicate - for (var i = primary_count; i < primary_count + replica_count; i++) + for (var i = replicaStartIndex; i < replicaStartIndex + replicaCount; i++) clusterTestUtils.WaitUntilNodeIdIsKnown(i, primaryId, logger: logger); // Issue cluster replicate and bump epoch manually to capture config. - for (var i = primary_count; i < primary_count + replica_count; i++) + for (var i = replicaStartIndex; i < replicaStartIndex + replicaCount; i++) _ = clusterTestUtils.ClusterReplicate(i, primaryId, async: true, logger: logger); - if (!checkpointTask.Wait(TimeSpan.FromSeconds(100))) Assert.Fail("Checkpoint task timeout"); + if (!checkpointTask.Wait(TimeSpan.FromSeconds(60))) Assert.Fail("Checkpoint task timeout"); // Wait for recovery and AofSync - for (var i = primary_count; i < replica_count; i++) + for (var i = replicaStartIndex; i < replicaStartIndex + replicaCount; i++) { clusterTestUtils.WaitForReplicaRecovery(i, logger); - clusterTestUtils.WaitForReplicaAofSync(0, i, logger); + clusterTestUtils.WaitForReplicaAofSync(primaryIndex, i, logger); } - clusterTestUtils.WaitForConnectedReplicaCount(0, replica_count, logger: logger); + clusterTestUtils.WaitForConnectedReplicaCount(primaryIndex, replicaCount, logger: logger); // Validate data on replicas - for (var i = primary_count; i < replica_count; i++) + for (var i = replicaStartIndex; i < replicaStartIndex + replicaCount; i++) { if (disableObjects) ValidateKVCollectionAgainstReplica(ref kvPairs, i); diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs index d7007f1b6a5..1073bce88ce 100644 --- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs @@ -732,7 +732,16 @@ public void ClusterReplicationCheckpointCleanupTest([Values] bool performRMW, [V var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, lowMemory: true, segmentSize: "4k", EnableIncrementalSnapshots: enableIncrementalSnapshots, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay, deviceType: Tsavorite.core.DeviceType.Native); + context.CreateInstances(nodes_count, + tryRecover: true, + disableObjects: disableObjects, + lowMemory: true, + segmentSize: "4k", + EnableIncrementalSnapshots: enableIncrementalSnapshots, + enableAOF: true, + useTLS: useTLS, + asyncReplay: asyncReplay, + deviceType: Tsavorite.core.DeviceType.Native); context.CreateConnection(useTLS: useTLS); // Setup cluster @@ -743,7 +752,7 @@ public void ClusterReplicationCheckpointCleanupTest([Values] bool performRMW, [V var slotRangesStr = string.Join(",", myself.Slots.Select(x => $"({x.From}-{x.To})").ToList()); ClassicAssert.AreEqual(1, myself.Slots.Count, $"Setup failed slot ranges count greater than 1 {slotRangesStr}"); - var shards = context.clusterTestUtils.ClusterShards(0, context.logger); + var shards = context.clusterTestUtils.ClusterShards(primaryIndex, context.logger); ClassicAssert.AreEqual(2, shards.Count); ClassicAssert.AreEqual(1, shards[0].slotRanges.Count); ClassicAssert.AreEqual(0, shards[0].slotRanges[0].Item1); @@ -752,13 +761,11 @@ public void ClusterReplicationCheckpointCleanupTest([Values] bool performRMW, [V context.kvPairs = []; context.kvPairsObj = []; context.checkpointTask = Task.Run(() => context.PopulatePrimaryAndTakeCheckpointTask(performRMW, disableObjects, takeCheckpoint: true)); - var attachReplicaTask = Task.Run(() => context.AttachAndWaitForSync(primary_count, replica_count, disableObjects)); - - if (!context.checkpointTask.Wait(TimeSpan.FromSeconds(60))) - Assert.Fail("checkpointTask timeout"); + var attachReplicaTask = Task.Run(() => context.AttachAndWaitForSync(primaryIndex, primary_count, replica_count, disableObjects)); - if (!attachReplicaTask.Wait(TimeSpan.FromSeconds(60))) - Assert.Fail("attachReplicaTask timeout"); + var tasks = new Task[] { context.checkpointTask, attachReplicaTask }; + if (!Task.WhenAll(tasks).Wait(TimeSpan.FromSeconds(60))) + Assert.Fail($"checkpointTask timeout checkpointTask: {context.checkpointTask.Status}, attachReplicaTask:{attachReplicaTask.Status}"); context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex: primaryIndex, secondaryIndex: replicaIndex, logger: context.logger); }