Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 85 additions & 56 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,27 @@ protected abstract void ReadAsync<TContext>(ulong alignedSourceAddress, IntPtr d
/// <summary>Flush checkpoint Delta to the Device</summary>
[MethodImpl(MethodImplOptions.NoInlining)]
internal virtual void AsyncFlushDeltaToDevice(CircularDiskWriteBuffer flushBuffers, long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog,
out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
out Task completedTask, int throttleCheckpointFlushDelayMs)
{
logger?.LogTrace("Starting async delta log flush with throttling {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");

var _completedSemaphore = new SemaphoreSlim(0);
completedSemaphore = _completedSemaphore;

// If throttled, convert rest of the method into a truly async task run because issuing IO can take up synchronous time
if (throttleCheckpointFlushDelayMs >= 0)
_ = Task.Run(FlushRunner);
{
completedTask = Task.Run(FlushRunner);
}
else
FlushRunner();
{
try
{
FlushRunner();
completedTask = Task.CompletedTask;
}
catch (Exception ex)
{
completedTask = Task.FromException(ex);
}
}

void FlushRunner()
{
Expand Down Expand Up @@ -340,7 +349,6 @@ void FlushRunner()

if (destOffset > 0)
deltaLog.Seal(destOffset);
_completedSemaphore.Release();
}
}

Expand Down Expand Up @@ -403,7 +411,8 @@ public virtual void Dispose()
bufferPool.Free();

flushEvent.Dispose();
notifyFlushedUntilAddressSemaphore?.Dispose();
notifyFlushedUntilAddressTcs?.TrySetCanceled();
notifyFlushedUntilAddressTcs = null;

onReadOnlyObserver?.OnCompleted();
onEvictionObserver?.OnCompleted();
Expand Down Expand Up @@ -1304,15 +1313,15 @@ internal long CalculateReadOnlyAddress(long tailAddress, long headAddress)
}

/// <summary>Used by applications to make the current state of the database immutable quickly</summary>
public bool ShiftReadOnlyToTail(out long tailAddress, out SemaphoreSlim notifyDone)
public bool ShiftReadOnlyToTail(out long tailAddress, out Task notifyDone)
{
notifyDone = null;
tailAddress = GetTailAddress();
var localTailAddress = tailAddress;
if (MonotonicUpdate(ref ReadOnlyAddress, tailAddress, out _))
{
notifyFlushedUntilAddressSemaphore = new SemaphoreSlim(0);
notifyDone = notifyFlushedUntilAddressSemaphore;
notifyFlushedUntilAddressTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
notifyDone = notifyFlushedUntilAddressTcs.Task;
notifyFlushedUntilAddress = localTailAddress;
epoch.BumpCurrentEpoch(() => OnPagesMarkedReadOnly(localTailAddress));
return true;
Expand Down Expand Up @@ -1573,7 +1582,7 @@ protected void ShiftFlushedUntilAddress()
flushEvent.Set();

if ((oldFlushedUntilAddress < notifyFlushedUntilAddress) && (currentFlushedUntilAddress >= notifyFlushedUntilAddress))
_ = notifyFlushedUntilAddressSemaphore.Release();
_ = notifyFlushedUntilAddressTcs?.TrySetResult(true);
}
}

Expand All @@ -1592,8 +1601,8 @@ protected void ShiftFlushedUntilAddress()
/// <summary>Address for notification of flushed-until</summary>
public long notifyFlushedUntilAddress;

/// <summary>Semaphore for notification of flushed-until</summary>
public SemaphoreSlim notifyFlushedUntilAddressSemaphore;
/// <summary>TaskCompletionSource for notification of flushed-until</summary>
public TaskCompletionSource<bool> notifyFlushedUntilAddressTcs;

