Skip to content
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 66 additions & 48 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ internal virtual void AsyncFlushDeltaToDevice(CircularDiskWriteBuffer flushBuffe

// If throttled, convert rest of the method into a truly async task run because issuing IO can take up synchronous time
if (throttleCheckpointFlushDelayMs >= 0)
_ = Task.Run(FlushRunner);
_ = Task.Run(() => FlushRunner());
else
FlushRunner();

Expand All @@ -261,16 +261,16 @@ void FlushRunner()
if (endAddress > GetLogicalAddressOfStartOfPage(endPage))
endPage++;

long prevEndPage = GetPage(prevEndAddress);
deltaLog.Allocate(out int entryLength, out long destPhysicalAddress);
int destOffset = 0;
var prevEndPage = GetPage(prevEndAddress);
deltaLog.Allocate(out var entryLength, out var destPhysicalAddress);
var destOffset = 0;

// We perform delta capture under epoch protection with page-wise refresh for latency reasons
bool epochTaken = epoch.ResumeIfNotProtected();
var epochTaken = epoch.ResumeIfNotProtected();

try
{
for (long p = startPage; p < endPage; p++)
for (var p = startPage; p < endPage; p++)
{
// Check if we have the entire page safely available to process in memory
if (HeadAddress >= GetLogicalAddressOfStartOfPage(p) + PageSize)
Expand Down Expand Up @@ -305,7 +305,7 @@ void FlushRunner()
if (info.Dirty)
{
info.ClearDirtyAtomic(); // there may be read locks being taken, hence atomic
int size = sizeof(long) + sizeof(int) + alignedRecordSize;
var size = sizeof(long) + sizeof(int) + alignedRecordSize;
if (destOffset + size > entryLength)
{
deltaLog.Seal(destOffset);
Expand Down Expand Up @@ -338,9 +338,15 @@ void FlushRunner()
epoch.Suspend();
}

if (destOffset > 0)
deltaLog.Seal(destOffset);
_completedSemaphore.Release();
try
{
if (destOffset > 0)
deltaLog.Seal(destOffset);
}
finally
{
_completedSemaphore.Release();
}
}
}

Expand Down Expand Up @@ -1862,8 +1868,10 @@ public void AsyncFlushPagesForRecovery<TContext>(long scanFromAddress, long flus
/// <summary>
/// Flush pages from startPage (inclusive) to endPage (exclusive) to specified log device and obj device for a snapshot checkpoint.
/// </summary>
/// <param name="flushBuffers"></param>
/// <param name="startPage"></param>
/// <param name="endPage"></param>
/// <param name="startLogicalAddress"></param>
/// <param name="endLogicalAddress"></param>
/// <param name="fuzzyStartLogicalAddress"></param>
/// <param name="logDevice"></param>
Expand All @@ -1881,7 +1889,7 @@ public void AsyncFlushPagesForSnapshot(CircularDiskWriteBuffer flushBuffers, lon

// If throttled, convert rest of the method into a truly async task run because issuing IO can take up synchronous time
if (throttleCheckpointFlushDelayMs >= 0)
_ = Task.Run(FlushRunner);
_ = Task.Run(() => FlushRunner());
else
FlushRunner();

Expand All @@ -1891,47 +1899,58 @@ void FlushRunner()

var flushCompletionTracker = new FlushCompletionTracker(_completedSemaphore, throttleCheckpointFlushDelayMs >= 0 ? new SemaphoreSlim(0) : null, totalNumPages);

// Flush each page in sequence
for (long flushPage = startPage; flushPage < endPage; flushPage++)
try
{
// For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page;
// for middle pages, we flush the entire page.
var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage);
if (startLogicalAddress > flushStartAddress)
flushStartAddress = startLogicalAddress;
var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1);
if (endLogicalAddress < flushEndAddress)
flushEndAddress = endLogicalAddress;
var flushSize = flushEndAddress - flushStartAddress;
if (flushSize <= 0)
continue;

var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushStartAddress,
untilAddress = flushEndAddress,
count = 1,
flushRequestState = FlushRequestState.Snapshot,
flushBuffers = flushBuffers
};

// Intended destination is flushPage
WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress);

// If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here;
// otherwise, we wait for the completion of the flush (and the callback will release the asyncResult).
if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued)
// Flush each page in sequence
for (var flushPage = startPage; flushPage < endPage; flushPage++)
{
if (throttleCheckpointFlushDelayMs >= 0)
// For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page;
// for middle pages, we flush the entire page.
var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage);
if (startLogicalAddress > flushStartAddress)
flushStartAddress = startLogicalAddress;
var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1);
if (endLogicalAddress < flushEndAddress)
flushEndAddress = endLogicalAddress;
var flushSize = flushEndAddress - flushStartAddress;
// TODO: Should we release completedSemaphore also if the flushSize <=0
if (flushSize <= 0)
continue;

