Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
38435fb
graceful poll completion, upgrade API lib, add test
yuandrew Feb 21, 2026
7d20180
Add integration test
yuandrew Mar 7, 2026
5248f1e
Merge branch 'master' into shutdown-poll-complete
yuandrew Mar 10, 2026
51506f8
Bump dev server to cloud release containing fix
yuandrew Mar 12, 2026
c501a4a
fix test race, fix dynamic config
yuandrew Mar 12, 2026
1d252b2
Forgot to set graceful shutdown..
yuandrew Mar 13, 2026
6bdd8ad
Fix test failures due to server upgrade, add to CONTRIBUTING.md, prev…
yuandrew Mar 16, 2026
208b74e
Merge branch 'master' into shutdown-poll-complete
yuandrew Mar 18, 2026
6accf29
Try with older server version
yuandrew Mar 18, 2026
42f7bf0
Fix sessions, rename gracefulPollShutdown to workerPollCompleteOnShut…
yuandrew Mar 18, 2026
68e3957
Merge branch 'shutdown-poll-complete' into shutdown-poll-complete1
yuandrew Mar 19, 2026
ab40351
revert to newer server version
yuandrew Mar 19, 2026
c1c85ef
try with higher timeouts, probably won't work
yuandrew Mar 19, 2026
da78d7f
tighten timeout
yuandrew Mar 19, 2026
336b7e2
Revert all non-test changes to isolate server version as flake cause
yuandrew Mar 19, 2026
8761325
Revert "Revert all non-test changes to isolate server version as flak…
yuandrew Mar 19, 2026
97f5a1d
Revert shutdown-poll-complete changes to isolate server version as fl…
yuandrew Mar 19, 2026
a463078
Revert "Revert shutdown-poll-complete changes to isolate server versi…
yuandrew Mar 19, 2026
1d6ad2a
only run TestWorkerTunerTestSuite/TestResourceBasedSmallSlots in CI
yuandrew Mar 19, 2026
c8336ab
Test A: Remove only noRepoll (keep TaskQueue/TaskQueueTypes)
yuandrew Mar 19, 2026
0260446
Test B: Remove only TaskQueue/TaskQueueTypes (keep noRepoll)
yuandrew Mar 19, 2026
635ee6f
Test C: test removing just the pollAndProcessSingleTask change
yuandrew Mar 19, 2026
8e52e6f
move tuner creation inside for loop
yuandrew Mar 19, 2026
ef1a0ea
revert change, print s.issuedSlots.Load()
yuandrew Mar 19, 2026
1640fc5
take out print to ensure it fails again
yuandrew Mar 19, 2026
0600432
fully go back
yuandrew Mar 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
go-version: ${{ matrix.go-version }}

- name: Integration tests (without cache)
run: go run . integration-test -dev-server
run: go run . integration-test -dev-server -run "TestWorkerTunerTestSuite/TestResourceBasedSmallSlots"
working-directory: ./internal/cmd/build
env:
WORKFLOW_CACHE_SIZE: "0"
Expand Down Expand Up @@ -106,7 +106,7 @@ jobs:
go-version: ${{ matrix.go-version }}

- name: Integration tests (with cache)
run: go run . integration-test -dev-server
run: go run . integration-test -dev-server -run "TestWorkerTunerTestSuite/TestResourceBasedSmallSlots"
working-directory: ./internal/cmd/build

docker-compose-test:
Expand Down
36 changes: 33 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,57 @@ All PR titles should start with Upper case.

## Testing

Tests are managed through the build tool at `internal/cmd/build`. This tool handles starting an embedded Temporal dev
server with the required dynamic configs and search attributes, enforces consistent test flags (`-race`, `-count 1`, no caching),
and manages coverage collection — so you don't need to manually configure a server or remember the right flags.

Run all static analysis tools:

```bash
cd ./internal/cmd/build
go run . check
```

Run the integration tests (requires local server running, or pass `-dev-server`):
### Integration Tests

Integration tests live in the `test/` directory and require a Temporal server by default. Use `-dev-server` to start an
embedded server automatically:

```bash
cd ./internal/cmd/build
go run . integration-test -dev-server
```

Run a specific test with `-run` (uses the same syntax as `go test -run`):

```bash
# Run a single test within a suite
cd ./internal/cmd/build
go run . integration-test -dev-server -run "TestIntegrationSuite/TestMyTest"

# Run all tests in a suite
cd ./internal/cmd/build
go run . integration-test
go run . integration-test -dev-server -run "TestWorkerTunerTestSuite"
```

Run the unit tests:
Without `-dev-server`, the tests connect to a server already running on `localhost:7233`.

### Unit Tests

Unit tests cover all packages except `test/`:

```bash
cd ./internal/cmd/build
go run . unit-test
```

Run specific unit tests with `-run`:

```bash
cd ./internal/cmd/build
go run . unit-test -run "TestMyFunction"
```

## Updating go mod files

