Skip to content

feat: enable graceful shutdown of persistent subscription consumer#5619

Open
avens19 wants to merge 1 commit into
kurrent-io:masterfrom
avens19:master
Open

feat: enable graceful shutdown of persistent subscription consumer#5619
avens19 wants to merge 1 commit into
kurrent-io:masterfrom
avens19:master

Conversation

@avens19

@avens19 avens19 commented May 7, 2026

Copy link
Copy Markdown

This pull request adds a new client message type to the persistent subscription channel. Sending stop from the client will remove the consumer from the strategy pool but will leave the grpc connection alive. This allows the consumer to finish processing in-flight events, ack/nack as needed and then unsubscribe. This avoids the likelihood of double-processing in-flight events as just calling unsubscribe automatically retries in-flight events

NOTE: this PR was created with the help of Claude (Opus 4.7)

This pull request adds a new client message type to the persistent
subscription channel. Sending stop from the client will remove the
consumer from the strategy pool but will leave the grpc connection
alive. This allows the consumer to finish processing in-flight events,
ack/nack as needed and then unsubscribe. This avoids the likelihood of
double-processing in-flight events as just calling unsusbscribe
automatically retries in-flight events
@avens19 avens19 requested review from a team as code owners May 7, 2026 22:02
@qodo-code-review

Copy link
Copy Markdown
Contributor
ⓘ You've reached your Qodo monthly free-tier limit. Reviews pause until next month — upgrade your plan to continue now, or link your paid account if you already have one.

@CLAassistant

CLAassistant commented May 7, 2026

Copy link
Copy Markdown

CLA assistant check
All committers have signed the CLA.

@timothycoleman

Copy link
Copy Markdown
Contributor

Hi thanks for this! We'll take a look over the next few days

@timothycoleman

Copy link
Copy Markdown
Contributor

/review

@qodo-code-review

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Warning

/review is deprecated. Use /agentic_review instead (removed after 2026-05-31).

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Concurrency

StopClient mutates client state (MarkStopped) and calls ClientRemoved on the consumer strategy, while other paths (push scheduling, disconnect/removal, timeouts) may also interact with the same client/strategy. Validate that all accesses to the client collection and consumer strategy are consistently protected by the same lock and that there are no races leading to duplicate removals or inconsistent routing decisions.

public IEnumerable<OutstandingMessage> RemoveClientByCorrelationId(Guid correlationId,
	bool sendDropNotification) {
	PersistentSubscriptionClient client;
	if (!_hash.TryGetValue(correlationId, out client))
		return new OutstandingMessage[0];
	_hash.Remove(client.CorrelationId);
	// A stopped client was already removed from the consumer strategy by StopClient.
	if (!client.IsStopped)
		_consumerStrategy.ClientRemoved(client);
	if (sendDropNotification) {
		client.SendDropNotification();
	}

	return client.GetUnconfirmedEvents();
}

// Removes the client from the consumer strategy so it stops receiving new
// events, but keeps it in the hash so acks/nacks for in-flight events are
// still delivered to it. Idempotent — calling Stop a second time is a no-op.
public bool StopClient(Guid correlationId) {
	if (!_hash.TryGetValue(correlationId, out var client))
		return false;
	if (client.IsStopped)
		return true;
	client.MarkStopped();
	_consumerStrategy.ClientRemoved(client);
	return true;
}
Missing response

The stop command is handled as a fire-and-forget message (no reply/ack to the caller and the boolean result is ignored). Confirm this is intended for the gRPC API semantics and that clients have a reliable way to detect “unknown subscription id / unknown correlation id / already stopped” vs “stop accepted”.

public void Handle(ClientMessage.PersistentSubscriptionNackEvents message) {
	if (!_started)
		return;
	PersistentSubscription subscription;
	if (_subscriptionsById.TryGetValue(message.SubscriptionId, out subscription)) {
		subscription.NotAcknowledgeMessagesProcessed(message.CorrelationId, message.ProcessedEventIds,
			(NakAction)message.Action, message.Message);
	}
}

public void Handle(ClientMessage.PersistentSubscriptionStopFromConsumer message) {
	if (!_started)
		return;
	if (_subscriptionsById.TryGetValue(message.SubscriptionId, out var subscription)) {
		subscription.StopClient(message.CorrelationId);
	}
}
State correctness

Stopping is implemented by gating CanSend() on IsStopped, which prevents new deliveries but keeps the client alive for ack/nack. Validate that all code paths that enqueue/schedule pushes respect CanSend() (and don’t bypass it), and that stop cannot unintentionally stall delivery when the stopped client is the only consumer and buffering/backpressure behavior is expected.

public Guid InstanceId { get; } = Guid.NewGuid();

// True once the client has been removed from the consumer pool via Stop.
// The client object is retained so in-flight events can still be acked or
// nacked over the still-open connection, but no new events are pushed to it.
public bool IsStopped { get; private set; }

/// <summary>
/// Raised whenever an in-flight event has been confirmed. This could be because of ack, nak, timeout or disconnection.
/// </summary>
public event Action<PersistentSubscriptionClient, ResolvedEvent> EventConfirmed;

