diff --git a/src/msg/producer/writer/consumer_writer.go b/src/msg/producer/writer/consumer_writer.go index 57bdca89b2..a8426f4894 100644 --- a/src/msg/producer/writer/consumer_writer.go +++ b/src/msg/producer/writer/consumer_writer.go @@ -27,6 +27,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "github.com/uber-go/tally" @@ -45,6 +46,7 @@ const ( var ( errInvalidConnection = errors.New("connection is invalid") + errWriteInProgress = errors.New("write in progress") u uninitializedReadWriter ) @@ -107,10 +109,13 @@ type consumerWriterImpl struct { connRetrier retry.Retrier logger *zap.Logger - resetCh chan struct{} - doneCh chan struct{} - wg sync.WaitGroup - m consumerWriterMetrics + resetCh chan struct{} + doneCh chan struct{} + wg sync.WaitGroup + m consumerWriterMetrics + isActive int32 + activeWriteDoneCh chan error + activeWriteTimer *time.Timer nowFn clock.NowFn connectFn connectFn @@ -150,17 +155,20 @@ func newConsumerWriter( connOpts := opts.ConnectionOptions() w := &consumerWriterImpl{ - addr: addr, - router: router, - opts: opts, - connOpts: connOpts, - ackRetrier: retry.NewRetrier(opts.AckErrorRetryOptions()), - connRetrier: retry.NewRetrier(connOpts.RetryOptions().SetForever(defaultRetryForever)), - logger: opts.InstrumentOptions().Logger(), - resetCh: make(chan struct{}, 1), - doneCh: make(chan struct{}), - m: m, - nowFn: time.Now, + addr: addr, + router: router, + opts: opts, + connOpts: connOpts, + ackRetrier: retry.NewRetrier(opts.AckErrorRetryOptions()), + connRetrier: retry.NewRetrier(connOpts.RetryOptions().SetForever(defaultRetryForever)), + logger: opts.InstrumentOptions().Logger(), + resetCh: make(chan struct{}, 1), + activeWriteDoneCh: make(chan error, 1), + activeWriteTimer: time.NewTimer(200 * time.Millisecond), + isActive: 0, + doneCh: make(chan struct{}), + m: m, + nowFn: time.Now, } w.connectFn = w.connectNoRetry @@ -195,6 +203,38 @@ func (w *consumerWriterImpl) Address() string { // Write should fail fast so that the write could be tried on other // consumer writers that are sharing the message queue. func (w *consumerWriterImpl) Write(connIndex int, b []byte) error { + // mark as active if not already active. + if !atomic.CompareAndSwapInt32(&w.isActive, 0, 1) { + return errWriteInProgress + } + + // start the timer and write in a goroutine. + w.activeWriteTimer.Reset(200 * time.Millisecond) + go func() { + // write and notify done. + // make it non-blocking by dropping the returned err. + select { + case w.activeWriteDoneCh <- w.write(connIndex, b): + default: + <-w.activeWriteDoneCh + } + + // stop the timer. + w.activeWriteTimer.Stop() + + // mark the write as inactive. + atomic.CompareAndSwapInt32(&w.isActive, 1, 0) + }() + + select { + case err := <-w.activeWriteDoneCh: + return err + case <-w.activeWriteTimer.C: + return errWriteInProgress + } +} + +func (w *consumerWriterImpl) write(connIndex int, b []byte) error { w.writeState.RLock() if !w.writeState.validConns || len(w.writeState.conns) == 0 { w.writeState.RUnlock()