Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions resource_locking/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Resource Locking Sample

This sample shows how to use a long-lived `LockManagerWorkflow` to allocate `resources` to `ResourceLockingWorkflows`.
Each`ResourceLockingWorkflow` runs several activities while it has ownership of a resource. Note that
`LockManagerWorkflow` is making resource allocation decisions based on in-memory state.

Run the following from this directory to start the worker:

uv run worker.py

This will start the worker. Then, in another terminal, run the following to execute several `ResourceLockingWorkflows`.

uv run starter.py

You should see output indicating that the LockManagerWorkflow serialized access to each resource.

You can query the set of current lock holders with:

tctl wf query -w lock_manager --qt get_current_holders

# Other approaches

There are simpler ways to manage concurrent access to resources. Consider using resource-specific workers/task queues,
and limiting the number of activity slots on the workers. The golang SDK also [sessions](https://docs.temporal.io/develop/go/sessions)
that allow workflows to pin themselves to workers.

The technique in this sample is capable of more complex resource locking than the options above, but it doesn't scale
as well. Specifically, it can:
- Manage access to a set of resources that is decoupled from the set of workers and task queues
- Run arbitrary code to place workloads on resources as they become available

# Caveats

This sample uses true locking (not leasing!) to avoid complexity and scaling concerns associated with heartbeating via
signals. Locking carries a risk where failure to unlock permanently removing a resource from the pool. However, with
Temporal's durable execution guarantees, this can only happen if:

- A LoadWorkflow times out (prohibited in the sample code)
- An operator terminates a LoadWorkflow. (Temporal recommends canceling workflows instead of terminating them whenever possible.)
- You shut down your workers and never restart them (unhandled, but irrelevant)

If a leak were to happen, you could discover the identity of the leaker using the query above, then:

tctl wf signal -w lock_manager --name release_resource --input '{ "resource": "the resource", "workflow_id": "holder workflow id", "run_id": "holder run id" }'

Performance: A single LockManagerWorkflow scales to tens, but not hundreds, of lock/unlock events per second. It is
best suited for locking resources during long-running workflows. Actual performance will depend on your temporal
server's persistence layer.
Empty file added resource_locking/__init__.py
Empty file.
130 changes: 130 additions & 0 deletions resource_locking/lock_manager_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
from dataclasses import dataclass
from datetime import timedelta
from typing import Optional

from temporalio import workflow

from resource_locking.shared import AcquireRequest, AcquireResponse


# Internal to this workflow, we'll associate randomly generated release signal names with each acquire request.
@dataclass
class InternalAcquireRequest(AcquireRequest):
release_signal: Optional[str]


@dataclass
class LockManagerWorkflowInput:
# Key is resource, value is current lock holder for the resource (None if not locked)
resources: dict[str, Optional[InternalAcquireRequest]]
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
waiters: list[InternalAcquireRequest]


@workflow.defn
class LockManagerWorkflow:
@workflow.init
def __init__(self, input: LockManagerWorkflowInput):
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
self.resources = input.resources
self.waiters = input.waiters
self.release_signal_to_resource: dict[str, str] = {}
for resource, holder in self.resources.items():
if holder is not None and holder.release_signal is not None:
self.release_signal_to_resource[holder.release_signal] = resource

@workflow.signal
async def add_resources(self, resources: list[str]):
for resource in resources:
if resource in self.resources:
workflow.logger.warning(
f"Ignoring attempt to add already-existing resource: {resource}"
)
continue

self.resources[resource] = None
if len(self.waiters) > 0:
next_holder = self.waiters.pop(0)
await self.allocate_resource(resource, next_holder)

@workflow.signal
async def acquire_resource(self, request: AcquireRequest):
internal_request = InternalAcquireRequest(
workflow_id=request.workflow_id, release_signal=None
)

for resource, holder in self.resources.items():
# Naively give out the first free resource, if we have one
if holder is None:
await self.allocate_resource(resource, internal_request)
return

# Otherwise queue the request
self.waiters.append(internal_request)
workflow.logger.info(
f"workflow_id={request.workflow_id} is waiting for a resource"
)

async def allocate_resource(
self, resource: str, internal_request: InternalAcquireRequest
):
self.resources[resource] = internal_request
workflow.logger.info(
f"workflow_id={internal_request.workflow_id} acquired resource {resource}"
)
internal_request.release_signal = str(workflow.uuid4())
self.release_signal_to_resource[internal_request.release_signal] = resource

requester = workflow.get_external_workflow_handle(internal_request.workflow_id)
await requester.signal(
"assign_resource",
AcquireResponse(
release_signal_name=internal_request.release_signal, resource=resource
),
)

@workflow.signal(dynamic=True)
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
async def release_resource(self, signal_name, *args):
if not signal_name in self.release_signal_to_resource:
workflow.logger.warning(
f"Ignoring unknown signal: {signal_name} was not a valid release signal."
)
return

resource = self.release_signal_to_resource[signal_name]

holder = self.resources[resource]
if holder is None:
workflow.logger.warning(
f"Ignoring request to release resource that is not locked: {resource}"
)
return

# Remove the current holder
workflow.logger.info(
f"workflow_id={holder.workflow_id} released resource {resource}"
)
self.resources[resource] = None
del self.release_signal_to_resource[signal_name]

# If there are queued requests, assign the resource to the next one
if len(self.waiters) > 0:
next_holder = self.waiters.pop(0)
await self.allocate_resource(resource, next_holder)

@workflow.query
def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]:
return {k: v if v else None for k, v in self.resources.items()}
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