var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushStartAddress,
untilAddress = flushEndAddress,
count = 1,
flushRequestState = FlushRequestState.Snapshot,
flushBuffers = flushBuffers
};

// Intended destination is flushPage
WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress);

// If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here;
// otherwise, we wait for the completion of the flush (and the callback will release the asyncResult).
if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued)
{
flushCompletionTracker.WaitOneFlush();
Thread.Sleep(throttleCheckpointFlushDelayMs);
if (throttleCheckpointFlushDelayMs >= 0)
{
flushCompletionTracker.WaitOneFlush();
Thread.Sleep(throttleCheckpointFlushDelayMs);
}
}
else
_ = asyncResult.Release();
}
else
_ = asyncResult.Release();
}
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed while flushing snapshot pages from {startPage} to {endPage}", nameof(AsyncFlushPagesForSnapshot), startPage, endPage);
_completedSemaphore.Release();
throw;
}
}
}
Expand Down Expand Up @@ -2255,9 +2274,8 @@ protected void AsyncFlushPageForSnapshotCallback(uint errorCode, uint numBytes,
{
if (epochTaken)
epoch.Suspend();
_ = result.Release();
}

_ = result.Release();
}
catch when (disposed) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
store.epoch.Resume();
_ = store.hlogBase.ShiftReadOnlyToTail(out var tailAddress, out store._hybridLogCheckpoint.flushedSemaphore);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.FoldOverSMTaskHybridLogFlushed);

// Update final logical address to the flushed tail - this may not be necessary
store._hybridLogCheckpoint.info.finalLogicalAddress = tailAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
out store._hybridLogCheckpoint.flushedSemaphore,
store.ThrottleCheckpointFlushDelayMs);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
break;

case Phase.PERSISTENCE_CALLBACK:
ObjectLog_OnPersistenceCallback();
store._hybridLogCheckpoint.info.deltaTailAddress = store._hybridLogCheckpoint.deltaLog.TailAddress;
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.IncrementalSnapshotCheckpointSMTaskHybridLogFlushed);
store.WriteHybridLogIncrementalMetaInfo(store._hybridLogCheckpoint.deltaLog);
store._hybridLogCheckpoint.info.deltaTailAddress = store._hybridLogCheckpoint.deltaLog.TailAddress;
store._lastSnapshotCheckpoint = store._hybridLogCheckpoint.Transfer();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Threading;

namespace Tsavorite.core
{
/// <summary>
/// Identifies the type of semaphore added to the state machine driver waiting list,
/// including the originating state machine task.
/// </summary>
public enum StateMachineSemaphoreType
{
/// <summary>
/// Waiting for all transactions in the last version to complete.
/// </summary>
LastVersionTransactionsDone,

/// <summary>
/// Waiting for the main index checkpoint to complete (IndexCheckpointSMTask).
/// </summary>
IndexCheckpointSMTaskMainIndexCheckpoint,

/// <summary>
/// Waiting for the overflow buckets checkpoint to complete (IndexCheckpointSMTask).
/// </summary>
IndexCheckpointSMTaskOverflowBucketsCheckpoint,

/// <summary>
/// Waiting for the hybrid log flush to complete (FoldOverSMTask).
/// </summary>
FoldOverSMTaskHybridLogFlushed,

/// <summary>
/// Waiting for the hybrid log flush to complete (IncrementalSnapshotCheckpointSMTask).
/// </summary>
IncrementalSnapshotCheckpointSMTaskHybridLogFlushed,

/// <summary>
/// Waiting for the hybrid log flush to complete (SnapshotCheckpointSMTask).
/// </summary>
SnapshotCheckpointSMTaskHybridLogFlushed,
}

/// <summary>
/// Pairs a <see cref="SemaphoreSlim"/> with a <see cref="StateMachineSemaphoreType"/>
/// so the state machine driver can identify which operation a semaphore belongs to.
/// </summary>
internal readonly struct SemaphoreWaiterMonitor
{
public SemaphoreSlim Semaphore { get; }
public StateMachineSemaphoreType Type { get; }

public SemaphoreWaiterMonitor(SemaphoreSlim semaphore, StateMachineSemaphoreType type)
{
Semaphore = semaphore;
Type = type;
}

public override string ToString() => Type.ToString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
out store._hybridLogCheckpoint.flushedSemaphore,
store.ThrottleCheckpointFlushDelayMs);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore, StateMachineSemaphoreType.SnapshotCheckpointSMTaskHybridLogFlushed);
break;

