Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddr

public void Throttle()
{
if (!garnetClient.IsConnected)
ExceptionUtils.ThrowException(new GarnetException("AOF stream client disconnected!"));

// Trigger flush while we are out of epoch protection
garnetClient.CompletePending(false);
garnetClient.Throttle();
Expand All @@ -108,11 +111,11 @@ public async Task ReplicaSyncTask()

iter = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(startAddress, long.MaxValue, scanUncommitted: true, recover: false, logger: logger);

while (true)
{
if (cts.Token.IsCancellationRequested) break;
await iter.BulkConsumeAllAsync(this, clusterProvider.serverOptions.ReplicaSyncDelayMs, maxChunkSize: 1 << 20, cts.Token);
}
await iter.BulkConsumeAllAsync(
this,
clusterProvider.serverOptions.ReplicaSyncDelayMs,
maxChunkSize: 1 << 20,
cts.Token).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down
15 changes: 6 additions & 9 deletions libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,12 @@ public async void ReplicaReplayTask()
try
{
activeReplay.ReadLock();
while (true)
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();
await replayIterator.BulkConsumeAllAsync(
this,
clusterProvider.serverOptions.ReplicaSyncDelayMs,
maxChunkSize: 1 << 20,
replicaReplayTaskCts.Token);
}

await replayIterator.BulkConsumeAllAsync(
this,
clusterProvider.serverOptions.ReplicaSyncDelayMs,
maxChunkSize: 1 << 20,
replicaReplayTaskCts.Token);
}
catch (Exception ex)
{
Expand Down
14 changes: 14 additions & 0 deletions libs/common/ExceptionUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Runtime.CompilerServices;

namespace Garnet.common
{
public static class ExceptionUtils
{
[MethodImpl(MethodImplOptions.NoInlining)]
public static void ThrowException(Exception e) => throw e;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,9 @@ public unsafe bool TryConsumeNext<T>(T consumer) where T : ILogEntryConsumer
/// <returns>whether a next entry is present</returns>
public unsafe bool TryBulkConsumeNext<T>(T consumer, int maxChunkSize = 0) where T : IBulkLogEntryConsumer
{
// Throttle and implicitly check for consumer liveness
consumer.Throttle();

if (maxChunkSize == 0) maxChunkSize = allocator.PageSize;

if (disposed)
Expand All @@ -500,7 +503,7 @@ public unsafe bool TryBulkConsumeNext<T>(T consumer, int maxChunkSize = 0) where
epoch.ProtectAndDrain();
}

var hasNext = GetNextInternal(out long startPhysicalAddress, out int newEntryLength, out long startLogicalAddress, out long endLogicalAddress, out bool isCommitRecord, out bool onFrame);
var hasNext = GetNextInternal(out long startPhysicalAddress, out var newEntryLength, out var startLogicalAddress, out var endLogicalAddress, out bool isCommitRecord, out bool onFrame);

if (!hasNext)
{
Expand All @@ -509,7 +512,7 @@ public unsafe bool TryBulkConsumeNext<T>(T consumer, int maxChunkSize = 0) where
}

// GetNextInternal returns only the payload length, so adjust the totalLength
int totalLength = headerSize + Align(newEntryLength);
var totalLength = headerSize + Align(newEntryLength);

// Expand the records in iteration, as long as as they are on the same physical page
while (ExpandGetNextInternal(startPhysicalAddress, ref totalLength, out _, out endLogicalAddress, out isCommitRecord))
Expand Down
Loading