Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
<PackageVersion Include="Dapper" Version="2.1.66" />
<PackageVersion Include="DotNext" Version="6.3.0" />
<PackageVersion Include="DotNext.IO" Version="6.3.0" />
<PackageVersion Include="DotNext.Threading" Version="6.3.0" />
<PackageVersion Include="DotNext.Threading" Version="6.4.0" />
<PackageVersion Include="DotNext.Unsafe" Version="6.3.0" />
<PackageVersion Include="DotNext.Net.Cluster" Version="6.4.0" />
<PackageVersion Include="Ductus.FluentDocker" Version="2.85.0" />
<PackageVersion Include="Ductus.FluentDocker.XUnit" Version="2.85.0" />
<PackageVersion Include="DuckDB.NET.Data.Full" Version="1.5.0" />
Expand Down Expand Up @@ -68,8 +69,8 @@
<PackageVersion Include="Kurrent.Surge.Core" Version="1.1.1-alpha.1.71" />
<PackageVersion Include="Kurrent.Surge.DataProtection" Version="1.1.1-alpha.1.71" />
<PackageVersion Include="Kurrent.Surge.DuckDB" Version="1.1.1-alpha.1.71" />
<PackageVersion Include="Kurrent.Quack" Version="0.0.0-alpha.181" />
<PackageVersion Include="Kurrent.Quack.Arrow" Version="0.0.0-alpha.181" />
<PackageVersion Include="Kurrent.Quack" Version="0.0.0-alpha.186" />
<PackageVersion Include="Kurrent.Quack.Arrow" Version="0.0.0-alpha.186" />
<PackageVersion Include="KurrentDB.Client" Version="1.0.0" />
<PackageVersion Include="librdkafka.redist" Version="2.5.0" />
<PackageVersion Include="LruCacheNet" Version="1.2.0" />
Expand Down Expand Up @@ -196,4 +197,4 @@
<PackageVersion Include="Apache.Arrow.Adbc.Drivers.FlightSql" Version="0.21.0" />
<PackageVersion Include="Apache.Arrow.Adbc.Client" Version="0.23.0" />
</ItemGroup>
</Project>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<IsKurrentXUnit>true</IsKurrentXUnit>
<ImplicitUsings>true</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>KurrentDB.KontrolPlane</RootNamespace>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\KurrentDB.Core.Testing.XUnit\KurrentDB.Core.Testing.XUnit.csproj" />
<ProjectReference Include="..\KurrentDB.KontrolPlane\KurrentDB.KontrolPlane.csproj" />
</ItemGroup>

<ItemGroup>
<Using Include="Xunit" />
</ItemGroup>
</Project>
167 changes: 167 additions & 0 deletions src/KurrentDB.KontrolPlane.Tests/LeaderAppointmentTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System.Collections.Immutable;
using System.Net;
using KurrentDB.Core.XUnit.Tests;

namespace KurrentDB.KontrolPlane;