/// <summary>Reset for recovery</summary>
[MethodImpl(MethodImplOptions.NoInlining)]
Expand Down Expand Up @@ -1870,16 +1879,16 @@ public void AsyncFlushPagesForRecovery<TContext>(long scanFromAddress, long flus
/// <param name="fuzzyStartLogicalAddress"></param>
/// <param name="logDevice"></param>
/// <param name="objectLogDevice"></param>
/// <param name="completedSemaphore"></param>
/// <param name="completedTask">Task that completes when all pages are flushed, or faults if an exception occurs</param>
/// <param name="throttleCheckpointFlushDelayMs"></param>
[MethodImpl(MethodImplOptions.NoInlining)]
public void AsyncFlushPagesForSnapshot(CircularDiskWriteBuffer flushBuffers, long startPage, long endPage, long startLogicalAddress, long endLogicalAddress,
long fuzzyStartLogicalAddress, IDevice logDevice, IDevice objectLogDevice, out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
long fuzzyStartLogicalAddress, IDevice logDevice, IDevice objectLogDevice, out Task completedTask, int throttleCheckpointFlushDelayMs)
{
logger?.LogTrace("Starting async full log flush with throttling {throttlingEnabled}", throttleCheckpointFlushDelayMs >= 0 ? $"enabled ({throttleCheckpointFlushDelayMs}ms)" : "disabled");

var _completedSemaphore = new SemaphoreSlim(0);
completedSemaphore = _completedSemaphore;
var completionTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
completedTask = completionTcs.Task;

// If throttled, convert rest of the method into a truly async task run because issuing IO can take up synchronous time
if (throttleCheckpointFlushDelayMs >= 0)
Expand All @@ -1891,49 +1900,69 @@ void FlushRunner()
{
var totalNumPages = (int)(endPage - startPage);

var flushCompletionTracker = new FlushCompletionTracker(_completedSemaphore, throttleCheckpointFlushDelayMs >= 0 ? new SemaphoreSlim(0) : null, totalNumPages);
var flushCompletionTracker = new FlushCompletionTracker(completionTcs, enableThrottling: throttleCheckpointFlushDelayMs >= 0, totalNumPages);

// Flush each page in sequence
for (long flushPage = startPage; flushPage < endPage; flushPage++)
try
{
// For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page;
// for middle pages, we flush the entire page.
var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage);
if (startLogicalAddress > flushStartAddress)
flushStartAddress = startLogicalAddress;
var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1);
if (endLogicalAddress < flushEndAddress)
flushEndAddress = endLogicalAddress;
var flushSize = flushEndAddress - flushStartAddress;
if (flushSize <= 0)
continue;

var asyncResult = new PageAsyncFlushResult<Empty>
// Flush each page in sequence
for (long flushPage = startPage; flushPage < endPage; flushPage++)
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushStartAddress,
untilAddress = flushEndAddress,
count = 1,
flushRequestState = FlushRequestState.Snapshot,
flushBuffers = flushBuffers
};

// Intended destination is flushPage
WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress);

// If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here;
// otherwise, we wait for the completion of the flush (and the callback will release the asyncResult).
if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued)
{
if (throttleCheckpointFlushDelayMs >= 0)
// For the first page, startLogicalAddress may be in the middle of the page; for the last page, endLogicalAddress may be in the middle of the page;
// for middle pages, we flush the entire page.
var flushStartAddress = GetLogicalAddressOfStartOfPage(flushPage);
if (startLogicalAddress > flushStartAddress)
flushStartAddress = startLogicalAddress;
var flushEndAddress = GetLogicalAddressOfStartOfPage(flushPage + 1);
if (endLogicalAddress < flushEndAddress)
flushEndAddress = endLogicalAddress;
var flushSize = flushEndAddress - flushStartAddress;
if (flushSize <= 0)
{
// No data to flush for this page. Signal completion and drain the
// throttle semaphore so the next real page's WaitOneFlush is not
// satisfied by this page's release.
flushCompletionTracker.CompleteFlush();
flushCompletionTracker.WaitOneFlush();
continue;
}

var asyncResult = new PageAsyncFlushResult<Empty>
{
flushCompletionTracker = flushCompletionTracker,
page = flushPage,
fromAddress = flushStartAddress,
untilAddress = flushEndAddress,
count = 1,
flushRequestState = FlushRequestState.Snapshot,
flushBuffers = flushBuffers
};

// Intended destination is flushPage
WriteAsyncToDeviceForSnapshot(startPage, flushPage, (int)flushSize, AsyncFlushPageForSnapshotCallback, asyncResult, logDevice, objectLogDevice, fuzzyStartLogicalAddress);

// If we did not issue a flush write (due to HeadAddress moving past flushPage), then WriteAsync set isForSnapshot false and we release the asyncResult here;
// otherwise, we wait for the completion of the flush (and the callback will release the asyncResult).
if (asyncResult.flushRequestState != FlushRequestState.WriteNotIssued)
{
if (throttleCheckpointFlushDelayMs >= 0)
{
flushCompletionTracker.WaitOneFlush();
Thread.Sleep(throttleCheckpointFlushDelayMs);
}
}
else
{
_ = asyncResult.Release();
// Release() called CompleteFlush() which released the throttle semaphore.
// Drain it so the next real page's WaitOneFlush is not satisfied by this no-op.
flushCompletionTracker.WaitOneFlush();
Thread.Sleep(throttleCheckpointFlushDelayMs);
}
}
else
_ = asyncResult.Release();
}
catch (Exception ex)
{
logger?.LogError(ex, "{method} failed while flushing snapshot pages from {startPage} to {endPage}", nameof(AsyncFlushPagesForSnapshot), startPage, endPage);
flushCompletionTracker.SetException(ex);
}
}
}
Expand Down Expand Up @@ -2250,16 +2279,16 @@ protected void AsyncFlushPageForSnapshotCallback(uint errorCode, uint numBytes,
if (info.Dirty)
info.ClearDirtyAtomic(); // there may be read locks being taken, hence atomic
physicalAddress += alignedRecordSize;
startAddress += alignedRecordSize;
}
}
}
finally
{
if (epochTaken)
epoch.Suspend();
_ = result.Release();
}

