Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ jobs:
go-version: ${{ matrix.go-version }}

- name: Integration tests (without cache)
run: go run . integration-test -dev-server
# TODO: temporary -run filter to test 1s sleep fix for TestMultipleUpdateOrdering flake
run: go run . integration-test -dev-server -run "TestIntegrationSuite/TestMultipleUpdateOrdering"
working-directory: ./internal/cmd/build
env:
WORKFLOW_CACHE_SIZE: "0"
Expand Down Expand Up @@ -106,7 +107,8 @@ jobs:
go-version: ${{ matrix.go-version }}

- name: Integration tests (with cache)
run: go run . integration-test -dev-server
# TODO: temporary -run filter to test 1s sleep fix for TestMultipleUpdateOrdering flake
run: go run . integration-test -dev-server -run "TestIntegrationSuite/TestMultipleUpdateOrdering"
working-directory: ./internal/cmd/build

docker-compose-test:
Expand Down
25 changes: 7 additions & 18 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,24 +310,13 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (taskForWorker,
}()

if bp.workerPollCompleteOnShutdown != nil && bp.workerPollCompleteOnShutdown.Load() {
// Don't kill the gRPC stream. After ShutdownWorker, the server returns empty responses.
select {
case <-doneC:
return result, err
case <-bp.stopC:
// TEMP FIX: Give the server a reasonable window to complete the poll after
// ShutdownWorker. Fall back to cancelling the poll if it takes too
// long, e.g. when the gRPC connection was closed before Stop().
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case <-doneC:
case <-timer.C:
cancel()
<-doneC
}
return result, err
}
// Don't cancel the gRPC stream. After ShutdownWorker, the server
// completes the poll with an empty response. The poll is bounded by
// the gRPC timeout (pollTaskServiceTimeOut). The worker's stop
// timeout (WorkerStopTimeout) controls how long Stop() blocks;
// goroutines clean up in the background within the gRPC deadline.
<-doneC
return result, err
}

