Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion router/pkg/pubsub/redis/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,15 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, conf datasource.Subscri
msgChan := sub.Channel()

cleanup := func() {
err := sub.PUnsubscribe(ctx, subConf.Channels...)
err := sub.PUnsubscribe(context.Background(), subConf.Channels...)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
if err != nil {
log.Error(fmt.Sprintf("error unsubscribing from redis for topics %v", subConf.Channels), zap.Error(err))
}

err = sub.Close()
if err != nil {
log.Error(fmt.Sprintf("error closing connection to redis: %w", zap.Error(err)))
}
Comment thread
dkorittki marked this conversation as resolved.
}

p.closeWg.Add(1)
Expand Down
Loading