Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions libs/server/AOF/AofProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,8 @@ public void Dispose()
activeVectorManager?.WaitForVectorOperationsToComplete();
activeVectorManager?.ShutdownReplayTasks();

var databaseSessionsSnapshot = respServerSession.GetDatabaseSessionsSnapshot();
foreach (var dbSession in databaseSessionsSnapshot)
{
dbSession.StorageSession.stringBasicContext.Session?.Dispose();
dbSession.StorageSession.objectBasicContext.Session?.Dispose();
dbSession.StorageSession.unifiedBasicContext.Session?.Dispose();
}
aofReplayCoordinator.Dispose();
respServerSession.Dispose();
}

/// <summary>
Expand Down Expand Up @@ -179,11 +174,6 @@ unsafe void ProcessAofRecord(IMemoryOwner<byte> entry, int length)
if (storeWrapper.serverOptions.FailOnRecoveryError)
throw;
}
finally
{
aofReplayCoordinator.Dispose();
respServerSession.Dispose();
}

return -1;
}
Expand Down
8 changes: 8 additions & 0 deletions libs/server/Storage/Functions/ObjectStore/RMWMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ bool InPlaceUpdaterWorker(ref LogRecord logRecord, ref ObjectInput input, ref Ob
// Can't access 'this' in a lambda so dispose directly and pass a no-op lambda.
functionsState.storeFunctions.DisposeValueObject(logRecord.ValueObject, DisposeReason.Deleted);
logRecord.ClearValueIfHeap(obj => { });
if (!logRecord.Info.Modified)
functionsState.watchVersionMap.IncrementVersion(rmwInfo.KeyHash);
if (functionsState.appendOnlyFile != null)
rmwInfo.UserData |= NeedAofLog;
rmwInfo.Action = RMWAction.ExpireAndStop;
return false;
}
Expand Down Expand Up @@ -210,6 +214,10 @@ public bool PostCopyUpdater<TSourceLogRecord>(in TSourceLogRecord srcLogRecord,
return true;
if (output.HasRemoveKey)
{
// Log to AOF before returning, so the mutation that emptied the collection
// is persisted and replayed correctly on recovery.
if (functionsState.appendOnlyFile != null)
rmwInfo.UserData |= NeedAofLog;
rmwInfo.Action = RMWAction.ExpireAndStop;
return false;
}
Expand Down
39 changes: 39 additions & 0 deletions test/Garnet.test/MultiDatabaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,45 @@ public void MultiDatabaseAofRecoverObjectTest()
}
}

[Test]
public void MultiDatabaseAofObjectMutationRecoverTest()
{
// Verify that object mutation operations (LPOP, HDEL) that empty collections
// are persisted correctly across multiple databases during AOF recovery
var listKey = "list:key1";
var hashKey = "hash:key1";

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
// DB 0: push and pop a list element (empties the list)
var db0 = redis.GetDatabase(0);
db0.ListLeftPush(listKey, "value1");
db0.ListLeftPop(listKey);
ClassicAssert.IsFalse(db0.KeyExists(listKey));

// DB 1: add and delete hash fields (empties the hash)
var db1 = redis.GetDatabase(1);
db1.HashSet(hashKey, [new HashEntry("f1", "v1"), new HashEntry("f2", "v2")]);
db1.HashDelete(hashKey, ["f1", "f2"]);
ClassicAssert.IsFalse(db1.KeyExists(hashKey));
}

server.Store.CommitAOF(true);
server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
// After recovery, both keys should remain absent
var db0 = redis.GetDatabase(0);
ClassicAssert.IsFalse(db0.KeyExists(listKey));

var db1 = redis.GetDatabase(1);
ClassicAssert.IsFalse(db1.KeyExists(hashKey));
}
}

[Test]
public void MultiDatabaseSaveInProgressTest()
{
Expand Down
190 changes: 190 additions & 0 deletions test/Garnet.test/RespAofTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,196 @@ public void AofListObjectStoreRecoverTest()
}
}

[Test]
public void AofObjectStoreRMWDeleteRecoverListTest()
{
// Verify LPOP that empties a list is persisted to AOF and recovered correctly
var key = "AofObjectStoreRMWDeleteRecoverListKey";
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);

db.ListLeftPush(key, "value1");
var popped = db.ListLeftPop(key);
ClassicAssert.AreEqual("value1", popped.ToString());

// Key should not exist after popping the only element
ClassicAssert.IsFalse(db.KeyExists(key));
}

server.Store.CommitAOF(true);
server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);
// After recovery, the key should still not exist
ClassicAssert.IsFalse(db.KeyExists(key));
}
}

[Test]
public void AofObjectStoreRMWDeleteRecoverSortedSetTest()
{
// Verify ZREM that removes one member from a sorted set is persisted to AOF and recovered correctly
var key = "AofObjectStoreRMWDeleteRecoverSortedSetKey";
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);

db.SortedSetAdd(key, [new SortedSetEntry("top1", 50), new SortedSetEntry("top2", 60)]);
db.SortedSetRemove(key, "top1");

// Sorted set should have one remaining member
var score = db.SortedSetScore(key, "top1");
ClassicAssert.IsFalse(score.HasValue);

score = db.SortedSetScore(key, "top2");
ClassicAssert.IsTrue(score.HasValue);
ClassicAssert.AreEqual(60, score.Value);
}

