Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 70 additions & 46 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,16 @@ void FlushRunner()
if (endAddress > GetLogicalAddressOfStartOfPage(endPage))
endPage++;

long prevEndPage = GetPage(prevEndAddress);
deltaLog.Allocate(out int entryLength, out long destPhysicalAddress);
int destOffset = 0;
var prevEndPage = GetPage(prevEndAddress);
deltaLog.Allocate(out var entryLength, out var destPhysicalAddress);
var destOffset = 0;

// We perform delta capture under epoch protection with page-wise refresh for latency reasons
bool epochTaken = epoch.ResumeIfNotProtected();
var epochTaken = epoch.ResumeIfNotProtected();

try
{
for (long p = startPage; p < endPage; p++)
for (var p = startPage; p < endPage; p++)
{
// Check if we have the entire page safely available to process in memory
if (HeadAddress >= GetLogicalAddressOfStartOfPage(p) + PageSize)
Expand Down Expand Up @@ -305,7 +305,7 @@ void FlushRunner()
if (info.Dirty)
{
info.ClearDirtyAtomic(); // there may be read locks being taken, hence atomic
int size = sizeof(long) + sizeof(int) + alignedRecordSize;
var size = sizeof(long) + sizeof(int) + alignedRecordSize;
if (destOffset + size > entryLength)
{
deltaLog.Seal(destOffset);
Expand All @@ -332,15 +332,27 @@ void FlushRunner()
epoch.ProtectAndDrain();
}
}
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed while flushing snapshot pages from {startPage} to {endPage}", nameof(AsyncFlushDeltaToDevice), startPage, endPage);
_ = _completedSemaphore.Release();
throw;
}
finally
{
if (epochTaken)
epoch.Suspend();
}

if (destOffset > 0)
deltaLog.Seal(destOffset);
_completedSemaphore.Release();
try
{
if (destOffset > 0)
deltaLog.Seal(destOffset);
}
finally
{
_completedSemaphore.Release();
}
}
}

Expand Down Expand Up @@ -1864,8 +1876,10 @@ public void AsyncFlushPagesForRecovery<TContext>(long scanFromAddress, long flus
/// <summary>
/// Flush pages from startPage (inclusive) to endPage (exclusive) to specified log device and obj device for a snapshot checkpoint.
/// </summary>
/// <param name="flushBuffers"></param>
/// <param name="startPage"></param>
/// <param name="endPage"></param>
/// <param name="startLogicalAddress"></param>
/// <param name="endLogicalAddress"></param>
/// <param name="fuzzyStartLogicalAddress"></param>
/// <param name="logDevice"></param>
Expand Down Expand Up @@ -1893,47 +1907,58 @@ void FlushRunner()

var flushCompletionTracker = new FlushCompletionTracker(_completedSemaphore, throttleCheckpointFlushDelayMs >= 0 ? new SemaphoreSlim(0) : null, totalNumPages);

// Flush each page in sequence
for (long flushPage = startPage; flushPage < endPage; flushPage++)
try
{
// For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page;
// for middle pages, we flush the entire page.
var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage);
if (startLogicalAddress > flushStartAddress)
flushStartAddress = startLogicalAddress;
var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1);
if (endLogicalAddress < flushEndAddress)
flushEndAddress = endLogicalAddress;
var flushSize = flushEndAddress - flushStartAddress;
if (flushSize <= 0)
continue;

var asyncResult = new PageAsyncFlushResult<Empty>
// Flush each page in sequence
for (var flushPage = startPage; flushPage < endPage; flushPage++)
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushStartAddress,
untilAddress = flushEndAddress,
count = 1,
flushRequestState = FlushRequestState.Snapshot,
flushBuffers = flushBuffers
};

// Intended destination is flushPage
WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress);

// If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here;
// otherwise, we wait for the completion of the flush (and the callback will release the asyncResult).
if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued)
{
if (throttleCheckpointFlushDelayMs >= 0)
// For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page;
// for middle pages, we flush the entire page.
var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage);
if (startLogicalAddress > flushStartAddress)
flushStartAddress = startLogicalAddress;
var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1);
if (endLogicalAddress < flushEndAddress)
flushEndAddress = endLogicalAddress;
var flushSize = flushEndAddress - flushStartAddress;
// TODO: Should we release completedSemaphore also if the flushSize <=0
if (flushSize <= 0)
continue;

var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker.WaitOneFlush();
Thread.Sleep(throttleCheckpointFlushDelayMs);
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushStartAddress,
untilAddress = flushEndAddress,
count = 1,
flushRequestState = FlushRequestState.Snapshot,
flushBuffers = flushBuffers
};

// Intended destination is flushPage
WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress);

