diff --git a/libs/server/AOF/AofProcessor.cs b/libs/server/AOF/AofProcessor.cs index eefd47a1132..6b7da227fe0 100644 --- a/libs/server/AOF/AofProcessor.cs +++ b/libs/server/AOF/AofProcessor.cs @@ -88,13 +88,8 @@ public void Dispose() activeVectorManager?.WaitForVectorOperationsToComplete(); activeVectorManager?.ShutdownReplayTasks(); - var databaseSessionsSnapshot = respServerSession.GetDatabaseSessionsSnapshot(); - foreach (var dbSession in databaseSessionsSnapshot) - { - dbSession.StorageSession.stringBasicContext.Session?.Dispose(); - dbSession.StorageSession.objectBasicContext.Session?.Dispose(); - dbSession.StorageSession.unifiedBasicContext.Session?.Dispose(); - } + aofReplayCoordinator.Dispose(); + respServerSession.Dispose(); } /// @@ -179,11 +174,6 @@ unsafe void ProcessAofRecord(IMemoryOwner entry, int length) if (storeWrapper.serverOptions.FailOnRecoveryError) throw; } - finally - { - aofReplayCoordinator.Dispose(); - respServerSession.Dispose(); - } return -1; } diff --git a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs index 57fa4aeec0e..1f8d2ea5cb5 100644 --- a/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs +++ b/libs/server/Storage/Functions/ObjectStore/RMWMethods.cs @@ -136,6 +136,10 @@ bool InPlaceUpdaterWorker(ref LogRecord logRecord, ref ObjectInput input, ref Ob // Can't access 'this' in a lambda so dispose directly and pass a no-op lambda. functionsState.storeFunctions.DisposeValueObject(logRecord.ValueObject, DisposeReason.Deleted); logRecord.ClearValueIfHeap(obj => { }); + if (!logRecord.Info.Modified) + functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash); + if (functionsState.appendOnlyFile != null) + rmwInfo.UserData |= NeedAofLog; rmwInfo.Action = RMWAction.ExpireAndStop; return false; } @@ -210,6 +214,10 @@ public bool PostCopyUpdater(in TSourceLogRecord srcLogRecord, return true; if (output.HasRemoveKey) { + // Log to AOF before returning, so the mutation that emptied the collection + // is persisted and replayed correctly on recovery. + if (functionsState.appendOnlyFile != null) + rmwInfo.UserData |= NeedAofLog; rmwInfo.Action = RMWAction.ExpireAndStop; return false; } diff --git a/test/Garnet.test/MultiDatabaseTests.cs b/test/Garnet.test/MultiDatabaseTests.cs index 250992dd782..86ee40bd79f 100644 --- a/test/Garnet.test/MultiDatabaseTests.cs +++ b/test/Garnet.test/MultiDatabaseTests.cs @@ -1291,6 +1291,45 @@ public void MultiDatabaseAofRecoverObjectTest() } } + [Test] + public void MultiDatabaseAofObjectMutationRecoverTest() + { + // Verify that object mutation operations (LPOP, HDEL) that empty collections + // are persisted correctly across multiple databases during AOF recovery + var listKey = "list:key1"; + var hashKey = "hash:key1"; + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + // DB 0: push and pop a list element (empties the list) + var db0 = redis.GetDatabase(0); + db0.ListLeftPush(listKey, "value1"); + db0.ListLeftPop(listKey); + ClassicAssert.IsFalse(db0.KeyExists(listKey)); + + // DB 1: add and delete hash fields (empties the hash) + var db1 = redis.GetDatabase(1); + db1.HashSet(hashKey, [new HashEntry("f1", "v1"), new HashEntry("f2", "v2")]); + db1.HashDelete(hashKey, ["f1", "f2"]); + ClassicAssert.IsFalse(db1.KeyExists(hashKey)); + } + + server.Store.CommitAOF(true); + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + // After recovery, both keys should remain absent + var db0 = redis.GetDatabase(0); + ClassicAssert.IsFalse(db0.KeyExists(listKey)); + + var db1 = redis.GetDatabase(1); + ClassicAssert.IsFalse(db1.KeyExists(hashKey)); + } + } + [Test] public void MultiDatabaseSaveInProgressTest() { diff --git a/test/Garnet.test/RespAofTests.cs b/test/Garnet.test/RespAofTests.cs index af664b6c4a1..3d1a1abaf24 100644 --- a/test/Garnet.test/RespAofTests.cs +++ b/test/Garnet.test/RespAofTests.cs @@ -808,6 +808,196 @@ public void AofListObjectStoreRecoverTest() } } + [Test] + public void AofObjectStoreRMWDeleteRecoverListTest() + { + // Verify LPOP that empties a list is persisted to AOF and recovered correctly + var key = "AofObjectStoreRMWDeleteRecoverListKey"; + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + db.ListLeftPush(key, "value1"); + var popped = db.ListLeftPop(key); + ClassicAssert.AreEqual("value1", popped.ToString()); + + // Key should not exist after popping the only element + ClassicAssert.IsFalse(db.KeyExists(key)); + } + + server.Store.CommitAOF(true); + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + // After recovery, the key should still not exist + ClassicAssert.IsFalse(db.KeyExists(key)); + } + } + + [Test] + public void AofObjectStoreRMWDeleteRecoverSortedSetTest() + { + // Verify ZREM that removes one member from a sorted set is persisted to AOF and recovered correctly + var key = "AofObjectStoreRMWDeleteRecoverSortedSetKey"; + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + db.SortedSetAdd(key, [new SortedSetEntry("top1", 50), new SortedSetEntry("top2", 60)]); + db.SortedSetRemove(key, "top1"); + + // Sorted set should have one remaining member + var score = db.SortedSetScore(key, "top1"); + ClassicAssert.IsFalse(score.HasValue); + + score = db.SortedSetScore(key, "top2"); + ClassicAssert.IsTrue(score.HasValue); + ClassicAssert.AreEqual(60, score.Value); + } + + server.Store.CommitAOF(true); + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + // After recovery, "top1" should still be absent + var score = db.SortedSetScore(key, "top1"); + ClassicAssert.IsFalse(score.HasValue); + + score = db.SortedSetScore(key, "top2"); + ClassicAssert.IsTrue(score.HasValue); + ClassicAssert.AreEqual(60, score.Value); + } + } + + [Test] + public void AofObjectStoreRMWDeleteRecoverSortedSetEmptyTest() + { + // Verify ZREM that empties a sorted set completely is persisted to AOF and recovered correctly + var key = "AofObjectStoreRMWDeleteRecoverSortedSetEmptyKey"; + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + db.SortedSetAdd(key, [new SortedSetEntry("top1", 50)]); + db.SortedSetRemove(key, "top1"); + + // Key should not exist after removing the only member + ClassicAssert.IsFalse(db.KeyExists(key)); + } + + server.Store.CommitAOF(true); + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + ClassicAssert.IsFalse(db.KeyExists(key)); + } + } + + [Test] + public void AofObjectStoreRMWDeleteRecoverHashTest() + { + // Verify HDEL that removes fields from a hash is persisted to AOF and recovered correctly + var key = "AofObjectStoreRMWDeleteRecoverHashKey"; + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + db.HashSet(key, [new HashEntry("hkey1", "v1"), new HashEntry("hkey2", "v2")]); + db.HashDelete(key, ["hkey1", "hkey2"]); + + // Key should not exist after deleting all hash fields + ClassicAssert.IsFalse(db.KeyExists(key)); + } + + server.Store.CommitAOF(true); + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + ClassicAssert.IsFalse(db.KeyExists(key)); + } + } + + [Test] + public void AofObjectStoreRMWDeleteRecoverSetTest() + { + // Verify SREM that empties a set is persisted to AOF and recovered correctly + var key = "AofObjectStoreRMWDeleteRecoverSetKey"; + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + db.SetAdd(key, ["member1", "member2"]); + db.SetRemove(key, ["member1", "member2"]); + + ClassicAssert.IsFalse(db.KeyExists(key)); + } + + server.Store.CommitAOF(true); + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + ClassicAssert.IsFalse(db.KeyExists(key)); + } + } + + [Test] + public void AofObjectStoreRMWPartialDeleteRecoverHashTest() + { + // Verify HDEL that removes only some fields (not emptying the hash) is persisted to AOF and recovered correctly + var key = "AofObjectStoreRMWPartialDeleteRecoverHashKey"; + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + db.HashSet(key, [new HashEntry("hkey1", "v1"), new HashEntry("hkey2", "v2"), new HashEntry("hkey3", "v3")]); + db.HashDelete(key, "hkey1"); + db.HashDelete(key, "hkey2"); + + // Key should still exist with remaining field + ClassicAssert.IsTrue(db.KeyExists(key)); + ClassicAssert.AreEqual("v3", db.HashGet(key, "hkey3").ToString()); + ClassicAssert.IsFalse(db.HashGet(key, "hkey1").HasValue); + ClassicAssert.IsFalse(db.HashGet(key, "hkey2").HasValue); + } + + server.Store.CommitAOF(true); + server.Dispose(false); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true); + server.Start(); + + using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig())) + { + var db = redis.GetDatabase(0); + + // After recovery, deleted fields should remain deleted + ClassicAssert.IsTrue(db.KeyExists(key)); + ClassicAssert.AreEqual("v3", db.HashGet(key, "hkey3").ToString()); + ClassicAssert.IsFalse(db.HashGet(key, "hkey1").HasValue); + ClassicAssert.IsFalse(db.HashGet(key, "hkey2").HasValue); + } + } + [Test] public void AofCustomTxnRecoverTest() { diff --git a/test/Garnet.test/TransactionTests.cs b/test/Garnet.test/TransactionTests.cs index a78bf67fcff..676194b2337 100644 --- a/test/Garnet.test/TransactionTests.cs +++ b/test/Garnet.test/TransactionTests.cs @@ -587,5 +587,45 @@ private static void updateKey(string key, string value) var expectedResponse = "+OK\r\n"; TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response); } + + [Test] + public void WatchFailsWhenListEmptiedByLPop() + { + // WATCH a list key, then LPOP all elements on the same connection. + // The LPOP that empties the list should increment the watch version, + // causing the subsequent EXEC to fail. + using var lightClientRequest = TestUtils.CreateRequest(); + var key = "watchlist"; + + // Create a single-element list + var response = lightClientRequest.SendCommand($"LPUSH {key} value1"); + var expectedResponse = ":1\r\n"; + TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response); + + // WATCH the list key + response = lightClientRequest.SendCommand($"WATCH {key}"); + expectedResponse = "+OK\r\n"; + TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response); + + // LPOP the only element (empties and deletes the list) — same connection, before MULTI + response = lightClientRequest.SendCommand($"LPOP {key}"); + expectedResponse = "$6\r\nvalue1\r\n"; + TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response); + + // Start a transaction + response = lightClientRequest.SendCommand("MULTI"); + expectedResponse = "+OK\r\n"; + TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response); + + // Queue a command + response = lightClientRequest.SendCommand($"LPUSH {key} value2"); + expectedResponse = "+QUEUED\r\n"; + TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response); + + // EXEC should fail because the watched key was modified by LPOP + response = lightClientRequest.SendCommand("EXEC"); + expectedResponse = "*-1"; + TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response); + } } } \ No newline at end of file