Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1.49.0", features = ["rt", "rt-multi-thread", "macros", "fs", "io-std", "io-util"] }
tokio-rustls = "0.26"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-util = { version = "0.7", features = ["io", "compat"] }
tokio-util = { version = "0.7", features = ["io", "compat", "rt"] }
futures = "0.3.31"
tracing = "0.1.44"
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
Expand All @@ -36,6 +36,8 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus
url = "2.5"
shadow-rs = "1.5.0"
snafu = { version = "0.8.9", features = ["futures"] }
kube-leader-election = { path = "crates/leader-election" }
hostname = "0.4"

# Console dependencies
axum = { version = "0.7", features = ["macros", "json"] }
Expand All @@ -57,6 +59,9 @@ shadow-rs = { version = "1.5.0", features = ["build"] }
[lints.rust]
unused_variables = "allow"

[workspace]
members = ["crates/leader-election"]

[lints.clippy]
unwrap_used = "deny"
expect_used = "deny"
22 changes: 22 additions & 0 deletions crates/leader-election/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "kube-leader-election"
version = "0.1.0"
edition = "2024"
license = "Apache-2.0"
description = "Kubernetes leader election for RustFS Operator"

[dependencies]
async-stream = "0.3"
async-trait = { version = "0.1.89", default-features = false }
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3.31"
k8s-openapi = { version = "0.26.1", features = ["v1_30"] }
kube = { version = "2.0.1", features = ["client", "rustls-tls"] }
rand = "0.9"
snafu = { version = "0.8.9", features = ["futures"] }
tokio = { version = "1.49.0", features = ["sync", "time", "macros", "rt"] }
tokio-util = { version = "0.7", features = ["rt"] }
tracing = "0.1.44"

[dev-dependencies]
tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros"] }
134 changes: 134 additions & 0 deletions crates/leader-election/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# kube-leader-election

Kubernetes leader election for Rust operators, using Lease resources as the lock backend.