@workflow.run
async def run(self, _: LockManagerWorkflowInput) -> None:
# Continue as new either when temporal tells us to, or every 12 hours (so it occurs semi-frequently)
await workflow.wait_condition(
lambda: workflow.info().is_continue_as_new_suggested(),
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
timeout=timedelta(hours=12),
)

workflow.continue_as_new(
LockManagerWorkflowInput(
resources=self.resources,
waiters=self.waiters,
)
)
106 changes: 106 additions & 0 deletions resource_locking/resource_allocator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import Optional

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.common import WorkflowIDConflictPolicy

from resource_locking.lock_manager_workflow import (
LockManagerWorkflow,
LockManagerWorkflowInput,
)
from resource_locking.shared import (
LOCK_MANAGER_WORKFLOW_ID,
AcquiredResource,
AcquireRequest,
AcquireResponse,
)


# Use this class in workflow code that that needs to run on locked resources.
class ResourceAllocator:
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
def __init__(self, client: Client):
self.client = client
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

@activity.defn
async def send_acquire_signal(self) -> None:
info = activity.info()

# This will start and signal the workflow if it isn't running, otherwise it will signal the current run.
await self.client.start_workflow(
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
workflow=LockManagerWorkflow.run,
arg=LockManagerWorkflowInput(
resources={},
waiters=[],
),
id=LOCK_MANAGER_WORKFLOW_ID,
task_queue="default",
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
start_signal="acquire_resource",
start_signal_args=[AcquireRequest(info.workflow_id)],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would usually be a good opportunity to use our new update-with-start functionality, but if you must span continue-as-new on the lock manager workflow, you have to stay with signals for now

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it's important to keep CAN in. Without CAN, the sample can't demonstrate how to detach/reattach so it seems incomplete to me.

)

@classmethod
@asynccontextmanager
async def acquire_resource(
cls,
*,
already_acquired_resource: Optional[AcquiredResource] = None,
Copy link
Copy Markdown
Contributor Author

@nagl-temporal nagl-temporal Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this, but... (thought finished after ⭐)

max_wait_time: timedelta = timedelta(minutes=5),
):
warn_when_workflow_has_timeouts()

resource = already_acquired_resource
if resource is None:

async def assign_resource(input: AcquireResponse):
workflow.set_signal_handler("assign_resource", None)
nonlocal resource
resource = AcquiredResource(
resource=input.resource,
release_signal_name=input.release_signal_name,
)

workflow.set_signal_handler("assign_resource", assign_resource)
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

await workflow.start_activity(
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
ResourceAllocator.send_acquire_signal, # type: ignore[arg-type]
start_to_close_timeout=timedelta(seconds=10),
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
)

await workflow.wait_condition(
lambda: resource is not None, timeout=max_wait_time
)

# Can't happen, but the typechecker doesn't know about workflow.wait_condition
if resource is None:
raise RuntimeError("resource was None when it can't be")

# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
# finally block will release the resource if an activity fails. This is why we asserted the lack of
# workflow-level timeouts above - the finally block wouldn't run if there was a timeout.
try:
resource.autorelease = True
yield resource
finally:
if resource.autorelease:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blah. I want there to be a way to tell whether the workflow code is CAN'ing here, but I believe there isn't one.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should always release it if a user is using async with. It will be very confusing if I as a user use async with and the resource remains when that exits. If we need to let callers keep the resource across continue as new, they should "detach" it (e.g. can offer a helper for this that returns some type) and "reattach" it on next workflow use. Granted I also think including continue as new from the caller side in this sample is a bit confusing, but maybe it's needed.

handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)
await handle.signal(resource.release_signal_name)


def warn_when_workflow_has_timeouts():
if has_timeout(workflow.info().run_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks"
)
if has_timeout(workflow.info().execution_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
)


def has_timeout(timeout: Optional[timedelta]) -> bool:
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
# continue_as_new call).
return timeout is not None and timeout > timedelta(0)
87 changes: 87 additions & 0 deletions resource_locking/resource_locking_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import asyncio
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Callable, Optional

from temporalio import activity, workflow

from resource_locking.resource_allocator import ResourceAllocator
from resource_locking.shared import (
LOCK_MANAGER_WORKFLOW_ID,
AcquiredResource,
AcquireRequest,
AcquireResponse,
)


@dataclass
class UseResourceActivityInput:
resource: str
iteration: str


@activity.defn
async def use_resource(input: UseResourceActivityInput) -> None:
info = activity.info()
activity.logger.info(
f"{info.workflow_id} starts using {input.resource} the {input.iteration} time"
)
await asyncio.sleep(3)
activity.logger.info(
f"{info.workflow_id} done using {input.resource} the {input.iteration} time"
)


@dataclass
class ResourceLockingWorkflowInput:
# If set, this workflow will fail after the "first", "second", or "third" activity.
iteration_to_fail_after: Optional[str]

# If True, this workflow will continue as new after the third activity. The next iteration will run three more
# activities, but will not continue as new. This lets us exercise the handoff logic.
should_continue_as_new: bool

# Used to transfer resource ownership between iterations during continue_as_new
already_acquired_resource: Optional[AcquiredResource] = field(default=None)


class FailWorkflowException(Exception):
pass


# Wait this long for a resource before giving up
MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5)


@workflow.defn(failure_exception_types=[FailWorkflowException])
class ResourceLockingWorkflow:
@workflow.run
async def run(self, input: ResourceLockingWorkflowInput):
async with ResourceAllocator.acquire_resource(
already_acquired_resource=input.already_acquired_resource
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⭐ ...I had to have some way of handling the case where my CAN-predecessor already locked the resource. It was either an optional param to acquire_resource or branching around the async with, which felt much more awkward.

Copy link
Copy Markdown
Contributor

@cretz cretz Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this sample needs to complication of including continue as new support for this workflow using the resource manager. This workflow isn't large enough to ever need to continue as new, so including support for it is a bit confusing (people aren't going to understand why it is here, but think they need to do the same).

However, if you do keep this, I would call this reattach= and have it accept an typed object that came from detach in the pre-CAN workflow.

) as resource:
for iteration in ["first", "second"]:
await workflow.execute_activity(
use_resource,
UseResourceActivityInput(resource.resource, iteration),
start_to_close_timeout=timedelta(seconds=10),
)

if iteration == input.iteration_to_fail_after:
workflow.logger.info(
f"Failing after iteration {input.iteration_to_fail_after}"
)
raise FailWorkflowException()

if input.should_continue_as_new:
next_input = ResourceLockingWorkflowInput(
iteration_to_fail_after=input.iteration_to_fail_after,
should_continue_as_new=False,
already_acquired_resource=resource,
)

# By default, ResourceAllocator will release the resource when we return. We want to hold the resource
# across continue-as-new for the sake of demonstration.b
resource.autorelease = False

workflow.continue_as_new(next_input)
19 changes: 19 additions & 0 deletions resource_locking/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dataclasses import dataclass, field

LOCK_MANAGER_WORKFLOW_ID = "lock_manager"
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated


@dataclass
class AcquireRequest:
workflow_id: str


@dataclass
class AcquireResponse:
release_signal_name: str
resource: str


@dataclass
class AcquiredResource(AcquireResponse):
autorelease: bool = field(default=True)
Loading
Loading