// Legacy: cancel in-flight polls immediately on shutdown
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,10 @@ func (aw *AggregatedWorker) Stop() {

close(aw.stopC)

// TODO: temporary sleep to test if the CancelOutstandingWorkerPolls race
// is caused by polls still being registered when ShutdownWorker runs.
time.Sleep(1 * time.Second)

aw.shutdownWorker()

// Issue stop through plugins
Expand Down
51 changes: 29 additions & 22 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ type (
lastPollTaskErrLock sync.Mutex

noRepoll atomic.Bool
pollerWG sync.WaitGroup
}

eagerOrPolledTask interface {
Expand Down Expand Up @@ -391,6 +392,7 @@ func (bw *baseWorker) Start() {

for i := 0; i < taskWorker.pollerCount; i++ {
bw.stopWG.Add(1)
bw.pollerWG.Add(1)
go bw.runPoller(taskWorker)
}

Expand All @@ -403,6 +405,15 @@ func (bw *baseWorker) Start() {
}
}

// When all pollers have exited, close taskQueueCh so the dispatcher
// knows no more polled tasks will arrive and can drain what remains.
bw.stopWG.Add(1)
go func() {
defer bw.stopWG.Done()
bw.pollerWG.Wait()
close(bw.taskQueueCh)
}()

bw.stopWG.Add(1)
go bw.runTaskDispatcher()

Expand All @@ -428,6 +439,10 @@ func (bw *baseWorker) isStop() bool {

func (bw *baseWorker) runPoller(taskWorker scalableTaskPoller) {
defer bw.stopWG.Done()
defer func() {
bw.logger.Info("Poller exiting", "pollerType", taskWorker.taskPollerType)
bw.pollerWG.Done()
}()
// Note: With poller autoscaling, this metric doesn't make a lot of sense since the number of pollers can go up and down.
bw.metricsHandler.Counter(metrics.PollerStartCounter).Inc(1)

Expand Down Expand Up @@ -561,24 +576,17 @@ func (bw *baseWorker) processTaskAsync(eagerOrPolled eagerOrPolledTask) {
func (bw *baseWorker) runTaskDispatcher() {
defer bw.stopWG.Done()

for {
// wait for new task or worker stop
select {
case <-bw.stopCh:
// Currently we can drop any tasks received when closing.
// https://github.com/temporalio/sdk-go/issues/1197
return
case task := <-bw.taskQueueCh:
// for non-polled-task (local activity result as task or eager task), we don't need to rate limit
_, isPolledTask := task.(*polledTask)
if isPolledTask && bw.taskLimiter.Wait(bw.limiterContext) != nil {
if bw.isStop() {
bw.releaseSlot(task.getPermit(), SlotReleaseReasonUnused)
return
}
}
bw.processTaskAsync(task)
for task := range bw.taskQueueCh {
// For non-polled-task (local activity result as task or eager task),
// we don't need to rate limit. During shutdown the limiter context
// is cancelled, so Wait returns immediately — we still process the
// task rather than dropping it.
if _, isPolledTask := task.(*polledTask); isPolledTask {
// Ignore error: during shutdown the limiter context is
// cancelled, but we still process remaining tasks.
_ = bw.taskLimiter.Wait(bw.limiterContext)
}
bw.processTaskAsync(task)
}
}

Expand Down Expand Up @@ -639,11 +647,10 @@ func (bw *baseWorker) pollTask(taskWorker scalableTaskPoller, slotPermit *SlotPe
taskWorker.pollerAutoscalerReportHandle.handleTask(task)
}

select {
case bw.taskQueueCh <- &polledTask{task: task, permit: slotPermit}:
didSendTask = true
case <-bw.stopCh:
}
// The dispatcher is guaranteed to be alive: it only exits after
// taskQueueCh is closed, which happens after all pollers finish.
bw.taskQueueCh <- &polledTask{task: task, permit: slotPermit}
didSendTask = true
}
}

Expand Down
88 changes: 88 additions & 0 deletions internal/internal_worker_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,94 @@ type noopTaskProcessor struct{}

func (noopTaskProcessor) ProcessTask(any) error { return nil }

// TestTaskNotDroppedDuringShutdown verifies the two-stage shutdown: when a
// poller receives a task during shutdown, the task is still dispatched and
// processed rather than silently dropped.
func TestTaskNotDroppedDuringShutdown(t *testing.T) {
taskProcessed := make(chan struct{})
pollStarted := make(chan struct{})

// A poller that blocks until returnTask is closed, then returns a task.
tp := &shutdownTaskPoller{
pollStarted: pollStarted,
returnTask: make(chan struct{}),
task: &testTask{},
}

processor := &recordingTaskProcessor{
processed: taskProcessed,
}

bw := newBaseWorker(baseWorkerOptions{
slotSupplier: &testSlotSupplier{},
maxTaskPerSecond: 1000,
taskPollers: []scalableTaskPoller{
{taskPollerType: "test", pollerCount: 1, taskPoller: tp},
},
taskProcessor: processor,
workerType: "ShutdownTest",
logger: ilog.NewNopLogger(),
stopTimeout: 5 * time.Second,
metricsHandler: metrics.NopHandler,
})

bw.Start()

// Wait for the poller to start polling
<-pollStarted

// Signal the poller to return a task, then stop the worker.
// The task should be processed, not dropped.
bw.noRepoll.Store(true)
close(bw.stopCh)
close(tp.returnTask)
bw.limiterContextCancel()

select {
case <-taskProcessed:
// Success: the task was dispatched and processed
case <-time.After(5 * time.Second):
t.Fatal("task polled during shutdown was not processed (dropped)")
}

// Wait for full cleanup. We already closed stopCh manually, so
// replicate the remaining Stop() logic.
awaitWaitGroup(&bw.stopWG, bw.options.stopTimeout)
}

// shutdownTaskPoller blocks until returnTask is closed, then returns a task
// exactly once. Subsequent polls return nil.
type shutdownTaskPoller struct {
pollStarted chan struct{}
returnTask chan struct{}
task taskForWorker
returned atomic.Bool
}

func (p *shutdownTaskPoller) PollTask() (taskForWorker, error) {
select {
case p.pollStarted <- struct{}{}:
default:
}
<-p.returnTask
if p.returned.CompareAndSwap(false, true) {
return p.task, nil
}
return nil, nil
}

type recordingTaskProcessor struct {
processed chan struct{}
}

func (p *recordingTaskProcessor) ProcessTask(any) error {
select {
case p.processed <- struct{}{}:
default:
}
return nil
}

func (s *PollScalerReportHandleSuite) TestAutoscaleDownOnTimeoutWithCapability() {
targetSuggestion := 0
ps := newPollScalerReportHandle(pollScalerReportHandleOptions{
Expand Down
Loading