Semantics are aligned with [client-go leaderelection](https://github.com/kubernetes/client-go/tree/master/tools/leaderelection).

## Features

- **Lease-based locking** — uses `coordination.k8s.io/v1` Lease objects (K8s 1.14+)
- **Structured concurrency** — no implicit task spawning; caller controls lifecycle via `CancellationToken`
- **Lock trait abstraction** — pluggable backends, easy to test with fakes
- **Clock trait injection** — deterministic tests with `MockClock`
- **Observability** — `LeaderElectorHandle` with watch-channel state stream
- **Jittered retry** — randomized retry intervals to avoid thundering herd

## Quick Start

```rust
use kube_leader_election::{
LeaderCallbacks, LeaderElector, LeaderElectorConfig, LeaseLock, SystemClock,
};
use kube::Client;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

let client = Client::try_default().await?;

// Create the lock (one per Lease name/namespace)
let lock = LeaseLock::new(client, "my-operator-lease", "default");

// Configure the elector
let config = LeaderElectorConfig {
identity: "pod-abc123".into(),
lease_duration: Duration::from_secs(15),
renew_deadline: Duration::from_secs(10),
retry_period: Duration::from_secs(2),
release_on_cancel: true,
};

// Create the elector
let elector = LeaderElector::new(config, lock, SystemClock);

// Define callbacks
struct MyCallbacks;

#[async_trait::async_trait]
impl LeaderCallbacks for MyCallbacks {
async fn on_started_leading(&self, cancel: CancellationToken) {
// Run your controller loop here
my_controller(cancel).await;
}

async fn on_stopped_leading(&self) {
tracing::warn!("lost leadership, shutting down");
}

async fn on_new_leader(&self, identity: String) {
tracing::info!("new leader: {}", identity);
}
}

// Run — blocks until cancellation, retrying on transient leadership loss
let cancel = CancellationToken::new();
elector.run(MyCallbacks, cancel).await;
```

## Configuration

| Parameter | Default | Description |
|-----------|---------|-------------|
| `identity` | *(required)* | Unique ID for this instance (typically pod name) |
| `lease_duration` | 15s | Non-leaders wait this long before forcing takeover |
| `renew_deadline` | 10s | Leader retry window for renewing the lease |
| `retry_period` | 2s | Interval between acquire/renew attempts (with jitter) |
| `release_on_cancel` | `true` | Whether to release the lease when context is cancelled |

**Constraint:** `lease_duration > renew_deadline > retry_period × 1.2`

## RBAC

The operator needs the following permissions on the Lease resource:

```yaml
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
```

## Architecture

```
LeaderElector
├── acquire() — loop: try_acquire_or_renew → sleep(jittered retry_period)
├── renew() — loop: try_acquire_or_renew within renew_deadline window
├── release() — clear holder_identity, preserve transition count
└── run() — orchestrate acquire → on_started_leading → renew → on_stopped_leading (retries on lease loss)
```

The `Lock` trait abstracts the backend:

```rust
#[async_trait]
pub trait Lock: Send + Sync {
async fn get(&self) -> Result<Option<LeaderElectionRecord>, Error>;
async fn create(&self, record: LeaderElectionRecord) -> Result<(), Error>;
async fn update(&self, record: LeaderElectionRecord) -> Result<(), Error>;
fn identity(&self) -> &str;
fn describe(&self) -> String;
}
```

`LeaseLock` is the built-in implementation backed by Kubernetes Lease objects.

## Testing

```bash
# Unit tests
cargo test -p kube-leader-election --lib

# Integration tests (uses FakeLock, no cluster needed)
cargo test -p kube-leader-election --test integration_tests
```

## Design Documentation

See [`docs/design.md`](docs/design.md) for the full technical design including:
- Client-go source analysis
- API design decisions
- Algorithm details (acquire/renew/release)
- Concurrency model

## License

Apache-2.0
32 changes: 32 additions & 0 deletions crates/leader-election/src/callbacks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Leader callbacks trait.

use async_trait::async_trait;
use tokio_util::sync::CancellationToken;

/// Callbacks invoked during leader election lifecycle events.
#[async_trait]
pub trait LeaderCallbacks: Send + Sync {
/// Called when this instance becomes the leader.
/// Receives a cancellation token that is cancelled when leadership is lost.
async fn on_started_leading(&self, cancel: CancellationToken);

/// Called when this instance stops being the leader (always called, even if never led).
async fn on_stopped_leading(&self);

/// Called when a new leader is observed (fire-and-forget, runs in a separate task).
async fn on_new_leader(&self, identity: String);
}
33 changes: 33 additions & 0 deletions crates/leader-election/src/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Injectable clock trait for testability.

use chrono::{DateTime, Utc};

/// Abstraction over time source, allowing mock clocks in tests.
pub trait Clock: Send + Sync {
/// Returns the current time.
fn now(&self) -> DateTime<Utc>;
}

/// Production clock backed by the system time.
#[derive(Debug, Clone, Copy)]
pub struct SystemClock;

impl Clock for SystemClock {
fn now(&self) -> DateTime<Utc> {
Utc::now()
}
}
34 changes: 34 additions & 0 deletions crates/leader-election/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Leader election configuration and validation.

use std::time::Duration;

/// Configuration for the leader elector.
///
/// Constraint: `lease_duration > renew_deadline > retry_period * 1.2`
#[derive(Debug, Clone)]
pub struct LeaderElectorConfig {
/// Unique identity for this instance (typically pod name or hostname).
pub identity: String,
/// Duration a non-leader waits before attempting to acquire (default 15s).
pub lease_duration: Duration,
/// Deadline within which the leader must successfully renew (default 10s).
pub renew_deadline: Duration,
/// Interval between retry attempts (default 2s).
pub retry_period: Duration,
/// Whether to release the lock when the cancel token fires (default true).
pub release_on_cancel: bool,
}
Loading
Loading