// If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here;
// otherwise, we wait for the completion of the flush (and the callback will release the asyncResult).
if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued)
{
if (throttleCheckpointFlushDelayMs >= 0)
{
flushCompletionTracker.WaitOneFlush();
Thread.Sleep(throttleCheckpointFlushDelayMs);
}
}
else
_ = asyncResult.Release();
}
else
_ = asyncResult.Release();
}
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed while flushing snapshot pages from {startPage} to {endPage}", nameof(AsyncFlushPagesForSnapshot), startPage, endPage);
_completedSemaphore.Release();
throw;
}
}
}
Expand Down Expand Up @@ -2257,9 +2282,8 @@ protected void AsyncFlushPageForSnapshotCallback(uint errorCode, uint numBytes,
{
if (epochTaken)
epoch.Suspend();
_ = result.Release();
}

_ = result.Release();
}
catch when (disposed) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
store.epoch.Resume();
_ = store.hlogBase.ShiftReadOnlyToTail(out var tailAddress, out store._hybridLogCheckpoint.flushedSemaphore);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.FoldOverSMTaskHybridLogFlushed);

// Update final logical address to the flushed tail - this may not be necessary
store._hybridLogCheckpoint.info.finalLogicalAddress = tailAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
out store._hybridLogCheckpoint.flushedSemaphore,
store.ThrottleCheckpointFlushDelayMs);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
break;

case Phase.PERSISTENCE_CALLBACK:
ObjectLog_OnPersistenceCallback();
store._hybridLogCheckpoint.info.deltaTailAddress = store._hybridLogCheckpoint.deltaLog.TailAddress;
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.IncrementalSnapshotCheckpointSMTaskHybridLogFlushed);
store.WriteHybridLogIncrementalMetaInfo(store._hybridLogCheckpoint.deltaLog);
store._hybridLogCheckpoint.info.deltaTailAddress = store._hybridLogCheckpoint.deltaLog.TailAddress;
store._lastSnapshotCheckpoint = store._hybridLogCheckpoint.Transfer();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Threading;

namespace Tsavorite.core
{
/// <summary>
/// Identifies the type of semaphore added to the state machine driver waiting list,
/// including the originating state machine task.
/// </summary>
public enum StateMachineSemaphoreType
{
/// <summary>
/// Waiting for all transactions in the last version to complete.
/// </summary>
LastVersionTransactionsDone,

/// <summary>
/// Waiting for the main index checkpoint to complete (IndexCheckpointSMTask).
/// </summary>
IndexCheckpointSMTaskMainIndexCheckpoint,

/// <summary>
/// Waiting for the overflow buckets checkpoint to complete (IndexCheckpointSMTask).
/// </summary>
IndexCheckpointSMTaskOverflowBucketsCheckpoint,

/// <summary>
/// Waiting for the hybrid log flush to complete (FoldOverSMTask).
/// </summary>
FoldOverSMTaskHybridLogFlushed,

/// <summary>
/// Waiting for the hybrid log flush to complete (IncrementalSnapshotCheckpointSMTask).
/// </summary>
IncrementalSnapshotCheckpointSMTaskHybridLogFlushed,

/// <summary>
/// Waiting for the hybrid log flush to complete (SnapshotCheckpointSMTask).
/// </summary>
SnapshotCheckpointSMTaskHybridLogFlushed,
}

/// <summary>
/// Pairs a <see cref="SemaphoreSlim"/> with a <see cref="StateMachineSemaphoreType"/>
/// so the state machine driver can identify which operation a semaphore belongs to.
/// </summary>
internal readonly struct SemaphoreWaiterMonitor
{
public SemaphoreSlim Semaphore { get; }
public StateMachineSemaphoreType Type { get; }

public SemaphoreWaiterMonitor(SemaphoreSlim semaphore, StateMachineSemaphoreType type)
{
Semaphore = semaphore;
Type = type;
}

public override string ToString() => Type.ToString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
out store._hybridLogCheckpoint.flushedSemaphore,
store.ThrottleCheckpointFlushDelayMs);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.SnapshotCheckpointSMTaskHybridLogFlushed);
break;

case Phase.PERSISTENCE_CALLBACK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class StateMachineDriver
{
SystemState systemState;
IStateMachine stateMachine;
readonly List<SemaphoreSlim> waitingList;
readonly List<SemaphoreWaiterMonitor> waitingList;
TaskCompletionSource<bool> stateMachineCompleted;
// All threads have entered the given state
SemaphoreSlim waitForTransitionIn;
Expand Down Expand Up @@ -75,7 +75,7 @@ internal void TrackLastVersion(long version)

// We have to re-check the number of active transactions after assigning lastVersion and lastVersionTransactionsDone
if (GetNumActiveTransactions(version) > 0)
AddToWaitingList(lastVersionTransactionsDone);
AddToWaitingList(lastVersionTransactionsDone, StateMachineSemaphoreType.LastVersionTransactionsDone);
}