Sometimes all go.mod files need to be tidied. For an easy way to do this on linux or (probably) mac,
Expand Down
3 changes: 2 additions & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (b *builder) integrationTest() error {
if *devServerFlag {
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
CachedDownload: testsuite.CachedDownload{
Version: "v1.6.1-server-1.31.0-151.0",
Version: "v1.6.2-server-1.31.0-151.6",
},
ClientOptions: &client.Options{
HostPort: "127.0.0.1:7233",
Expand Down Expand Up @@ -161,6 +161,7 @@ func (b *builder) integrationTest() error {
"--dynamic-config-value", `component.nexusoperations.useSystemCallbackURL=false`,
"--dynamic-config-value", `component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"`,
"--dynamic-config-value", "frontend.ListWorkersEnabled=true",
"--dynamic-config-value", "frontend.enableCancelWorkerPollsOnShutdown=true",
},
})
if err != nil {
Expand Down
17 changes: 10 additions & 7 deletions internal/internal_nexus_task_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ func newNexusTaskPoller(
) *nexusTaskPoller {
return &nexusTaskPoller{
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.DeploymentOptions.Version,
capabilities: params.capabilities,
pollTimeTracker: params.pollTimeTracker,
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.DeploymentOptions.Version,
capabilities: params.capabilities,
pollTimeTracker: params.pollTimeTracker,
workerInstanceKey: params.workerInstanceKey,
workerPollCompleteOnShutdown: params.workerPollCompleteOnShutdown,
},
taskHandler: taskHandler,
service: service,
Expand Down Expand Up @@ -80,6 +82,7 @@ func (ntp *nexusTaskPoller) poll(ctx context.Context) (taskForWorker, error) {
ntp.useBuildIDVersioning,
ntp.workerDeploymentVersion,
),
WorkerInstanceKey: ntp.workerInstanceKey,
}

response, err := ntp.pollNexusTaskQueue(ctx, request)
Expand Down
49 changes: 33 additions & 16 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"go.temporal.io/sdk/internal/common/retry"
Expand Down Expand Up @@ -83,6 +84,8 @@ type (
pollTimeTracker *pollTimeTracker
// Unique identifier for worker
workerInstanceKey string
// Server cancels polls on shutdown
workerPollCompleteOnShutdown *atomic.Bool
}

// numPollerMetric tracks the number of active pollers and publishes a metric on it.
Expand Down Expand Up @@ -290,6 +293,18 @@ func (bp *basePoller) doPoll(pollFunc func(ctx context.Context) (taskForWorker,
close(doneC)
}()

if bp.workerPollCompleteOnShutdown != nil && bp.workerPollCompleteOnShutdown.Load() {
// Don't kill the gRPC stream. After ShutdownWorker, the server returns empty responses.
select {
case <-doneC:
return result, err
case <-bp.stopC:
<-doneC
return result, err
}
}

// Legacy: cancel in-flight polls immediately on shutdown
select {
case <-doneC:
return result, err
Expand Down Expand Up @@ -320,14 +335,15 @@ func newWorkflowTaskProcessor(
) *workflowTaskProcessor {
return &workflowTaskProcessor{
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.DeploymentOptions.Version,
capabilities: params.capabilities,
pollTimeTracker: params.pollTimeTracker,
workerInstanceKey: params.workerInstanceKey,
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.DeploymentOptions.Version,
capabilities: params.capabilities,
pollTimeTracker: params.pollTimeTracker,
workerInstanceKey: params.workerInstanceKey,
workerPollCompleteOnShutdown: params.workerPollCompleteOnShutdown,
},
service: service,
namespace: params.Namespace,
Expand Down Expand Up @@ -1126,14 +1142,15 @@ func newGetHistoryPageFunc(
func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowservice.WorkflowServiceClient, params workerExecutionParameters) *activityTaskPoller {
return &activityTaskPoller{
basePoller: basePoller{
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.DeploymentOptions.Version,
capabilities: params.capabilities,
pollTimeTracker: params.pollTimeTracker,
workerInstanceKey: params.workerInstanceKey,
metricsHandler: params.MetricsHandler,
stopC: params.WorkerStopChannel,
workerBuildID: params.getBuildID(),
useBuildIDVersioning: params.UseBuildIDForVersioning,
workerDeploymentVersion: params.DeploymentOptions.Version,
capabilities: params.capabilities,
pollTimeTracker: params.pollTimeTracker,
workerInstanceKey: params.workerInstanceKey,
workerPollCompleteOnShutdown: params.workerPollCompleteOnShutdown,
},
taskHandler: taskHandler,
service: service,
Expand Down
80 changes: 80 additions & 0 deletions internal/internal_task_pollers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,83 @@ func TestWFTPanicInTaskHandler(t *testing.T) {
// Workflow should not be in cache
require.Nil(t, cache.getWorkflowContext(runID))
}

type mockTask struct{}

func (mockTask) scaleDecision() (pollerScaleDecision, bool) { return pollerScaleDecision{}, false }
func (mockTask) isEmpty() bool { return false }

func TestDoPollGracefulShutdown(t *testing.T) {
tests := []struct {
name string
gracefulEnabled bool
wantErrStop bool
}{
{
name: "graceful enabled, waits for poll completion",
gracefulEnabled: true,
},
{
name: "graceful disabled, returns errStop",
gracefulEnabled: false,
wantErrStop: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
stopC := make(chan struct{})
graceful := &atomic.Bool{}
graceful.Store(tc.gracefulEnabled)

bp := basePoller{
stopC: stopC,
workerPollCompleteOnShutdown: graceful,
}

pollStarted := make(chan struct{})
pollRelease := make(chan struct{})
expectedTask := &mockTask{}

pollFunc := func(ctx context.Context) (taskForWorker, error) {
close(pollStarted)
select {
case <-pollRelease:
return expectedTask, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}

type pollResult struct {
task taskForWorker
err error
}
resultC := make(chan pollResult, 1)

go func() {
task, err := bp.doPoll(pollFunc)
resultC <- pollResult{task, err}
}()

<-pollStarted
close(stopC)
if tc.gracefulEnabled {
// Graceful mode: doPoll waits for poll to finish
close(pollRelease)
}
// Legacy mode: doPoll cancels context and returns immediately;
// goroutine exits via ctx.Done()

r := <-resultC

if tc.wantErrStop {
require.ErrorIs(t, r.err, errStop)
require.Nil(t, r.task)
} else {
require.NoError(t, r.err)
require.Equal(t, expectedTask, r.task)
}
})
}
}
Loading
Loading