Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 71 additions & 49 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,27 @@ protected abstract void ReadAsync<TContext>(ulong alignedSourceAddress, IntPtr d
/// <summary>Flush checkpoint Delta to the Device</summary>
[MethodImpl(MethodImplOptions.NoInlining)]
internal virtual void AsyncFlushDeltaToDevice(CircularDiskWriteBuffer flushBuffers, long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog,
out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
out Task completedTask, int throttleCheckpointFlushDelayMs)
{
logger?.LogTrace("Starting async delta log flush with throttling {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");

var _completedSemaphore = new SemaphoreSlim(0);
completedSemaphore = _completedSemaphore;

// If throttled, convert rest of the method into a truly async task run because issuing IO can take up synchronous time
if (throttleCheckpointFlushDelayMs >= 0)
_ = Task.Run(FlushRunner);
{
completedTask = Task.Run(FlushRunner);
}
else
FlushRunner();
{
try
{
FlushRunner();
completedTask = Task.CompletedTask;
}
catch (Exception ex)
{
completedTask = Task.FromException(ex);
}
}

void FlushRunner()
{
Expand Down Expand Up @@ -340,7 +349,6 @@ void FlushRunner()

if (destOffset > 0)
deltaLog.Seal(destOffset);
_completedSemaphore.Release();
}
}

Expand Down Expand Up @@ -1870,16 +1878,16 @@ public void AsyncFlushPagesForRecovery<TContext>(long scanFromAddress, long flus
/// <param name="fuzzyStartLogicalAddress"></param>
/// <param name="logDevice"></param>
/// <param name="objectLogDevice"></param>
/// <param name="completedSemaphore"></param>
/// <param name="completedTask">Task that completes when all pages are flushed, or faults if an exception occurs</param>
/// <param name="throttleCheckpointFlushDelayMs"></param>
[MethodImpl(MethodImplOptions.NoInlining)]
public void AsyncFlushPagesForSnapshot(CircularDiskWriteBuffer flushBuffers, long startPage, long endPage, long startLogicalAddress, long endLogicalAddress,
long fuzzyStartLogicalAddress, IDevice logDevice, IDevice objectLogDevice, out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
long fuzzyStartLogicalAddress, IDevice logDevice, IDevice objectLogDevice, out Task completedTask, int throttleCheckpointFlushDelayMs)
{
logger?.LogTrace("Starting async full log flush with throttling {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");

var _completedSemaphore = new SemaphoreSlim(0);
completedSemaphore = _completedSemaphore;
var completionTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
completedTask = completionTcs.Task;

// If throttled, convert rest of the method into a truly async task run because issuing IO can take up synchronous time
if (throttleCheckpointFlushDelayMs >= 0)
Expand All @@ -1891,49 +1899,64 @@ void FlushRunner()
{
var totalNumPages = (int)(endPage - startPage);

var flushCompletionTracker = new FlushCompletionTracker(_completedSemaphore, throttleCheckpointFlushDelayMs >= 0 ? new SemaphoreSlim(0) : null, totalNumPages);
var flushCompletionTracker = new FlushCompletionTracker(completionTcs, enableThrottling: throttleCheckpointFlushDelayMs >= 0, totalNumPages);

// Flush each page in sequence
for (long flushPage = startPage; flushPage < endPage; flushPage++)
try
{
// For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page;
// for middle pages, we flush the entire page.
var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage);
if (startLogicalAddress > flushStartAddress)
flushStartAddress = startLogicalAddress;
var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1);
if (endLogicalAddress < flushEndAddress)
flushEndAddress = endLogicalAddress;
var flushSize = flushEndAddress - flushStartAddress;
if (flushSize <= 0)
continue;

var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushStartAddress,
untilAddress = flushEndAddress,
count = 1,
flushRequestState = FlushRequestState.Snapshot,
flushBuffers = flushBuffers
};

// Intended destination is flushPage
WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress);

// If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here;
// otherwise, we wait for the completion of the flush (and the callback will release the asyncResult).
if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued)
// Flush each page in sequence
for (long flushPage = startPage; flushPage < endPage; flushPage++)
{
if (throttleCheckpointFlushDelayMs >= 0)
// For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page;
// for middle pages, we flush the entire page.
var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage);
if (startLogicalAddress > flushStartAddress)
flushStartAddress = startLogicalAddress;
var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1);
if (endLogicalAddress < flushEndAddress)
flushEndAddress = endLogicalAddress;
var flushSize = flushEndAddress - flushStartAddress;
if (flushSize <= 0)
{
// No data to flush for this page. Signal completion and drain the
// throttle semaphore so the next real page's WaitOneFlush is not
// satisfied by this page's release.
flushCompletionTracker.CompleteFlush();
flushCompletionTracker.WaitOneFlush();
Thread.Sleep(throttleCheckpointFlushDelayMs);
continue;
}

