Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions resource_locking/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Resource Locking Sample

This sample shows how to use a long-lived `LockManagerWorkflow` to ensure that each `resource` is used by at most one
`ResourceLockingWorkflow` at a time. `ResourceLockingWorkflow` runs several activities while it has ownership of a
resource.

Run the following from this directory to start the worker:

poetry run python worker.py

This will start the worker. Then, in another terminal, run the following to execute several load workflows:

poetry run python starter.py

You should see output indicating that the LockManagerWorkflow serialized access to each resource.

You can query the set of current lock holders with:

tctl wf query -w lock_manager --qt get_current_holders

# Other approaches

There are simpler ways to manage concurrent access to resources. Consider using resource-specific workers/task queues,
and limiting the number of activity slots on the workers. The golang SDK also [sessions](https://docs.temporal.io/develop/go/sessions)
that allow workflows to pin themselves to workers.

The technique in this sample is capable of more complex resource locking than the options above, but it doesn't scale
as well. Specifically, it can:
- Manage access to a set of resources that is decoupled from the set of workers and task queues
- Run arbitrary code to place workloads on resources as they become available

# Caveats

This sample uses true locking (not leasing!) to avoid complexity and scaling concerns associated with heartbeating via
signals. Locking carries a risk where failure to unlock permanently removing a resource from the pool. However, with
Temporal's durable execution guarantees, this can only happen if:

- A LoadWorkflow times out (prohibited in the sample code)
- You shut down your workers and never restart them (unhandled, but irrelevant)

If a leak were to happen, you could discover the identity of the leaker using the query above, then:

tctl wf signal -w lock_manager --name release_resource --input '{ "resource": "the resource", "workflow_id": "holder workflow id", "run_id": "holder run id" }'

Performance: A single LockManagerWorkflow scales to tens, but not hundreds, of lock/unlock events per second. It is
best suited for locking resources during long-running workflows. Actual performance will depend on your temporal
server's persistence layer.
Empty file added resource_locking/__init__.py
Empty file.
116 changes: 116 additions & 0 deletions resource_locking/lock_manager_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from dataclasses import dataclass
from datetime import timedelta
from typing import Optional

from temporalio import workflow

from resource_locking.shared import (
AcquireRequest,
AcquireResponse,
)

# Internal to this workflow, we'll associate randomly generated release signal names with each acquire request.
@dataclass
class InternalAcquireRequest(AcquireRequest):
release_signal: Optional[str]

@dataclass
class LockManagerWorkflowInput:
# Key is resource, value is current lock holder for the resource (None if not locked)
resources: dict[str, Optional[InternalAcquireRequest]]
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
waiters: list[InternalAcquireRequest]

@workflow.defn
class LockManagerWorkflow:
@workflow.init
def __init__(self, input: LockManagerWorkflowInput):
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
self.resources = input.resources
self.waiters = input.waiters
self.release_signal_to_resource: dict[str, str] = {}
for resource, holder in self.resources.items():
if holder is not None:
self.release_signal_to_resource[holder.release_signal] = resource

@workflow.signal
async def add_resources(self, resources: list[str]):
for resource in resources:
if resource in self.resources:
workflow.logger.warning(
f"Ignoring attempt to add already-existing resource: {resource}"
)
continue

self.resources[resource] = None
if len(self.waiters) > 0:
next_holder = self.waiters.pop(0)
await self.allocate_resource(resource, next_holder)

@workflow.signal
async def acquire_resource(self, request: AcquireRequest):
internal_request = InternalAcquireRequest(workflow_id=request.workflow_id, release_signal=None)

for resource, holder in self.resources.items():
# Naively give out the first free resource, if we have one
if holder is None:
await self.allocate_resource(resource, internal_request)
return

# Otherwise queue the request
self.waiters.append(internal_request)
workflow.logger.info(
f"workflow_id={request.workflow_id} is waiting for a resource"
)

async def allocate_resource(self, resource: str, internal_request: InternalAcquireRequest):
self.resources[resource] = internal_request
workflow.logger.info(
f"workflow_id={internal_request.workflow_id} acquired resource {resource}"
)
internal_request.release_signal = str(workflow.uuid4())
self.release_signal_to_resource[internal_request.release_signal] = resource

requester = workflow.get_external_workflow_handle(internal_request.workflow_id)
await requester.signal("assign_resource", AcquireResponse(release_signal_name=internal_request.release_signal, resource=resource))

@workflow.signal(dynamic=True)
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
async def release_resource(self, signal_name, *args):
if not signal_name in self.release_signal_to_resource:
workflow.logger.warning(f"Ignoring unknown signal: {signal_name} was not a valid release signal.")
return

resource = self.release_signal_to_resource[signal_name]

holder = self.resources[resource]
if holder is None:
workflow.logger.warning(
f"Ignoring request to release resource that is not locked: {resource}"
)

# Remove the current holder
workflow.logger.info(
f"workflow_id={holder.workflow_id} released resource {resource}"
)
self.resources[resource] = None
del self.release_signal_to_resource[signal_name]

# If there are queued requests, assign the resource to the next one
if len(self.waiters) > 0:
next_holder = self.waiters.pop(0)
await self.allocate_resource(resource, next_holder)

@workflow.query
def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]:
return {k: v if v else None for k, v in self.resources.items()}
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