case Phase.PERSISTENCE_CALLBACK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class StateMachineDriver
{
SystemState systemState;
IStateMachine stateMachine;
readonly List<SemaphoreSlim> waitingList;
readonly List<SemaphoreWaiterMonitor> waitingList;
TaskCompletionSource<bool> stateMachineCompleted;
// All threads have entered the given state
SemaphoreSlim waitForTransitionIn;
Expand Down Expand Up @@ -66,6 +66,12 @@ void DecrementActiveTransactions(long txnVersion)

internal void TrackLastVersion(long version)
{
// Only create and enqueue one semaphore per version, if we create a
// new one on each call, the earlier semaphore is orphaned in the waitingList
// and never released, and we permanently block ProcessWaitingListAsync.
if (lastVersion == version)
return;

if (GetNumActiveTransactions(version) > 0)
{
// Set version number first, then create semaphore
Expand All @@ -75,7 +81,7 @@ internal void TrackLastVersion(long version)

// We have to re-check the number of active transactions after assigning lastVersion and lastVersionTransactionsDone
if (GetNumActiveTransactions(version) > 0)
AddToWaitingList(lastVersionTransactionsDone);
AddToWaitingList(lastVersionTransactionsDone, StateMachineSemaphoreType.LastVersionTransactionsDone);
}

internal void ResetLastVersion()
Expand Down Expand Up @@ -155,10 +161,10 @@ public long VerifyTransactionVersion(long txnVersion)
public void EndTransaction(long txnVersion)
=> DecrementActiveTransactions(txnVersion);

internal void AddToWaitingList(SemaphoreSlim waiter)
internal void AddToWaitingList(SemaphoreSlim waiter, StateMachineSemaphoreType type)
{
if (waiter != null)
waitingList.Add(waiter);
waitingList.Add(new SemaphoreWaiterMonitor(waiter, type));
}

public bool Register(IStateMachine stateMachine, CancellationToken token = default)
Expand Down Expand Up @@ -311,7 +317,7 @@ async Task ProcessWaitingListAsync(CancellationToken token = default)
}
foreach (var waiter in waitingList)
{
await waiter.WaitAsync(token);
await waiter.Semaphore.WaitAsync(token);
}
waitingList.Clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ internal bool IsIndexFuzzyCheckpointCompleted()

internal void AddIndexCheckpointWaitingList(StateMachineDriver stateMachineDriver)
{
stateMachineDriver.AddToWaitingList(mainIndexCheckpointSemaphore);
stateMachineDriver.AddToWaitingList(overflowBucketsAllocator.GetCheckpointSemaphore());
stateMachineDriver.AddToWaitingList(mainIndexCheckpointSemaphore, StateMachineSemaphoreType.IndexCheckpointSMTaskMainIndexCheckpoint);
stateMachineDriver.AddToWaitingList(overflowBucketsAllocator.GetCheckpointSemaphore(), StateMachineSemaphoreType.IndexCheckpointSMTaskOverflowBucketsCheckpoint);
}

internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken token = default)
Expand All @@ -67,7 +67,6 @@ internal async ValueTask IsIndexFuzzyCheckpointCompletedAsync(CancellationToken
await t2.ConfigureAwait(false);
}


// Implementation of an asynchronous checkpointing scheme
// for main hash index of Tsavorite
private int mainIndexCheckpointCallbackCount;
Expand Down Expand Up @@ -160,7 +159,7 @@ private async ValueTask IsMainIndexCheckpointCompletedAsync(CancellationToken to
s.Release();
}

private unsafe void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context)
private void AsyncPageFlushCallback(uint errorCode, uint numBytes, object context)
{
// Set the page status to flushed
var mem = ((HashIndexPageAsyncFlushResult)context).mem;
Expand Down
Loading
Loading