diff --git a/jetstreamext/channelpublisher.go b/jetstreamext/channelpublisher.go new file mode 100644 index 0000000..3b20015 --- /dev/null +++ b/jetstreamext/channelpublisher.go @@ -0,0 +1,396 @@ +// Copyright 2025 Synadia Communications Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jetstreamext + +import ( + "encoding/json" + "fmt" + "math/rand" + "strconv" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type ( + // ChannelPublisher publishes messages asynchronously and delivers + // acks/errors on a single channel. + ChannelPublisher interface { + // Publish publishes a message asynchronously. + Publish(subject string, payload []byte, opts ...PublishOpt) error + + // PublishMsg publishes a nats.Msg asynchronously. + PublishMsg(msg *nats.Msg, opts ...PublishOpt) error + + // Results returns the channel where acks/errors are delivered. + Results() <-chan PubAckResult + + // Close stops the publisher and closes the results channel. + // It processes any pending acks received by the client before closing. + Close() + + // Pending returns number of publishes waiting for ack. + Pending() int + } + + // PubAckResult contains the result of an async publish operation. + PubAckResult struct { + // Ack is the publish acknowledgment from the server. + Ack *jetstream.PubAck + // Err is any error that occurred during publish. + Err error + // Msg is the original message that was published. + Msg *nats.Msg + } + + channelPublisher struct { + js jetstream.JetStream + results chan PubAckResult + sub *nats.Subscription + replyPrefix string + mu sync.Mutex + pending map[string]*pendingPub + closed bool + rr *rand.Rand + stallCh chan struct{} + } + + pendingPub struct { + msg *nats.Msg + timeout *time.Timer + } + + channelPublisherOpts struct { + channelBuffer int + } + + // ChannelPublisherOpt is a functional option for configuring a ChannelPublisher. + ChannelPublisherOpt func(*channelPublisherOpts) error + + // PublishOpt is a functional option for configuring a publish operation. + PublishOpt func(*publishOpts) error + + publishOpts struct { + msgID string + expectedStream string + expectedLastSeq *uint64 + expectedLastSubjSeq *uint64 + expectedLastSubject string + expectedLastMsgID string + ttl time.Duration + stallWait time.Duration + } +) + +const ( + defaultChannelBuffer = 1000 + defaultStallWait = 200 * time.Millisecond + aReplyTokensize = 6 + rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + base = 62 +) + +// NewChannelPublisher creates a new channel-based async publisher. +func NewChannelPublisher(js jetstream.JetStream, opts ...ChannelPublisherOpt) (ChannelPublisher, error) { + pubOpts := channelPublisherOpts{ + channelBuffer: defaultChannelBuffer, + } + + for _, opt := range opts { + if err := opt(&pubOpts); err != nil { + return nil, err + } + } + + cp := &channelPublisher{ + js: js, + results: make(chan PubAckResult, pubOpts.channelBuffer), + pending: make(map[string]*pendingPub), + replyPrefix: js.Conn().NewRespInbox() + ".", + rr: rand.New(rand.NewSource(time.Now().UnixNano())), + } + + // Create single subscription for all async acks + sub, err := js.Conn().Subscribe(cp.replyPrefix+"*", cp.handleAsyncReply) + if err != nil { + return nil, fmt.Errorf("failed to create reply subscription: %w", err) + } + cp.sub = sub + + return cp, nil +} + +// Publish publishes a message asynchronously. +func (cp *channelPublisher) Publish(subject string, payload []byte, opts ...PublishOpt) error { + return cp.PublishMsg(&nats.Msg{Subject: subject, Data: payload}, opts...) +} + +// PublishMsg publishes a nats.Msg asynchronously. +func (cp *channelPublisher) PublishMsg(msg *nats.Msg, opts ...PublishOpt) error { + if msg.Reply != "" { + return ErrChannelPublisherReplySubjectSet + } + o := publishOpts{ + stallWait: defaultStallWait, + } + for _, opt := range opts { + if err := opt(&o); err != nil { + return err + } + } + + cp.mu.Lock() + if cp.closed { + cp.mu.Unlock() + return ErrPublisherClosed + } + + jsOpts := cp.js.Options() + maxPending := jsOpts.PublisherOpts.MaxAckPending + + if maxPending > 0 && len(cp.pending) >= maxPending { + // Create stall channel if needed + if cp.stallCh == nil { + cp.stallCh = make(chan struct{}) + } + stallCh := cp.stallCh + + cp.mu.Unlock() + + // Wait for pending to drop below max or timeout + select { + case <-stallCh: + // Channel closed - pending dropped below max, continue with publish + cp.mu.Lock() + if cp.closed { + return ErrPublisherClosed + } + case <-time.After(o.stallWait): + return ErrTooManyStalledMsgs + } + } + + var sb strings.Builder + sb.WriteString(cp.replyPrefix) + for { + rn := cp.rr.Int63() + var b [aReplyTokensize]byte + for i, l := 0, rn; i < len(b); i++ { + b[i] = rdigits[l%base] + l /= base + } + if _, ok := cp.pending[string(b[:])]; ok { + continue + } + sb.Write(b[:]) + break + + } + id := sb.String()[len(cp.replyPrefix):] + + // Apply publish options to message headers + if err := applyPublishOptsHeaders(msg, &o); err != nil { + cp.mu.Unlock() + return err + } + + pending := &pendingPub{msg: msg} + cp.pending[id] = pending + + // Set reply subject for ack + msg.Reply = sb.String() + + cp.mu.Unlock() + + if err := cp.js.Conn().PublishMsg(msg); err != nil { + cp.mu.Lock() + delete(cp.pending, id) + cp.mu.Unlock() + return err + } + + // Set up ack timeout if configured + if jsOpts.PublisherOpts.AckTimeout > 0 { + pending.timeout = time.AfterFunc(jsOpts.PublisherOpts.AckTimeout, func() { + cp.mu.Lock() + defer cp.mu.Unlock() + + // Check if ack already received + if _, ok := cp.pending[id]; !ok { + return + } + + // Timeout occurred - remove from pending + delete(cp.pending, id) + + result := PubAckResult{ + Msg: msg, + Err: ErrAckTimeout, + } + select { + case cp.results <- result: + default: + // Channel full or closed + } + }) + } + + return nil +} + +// applyPublishOptsHeaders applies publish options to message headers. +func applyPublishOptsHeaders(msg *nats.Msg, o *publishOpts) error { + if msg.Header == nil { + msg.Header = nats.Header{} + } + if o.msgID != "" { + msg.Header.Set(jetstream.MsgIDHeader, o.msgID) + } + if o.expectedStream != "" { + msg.Header.Set(jetstream.ExpectedStreamHeader, o.expectedStream) + } + if o.expectedLastSeq != nil { + msg.Header.Set(jetstream.ExpectedLastSeqHeader, strconv.FormatUint(*o.expectedLastSeq, 10)) + } + if o.expectedLastSubjSeq != nil { + msg.Header.Set(jetstream.ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.expectedLastSubjSeq, 10)) + } + if o.expectedLastSubject != "" { + msg.Header.Set(jetstream.ExpectedLastSubjSeqSubjHeader, o.expectedLastSubject) + } + if o.expectedLastMsgID != "" { + msg.Header.Set(jetstream.ExpectedLastMsgIDHeader, o.expectedLastMsgID) + } + if o.ttl > 0 { + msg.Header.Set(jetstream.MsgTTLHeader, o.ttl.String()) + } + + return nil +} + +// handleAsyncReply processes incoming ack messages from the subscription. +func (cp *channelPublisher) handleAsyncReply(m *nats.Msg) { + // Extract token from reply subject + token := strings.TrimPrefix(m.Subject, cp.replyPrefix) + + // Look up pending publish + cp.mu.Lock() + pending, found := cp.pending[token] + if !found { + cp.mu.Unlock() + return + } + delete(cp.pending, token) + + // Stop timeout timer if it exists + if pending.timeout != nil { + pending.timeout.Stop() + } + + // Close stall channel if pending dropped below max + jsOpts := cp.js.Options() + maxPending := jsOpts.PublisherOpts.MaxAckPending + if cp.stallCh != nil && maxPending > 0 && len(cp.pending) < maxPending { + close(cp.stallCh) + cp.stallCh = nil + } + + cp.mu.Unlock() + + result := PubAckResult{Msg: pending.msg} + + // Parse ack response + if len(m.Data) > 0 { + var ackResp struct { + jetstream.PubAck + Error *jetstream.APIError `json:"error,omitempty"` + } + + if err := json.Unmarshal(m.Data, &ackResp); err != nil { + result.Err = fmt.Errorf("failed to parse ack: %w", err) + } else if ackResp.Error != nil { + result.Err = ackResp.Error + } else { + result.Ack = &ackResp.PubAck + } + } else { + result.Err = fmt.Errorf("empty ack response") + } + + select { + case cp.results <- result: + default: + // Channel is full or closed, drop result + } +} + +// Results returns the channel where acks/errors are delivered. +func (cp *channelPublisher) Results() <-chan PubAckResult { + cp.mu.Lock() + defer cp.mu.Unlock() + return cp.results +} + +// Close stops the publisher and closes the results channel. +func (cp *channelPublisher) Close() { + cp.mu.Lock() + if cp.closed { + cp.mu.Unlock() + return + } + cp.closed = true + + // Close stall channel to unblock any waiting publishers + if cp.stallCh != nil { + close(cp.stallCh) + cp.stallCh = nil + } + cp.mu.Unlock() + + // drain the subscription to make sure all incoming acks are processed + if cp.sub != nil { + cp.sub.Drain() + } + + cp.mu.Lock() + for token, pending := range cp.pending { + if pending.timeout != nil { + pending.timeout.Stop() + } + + result := PubAckResult{ + Msg: pending.msg, + Err: ErrPublisherClosed, + } + select { + case cp.results <- result: + default: + } + delete(cp.pending, token) + } + cp.mu.Unlock() + + // Close the results channel + close(cp.results) +} + +// Pending returns number of publishes waiting for ack. +func (cp *channelPublisher) Pending() int { + cp.mu.Lock() + defer cp.mu.Unlock() + return len(cp.pending) +} diff --git a/jetstreamext/channelpublisher_options.go b/jetstreamext/channelpublisher_options.go new file mode 100644 index 0000000..67d7568 --- /dev/null +++ b/jetstreamext/channelpublisher_options.go @@ -0,0 +1,107 @@ +// Copyright 2025 Synadia Communications Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jetstreamext + +import ( + "fmt" + "time" +) + +// ChannelPublisher options + +// WithChannelBuffer sets the result channel buffer size. +// Default is 1000. +func WithChannelBuffer(size int) ChannelPublisherOpt { + return func(opts *channelPublisherOpts) error { + if size <= 0 { + return fmt.Errorf("%w: channel buffer size must be greater than 0", ErrInvalidOption) + } + opts.channelBuffer = size + return nil + } +} + +// Publish options + +// WithMsgID sets the message ID for deduplication. +func WithMsgID(id string) PublishOpt { + return func(opts *publishOpts) error { + opts.msgID = id + return nil + } +} + +// WithExpectStream sets the expected stream the message should be published to. +func WithExpectStream(stream string) PublishOpt { + return func(opts *publishOpts) error { + opts.expectedStream = stream + return nil + } +} + +// WithExpectLastSequence sets the expected sequence number of the last message in the stream. +func WithExpectLastSequence(seq uint64) PublishOpt { + return func(opts *publishOpts) error { + opts.expectedLastSeq = &seq + return nil + } +} + +// WithExpectLastSequencePerSubject sets the expected sequence number of the last message +// on the subject the message is published to. +func WithExpectLastSequencePerSubject(seq uint64) PublishOpt { + return func(opts *publishOpts) error { + opts.expectedLastSubjSeq = &seq + return nil + } +} + +// WithExpectLastSequenceForSubject sets the sequence and subject for which the +// last sequence number should be checked. +func WithExpectLastSequenceForSubject(seq uint64, subject string) PublishOpt { + return func(opts *publishOpts) error { + if subject == "" { + return fmt.Errorf("%w: subject cannot be empty", ErrInvalidOption) + } + opts.expectedLastSubjSeq = &seq + opts.expectedLastSubject = subject + return nil + } +} + +// WithExpectLastMsgID sets the expected last message ID. +func WithExpectLastMsgID(id string) PublishOpt { + return func(opts *publishOpts) error { + opts.expectedLastMsgID = id + return nil + } +} + +// WithMsgTTL sets per-message TTL. +func WithMsgTTL(ttl time.Duration) PublishOpt { + return func(opts *publishOpts) error { + opts.ttl = ttl + return nil + } +} + +// WithStallWait sets the max wait when the producer becomes stalled. +// If a publish call is blocked for this long due to max pending acks, +// ErrTooManyStalledMsgs is returned. +func WithStallWait(wait time.Duration) PublishOpt { + return func(opts *publishOpts) error { + opts.stallWait = wait + return nil + } +} diff --git a/jetstreamext/errors.go b/jetstreamext/errors.go index e60a6dd..877b0d7 100644 --- a/jetstreamext/errors.go +++ b/jetstreamext/errors.go @@ -39,6 +39,20 @@ var ( // ErrInvalidBatchAck is returned when JetStream ack from batch publish is // invalid. ErrInvalidBatchAck jetstream.JetStreamError = &jsError{message: "invalid jetstream batch publish response"} + + // Channel publisher errors + + // ErrPublisherClosed is returned when attempting to publish on a closed publisher. + ErrPublisherClosed = &jsError{message: "channel publisher closed"} + + // ErrTooManyStalledMsgs is returned when max pending acks is reached and stall wait expires. + ErrTooManyStalledMsgs = &jsError{message: "too many stalled messages"} + + // ErrAckTimeout is returned when an ack is not received within the configured timeout. + ErrAckTimeout = &jsError{message: "ack timeout"} + + // ErrChannelPublisherReplySubjectSet is returned when a message to be published already has a reply subject set. + ErrChannelPublisherReplySubjectSet = &jsError{message: "channel publisher cannot publish messages with reply subject set"} ) type jsError struct { diff --git a/jetstreamext/go.mod b/jetstreamext/go.mod index 3db3b27..fac61fe 100644 --- a/jetstreamext/go.mod +++ b/jetstreamext/go.mod @@ -3,7 +3,7 @@ module github.com/synadia-io/orbit.go/jetstreamext go 1.24.0 require ( - github.com/nats-io/nats.go v1.46.0 + github.com/nats-io/nats.go v1.47.1-0.20251217134354-a5a509fddade github.com/nats-io/nuid v1.0.1 github.com/synadia-io/orbit.go/natsext v0.1.1 ) diff --git a/jetstreamext/go.sum b/jetstreamext/go.sum index 3961421..1ad6ca1 100644 --- a/jetstreamext/go.sum +++ b/jetstreamext/go.sum @@ -1,7 +1,7 @@ github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= -github.com/nats-io/nats.go v1.46.0 h1:iUcX+MLT0HHXskGkz+Sg20sXrPtJLsOojMDTDzOHSb8= -github.com/nats-io/nats.go v1.46.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.47.1-0.20251217134354-a5a509fddade h1:9Z7NnuuCDqdf/4EofXkgxOtwobfxB3o/2vTFJZA+d9Y= +github.com/nats-io/nats.go v1.47.1-0.20251217134354-a5a509fddade/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/jetstreamext/go.work b/jetstreamext/go.work index 8913bc7..d8f1634 100644 --- a/jetstreamext/go.work +++ b/jetstreamext/go.work @@ -1,4 +1,4 @@ -go 1.24.0 +go 1.25.0 use ( . diff --git a/jetstreamext/go.work.sum b/jetstreamext/go.work.sum index ff4c61e..a301eb9 100644 --- a/jetstreamext/go.work.sum +++ b/jetstreamext/go.work.sum @@ -1,23 +1,30 @@ github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm-tools v0.3.13-0.20230620182252-4639ecce2aba h1:qJEJcuLzH5KDR0gKc0zcktin6KSAwL7+jWKBYceddTc= github.com/google/go-tpm-tools v0.3.13-0.20230620182252-4639ecce2aba/go.mod h1:EFYHy8/1y2KfgTAsx7Luu7NGhoxtuVHnNo8jE7FikKc= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= +github.com/nats-io/nats.go v1.47.1-0.20251217134354-a5a509fddade h1:9Z7NnuuCDqdf/4EofXkgxOtwobfxB3o/2vTFJZA+d9Y= +github.com/nats-io/nats.go v1.47.1-0.20251217134354-a5a509fddade/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= @@ -33,12 +40,14 @@ golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw= +golang.org/x/term v0.35.0 h1:bZBVKBudEyhRcajGcNc3jIfWPqV4y/Kt2XcoigOWtDQ= golang.org/x/term v0.35.0/go.mod h1:TPGtkTLesOwf2DE8CgVYiZinHAOuy5AYUYT1lENIZnA= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= diff --git a/jetstreamext/test/channelpublisher_test.go b/jetstreamext/test/channelpublisher_test.go new file mode 100644 index 0000000..6699bc6 --- /dev/null +++ b/jetstreamext/test/channelpublisher_test.go @@ -0,0 +1,638 @@ +// Copyright 2025 Synadia Communications Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/synadia-io/orbit.go/jetstreamext" +) + +func TestChannelPublisher_BasicFlow(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(js) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + if err := pub.Publish("test.foo", []byte("message 1")); err != nil { + t.Fatalf("Unexpected error publishing: %v", err) + } + + select { + case result := <-pub.Results(): + if result.Err != nil { + t.Fatalf("Unexpected error in result: %v", result.Err) + } + if result.Ack == nil { + t.Fatal("Expected non-nil ack") + } + if result.Ack.Stream != "TEST" { + t.Fatalf("Expected stream TEST, got %s", result.Ack.Stream) + } + if result.Ack.Sequence != 1 { + t.Fatalf("Expected sequence 1, got %d", result.Ack.Sequence) + } + if result.Msg == nil { + t.Fatal("Expected non-nil message") + } + if result.Msg.Subject != "test.foo" { + t.Fatalf("Expected subject test.foo, got %s", result.Msg.Subject) + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for result") + } + + // Verify pending count is 0 + if pending := pub.Pending(); pending != 0 { + t.Fatalf("Expected 0 pending, got %d", pending) + } +} + +func TestChannelPublisher_MultipleMessages(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(js) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + count := 10 + for i := range count { + if err := pub.Publish("test.multi", []byte("message")); err != nil { + t.Fatalf("Unexpected error publishing message %d: %v", i, err) + } + } + + // Collect all results + received := 0 + timeout := time.After(3 * time.Second) + for range count { + select { + case result := <-pub.Results(): + if result.Err != nil { + t.Fatalf("Unexpected error in result: %v", result.Err) + } + if result.Ack == nil { + t.Fatal("Expected non-nil ack") + } + case <-timeout: + t.Fatalf("Timeout waiting for results, received %d/%d", received, count) + } + } + // verify there are no more results + select { + case result := <-pub.Results(): + t.Fatalf("Expected no more results, got %v", result) + case <-time.After(100 * time.Millisecond): + } + +} + +func TestChannelPublisher_PublishError(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(js) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + // Publish with wrong expected stream using PublishOpt + if err := pub.Publish("test.foo", []byte("message"), jetstreamext.WithExpectStream("WRONG")); err != nil { + t.Fatalf("Unexpected error publishing: %v", err) + } + + // Should get error in results + select { + case result := <-pub.Results(): + if result.Err == nil { + t.Fatal("Expected error in result") + } + var apiErr *jetstream.APIError + if !errors.As(result.Err, &apiErr) { + t.Fatalf("Expected APIError, got %T", result.Err) + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for error result") + } +} + +func TestChannelPublisher_SemaphorePattern(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(js, jetstreamext.WithChannelBuffer(100)) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + totalMessages := 100 + maxInFlight := 10 + sem := make(chan struct{}, maxInFlight) + + var wg sync.WaitGroup + + // Track errors + publishErrors := 0 + resultErrors := 0 + successCount := 0 + + // consume acks + wg.Go(func() { + for range totalMessages { + result := <-pub.Results() + if result.Err != nil { + resultErrors++ + } else { + successCount++ + } + <-sem + } + }) + + for range totalMessages { + sem <- struct{}{} + + if err := pub.Publish("test.sem", []byte("message")); err != nil { + publishErrors++ + <-sem + } + } + + wg.Wait() + + if publishErrors > 0 { + t.Fatalf("Expected 0 publish errors, got %d", publishErrors) + } + if resultErrors > 0 { + t.Fatalf("Expected 0 result errors, got %d", resultErrors) + } + if successCount != totalMessages { + t.Fatalf("Expected %d successful publishes, got %d", totalMessages, successCount) + } +} + +func TestChannelPublisher_Close(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(js) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + + // Publish some messages + for i := range 5 { + if err := pub.Publish("test.close", []byte("message")); err != nil { + t.Fatalf("Unexpected error publishing message %d: %v", i, err) + } + } + + // Wait a bit before closing to allow acks to be sent from server + time.Sleep(100 * time.Millisecond) + + // Close publisher - should wait for pending and close channel + pub.Close() + + count := 0 + for range 5 { + select { + case result := <-pub.Results(): + if result.Err != nil { + t.Fatalf("Unexpected error in result: %v", result.Err) + } + count++ + case <-time.After(2 * time.Second): + t.Fatalf("Timeout waiting for results, received %d/5", count) + } + } + if count != 5 { + t.Errorf("Expected 5 results, got %d", count) + } + + // Verify pending is 0 + if pending := pub.Pending(); pending != 0 { + t.Fatalf("Expected 0 pending after close, got %d", pending) + } + + // Close again should be safe + pub.Close() + + // Try to publish after close + err = pub.Publish("test.closed", []byte("message")) + if !errors.Is(err, jetstreamext.ErrPublisherClosed) { + t.Fatalf("Expected ErrPublisherClosed, got %v", err) + } +} + +func TestChannelPublisher_ConcurrentPublish(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(js) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + publishers := 10 + messagesPerPublisher := 20 + totalMessages := publishers * messagesPerPublisher + + // Start multiple publishers + for range publishers { + go func() { + for range messagesPerPublisher { + if err := pub.Publish("test.concurrent", []byte("message")); err != nil { + t.Logf("Publish error: %v", err) + } + } + }() + } + + received := 0 + timeout := time.After(5 * time.Second) + for range totalMessages { + select { + case result := <-pub.Results(): + if result.Err != nil { + t.Fatalf("Unexpected error in result: %v", result.Err) + } + case <-timeout: + t.Fatalf("Timeout waiting for results, received %d/%d", received, totalMessages) + } + } +} + +func TestChannelPublisher_WithPublishOptions(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(js) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + // Publish with expected stream option + if err := pub.Publish("test.opts", []byte("message"), jetstreamext.WithExpectStream("TEST")); err != nil { + t.Fatalf("Unexpected error publishing: %v", err) + } + + // Verify result + select { + case result := <-pub.Results(): + if result.Err != nil { + t.Fatalf("Unexpected error in result: %v", result.Err) + } + if result.Ack == nil { + t.Fatal("Expected non-nil ack") + } + if result.Ack.Stream != "TEST" { + t.Fatalf("Expected stream TEST, got %s", result.Ack.Stream) + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for result") + } + + // Publish with message ID option + msgID := "unique-id-123" + if err := pub.Publish("test.opts", []byte("message 2"), jetstreamext.WithMsgID(msgID)); err != nil { + t.Fatalf("Unexpected error publishing with msg ID: %v", err) + } + + // Verify result + select { + case result := <-pub.Results(): + if result.Err != nil { + t.Fatalf("Unexpected error in result: %v", result.Err) + } + if result.Ack == nil { + t.Fatal("Expected non-nil ack") + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for result with msg ID") + } + + // Try to publish duplicate message ID - should be deduplicated + if err := pub.Publish("test.opts", []byte("duplicate"), jetstreamext.WithMsgID(msgID)); err != nil { + t.Fatalf("Unexpected error publishing duplicate: %v", err) + } + + // should get ack with Duplicate=true + select { + case result := <-pub.Results(): + if result.Err != nil { + t.Fatalf("Unexpected error for duplicate: %v", result.Err) + } + if result.Ack == nil { + t.Fatal("Expected non-nil ack for duplicate") + } + if !result.Ack.Duplicate { + t.Fatal("Expected duplicate flag to be true") + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for duplicate result") + } +} + +func TestChannelPublisher_WithChannelBuffer(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Create a stream + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + // Create with small buffer + pub, err := jetstreamext.NewChannelPublisher(js, jetstreamext.WithChannelBuffer(5)) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + // Publish messages + for i := range 5 { + if err := pub.Publish("test.buffer", []byte("message")); err != nil { + t.Fatalf("Unexpected error publishing message %d: %v", i, err) + } + } + + // Read all results + for range 5 { + select { + case result := <-pub.Results(): + if result.Err != nil { + t.Fatalf("Unexpected error in result: %v", result.Err) + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for result") + } + } +} + +func TestChannelPublisher_InvalidOptions(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + // Invalid buffer size + _, err := jetstreamext.NewChannelPublisher(js, jetstreamext.WithChannelBuffer(0)) + if !errors.Is(err, jetstreamext.ErrInvalidOption) { + t.Fatalf("Expected ErrInvalidOption, got %v", err) + } + + _, err = jetstreamext.NewChannelPublisher(js, jetstreamext.WithChannelBuffer(-1)) + if !errors.Is(err, jetstreamext.ErrInvalidOption) { + t.Fatalf("Expected ErrInvalidOption for negative buffer, got %v", err) + } +} + +func TestChannelPublisher_MaxPending(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer nc.Close() + + js, err := jetstream.New(nc, jetstream.WithPublishAsyncMaxPending(5)) + if err != nil { + t.Fatalf("Failed to create JetStream: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err = js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + NoAck: true, // Use NoAck to force server to not send acks back + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(js) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + // Publish up to max pending + for i := range 5 { + if err := pub.Publish("test.maxpending", []byte("message")); err != nil { + t.Fatalf("Unexpected error publishing message %d: %v", i, err) + } + } + if pub.Pending() != 5 { + t.Fatalf("Expected 5 pending, got %d", pub.Pending()) + } + + // Try to publish one more - should hit max and stall + start := time.Now() + err = pub.Publish("test.maxpending", []byte("message"), jetstreamext.WithStallWait(100*time.Millisecond)) + elapsed := time.Since(start) + + if !errors.Is(err, jetstreamext.ErrTooManyStalledMsgs) { + t.Fatalf("Expected ErrTooManyStalledMsgs, got %v", err) + } + if elapsed < 80*time.Millisecond || elapsed > 200*time.Millisecond { + t.Fatalf("Stall wait took %v, expected ~100ms", elapsed) + } +} + +func TestChannelPublisher_AckTimeout(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Create a stream + _, err := js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "TEST", + Subjects: []string{"test.>"}, + NoAck: true, // Use NoAck to force server to not send acks back + }) + if err != nil { + t.Fatalf("Unexpected error creating stream: %v", err) + } + + // Create JetStream with short publish async timeout + jsWithTimeout, err := jetstream.New(nc, jetstream.WithPublishAsyncTimeout(100*time.Millisecond)) + if err != nil { + t.Fatalf("Unexpected error creating jetstream: %v", err) + } + + pub, err := jetstreamext.NewChannelPublisher(jsWithTimeout) + if err != nil { + t.Fatalf("Unexpected error creating channel publisher: %v", err) + } + defer pub.Close() + + if err := pub.Publish("test.timeout", []byte("message")); err != nil { + t.Fatalf("Unexpected error publishing: %v", err) + } + if pub.Pending() != 1 { + t.Fatalf("Expected 1 pending, got %d", pub.Pending()) + } + + select { + case result := <-pub.Results(): + if result.Err == nil { + t.Fatal("Expected timeout error in result") + } + if !errors.Is(result.Err, jetstreamext.ErrAckTimeout) { + t.Fatalf("Expected ErrAckTimeout, got %v", result.Err) + } + case <-time.After(2 * time.Second): + t.Fatal("Timeout waiting for result") + } + + // Verify Pending() returns 0 after ack times out + if pending := pub.Pending(); pending != 0 { + t.Fatalf("Expected 0 pending, got %d", pending) + } +} diff --git a/jetstreamext/test/go.mod b/jetstreamext/test/go.mod index 73a37e6..411a82e 100644 --- a/jetstreamext/test/go.mod +++ b/jetstreamext/test/go.mod @@ -1,10 +1,10 @@ module tests -go 1.24.0 +go 1.25.0 require ( github.com/nats-io/nats-server/v2 v2.12.0 - github.com/nats-io/nats.go v1.46.0 + github.com/nats-io/nats.go v1.47.1-0.20251217134354-a5a509fddade github.com/synadia-io/orbit.go/jetstreamext v0.1.0 ) diff --git a/jetstreamext/test/go.sum b/jetstreamext/test/go.sum index 5be68fb..1f2f072 100644 --- a/jetstreamext/test/go.sum +++ b/jetstreamext/test/go.sum @@ -10,8 +10,7 @@ github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74= github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww= -github.com/nats-io/nats.go v1.46.0 h1:iUcX+MLT0HHXskGkz+Sg20sXrPtJLsOojMDTDzOHSb8= -github.com/nats-io/nats.go v1.46.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.47.1-0.20251217134354-a5a509fddade h1:9Z7NnuuCDqdf/4EofXkgxOtwobfxB3o/2vTFJZA+d9Y= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=