internal void ResetLastVersion()
Expand Down Expand Up @@ -155,10 +155,10 @@ public long VerifyTransactionVersion(long txnVersion)
public void EndTransaction(long txnVersion)
=> DecrementActiveTransactions(txnVersion);

internal void AddToWaitingList(SemaphoreSlim waiter)
internal void AddToWaitingList(SemaphoreSlim waiter, StateMachineSemaphoreType type)
{
if (waiter != null)
waitingList.Add(waiter);
waitingList.Add(new SemaphoreWaiterMonitor(waiter, type));
}

public bool Register(IStateMachine stateMachine, CancellationToken token = default)
Expand Down Expand Up @@ -311,7 +311,7 @@ async Task ProcessWaitingListAsync(CancellationToken token = default)
}
foreach (var waiter in waitingList)
{
await waiter.WaitAsync(token);
await waiter.Semaphore.WaitAsync(token);
}
waitingList.Clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ internal bool IsIndexFuzzyCheckpointCompleted()

internal void AddIndexCheckpointWaitingList(StateMachineDriver stateMachineDriver)
{
stateMachineDriver.AddToWaitingList(mainIndexCheckpointSemaphore);
stateMachineDriver.AddToWaitingList(overflowBucketsAllocator.GetCheckpointSemaphore());
stateMachineDriver.AddToWaitingList(mainIndexCheckpointSemaphore, StateMachineSemaphoreType.IndexCheckpointSMTaskMainIndexCheckpoint);
stateMachineDriver.AddToWaitingList(overflowBucketsAllocator.GetCheckpointSemaphore(), StateMachineSemaphoreType.IndexCheckpointSMTaskOverflowBucketsCheckpoint);
}

internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken token = default)
Expand All @@ -67,7 +67,6 @@ internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken
await t2.ConfigureAwait(false);
}


// Implementation of an asynchronous checkpointing scheme
// for main hash index of Tsavorite
private int mainIndexCheckpointCallbackCount;
Expand Down Expand Up @@ -160,7 +159,7 @@ private async ValueTask IsMainIndexCheckpointCompletedAsync(CancellationToken to
s.Release();
}

private unsafe void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context)
private void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context)
{
// Set the page status to flushed
var mem = ((HashIndexPageAsyncFlushResult)context).mem;
Expand Down
18 changes: 9 additions & 9 deletions test/Garnet.test.cluster/ClusterTestContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -769,31 +769,31 @@ public void ClusterFailoverSpinWait(int replicaNodeIndex, ILogger logger)
}
}

public void AttachAndWaitForSync(int primary_count, int replica_count, bool disableObjects)
public void AttachAndWaitForSync(int primaryIndex, int replicaStartIndex, int replicaCount, bool disableObjects)
{
var primaryId = clusterTestUtils.GetNodeIdFromNode(0, logger);
var primaryId = clusterTestUtils.GetNodeIdFromNode(primaryIndex, logger);

// Wait until primary node is known so as not to fail replicate
for (var i = primary_count; i < primary_count + replica_count; i++)
for (var i = replicaStartIndex; i < replicaStartIndex + replicaCount; i++)
clusterTestUtils.WaitUntilNodeIdIsKnown(i, primaryId, logger: logger);

// Issue cluster replicate and bump epoch manually to capture config.
for (var i = primary_count; i < primary_count + replica_count; i++)
for (var i = replicaStartIndex; i < replicaStartIndex + replicaCount; i++)
_ = clusterTestUtils.ClusterReplicate(i, primaryId, async: true, logger: logger);

if (!checkpointTask.Wait(TimeSpan.FromSeconds(100))) Assert.Fail("Checkpoint task timeout");
if (!checkpointTask.Wait(TimeSpan.FromSeconds(60))) Assert.Fail("Checkpoint task timeout");

// Wait for recovery and AofSync
for (var i = primary_count; i < replica_count; i++)
for (var i = replicaStartIndex; i < replicaStartIndex + replicaCount; i++)
{
clusterTestUtils.WaitForReplicaRecovery(i, logger);
clusterTestUtils.WaitForReplicaAofSync(0, i, logger);
clusterTestUtils.WaitForReplicaAofSync(primaryIndex, i, logger);
}

clusterTestUtils.WaitForConnectedReplicaCount(0, replica_count, logger: logger);
clusterTestUtils.WaitForConnectedReplicaCount(primaryIndex, replicaCount, logger: logger);

// Validate data on replicas
for (var i = primary_count; i < replica_count; i++)
for (var i = replicaStartIndex; i < replicaStartIndex + replicaCount; i++)
{
if (disableObjects)
ValidateKVCollectionAgainstReplica(ref kvPairs, i);
Expand Down
Loading
Loading