var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushStartAddress,
untilAddress = flushEndAddress,
count = 1,
flushRequestState = FlushRequestState.Snapshot,
flushBuffers = flushBuffers
};

// Intended destination is flushPage
WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress);

// If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here;
// otherwise, we wait for the completion of the flush (and the callback will release the asyncResult).
if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued)
{
if (throttleCheckpointFlushDelayMs >= 0)
{
flushCompletionTracker.WaitOneFlush();
Thread.Sleep(throttleCheckpointFlushDelayMs);
}
}
else
_ = asyncResult.Release();
}
else
_ = asyncResult.Release();
}
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed while flushing snapshot pages from {startPage} to {endPage}", nameof(AsyncFlushPagesForSnapshot), startPage, endPage);
flushCompletionTracker.SetException(ex);
}
}
}
Expand Down Expand Up @@ -2257,9 +2280,8 @@ protected void AsyncFlushPageForSnapshotCallback(uint errorCode, uint numBytes,
{
if (epochTaken)
epoch.Suspend();
_ = result.Release();
}

_ = result.Release();
}
catch when (disposed) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Tsavorite.core
Expand Down Expand Up @@ -732,6 +733,8 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedMainLogFlushPageA
// which will never be less than HeadAddress. So we do not need to worry about whatever values are in the inline
// record space between the current logicalAddress and HeadAddress.
extraRecordOffset = (int)(headAddress - (logicalAddress + logRecordSize));
// Skip object serialization
goto NextRecord;
}
else
{
Expand Down Expand Up @@ -777,6 +780,7 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedMainLogFlushPageA
}
} // endif record id Valid

NextRecord:
logicalAddress += logRecordSize + extraRecordOffset; // advance in main log
physicalAddress += logRecordSize + extraRecordOffset; // advance in source buffer
}
Expand Down Expand Up @@ -1067,7 +1071,7 @@ internal override void MemoryPageScan(long beginAddress, long endAddress, IObser
observer?.OnNext(iter);
}

