-
Notifications
You must be signed in to change notification settings - Fork 105
Add a sample that uses a workflow to lock resources #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
6fbfdd9
00c1d4d
cd31a61
7fa82b1
583be40
be1d760
ca184ed
0e0bcd2
90aaf99
72a24b0
eb92f48
69751bb
2ab9dfa
bb9cb64
d9a5f50
d75d2d3
c38f803
f31cd1b
d57b65a
bbe3383
104b43f
7ac42cc
883f159
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| # Resource Pool Sample | ||
|
|
||
| This sample shows how to use a long-lived `ResourcePoolWorkflow` to allocate `resources` to `ResourceUserWorkflows`. | ||
| Each `ResourceUserWorkflow` runs several activities while it has ownership of a resource. Note that | ||
| `ResourcePoolWorkflow` is making resource allocation decisions based on in-memory state. | ||
|
|
||
| Run the following from this directory to start the worker: | ||
|
|
||
| uv run worker.py | ||
|
|
||
| This will start the worker. Then, in another terminal, run the following to execute several `ResourceUserWorkflows`. | ||
|
|
||
| uv run starter.py | ||
|
|
||
| You should see output indicating that the `ResourcePoolWorkflow` serialized access to each resource. | ||
|
|
||
| You can query the set of current resource resource holders with: | ||
|
|
||
| tctl wf query -w resource_pool --qt get_current_holders | ||
|
|
||
| # Other approaches | ||
|
|
||
| There are simpler ways to manage concurrent access to resources. Consider using resource-specific workers/task queues, | ||
| and limiting the number of activity slots on the workers. The golang SDK also [sessions](https://docs.temporal.io/develop/go/sessions) | ||
| that allow workflows to pin themselves to workers. | ||
|
|
||
| The technique in this sample is capable of more complex resource allocation than the options above, but it doesn't scale | ||
| as well. Specifically, it can: | ||
| - Manage access to a set of resources that is decoupled from the set of workers and task queues | ||
| - Run arbitrary code to place workloads on resources as they become available | ||
|
|
||
| # Caveats | ||
|
|
||
| This sample uses true locking (not leasing!) to avoid complexity and scaling concerns associated with heartbeating via | ||
| signals. Locking carries a risk where failure to unlock permanently removing a resource from the pool. However, with | ||
| Temporal's durable execution guarantees, this can only happen if: | ||
|
|
||
| - A ResourceUserWorkflows times out (prohibited in the sample code) | ||
| - An operator terminates a ResourceUserWorkflows. (Temporal recommends canceling workflows instead of terminating them whenever possible.) | ||
| - You shut down your workers and never restart them (unhandled, but irrelevant) | ||
|
|
||
| If a leak were to happen, you could discover the identity of the leaker using the query above, then: | ||
|
|
||
| tctl wf signal -w resource_pool --name release_resource --input '{ "release_key": "<the key from the query above>" } | ||
|
|
||
| Performance: A single ResourcePoolWorkflow scales to tens, but not hundreds, of request/release events per second. It is | ||
| best suited for allocating resources to long-running workflows. Actual performance will depend on your temporal server's | ||
| persistence layer. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| from .resource_pool_client import ResourcePoolClient |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| from contextlib import asynccontextmanager | ||
| from datetime import timedelta | ||
| from typing import AsyncGenerator, Optional | ||
|
|
||
| from temporalio import workflow | ||
|
|
||
| from resource_pool.resource_pool_workflow import ResourcePoolWorkflow | ||
| from resource_pool.shared import ( | ||
| AcquiredResource, | ||
| AcquireRequest, | ||
| AcquireResponse, | ||
| DetachedResource, | ||
| ) | ||
|
|
||
|
|
||
| # Use this class in workflow code that that needs to run on locked resources. | ||
| class ResourcePoolClient: | ||
| def __init__(self, pool_workflow_id: str) -> None: | ||
| self.pool_workflow_id = pool_workflow_id | ||
| self.acquired_resources: list[AcquiredResource] = [] | ||
| self.register_signal_handler() | ||
|
|
||
| def register_signal_handler(self) -> None: | ||
| signal_name = f"assign_resource_{self.pool_workflow_id}" | ||
| if workflow.get_signal_handler(signal_name) is None: | ||
| workflow.set_signal_handler(signal_name, self.assign_resource) | ||
| else: | ||
| raise RuntimeError( | ||
| f"{signal_name} already registered - if you use multiple ResourcePoolClients within the " | ||
| f"same workflow, they must use different pool_workflow_ids" | ||
| ) | ||
|
|
||
| async def send_acquire_signal(self) -> None: | ||
|
nagl-temporal marked this conversation as resolved.
Outdated
|
||
| handle = workflow.get_external_workflow_handle_for( | ||
| ResourcePoolWorkflow.run, self.pool_workflow_id | ||
| ) | ||
| await handle.signal( | ||
| "acquire_resource", AcquireRequest(workflow.info().workflow_id) | ||
| ) | ||
|
|
||
| async def send_release_signal(self, acquired_resource: AcquiredResource) -> None: | ||
| handle = workflow.get_external_workflow_handle_for( | ||
| ResourcePoolWorkflow.run, self.pool_workflow_id | ||
| ) | ||
| await handle.signal( | ||
| "release_resource", | ||
| AcquireResponse( | ||
| resource=acquired_resource.resource, | ||
| release_key=acquired_resource.release_key, | ||
| ), | ||
| ) | ||
|
|
||
| def assign_resource(self, response: AcquireResponse) -> None: | ||
| self.acquired_resources.append( | ||
| AcquiredResource( | ||
| resource=response.resource, release_key=response.release_key | ||
| ) | ||
| ) | ||
|
|
||
| @asynccontextmanager | ||
| async def acquire_resource( | ||
| self, | ||
| *, | ||
| reattach: Optional[DetachedResource] = None, | ||
| max_wait_time: timedelta = timedelta(minutes=5), | ||
| ) -> AsyncGenerator[AcquiredResource, None]: | ||
| _warn_when_workflow_has_timeouts() | ||
|
|
||
| if reattach is None: | ||
| await self.send_acquire_signal() | ||
| await workflow.wait_condition( | ||
| lambda: len(self.acquired_resources) > 0, timeout=max_wait_time | ||
| ) | ||
| resource = self.acquired_resources.pop(0) | ||
| else: | ||
| resource = AcquiredResource( | ||
| resource=reattach.resource, release_key=reattach.release_key | ||
| ) | ||
|
|
||
| # Can't happen, but the typechecker doesn't know about workflow.wait_condition | ||
| if resource is None: | ||
| raise RuntimeError("resource was None when it can't be") | ||
|
|
||
| # During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our | ||
| # finally block will release the resource if an activity fails. This is why we asserted the lack of | ||
| # workflow-level timeouts above - the finally block wouldn't run if there was a timeout. | ||
| try: | ||
| yield resource | ||
| finally: | ||
| if not resource.detached: | ||
| await self.send_release_signal(resource) | ||
|
|
||
|
|
||
| def _warn_when_workflow_has_timeouts() -> None: | ||
| def has_timeout(timeout: Optional[timedelta]) -> bool: | ||
| # After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the | ||
| # continue_as_new call). | ||
| return timeout is not None and timeout > timedelta(0) | ||
|
|
||
| if has_timeout(workflow.info().run_timeout): | ||
| workflow.logger.warning( | ||
| f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks" | ||
| ) | ||
| if has_timeout(workflow.info().execution_timeout): | ||
| workflow.logger.warning( | ||
| f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks" | ||
| ) | ||
|
nagl-temporal marked this conversation as resolved.
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| from dataclasses import dataclass | ||
| from typing import Optional | ||
|
|
||
| from temporalio import workflow | ||
| from temporalio.exceptions import ApplicationError | ||
|
|
||
| from resource_pool.shared import AcquireRequest, AcquireResponse | ||
|
|
||
|
|
||
| # Internal to this workflow, we'll associate randomly generated release signal names with each acquire request. | ||
| @dataclass | ||
| class InternalAcquireRequest(AcquireRequest): | ||
| release_signal: Optional[str] | ||
|
|
||
|
|
||
| @dataclass | ||
| class ResourcePoolWorkflowInput: | ||
| # Key is resource, value is current holder of the resource (None if not held) | ||
| resources: dict[str, Optional[InternalAcquireRequest]] | ||
| waiters: list[InternalAcquireRequest] | ||
|
|
||
|
|
||
| @workflow.defn | ||
| class ResourcePoolWorkflow: | ||
| @workflow.init | ||
| def __init__(self, input: ResourcePoolWorkflowInput) -> None: | ||
| self.resources = input.resources | ||
| self.waiters = input.waiters | ||
| self.release_key_to_resource: dict[str, str] = {} | ||
|
|
||
| for resource, holder in self.resources.items(): | ||
| if holder is not None and holder.release_signal is not None: | ||
| self.release_key_to_resource[holder.release_signal] = resource | ||
|
|
||
| @workflow.signal | ||
| async def add_resources(self, resources: list[str]) -> None: | ||
| for resource in resources: | ||
| if resource in self.resources: | ||
| workflow.logger.warning( | ||
| f"Ignoring attempt to add already-existing resource: {resource}" | ||
| ) | ||
| else: | ||
| self.resources[resource] = None | ||
|
|
||
| @workflow.signal | ||
| async def acquire_resource(self, request: AcquireRequest) -> None: | ||
| self.waiters.append( | ||
| InternalAcquireRequest(workflow_id=request.workflow_id, release_signal=None) | ||
| ) | ||
| workflow.logger.info( | ||
| f"workflow_id={request.workflow_id} is waiting for a resource" | ||
| ) | ||
|
|
||
| @workflow.signal | ||
| async def release_resource(self, acquire_response: AcquireResponse) -> None: | ||
| release_key = acquire_response.release_key | ||
| resource = self.release_key_to_resource.get(release_key) | ||
| if resource is None: | ||
| workflow.logger.warning(f"Ignoring unknown release_key: {release_key}") | ||
| return | ||
|
|
||
| holder = self.resources[resource] | ||
| if holder is None: | ||
| workflow.logger.warning( | ||
| f"Ignoring request to release resource that is not held: {resource}" | ||
| ) | ||
| return | ||
|
|
||
| # Remove the current holder | ||
| workflow.logger.info( | ||
| f"workflow_id={holder.workflow_id} released resource {resource}" | ||
| ) | ||
| self.resources[resource] = None | ||
| del self.release_key_to_resource[release_key] | ||
|
|
||
| @workflow.query | ||
| def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]: | ||
| return self.resources | ||
|
|
||
| async def assign_resource( | ||
| self, resource: str, internal_request: InternalAcquireRequest | ||
| ) -> None: | ||
| workflow.logger.info( | ||
| f"workflow_id={internal_request.workflow_id} acquired resource {resource}" | ||
| ) | ||
|
|
||
| requester = workflow.get_external_workflow_handle(internal_request.workflow_id) | ||
| try: | ||
| release_signal = str(workflow.uuid4()) | ||
| await requester.signal( | ||
| f"assign_resource_{workflow.info().workflow_id}", | ||
| AcquireResponse(release_key=release_signal, resource=resource), | ||
| ) | ||
|
|
||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no race here, right? Imagine the release signal arrives immediately. The fix is obvious if there is a race, but the code smells nicer this way. Why I think there's no race: I believe that this handler must run to completion before the handler for the release signal can start.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct, there is no race here because this is only called in one place, serially, in the primary loop. However, this can show the dangers of overly extracting/modularizing single-use methods - it can be hard to see the constraints they expect of the callers. Can technically add a "Not safe for concurrent use" docstring if concerned. |
||
| internal_request.release_signal = release_signal | ||
| self.resources[resource] = internal_request | ||
| self.release_key_to_resource[release_signal] = resource | ||
| except ApplicationError as e: | ||
| if e.type == "ExternalWorkflowExecutionNotFound": | ||
| workflow.logger.info( | ||
| f"Could not assign resource {resource} to {internal_request.workflow_id}: {e.message}" | ||
| ) | ||
| else: | ||
| raise e | ||
|
|
||
| async def assign_next_resource(self) -> bool: | ||
| if len(self.waiters) == 0: | ||
| return False | ||
|
|
||
| next_free_resource = self.get_free_resource() | ||
| if next_free_resource is None: | ||
| return False | ||
|
|
||
| next_waiter = self.waiters.pop(0) | ||
| await self.assign_resource(next_free_resource, next_waiter) | ||
| return True | ||
|
|
||
| def get_free_resource(self) -> Optional[str]: | ||
| return next( | ||
| (resource for resource, holder in self.resources.items() if holder is None), | ||
| None, | ||
| ) | ||
|
|
||
| def can_assign_resource(self) -> bool: | ||
| return len(self.waiters) > 0 and self.get_free_resource() is not None | ||
|
|
||
| def should_continue_as_new(self) -> bool: | ||
| return ( | ||
| workflow.info().is_continue_as_new_suggested() | ||
| and workflow.all_handlers_finished() | ||
| ) | ||
|
|
||
| @workflow.run | ||
| async def run(self, _: ResourcePoolWorkflowInput) -> None: | ||
| while True: | ||
| await workflow.wait_condition( | ||
| lambda: self.can_assign_resource() or self.should_continue_as_new() | ||
| ) | ||
|
|
||
| if await self.assign_next_resource(): | ||
| continue | ||
|
|
||
| if self.should_continue_as_new(): | ||
| workflow.continue_as_new( | ||
| ResourcePoolWorkflowInput( | ||
| resources=self.resources, | ||
| waiters=self.waiters, | ||
| ) | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| import asyncio | ||
| from dataclasses import dataclass, field | ||
| from datetime import timedelta | ||
| from typing import Optional | ||
|
|
||
| from temporalio import activity, workflow | ||
|
|
||
| from resource_pool.pool_client import ResourcePoolClient | ||
| from resource_pool.shared import DetachedResource | ||
|
|
||
|
|
||
| @dataclass | ||
| class UseResourceActivityInput: | ||
| resource: str | ||
| iteration: str | ||
|
|
||
|
|
||
| @activity.defn | ||
| async def use_resource(input: UseResourceActivityInput) -> None: | ||
| info = activity.info() | ||
| activity.logger.info( | ||
| f"{info.workflow_id} starts using {input.resource} the {input.iteration} time" | ||
| ) | ||
| await asyncio.sleep(3) | ||
| activity.logger.info( | ||
| f"{info.workflow_id} done using {input.resource} the {input.iteration} time" | ||
| ) | ||
|
|
||
|
|
||
| @dataclass | ||
| class ResourceUserWorkflowInput: | ||
| # The id of the resource pool workflow to request a resource from | ||
| resource_pool_workflow_id: str | ||
|
|
||
| # If set, this workflow will fail after the "first" or "second" activity. | ||
| iteration_to_fail_after: Optional[str] | ||
|
|
||
| # If True, this workflow will continue as new after the last activity. The next iteration will run more activities, | ||
| # but will not continue as new. | ||
| should_continue_as_new: bool | ||
|
|
||
| # Used to transfer resource ownership between iterations during continue_as_new | ||
| already_acquired_resource: Optional[DetachedResource] = field(default=None) | ||
|
|
||
|
|
||
| class FailWorkflowException(Exception): | ||
| pass | ||
|
|
||
|
|
||
| # Wait this long for a resource before giving up | ||
| MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5) | ||
|
|
||
|
|
||
| @workflow.defn(failure_exception_types=[FailWorkflowException]) | ||
| class ResourceUserWorkflow: | ||
| @workflow.run | ||
| async def run(self, input: ResourceUserWorkflowInput) -> None: | ||
| pool_client = ResourcePoolClient(input.resource_pool_workflow_id) | ||
|
|
||
| async with pool_client.acquire_resource( | ||
| reattach=input.already_acquired_resource | ||
| ) as acquired_resource: | ||
| for iteration in ["first", "second"]: | ||
| await workflow.execute_activity( | ||
| use_resource, | ||
| UseResourceActivityInput(acquired_resource.resource, iteration), | ||
| start_to_close_timeout=timedelta(seconds=10), | ||
| ) | ||
|
|
||
| if iteration == input.iteration_to_fail_after: | ||
| workflow.logger.info( | ||
| f"Failing after iteration {input.iteration_to_fail_after}" | ||
| ) | ||
| raise FailWorkflowException() | ||
|
|
||
| # This workflow only continues as new so it can demonstrate how to pass acquired resources across | ||
| # iterations. Ordinarily, such a short workflow would not use continue as new. | ||
| if input.should_continue_as_new: | ||
| detached_resource = acquired_resource.detach() | ||
|
|
||
| next_input = ResourceUserWorkflowInput( | ||
| resource_pool_workflow_id=input.resource_pool_workflow_id, | ||
| iteration_to_fail_after=input.iteration_to_fail_after, | ||
| should_continue_as_new=False, | ||
| already_acquired_resource=detached_resource, | ||
| ) | ||
|
|
||
| workflow.continue_as_new(next_input) |
Uh oh!
There was an error while loading. Please reload this page.