diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index e5b18f790a5..3c92b0f5570 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -328,6 +328,7 @@ static partial class CmdStrings public const string GenericUnknownClientType = "ERR Unknown client type '{0}'"; public const string GenericErrDuplicateFilter = "ERR Filter '{0}' defined multiple times"; public const string GenericPubSubCommandDisabled = "ERR {0} is disabled, enable it with --pubsub option."; + public const string GenericPubSubCommandNotAllowed = "ERR Can't execute '{0}': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context"; public const string GenericErrLonLat = "ERR invalid longitude,latitude pair {0:F6},{1:F6}"; public const string GenericErrStoreCommand = "ERR STORE option in {0} is not compatible with WITHDIST, WITHHASH and WITHCOORD options"; public const string GenericErrCommandDisallowedWithOption = diff --git a/libs/server/Resp/Parser/RespCommand.cs b/libs/server/Resp/Parser/RespCommand.cs index cc81121b1df..1f657c06c5e 100644 --- a/libs/server/Resp/Parser/RespCommand.cs +++ b/libs/server/Resp/Parser/RespCommand.cs @@ -627,6 +627,23 @@ public static bool IsClusterSubCommand(this RespCommand cmd) bool inRange = test <= (RespCommand.CLUSTER_SYNC - RespCommand.CLUSTER_ADDSLOTS); return inRange; } + + /// + /// Returns true if is allowed while a session is in + /// pub/sub subscription mode (RESP2). Per the Redis protocol, only + /// (P|S)SUBSCRIBE, (P|S)UNSUBSCRIBE, PING, and QUIT are valid in this state. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static bool IsAllowedInSubscriptionMode(this RespCommand cmd) + { + return cmd is RespCommand.SUBSCRIBE + or RespCommand.UNSUBSCRIBE + or RespCommand.PSUBSCRIBE + or RespCommand.PUNSUBSCRIBE + or RespCommand.SSUBSCRIBE + or RespCommand.PING + or RespCommand.QUIT; + } } /// diff --git a/libs/server/Resp/PubSubCommands.cs b/libs/server/Resp/PubSubCommands.cs index 66ff39ccb2d..67c50d34eb5 100644 --- a/libs/server/Resp/PubSubCommands.cs +++ b/libs/server/Resp/PubSubCommands.cs @@ -3,7 +3,6 @@ using System; using System.Collections.Generic; -using System.Diagnostics; using Garnet.common; using Tsavorite.core; @@ -21,9 +20,15 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase /// public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value) { + // When a session publishes to a channel it is itself subscribed to, the Broadcast + // callback is invoked on the same thread that already holds the network sender lock + // via TryConsumeMessages. Detect this reentrant case and write directly to the + // existing output buffer instead of re-entering the lock. + var reentrant = commandProcessingThreadId == Environment.CurrentManagedThreadId; try { - networkSender.EnterAndGetResponseObject(out dcurr, out dend); + if (!reentrant) + networkSender.EnterAndGetResponseObject(out dcurr, out dend); WritePushLength(3); @@ -35,7 +40,7 @@ public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value) WriteDirectLargeRespString(value.ReadOnlySpan); // Flush the publish message for this subscriber - if (dcurr > networkSender.GetResponseObjectHead()) + if (!reentrant && dcurr > networkSender.GetResponseObjectHead()) Send(networkSender.GetResponseObjectHead()); } catch @@ -44,16 +49,19 @@ public override unsafe void Publish(PinnedSpanByte key, PinnedSpanByte value) } finally { - networkSender.ExitAndReturnResponseObject(); + if (!reentrant) + networkSender.ExitAndReturnResponseObject(); } } /// public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByte key, PinnedSpanByte value) { + var reentrant = commandProcessingThreadId == Environment.CurrentManagedThreadId; try { - networkSender.EnterAndGetResponseObject(out dcurr, out dend); + if (!reentrant) + networkSender.EnterAndGetResponseObject(out dcurr, out dend); WritePushLength(4); @@ -65,7 +73,7 @@ public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByt WriteDirectLargeRespString(key.ReadOnlySpan); WriteDirectLargeRespString(value.ReadOnlySpan); - if (dcurr > networkSender.GetResponseObjectHead()) + if (!reentrant && dcurr > networkSender.GetResponseObjectHead()) Send(networkSender.GetResponseObjectHead()); } catch @@ -74,7 +82,8 @@ public override unsafe void PatternPublish(PinnedSpanByte pattern, PinnedSpanByt } finally { - networkSender.ExitAndReturnResponseObject(); + if (!reentrant) + networkSender.ExitAndReturnResponseObject(); } } @@ -100,7 +109,6 @@ private bool NetworkPUBLISH(RespCommand cmd) return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_CLUSTER_DISABLED); } - Debug.Assert(isSubscriptionSession == false); // PUBLISH channel message => [*3\r\n$7\r\nPUBLISH\r\n$]7\r\nchannel\r\n$7\r\message\r\n var key = parseState.GetArgSliceByRef(0); diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index f85e4a968cc..4596ce0f8df 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -80,6 +80,10 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase int opCount; + // Thread ID of the thread currently processing commands (used by Publish/PatternPublish + // callbacks to detect reentrant calls when the same session publishes to a self-subscribed channel) + int commandProcessingThreadId; + /// /// Current database session items /// @@ -441,8 +445,8 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) clusterSession?.AcquireCurrentEpoch(); recvBufferPtr = reqBuffer; networkSender.EnterAndGetResponseObject(out dcurr, out dend); + commandProcessingThreadId = Environment.CurrentManagedThreadId; ProcessMessages(); - recvBufferPtr = null; } catch (RespParsingException ex) { @@ -497,6 +501,7 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived) } finally { + commandProcessingThreadId = 0; networkSender.ExitAndReturnResponseObject(); clusterSession?.ReleaseCurrentEpoch(); scratchBufferBuilder.Reset(); @@ -575,7 +580,14 @@ private void ProcessMessages() if (CheckACLPermissions(cmd) && (noScriptPassed = CheckScriptPermissions(cmd))) { - if (txnManager.state != TxnState.None) + // In RESP2, only a small set of commands are allowed while in subscription mode. + // RESP3 uses distinct push types for subscription messages, so all commands are valid. + if (isSubscriptionSession && respProtocolVersion == 2 && !cmd.IsAllowedInSubscriptionMode()) + { + while (!RespWriteUtils.TryWriteError(string.Format(CmdStrings.GenericPubSubCommandNotAllowed, cmd.ToString()), ref dcurr, dend)) + SendAndReset(); + } + else if (txnManager.state != TxnState.None) { if (txnManager.state == TxnState.Running) { diff --git a/test/Garnet.test/RespPubSubTests.cs b/test/Garnet.test/RespPubSubTests.cs index ea65051fe94..48160a0323f 100644 --- a/test/Garnet.test/RespPubSubTests.cs +++ b/test/Garnet.test/RespPubSubTests.cs @@ -6,6 +6,7 @@ using System.Security.Cryptography; using System.Threading; using Allure.NUnit; +using Garnet.common; using NUnit.Framework; using NUnit.Framework.Legacy; using StackExchange.Redis; @@ -230,5 +231,244 @@ private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel cha ClassicAssert.IsTrue(repeat != 0, "Timeout waiting for subscription receive"); } } + + /// + /// Verifies that disallowed commands (GET, SET, PUBLISH, MULTI) are rejected + /// with an appropriate error when a RESP2 session is in subscription mode. + /// + [Test] + public void PubSubModeRejectsDisallowedCommandsInResp2() + { + using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Bytes); + + // Subscribe to enter subscription mode + var subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:1\r\n"; + var response = lightClientRequest.Execute("SUBSCRIBE foo", subscribeResp.Length); + ClassicAssert.AreEqual(subscribeResp, response); + + // GET should be rejected + var errorResp = "-ERR Can't execute 'GET': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n"; + response = lightClientRequest.Execute("GET bar", errorResp.Length); + ClassicAssert.AreEqual(errorResp, response); + + // SET should be rejected + errorResp = "-ERR Can't execute 'SET': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n"; + response = lightClientRequest.Execute("SET bar value", errorResp.Length); + ClassicAssert.AreEqual(errorResp, response); + + // PUBLISH should be rejected + errorResp = "-ERR Can't execute 'PUBLISH': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n"; + response = lightClientRequest.Execute("PUBLISH foo bar", errorResp.Length); + ClassicAssert.AreEqual(errorResp, response); + + // MULTI should be rejected + errorResp = "-ERR Can't execute 'MULTI': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n"; + response = lightClientRequest.Execute("MULTI", errorResp.Length); + ClassicAssert.AreEqual(errorResp, response); + } + + /// + /// Verifies that allowed commands (PING, SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, + /// PUNSUBSCRIBE, QUIT) work correctly in RESP2 subscription mode. + /// + [Test] + public void PubSubModeAllowsValidCommandsInResp2() + { + using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Bytes); + + // Enter subscription mode + var subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:1\r\n"; + var response = lightClientRequest.Execute("SUBSCRIBE foo", subscribeResp.Length); + ClassicAssert.AreEqual(subscribeResp, response); + + // PING should return subscription-mode PONG + var pongResp = "*2\r\n$4\r\npong\r\n$0\r\n\r\n"; + response = lightClientRequest.Execute("PING", pongResp.Length); + ClassicAssert.AreEqual(pongResp, response); + + // Another SUBSCRIBE should work (channel count goes to 2) + subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nbar\r\n:2\r\n"; + response = lightClientRequest.Execute("SUBSCRIBE bar", subscribeResp.Length); + ClassicAssert.AreEqual(subscribeResp, response); + + // PSUBSCRIBE should work (channel count goes to 3) + var psubResp = "*3\r\n$10\r\npsubscribe\r\n$4\r\nbaz*\r\n:3\r\n"; + response = lightClientRequest.Execute("PSUBSCRIBE baz*", psubResp.Length); + ClassicAssert.AreEqual(psubResp, response); + + // UNSUBSCRIBE should work (channel count goes to 2) + var unsubResp = "*3\r\n$11\r\nunsubscribe\r\n$3\r\nbar\r\n:2\r\n"; + response = lightClientRequest.Execute("UNSUBSCRIBE bar", unsubResp.Length); + ClassicAssert.AreEqual(unsubResp, response); + + // PUNSUBSCRIBE should work (channel count goes to 1) + var punsubResp = "*3\r\n$12\r\npunsubscribe\r\n$4\r\nbaz*\r\n:1\r\n"; + response = lightClientRequest.Execute("PUNSUBSCRIBE baz*", punsubResp.Length); + ClassicAssert.AreEqual(punsubResp, response); + + // Still in subscription mode - GET should be rejected + var errorResp = "-ERR Can't execute 'GET': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n"; + response = lightClientRequest.Execute("GET bar", errorResp.Length); + ClassicAssert.AreEqual(errorResp, response); + + // UNSUBSCRIBE last channel to exit subscription mode (channel count goes to 0) + unsubResp = "*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n"; + response = lightClientRequest.Execute("UNSUBSCRIBE foo", unsubResp.Length); + ClassicAssert.AreEqual(unsubResp, response); + + // No longer in subscription mode - GET should work (key doesn't exist = null) + var getResp = "$-1\r\n"; + response = lightClientRequest.Execute("GET bar", getResp.Length); + ClassicAssert.AreEqual(getResp, response); + } + + /// + /// Verifies that a RESP3 session in subscription mode can execute PUBLISH + /// (including self-publish to its own subscribed channel) without a + /// SynchronizationLockException crash. This was the core lock bug in issue #1615. + /// + [Test] + public void PubSubSelfPublishResp3NoLockError() + { + // Use Newlines counting for HELLO 3 (variable-length map response) + using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Newlines); + + // Switch to RESP3 + var response = lightClientRequest.Execute("HELLO 3", 30); + ClassicAssert.IsTrue(response.Contains("proto")); + + // Switch to Bytes counting for precise response control + lightClientRequest.countResponseType = CountResponseType.Bytes; + + // Subscribe to a channel + var subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:1\r\n"; + response = lightClientRequest.Execute("SUBSCRIBE foo", subscribeResp.Length); + ClassicAssert.AreEqual(subscribeResp, response); + + // Self-publish: this triggers the reentrant Publish() callback on the same session. + // Before the fix, this would crash with SynchronizationLockException. + // + // Expected response consists of: + // 1. Push message (self-notification): >3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n + // 2. PUBLISH response: :1\r\n + var pushMsg = ">3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$3\r\nbar\r\n"; + var publishResp = ":1\r\n"; + var expectedPublishTotal = pushMsg + publishResp; + response = lightClientRequest.Execute("PUBLISH foo bar", expectedPublishTotal.Length); + ClassicAssert.AreEqual(expectedPublishTotal, response); + + // Unsubscribe and verify server is still responsive + var unsubResp = "*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n"; + response = lightClientRequest.Execute("UNSUBSCRIBE foo", unsubResp.Length); + ClassicAssert.AreEqual(unsubResp, response); + + // PING to confirm server health + response = lightClientRequest.Execute("PING", "+PONG\r\n".Length); + ClassicAssert.AreEqual("+PONG\r\n", response); + } + + /// + /// Verifies that the PatternPublish reentrant path works correctly in RESP3. + /// When a session has a pattern subscription (PSUBSCRIBE) and publishes to a + /// matching channel, the PatternPublish() callback is invoked on the same thread. + /// + [Test] + public void PubSubSelfPatternPublishResp3NoLockError() + { + using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Newlines); + + // Switch to RESP3 + var response = lightClientRequest.Execute("HELLO 3", 30); + ClassicAssert.IsTrue(response.Contains("proto")); + + lightClientRequest.countResponseType = CountResponseType.Bytes; + + // Pattern subscribe + var psubResp = "*3\r\n$10\r\npsubscribe\r\n$4\r\nfoo*\r\n:1\r\n"; + response = lightClientRequest.Execute("PSUBSCRIBE foo*", psubResp.Length); + ClassicAssert.AreEqual(psubResp, response); + + // Self-publish to a matching channel — triggers reentrant PatternPublish() callback. + // Expected response: + // 1. Push pmessage: >4\r\n$8\r\npmessage\r\n$4\r\nfoo*\r\n$6\r\nfoobar\r\n$3\r\nbaz\r\n + // 2. PUBLISH response: :1\r\n + var pushMsg = ">4\r\n$8\r\npmessage\r\n$4\r\nfoo*\r\n$6\r\nfoobar\r\n$3\r\nbaz\r\n"; + var publishResp = ":1\r\n"; + var expectedTotal = pushMsg + publishResp; + response = lightClientRequest.Execute("PUBLISH foobar baz", expectedTotal.Length); + ClassicAssert.AreEqual(expectedTotal, response); + + // Clean up and verify server health + var punsubResp = "*3\r\n$12\r\npunsubscribe\r\n$4\r\nfoo*\r\n:0\r\n"; + response = lightClientRequest.Execute("PUNSUBSCRIBE foo*", punsubResp.Length); + ClassicAssert.AreEqual(punsubResp, response); + + response = lightClientRequest.Execute("PING", "+PONG\r\n".Length); + ClassicAssert.AreEqual("+PONG\r\n", response); + } + + /// + /// Verifies that in RESP3 subscription mode, regular commands (GET, SET) + /// are allowed and execute correctly alongside active subscriptions. + /// + [Test] + public void PubSubModeAllowsRegularCommandsInResp3() + { + using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Newlines); + + // Switch to RESP3 + var response = lightClientRequest.Execute("HELLO 3", 30); + ClassicAssert.IsTrue(response.Contains("proto")); + + lightClientRequest.countResponseType = CountResponseType.Bytes; + + // Subscribe to a channel + var subscribeResp = "*3\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n:1\r\n"; + response = lightClientRequest.Execute("SUBSCRIBE foo", subscribeResp.Length); + ClassicAssert.AreEqual(subscribeResp, response); + + // SET should work in RESP3 subscription mode + response = lightClientRequest.Execute("SET mykey myval", "+OK\r\n".Length); + ClassicAssert.AreEqual("+OK\r\n", response); + + // GET should work in RESP3 subscription mode + var getResp = "$5\r\nmyval\r\n"; + response = lightClientRequest.Execute("GET mykey", getResp.Length); + ClassicAssert.AreEqual(getResp, response); + + // Clean up + var unsubResp = "*3\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n:0\r\n"; + response = lightClientRequest.Execute("UNSUBSCRIBE foo", unsubResp.Length); + ClassicAssert.AreEqual(unsubResp, response); + } + + /// + /// Verifies that entering subscription mode via PSUBSCRIBE alone (without SUBSCRIBE) + /// also correctly restricts commands in RESP2. + /// + [Test] + public void PubSubModeViaPsubscribeRejectsCommandsInResp2() + { + using var lightClientRequest = TestUtils.CreateRequest(countResponseType: CountResponseType.Bytes); + + // Enter subscription mode via PSUBSCRIBE only + var psubResp = "*3\r\n$10\r\npsubscribe\r\n$4\r\nfoo*\r\n:1\r\n"; + var response = lightClientRequest.Execute("PSUBSCRIBE foo*", psubResp.Length); + ClassicAssert.AreEqual(psubResp, response); + + // GET should be rejected + var errorResp = "-ERR Can't execute 'GET': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context\r\n"; + response = lightClientRequest.Execute("GET bar", errorResp.Length); + ClassicAssert.AreEqual(errorResp, response); + + // PUNSUBSCRIBE to exit subscription mode + var punsubResp = "*3\r\n$12\r\npunsubscribe\r\n$4\r\nfoo*\r\n:0\r\n"; + response = lightClientRequest.Execute("PUNSUBSCRIBE foo*", punsubResp.Length); + ClassicAssert.AreEqual(punsubResp, response); + + // GET should work again + response = lightClientRequest.Execute("GET bar", "$-1\r\n".Length); + ClassicAssert.AreEqual("$-1\r\n", response); + } } } \ No newline at end of file