internal override void AsyncFlushDeltaToDevice(CircularDiskWriteBuffer flushBuffers, long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog, out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
internal override void AsyncFlushDeltaToDevice(CircularDiskWriteBuffer flushBuffers, long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog, out Task completedTask, int throttleCheckpointFlushDelayMs)
{
throw new TsavoriteException("Incremental snapshots not supported with generic allocator");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Tsavorite.core
Expand Down Expand Up @@ -338,7 +338,7 @@ internal struct HybridLogCheckpointInfo : IDisposable
public IDevice snapshotFileObjectLogDevice;
public IDevice deltaFileDevice;
public DeltaLog deltaLog;
public SemaphoreSlim flushedSemaphore;
public Task flushedTask;
public long prevVersion;
internal CircularDiskWriteBuffer objectLogFlushBuffers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
try
{
store.epoch.Resume();
_ = store.hlogBase.ShiftReadOnlyToTail(out var tailAddress, out store._hybridLogCheckpoint.flushedSemaphore);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
_ = store.hlogBase.ShiftReadOnlyToTail(out var tailAddress, out var flushedSemaphore);
if (flushedSemaphore != null)
{
store._hybridLogCheckpoint.flushedTask = flushedSemaphore.WaitAsync();
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedTask, StateMachineTaskType.FoldOverSMTaskHybridLogFlushed);
}

// Update final logical address to the flushed tail - this may not be necessary
store._hybridLogCheckpoint.info.finalLogicalAddress = tailAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
store._lastSnapshotCheckpoint.info.finalLogicalAddress,
store._hybridLogCheckpoint.prevVersion,
store._hybridLogCheckpoint.deltaLog,
out store._hybridLogCheckpoint.flushedSemaphore,
out store._hybridLogCheckpoint.flushedTask,
store.ThrottleCheckpointFlushDelayMs);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
if (store._hybridLogCheckpoint.flushedTask != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedTask, StateMachineTaskType.IncrementalSnapshotCheckpointSMTaskHybridLogFlushed);
break;

case Phase.PERSISTENCE_CALLBACK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public override void GlobalBeforeEnteringState(SystemState next, StateMachineDri
fuzzyStartLogicalAddress: store._hybridLogCheckpoint.info.startLogicalAddress,
logDevice: store._hybridLogCheckpoint.snapshotFileDevice,
objectLogDevice: store._hybridLogCheckpoint.snapshotFileObjectLogDevice,
out store._hybridLogCheckpoint.flushedSemaphore,
out store._hybridLogCheckpoint.flushedTask,
store.ThrottleCheckpointFlushDelayMs);
if (store._hybridLogCheckpoint.flushedSemaphore != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedSemaphore);
if (store._hybridLogCheckpoint.flushedTask != null)
stateMachineDriver.AddToWaitingList(store._hybridLogCheckpoint.flushedTask, StateMachineTaskType.SnapshotCheckpointSMTaskHybridLogFlushed);
break;

case Phase.PERSISTENCE_CALLBACK:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class StateMachineDriver
{
SystemState systemState;
IStateMachine stateMachine;
readonly List<SemaphoreSlim> waitingList;
readonly List<(Task task, StateMachineTaskType type)> waitingList;
TaskCompletionSource<bool> stateMachineCompleted;
// All threads have entered the given state
SemaphoreSlim waitForTransitionIn;
Expand Down Expand Up @@ -75,7 +75,7 @@ internal void TrackLastVersion(long version)

// We have to re-check the number of active transactions after assigning lastVersion and lastVersionTransactionsDone
if (GetNumActiveTransactions(version) > 0)
AddToWaitingList(lastVersionTransactionsDone);
AddToWaitingList(lastVersionTransactionsDone, StateMachineTaskType.LastVersionTransactionsDone);
}

internal void ResetLastVersion()
Expand Down Expand Up @@ -155,10 +155,16 @@ public long VerifyTransactionVersion(long txnVersion)
public void EndTransaction(long txnVersion)
=> DecrementActiveTransactions(txnVersion);

internal void AddToWaitingList(SemaphoreSlim waiter)
internal void AddToWaitingList(SemaphoreSlim waiter, StateMachineTaskType type)
{
if (waiter != null)
waitingList.Add(waiter);
waitingList.Add((waiter.WaitAsync(), type));
}

internal void AddToWaitingList(Task waiter, StateMachineTaskType type)
{
if (waiter != null)
waitingList.Add((waiter, type));
}

public bool Register(IStateMachine stateMachine, CancellationToken token = default)
Expand Down Expand Up @@ -309,9 +315,17 @@ async Task ProcessWaitingListAsync(CancellationToken token = default)
{
throw waitForTransitionInException;
}
foreach (var waiter in waitingList)
foreach (var (task, type) in waitingList)
{
await waiter.WaitAsync(token);
try
{
await task.WaitAsync(token).ConfigureAwait(false);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger?.LogError(ex, "State machine task '{type}' faulted", type);
throw;
}
}
waitingList.Clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Tsavorite.core
{
/// <summary>
/// Identifies the type of waiter added to the state machine driver waiting list,
/// including the originating state machine task.
/// </summary>
public enum StateMachineTaskType
{
/// <summary>
/// Waiting for all transactions in the last version to complete.
/// </summary>
LastVersionTransactionsDone,

/// <summary>
/// Waiting for the main index checkpoint to complete (IndexCheckpointSMTask).
/// </summary>
IndexCheckpointSMTaskMainIndexCheckpoint,

/// <summary>
/// Waiting for the overflow buckets checkpoint to complete (IndexCheckpointSMTask).
/// </summary>
IndexCheckpointSMTaskOverflowBucketsCheckpoint,

/// <summary>
/// Waiting for the hybrid log flush to complete (FoldOverSMTask).
/// </summary>
FoldOverSMTaskHybridLogFlushed,

/// <summary>
/// Waiting for the hybrid log flush to complete (IncrementalSnapshotCheckpointSMTask).
/// </summary>
IncrementalSnapshotCheckpointSMTaskHybridLogFlushed,

/// <summary>
/// Waiting for the hybrid log flush to complete (SnapshotCheckpointSMTask).
/// </summary>
SnapshotCheckpointSMTaskHybridLogFlushed,
}
}
Loading
Loading