[Collection("RaftKontroller")]
public sealed class LeaderAppointmentTests : DirectoryFixture<LeaderAppointmentTests> {
[Fact]
public async Task AppointLeader() {
// initialize members
var replicaSet = new TestDataPlane();
replicaSet.UpdateMember(TestDataPlane.Host1, new(UncommittedOffset: 100, Epoch: 1L));
replicaSet.UpdateMember(TestDataPlane.Host2, new(UncommittedOffset: 200, Epoch: 0L));
replicaSet.UpdateMember(TestDataPlane.Host3, new(UncommittedOffset: 150, Epoch: 1L)); // leader

// initialize Kontroller
await using var kontroller = new RaftKontroller(new RaftKontroller.Options {
ListenAddress = new(IPAddress.Loopback, 3269),
AppointmentDuration = TimeSpan.FromSeconds(1),
ConnectionPoolCapacity = 10,
WalOptions = new() {
Location = Directory,
},
SingleNodeDeployment = true,
}) {
DataPlane = replicaSet,
};

await kontroller.StartAsync(TestToken);
await AddNodesAsync(kontroller);

var leader = await WaitForLeaderAsync(kontroller);
Assert.Equal(TestDataPlane.Host3, leader.Address);
Assert.Equal(1UL, leader.Epoch);

await kontroller.StopAsync(TestToken);
}

[Fact]
public async Task RenewLeaderAppointment() {
var appointmentTimeout = TimeSpan.FromSeconds(1);

// initialize members
var replicaSet = new TestDataPlane();
replicaSet.UpdateMember(TestDataPlane.Host1, new(UncommittedOffset: 100, Epoch: 1L));
replicaSet.UpdateMember(TestDataPlane.Host2, new(UncommittedOffset: 200, Epoch: 0L));
replicaSet.UpdateMember(TestDataPlane.Host3, new(UncommittedOffset: 150, Epoch: 1L)); // leader

// initialize Kontroller
await using var kontroller = new RaftKontroller(new RaftKontroller.Options {
ListenAddress = new(IPAddress.Loopback, 3269),
AppointmentDuration = appointmentTimeout,
ConnectionPoolCapacity = 10,
WalOptions = new() {
Location = Directory,
},
SingleNodeDeployment = true,
}) {
DataPlane = replicaSet,
};

await kontroller.StartAsync(TestToken);
await AddNodesAsync(kontroller);

var leader = await WaitForLeaderAsync(kontroller);
Assert.Equal(TestDataPlane.Host3, leader.Address);

// Start renewal process in the background
var process = new RenewalProcess(kontroller, TestDataPlane.Host3, leader.Epoch, appointmentTimeout);
var renewalTask = process.RunAsync();

// Make the current member as unavailable, but due to renewal process a new leader cannot be appointed
replicaSet.RemoveMember(TestDataPlane.Host3);

await Task.Delay(appointmentTimeout * 3, TestToken);
leader = await WaitForLeaderAsync(kontroller);
Assert.Equal(TestDataPlane.Host3, leader.Address);

// Now stop renewal process and wait for a new leader appointment
process.RequestStop();
await renewalTask.WaitAsync(TestToken);

do {
leader = await WaitForLeaderAsync(kontroller);
} while (!TestDataPlane.Host1.Equals(leader.Address));

await kontroller.StopAsync(TestToken);
}

private sealed class RenewalProcess(IKontroller kontroller, EndPoint address, ulong epoch, TimeSpan appointmentTimeout) {
private volatile bool _stopped;

public void RequestStop() => _stopped = true;

public async Task RunAsync() {
while (!_stopped) {
// Renew every 300 ms
await Task.Delay(appointmentTimeout / 3);

await kontroller.RenewLeaderAppointmentAsync(Database.MainDatabaseId, address, epoch, TestToken);
}
}
}

private static async Task AddNodesAsync(IKontroller kontroller) {
var node = new DatabaseNode { DatabaseId = Database.MainDatabaseId, Address = TestDataPlane.Host1 };
await kontroller.AddOrUpdateDatabaseNodeAsync(node, TestToken);

node = new DatabaseNode { DatabaseId = Database.MainDatabaseId, Address = TestDataPlane.Host2 };
await kontroller.AddOrUpdateDatabaseNodeAsync(node, TestToken);

node = new DatabaseNode { DatabaseId = Database.MainDatabaseId, Address = TestDataPlane.Host3 };
await kontroller.AddOrUpdateDatabaseNodeAsync(node, TestToken);

node = new DatabaseNode { DatabaseId = Database.MainDatabaseId, Address = TestDataPlane.Host4 };
await kontroller.AddOrUpdateDatabaseNodeAsync(node, TestToken);
}

private static async Task<(EndPoint? Address, ulong Epoch)> WaitForLeaderAsync(IKontroller kontroller) {
await foreach (var database in kontroller.ListenDatabaseAsync(Database.MainDatabaseId, TestToken)) {
if (database.LeaderAddress is { } address)
return (address, database.Epoch);
}

return (null, 0L);
}

private static CancellationToken TestToken => TestContext.Current.CancellationToken;

private sealed class TestDataPlane : IDataPlane {
public static readonly IPEndPoint Host1 = new(IPAddress.Loopback, 3269);
public static readonly IPEndPoint Host2 = new(IPAddress.Loopback, 3270);
public static readonly IPEndPoint Host3 = new(IPAddress.Loopback, 3271);
public static readonly IPEndPoint Host4 = new(IPAddress.Loopback, 3271);

private ImmutableDictionary<EndPoint, ReplicaState> _members = ImmutableDictionary<EndPoint, ReplicaState>.Empty;

public void UpdateMember(IPEndPoint endPoint, ReplicaState state) {
for (ImmutableDictionary<EndPoint, ReplicaState> current = _members, tmp;; current = tmp) {
var newDictionary = current.SetItem(endPoint, state);
tmp = Interlocked.CompareExchange(ref _members, newDictionary, current);

if (ReferenceEquals(current, tmp))
break;
}
}

public void RemoveMember(IPEndPoint endPoint) {
for (ImmutableDictionary<EndPoint, ReplicaState> current = _members, tmp;; current = tmp) {
var newDictionary = current.Remove(endPoint);
tmp = Interlocked.CompareExchange(ref _members, newDictionary, current);

if (ReferenceEquals(current, tmp))
break;
}
}

ValueTask<ReplicaState> IDataPlane.GetReplicaStateAsync(EndPoint address, CancellationToken token)
=> Volatile.Read(in _members).TryGetValue(address, out var state)
? new(state)
: ValueTask.FromException<ReplicaState>(new IOException());
}
}
Loading
Loading