public int InflightMessages {
	get { return _unconfirmedEvents.Count; }
}

public int AvailableSlots {
	get { return _allowedMessages; }
}

public Guid ConnectionId {
	get { return _connectionId; }
}

public string ConnectionName {
	get { return _connectionName; }
}

public long TotalItems {
	get { return _totalItems; }
}

public long LastTotalItems { get; set; }

public Guid CorrelationId {
	get { return _correlationId; }
}

internal bool RemoveFromProcessing(Guid[] processedEventIds) {
	bool removedAny = false;
	foreach (var processedEventId in processedEventIds) {
		if (_extraStatistics != null)
			_extraStatistics.EndOperation(processedEventId);
		OutstandingMessage ev;
		if (!_unconfirmedEvents.TryGetValue(processedEventId, out ev))
			continue;
		_unconfirmedEvents.Remove(processedEventId);
		removedAny = true;
		_allowedMessages++;
		OnEventConfirmed(ev);
	}

	return removedAny;
}

internal void MarkStopped() {
	IsStopped = true;
}

public bool Push(OutstandingMessage message) {
	if (!CanSend()) {
		return false;
	}

	var evnt = message.ResolvedEvent;
	_allowedMessages--;
	Interlocked.Increment(ref _totalItems);
	if (_extraStatistics != null)
		_extraStatistics.StartOperation(evnt.OriginalEvent.EventId);

	_envelope.ReplyWith(
		new ClientMessage.PersistentSubscriptionStreamEventAppeared(CorrelationId, evnt, message.RetryCount));
	if (!_unconfirmedEvents.ContainsKey(evnt.OriginalEvent.EventId)) {
		_unconfirmedEvents.Add(evnt.OriginalEvent.EventId, message);
	}

	return true;
}

public IEnumerable<OutstandingMessage> GetUnconfirmedEvents() {
	return _unconfirmedEvents.Values;
}

internal void SendDropNotification() {
	_envelope.ReplyWith(
		new ClientMessage.SubscriptionDropped(CorrelationId, SubscriptionDropReason.Unsubscribed));
}

internal ObservedTimingMeasurement GetExtraStats() {
	return _extraStatistics == null ? null : _extraStatistics.GetMeasurementDetails();
}

private bool CanSend() {
	return !IsStopped && AvailableSlots > 0;
}
📄 References
  1. No matching references available

@timothycoleman timothycoleman self-assigned this May 12, 2026
@timothycoleman

Copy link
Copy Markdown
Contributor

Heya,

Just to clarify, we're looking to improve the graceful termination behaviour with respect to duplicate processing of in-flight/'still being worked on' events, do we mean

  1. When unsubscribing, Acks/Nacks that the consumer had previously sent are not being received by the server because the gRPC call is being terminated too soon, resulting in some events being unnecessarily processed twice

  2. When unsubscribing, the consumer may be in the middle of processing some events that it will subsequently try to Ack/Nack but be unable to because the gRPC call will have closed, resulting in some events being unnecessarily processed twice

  3. When unsubscribing, events that the server had sent to the consumer but that the consumer never processes get redistributed to other consumers but this produces some unideal behaviour

(or more than one of the above)

Thanks!
Tim

@avens19

avens19 commented May 12, 2026

Copy link
Copy Markdown
Author

2 was my primary goal, though 3 is also unideal

@avens19

avens19 commented May 14, 2026

Copy link
Copy Markdown
Author

@timothycoleman do you need any more info?

@timothycoleman

Copy link
Copy Markdown
Contributor

Hi @avens19,

Sorry it's taking a while, it affects the API (albeit in a small way) and therefore the client SDKs and so we're discussing it internally.

What client SDK are you using, and do you also have code changes on the client side? I'm mostly
wondering how, after sending Stop, you're determining when the in-flight messages have been cleared and that the server isn't currently sending some more. Maybe it's approximate and based on a timeout?

It may be that since we're not getting a 'Stopped' message in reponse, it'll behave the same as a client-side only mechanism that:

  1. prevents any further events from starting to be processed
  2. waits until we've sent an ack or nack for any even currently being processed
  3. teminate the subscription

Without involving the server before the termination?

@avens19

avens19 commented May 15, 2026

Copy link
Copy Markdown
Author

I have not implemented any clients, we're using the NodeJS client.

Ya my thinking was that it ultimately doesn't matter since the fallback is the existing behaviour. (IE: if there's a race and the server tries to send me an event while I send stop and have no items in the queue, I will exit immediately, ignore the event and the server will retry it on another client as it does today). I plan to continue working on any events I've received, adding any new events I receive during the drain to the queue and then calling the existing unsubscribe. All of this is best effort with the existing behaviour as the fallback.

You're right that it's similar to a client-only solution which is why I mentioned that this also helps with your # 3 from above. Those events that I'm silently ignoring will end up being delayed and out of order relative to other events. I know there are no guarantees for this in general but ideally we'd do our best to prevent.

@avens19

avens19 commented May 15, 2026

Copy link
Copy Markdown
Author

I did implement the client-side solution on my side which you're right does solve # 2 from above which I said was my primary concern, so fair callout

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants