Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ static partial class CmdStrings
public const string GenericUnknownClientType = "ERR Unknown client type '{0}'";
public const string GenericErrDuplicateFilter = "ERR Filter '{0}' defined multiple times";
public const string GenericPubSubCommandDisabled = "ERR {0} is disabled, enable it with --pubsub option.";
public const string GenericPubSubCommandNotAllowed = "ERR Can't execute '{0}': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context";
public const string GenericErrLonLat = "ERR invalid longitude,latitude pair {0:F6},{1:F6}";
public const string GenericErrStoreCommand = "ERR STORE option in {0} is not compatible with WITHDIST, WITHHASH and WITHCOORD options";
public const string GenericErrCommandDisallowedWithOption =
Expand Down
17 changes: 17 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,23 @@ public static bool IsClusterSubCommand(this RespCommand cmd)
bool inRange = test <= (RespCommand.CLUSTER_SYNC - RespCommand.CLUSTER_ADDSLOTS);
return inRange;
}

/// <summary>
/// Returns true if <paramref name="cmd"/> is allowed while a session is in
/// pub/sub subscription mode (RESP2). Per the Redis protocol, only
/// (P|S)SUBSCRIBE, (P|S)UNSUBSCRIBE, PING, and QUIT are valid in this state.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool IsAllowedInSubscriptionMode(this RespCommand cmd)
{
return cmd is RespCommand.SUBSCRIBE
or RespCommand.UNSUBSCRIBE
or RespCommand.PSUBSCRIBE
or RespCommand.PUNSUBSCRIBE
or RespCommand.SSUBSCRIBE
or RespCommand.PING
or RespCommand.QUIT;
}
}

/// <summary>
Expand Down
24 changes: 16 additions & 8 deletions libs/server/Resp/PubSubCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using Garnet.common;
using Tsavorite.core;

Expand All @@ -21,9 +20,15 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase
/// <inheritdoc />
public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
{
// When a session publishes to a channel it is itself subscribed to, the Broadcast
// callback is invoked on the same thread that already holds the network sender lock
// via TryConsumeMessages. Detect this reentrant case and write directly to the
// existing output buffer instead of re-entering the lock.
var reentrant = commandProcessingThreadId == Environment.CurrentManagedThreadId;
try
{
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
if (!reentrant)
networkSender.EnterAndGetResponseObject(out dcurr, out dend);

WritePushLength(3);

Expand All @@ -35,7 +40,7 @@ public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
WriteDirectLargeRespString(value.ReadOnlySpan);

// Flush the publish message for this subscriber
if (dcurr > networkSender.GetResponseObjectHead())
if (!reentrant && dcurr > networkSender.GetResponseObjectHead())
Send(networkSender.GetResponseObjectHead());
}
catch
Expand All @@ -44,16 +49,19 @@ public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value)
}
finally
{
networkSender.ExitAndReturnResponseObject();
if (!reentrant)
networkSender.ExitAndReturnResponseObject();
}
}

/// <inheritdoc />
public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByte key, PinnedSpanByte value)
{
var reentrant = commandProcessingThreadId == Environment.CurrentManagedThreadId;
try
{
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
if (!reentrant)
networkSender.EnterAndGetResponseObject(out dcurr, out dend);

WritePushLength(4);

Expand All @@ -65,7 +73,7 @@ public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByt
WriteDirectLargeRespString(key.ReadOnlySpan);
WriteDirectLargeRespString(value.ReadOnlySpan);

if (dcurr > networkSender.GetResponseObjectHead())
if (!reentrant && dcurr > networkSender.GetResponseObjectHead())
Send(networkSender.GetResponseObjectHead());
}
catch
Expand All @@ -74,7 +82,8 @@ public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByt
}
finally
{
networkSender.ExitAndReturnResponseObject();
if (!reentrant)
networkSender.ExitAndReturnResponseObject();
}
}

Expand All @@ -100,7 +109,6 @@ private bool NetworkPUBLISH(RespCommand cmd)
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_CLUSTER_DISABLED);
}

Debug.Assert(isSubscriptionSession == false);
// PUBLISH channel message => [*3\r\n$7\r\nPUBLISH\r\n$]7\r\nchannel\r\n$7\r\message\r\n

var key = parseState.GetArgSliceByRef(0);
Expand Down
16 changes: 14 additions & 2 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase

int opCount;

// Thread ID of the thread currently processing commands (used by Publish/PatternPublish
// callbacks to detect reentrant calls when the same session publishes to a self-subscribed channel)
int commandProcessingThreadId;

/// <summary>
/// Current database session items
/// </summary>
Expand Down Expand Up @@ -441,8 +445,8 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
clusterSession?.AcquireCurrentEpoch();
recvBufferPtr = reqBuffer;
networkSender.EnterAndGetResponseObject(out dcurr, out dend);
commandProcessingThreadId = Environment.CurrentManagedThreadId;
ProcessMessages();
recvBufferPtr = null;
}
catch (RespParsingException ex)
{
Expand Down Expand Up @@ -497,6 +501,7 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
}
finally
{
commandProcessingThreadId = 0;
networkSender.ExitAndReturnResponseObject();
clusterSession?.ReleaseCurrentEpoch();
scratchBufferBuilder.Reset();
Expand Down Expand Up @@ -575,7 +580,14 @@ private void ProcessMessages()

if (CheckACLPermissions(cmd) && (noScriptPassed = CheckScriptPermissions(cmd)))
{
if (txnManager.state != TxnState.None)
// In RESP2, only a small set of commands are allowed while in subscription mode.
// RESP3 uses distinct push types for subscription messages, so all commands are valid.
if (isSubscriptionSession && respProtocolVersion == 2 && !cmd.IsAllowedInSubscriptionMode())
{
while (!RespWriteUtils.TryWriteError(string.Format(CmdStrings.GenericPubSubCommandNotAllowed, cmd.ToString()), ref dcurr, dend))
SendAndReset();
}
else if (txnManager.state != TxnState.None)
{
if (txnManager.state == TxnState.Running)
{
Expand Down
Loading
Loading