server.Store.CommitAOF(true);
server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);

// After recovery, "top1" should still be absent
var score = db.SortedSetScore(key, "top1");
ClassicAssert.IsFalse(score.HasValue);

score = db.SortedSetScore(key, "top2");
ClassicAssert.IsTrue(score.HasValue);
ClassicAssert.AreEqual(60, score.Value);
}
}

[Test]
public void AofObjectStoreRMWDeleteRecoverSortedSetEmptyTest()
{
// Verify ZREM that empties a sorted set completely is persisted to AOF and recovered correctly
var key = "AofObjectStoreRMWDeleteRecoverSortedSetEmptyKey";
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);

db.SortedSetAdd(key, [new SortedSetEntry("top1", 50)]);
db.SortedSetRemove(key, "top1");

// Key should not exist after removing the only member
ClassicAssert.IsFalse(db.KeyExists(key));
}

server.Store.CommitAOF(true);
server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);
ClassicAssert.IsFalse(db.KeyExists(key));
}
}

[Test]
public void AofObjectStoreRMWDeleteRecoverHashTest()
{
// Verify HDEL that removes fields from a hash is persisted to AOF and recovered correctly
var key = "AofObjectStoreRMWDeleteRecoverHashKey";
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);

db.HashSet(key, [new HashEntry("hkey1", "v1"), new HashEntry("hkey2", "v2")]);
db.HashDelete(key, ["hkey1", "hkey2"]);

// Key should not exist after deleting all hash fields
ClassicAssert.IsFalse(db.KeyExists(key));
}

server.Store.CommitAOF(true);
server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);
ClassicAssert.IsFalse(db.KeyExists(key));
}
}

[Test]
public void AofObjectStoreRMWDeleteRecoverSetTest()
{
// Verify SREM that empties a set is persisted to AOF and recovered correctly
var key = "AofObjectStoreRMWDeleteRecoverSetKey";
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);

db.SetAdd(key, ["member1", "member2"]);
db.SetRemove(key, ["member1", "member2"]);

ClassicAssert.IsFalse(db.KeyExists(key));
}

server.Store.CommitAOF(true);
server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);
ClassicAssert.IsFalse(db.KeyExists(key));
}
}

[Test]
public void AofObjectStoreRMWPartialDeleteRecoverHashTest()
{
// Verify HDEL that removes only some fields (not emptying the hash) is persisted to AOF and recovered correctly
var key = "AofObjectStoreRMWPartialDeleteRecoverHashKey";
using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);

db.HashSet(key, [new HashEntry("hkey1", "v1"), new HashEntry("hkey2", "v2"), new HashEntry("hkey3", "v3")]);
db.HashDelete(key, "hkey1");
db.HashDelete(key, "hkey2");

// Key should still exist with remaining field
ClassicAssert.IsTrue(db.KeyExists(key));
ClassicAssert.AreEqual("v3", db.HashGet(key, "hkey3").ToString());
ClassicAssert.IsFalse(db.HashGet(key, "hkey1").HasValue);
ClassicAssert.IsFalse(db.HashGet(key, "hkey2").HasValue);
}

server.Store.CommitAOF(true);
server.Dispose(false);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, tryRecover: true, enableAOF: true);
server.Start();

using (var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()))
{
var db = redis.GetDatabase(0);

// After recovery, deleted fields should remain deleted
ClassicAssert.IsTrue(db.KeyExists(key));
ClassicAssert.AreEqual("v3", db.HashGet(key, "hkey3").ToString());
ClassicAssert.IsFalse(db.HashGet(key, "hkey1").HasValue);
ClassicAssert.IsFalse(db.HashGet(key, "hkey2").HasValue);
}
}

[Test]
public void AofCustomTxnRecoverTest()
{
Expand Down
40 changes: 40 additions & 0 deletions test/Garnet.test/TransactionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -587,5 +587,45 @@ private static void updateKey(string key, string value)
var expectedResponse = "+OK\r\n";
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
}

[Test]
public void WatchFailsWhenListEmptiedByLPop()
{
// WATCH a list key, then LPOP all elements on the same connection.
// The LPOP that empties the list should increment the watch version,
// causing the subsequent EXEC to fail.
using var lightClientRequest = TestUtils.CreateRequest();
var key = "watchlist";

// Create a single-element list
var response = lightClientRequest.SendCommand($"LPUSH {key} value1");
var expectedResponse = ":1\r\n";
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);

// WATCH the list key
response = lightClientRequest.SendCommand($"WATCH {key}");
expectedResponse = "+OK\r\n";
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);

// LPOP the only element (empties and deletes the list) — same connection, before MULTI
response = lightClientRequest.SendCommand($"LPOP {key}");
expectedResponse = "$6\r\nvalue1\r\n";
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);

// Start a transaction
response = lightClientRequest.SendCommand("MULTI");
expectedResponse = "+OK\r\n";
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);

// Queue a command
response = lightClientRequest.SendCommand($"LPUSH {key} value2");
expectedResponse = "+QUEUED\r\n";
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);

// EXEC should fail because the watched key was modified by LPOP
response = lightClientRequest.SendCommand("EXEC");
expectedResponse = "*-1";
TestUtils.AssertEqualUpToExpectedLength(expectedResponse, response);
}
}
}
Loading