_ = result.Release();
}
catch when (disposed) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public sealed class MallocFixedPageSize<T> : IDisposable
internal static bool IsBlittable => Utility.IsBlittable<T>();

private int checkpointCallbackCount;
private SemaphoreSlim checkpointSemaphore;
private TaskCompletionSource<bool> checkpointTcs;

private readonly ConcurrentQueue<long> freeList;

Expand Down Expand Up @@ -267,12 +267,10 @@ private unsafe long InternalAllocate(int blockSize)
/// <returns></returns>
public async ValueTask IsCheckpointCompletedAsync(CancellationToken token = default)
{
var s = checkpointSemaphore;
await s.WaitAsync(token).ConfigureAwait(false);
s.Release();
await checkpointTcs.Task.WaitAsync(token).ConfigureAwait(false);
}

public SemaphoreSlim GetCheckpointSemaphore() => checkpointSemaphore;
public Task GetCheckpointTask() => checkpointTcs.Task;

/// <summary>
/// Public facing persistence API
Expand All @@ -299,7 +297,7 @@ internal unsafe void BeginCheckpoint(IDevice device, ulong offset, out ulong num
int numCompleteLevels = localCount >> PageSizeBits;
int numLevels = numCompleteLevels + (recordsCountInLastLevel > 0 ? 1 : 0);
checkpointCallbackCount = numLevels;
checkpointSemaphore = new SemaphoreSlim(0);
checkpointTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
uint alignedPageSize = PageSize * (uint)RecordSize;
uint lastLevelSize = (uint)recordsCountInLastLevel * (uint)RecordSize;

Expand Down Expand Up @@ -353,7 +351,7 @@ private unsafe void AsyncFlushCallback(uint errorCode, uint numBytes, object con

if (Interlocked.Decrement(ref checkpointCallbackCount) == 0)
{
checkpointSemaphore.Release();
checkpointTcs.TrySetResult(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Tsavorite.core
Expand Down Expand Up @@ -519,17 +520,22 @@ protected override void WriteAsyncToDeviceForSnapshot<TContext>(long startPage,
var epochTaken = epoch.ResumeIfNotProtected();
try
{
if (HeadAddress >= asyncResult.untilAddress)
var headAddress = HeadAddress;

if (headAddress >= asyncResult.untilAddress)
{
// Requested span on page is entirely unavailable in memory; ignore it and call the callback directly.
callback(0, 0, asyncResult);
return;
}

// If requested page span is only partly available in memory, adjust the start position. WriteAsync will handle it if HeadAddress is lower,
// but this is faster.
if (HeadAddress > asyncResult.fromAddress)
asyncResult.fromAddress = HeadAddress;
// If requested page span is only partly available in memory, adjust the start position
// and mark as partial so WriteAsync recalculates the flush size from the adjusted range.
if (headAddress > asyncResult.fromAddress)
{
asyncResult.fromAddress = headAddress;
asyncResult.partial = true;
}

// We are writing to a separate device which starts at startPage. Eventually, startPage becomes the basis of
// HybridLogRecoveryInfo.snapshotStartFlushedLogicalAddress, which is the page starting at offset 0 of the snapshot file.
Expand Down Expand Up @@ -732,6 +738,8 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedMainLogFlushPageA
// which will never be less than HeadAddress. So we do not need to worry about whatever values are in the inline
// record space between the current logicalAddress and HeadAddress.
extraRecordOffset = (int)(headAddress - (logicalAddress + logRecordSize));
// Skip object serialization
goto NextRecord;
}
else
{
Expand Down Expand Up @@ -777,6 +785,7 @@ private void WriteAsync<TContext>(long flushPage, ulong alignedMainLogFlushPageA
}
} // endif record id Valid

NextRecord:
logicalAddress += logRecordSize + extraRecordOffset; // advance in main log
physicalAddress += logRecordSize + extraRecordOffset; // advance in source buffer
}
Expand Down Expand Up @@ -1067,7 +1076,7 @@ internal override void MemoryPageScan(long beginAddress, long endAddress, IObser
observer?.OnNext(iter);
}

internal override void AsyncFlushDeltaToDevice(CircularDiskWriteBuffer flushBuffers, long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog, out SemaphoreSlim completedSemaphore, int throttleCheckpointFlushDelayMs)
internal override void AsyncFlushDeltaToDevice(CircularDiskWriteBuffer flushBuffers, long startAddress, long endAddress, long prevEndAddress, long version, DeltaLog deltaLog, out Task completedTask, int throttleCheckpointFlushDelayMs)
{
throw new TsavoriteException("Incremental snapshots not supported with generic allocator");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace Tsavorite.core
Expand Down Expand Up @@ -338,7 +338,7 @@ internal struct HybridLogCheckpointInfo : IDisposable
public IDevice snapshotFileObjectLogDevice;
public IDevice deltaFileDevice;
public DeltaLog deltaLog;
public SemaphoreSlim flushedSemaphore;
public Task flushedTask;
public long prevVersion;
internal CircularDiskWriteBuffer objectLogFlushBuffers;

Expand Down
Loading
Loading