From 99a996ecd51978a2def11a14c0fb32c02c70d279 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Thu, 4 Jun 2026 12:56:09 +0200 Subject: [PATCH 1/3] fix: close redis connections on unsubscribe --- router/pkg/pubsub/redis/adapter.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/router/pkg/pubsub/redis/adapter.go b/router/pkg/pubsub/redis/adapter.go index 699670f1f7..e7eb559acc 100644 --- a/router/pkg/pubsub/redis/adapter.go +++ b/router/pkg/pubsub/redis/adapter.go @@ -100,10 +100,15 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, conf datasource.Subscri msgChan := sub.Channel() cleanup := func() { - err := sub.PUnsubscribe(ctx, subConf.Channels...) + err := sub.PUnsubscribe(context.Background(), subConf.Channels...) if err != nil { log.Error(fmt.Sprintf("error unsubscribing from redis for topics %v", subConf.Channels), zap.Error(err)) } + + err = sub.Close() + if err != nil { + log.Error(fmt.Sprintf("error closing connection to redis: %w", zap.Error(err))) + } } p.closeWg.Add(1) From 30c4dbb9c2bc58d1c963ce21aa0f52b0dcfd2f0d Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Thu, 4 Jun 2026 17:56:05 +0200 Subject: [PATCH 2/3] fix: use correct format identifier in Sprintf --- router/pkg/pubsub/redis/adapter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/pkg/pubsub/redis/adapter.go b/router/pkg/pubsub/redis/adapter.go index e7eb559acc..dfde518886 100644 --- a/router/pkg/pubsub/redis/adapter.go +++ b/router/pkg/pubsub/redis/adapter.go @@ -107,7 +107,7 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, conf datasource.Subscri err = sub.Close() if err != nil { - log.Error(fmt.Sprintf("error closing connection to redis: %w", zap.Error(err))) + log.Error(fmt.Sprintf("error closing connection to redis: %v", zap.Error(err))) } } From 7a44cae6638a7fbbd4d9a5bb1279e0ebecbcec4f Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Thu, 4 Jun 2026 18:43:39 +0200 Subject: [PATCH 3/3] fix: use fixed timeout for unsubscribe --- router/pkg/pubsub/redis/adapter.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/router/pkg/pubsub/redis/adapter.go b/router/pkg/pubsub/redis/adapter.go index dfde518886..7a1d9beac6 100644 --- a/router/pkg/pubsub/redis/adapter.go +++ b/router/pkg/pubsub/redis/adapter.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/wundergraph/cosmo/router/pkg/metric" @@ -100,7 +101,10 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, conf datasource.Subscri msgChan := sub.Channel() cleanup := func() { - err := sub.PUnsubscribe(context.Background(), subConf.Channels...) + unsubCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := sub.PUnsubscribe(unsubCtx, subConf.Channels...) if err != nil { log.Error(fmt.Sprintf("error unsubscribing from redis for topics %v", subConf.Channels), zap.Error(err)) }