@workflow.run
async def run(self, _: LockManagerWorkflowInput) -> None:
# Continue as new either when temporal tells us to, or every 12 hours (so it occurs semi-frequently)
await workflow.wait_condition(
lambda: workflow.info().is_continue_as_new_suggested(),
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
timeout=timedelta(hours=12),
)

workflow.continue_as_new(LockManagerWorkflowInput(
resources=self.resources,
waiters=self.waiters,
))
83 changes: 83 additions & 0 deletions resource_locking/resource_allocator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import Optional, AsyncGenerator

from temporalio.client import Client
from temporalio import workflow, activity
from temporalio.common import WorkflowIDConflictPolicy

from resource_locking.lock_manager_workflow import LockManagerWorkflowInput, LockManagerWorkflow
from resource_locking.shared import AcquireResponse, LOCK_MANAGER_WORKFLOW_ID, AcquireRequest, AcquiredResource

# Use this class in workflow code that that needs to run on locked resources.
class ResourceAllocator:
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
def __init__(self, client: Client):
self.client = client
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

@activity.defn
async def send_acquire_signal(self):
info = activity.info()

# This will start and signal the workflow if it isn't running, otherwise it will signal the current run.
await self.client.start_workflow(
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
workflow=LockManagerWorkflow.run,
arg=LockManagerWorkflowInput(
resources={},
waiters=[],
),
id=LOCK_MANAGER_WORKFLOW_ID,
task_queue="default",
id_conflict_policy=WorkflowIDConflictPolicy.USE_EXISTING,
start_signal="acquire_resource",
start_signal_args=[AcquireRequest(info.workflow_id)]
)

@classmethod
@asynccontextmanager
async def acquire_resource(cls, *, already_acquired_resource: Optional[AcquiredResource] = None, max_wait_time: timedelta = timedelta(minutes=5)):
warn_when_workflow_has_timeouts()

resource = already_acquired_resource
if resource is None:
async def assign_resource(input: AcquireResponse):
workflow.set_signal_handler("assign_resource", None)
nonlocal resource
resource = AcquiredResource(
resource=input.resource,
release_signal_name=input.release_signal_name,
)

workflow.set_signal_handler("assign_resource", assign_resource)
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

await workflow.execute_activity(
ResourceAllocator.send_acquire_signal,
start_to_close_timeout=timedelta(seconds=10),
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
)

await workflow.wait_condition(lambda: resource is not None, timeout=max_wait_time)

# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
# finally block will release the resource if an activity fails. This is why we asserted the lack of
# workflow-level timeouts above - the finally block wouldn't run if there was a timeout.
try:
resource.autorelease = True
yield resource
finally:
if resource.autorelease:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blah. I want there to be a way to tell whether the workflow code is CAN'ing here, but I believe there isn't one.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should always release it if a user is using async with. It will be very confusing if I as a user use async with and the resource remains when that exits. If we need to let callers keep the resource across continue as new, they should "detach" it (e.g. can offer a helper for this that returns some type) and "reattach" it on next workflow use. Granted I also think including continue as new from the caller side in this sample is a bit confusing, but maybe it's needed.

handle = workflow.get_external_workflow_handle(LOCK_MANAGER_WORKFLOW_ID)
await handle.signal(resource.release_signal_name)

def warn_when_workflow_has_timeouts():
if has_timeout(workflow.info().run_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks"
)
if has_timeout(workflow.info().execution_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
)

def has_timeout(timeout: Optional[timedelta]) -> bool:
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
# continue_as_new call).
return timeout is not None and timeout > timedelta(0)
82 changes: 82 additions & 0 deletions resource_locking/resource_locking_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import asyncio
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Optional, Callable

from temporalio import activity, workflow

from resource_locking.resource_allocator import ResourceAllocator
from resource_locking.shared import (
LOCK_MANAGER_WORKFLOW_ID,
AcquireRequest,
AcquireResponse, AcquiredResource,
)

@dataclass
class UseResourceActivityInput:
resource: str
iteration: str

@activity.defn
async def use_resource(input: UseResourceActivityInput) -> None:
info = activity.info()
activity.logger.info(
f"{info.workflow_id} starts using {input.resource} the {input.iteration} time"
)
await asyncio.sleep(3)
activity.logger.info(
f"{info.workflow_id} done using {input.resource} the {input.iteration} time"
)


@dataclass
class ResourceLockingWorkflowInput:
# If set, this workflow will fail after the "first", "second", or "third" activity.
iteration_to_fail_after: Optional[str]

# If True, this workflow will continue as new after the third activity. The next iteration will run three more
# activities, but will not continue as new. This lets us exercise the handoff logic.
should_continue_as_new: bool

# Used to transfer resource ownership between iterations during continue_as_new
already_acquired_resource: Optional[AcquiredResource] = field(default=None)


class FailWorkflowException(Exception):
pass


# Wait this long for a resource before giving up
MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5)


@workflow.defn(failure_exception_types=[FailWorkflowException])
class ResourceLockingWorkflow:
@workflow.run
async def run(self, input: ResourceLockingWorkflowInput):
async with ResourceAllocator.acquire_resource(already_acquired_resource=input.already_acquired_resource) as resource:
for iteration in ["first", "second", "third"]:
await workflow.execute_activity(
use_resource,
UseResourceActivityInput(resource.resource, iteration),
start_to_close_timeout=timedelta(seconds=10),
)

if iteration == input.iteration_to_fail_after:
workflow.logger.info(
f"Failing after iteration {input.iteration_to_fail_after}"
)
raise FailWorkflowException()

if input.should_continue_as_new:
next_input = ResourceLockingWorkflowInput(
iteration_to_fail_after=input.iteration_to_fail_after,
should_continue_as_new=False,
already_acquired_resource=resource,
)

# By default, ResourceAllocator will release the resource when we return. We want to hold the resource
# across continue-as-new for the sake of demonstration.
resource.autorelease = False

workflow.continue_as_new(next_input)
17 changes: 17 additions & 0 deletions resource_locking/shared.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dataclasses import dataclass, field
from typing import Optional

LOCK_MANAGER_WORKFLOW_ID = "lock_manager"
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

@dataclass
class AcquireRequest:
workflow_id: str

@dataclass
class AcquireResponse:
release_signal_name: str
resource: str

@dataclass
class AcquiredResource(AcquireResponse):
autorelease: bool = field(default=True)
Loading