Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions resource_pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Resource Pool Sample

This sample shows how to use a long-lived `ResourcePoolWorkflow` to allocate `resources` to `ResourceUserWorkflows`.
Each `ResourceUserWorkflow` runs several activities while it has ownership of a resource. Note that
`ResourcePoolWorkflow` is making resource allocation decisions based on in-memory state.

Run the following from this directory to start the worker:

uv run worker.py

This will start the worker. Then, in another terminal, run the following to execute several `ResourceUserWorkflows`.

uv run starter.py

You should see output indicating that the `ResourcePoolWorkflow` serialized access to each resource.

You can query the set of current resource resource holders with:

tctl wf query -w resource_pool --qt get_current_holders

# Other approaches

There are simpler ways to manage concurrent access to resources. Consider using resource-specific workers/task queues,
and limiting the number of activity slots on the workers. The golang SDK also [sessions](https://docs.temporal.io/develop/go/sessions)
that allow workflows to pin themselves to workers.

The technique in this sample is capable of more complex resource allocation than the options above, but it doesn't scale
as well. Specifically, it can:
- Manage access to a set of resources that is decoupled from the set of workers and task queues
- Run arbitrary code to place workloads on resources as they become available

# Caveats

This sample uses true locking (not leasing!) to avoid complexity and scaling concerns associated with heartbeating via
signals. Locking carries a risk where failure to unlock permanently removing a resource from the pool. However, with
Temporal's durable execution guarantees, this can only happen if:

- A ResourceUserWorkflows times out (prohibited in the sample code)
- An operator terminates a ResourceUserWorkflows. (Temporal recommends canceling workflows instead of terminating them whenever possible.)
- You shut down your workers and never restart them (unhandled, but irrelevant)

If a leak were to happen, you could discover the identity of the leaker using the query above, then:

tctl wf signal -w resource_pool --name release_resource --input '{ "release_key": "<the key from the query above>" }

Performance: A single ResourcePoolWorkflow scales to tens, but not hundreds, of request/release events per second. It is
best suited for allocating resources to long-running workflows. Actual performance will depend on your temporal server's
persistence layer.
Empty file added resource_pool/__init__.py
Empty file.
97 changes: 97 additions & 0 deletions resource_pool/resource_pool_client.py
Comment thread
nagl-temporal marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from contextlib import asynccontextmanager
from datetime import timedelta
from typing import AsyncGenerator, Optional

from temporalio import workflow

from resource_pool.shared import (
AcquiredResource,
AcquireRequest,
AcquireResponse,
DetachedResource,
)


# Use this class in workflow code that that needs to run on locked resources.
class ResourcePoolClient:
def __init__(self, pool_workflow_id: str) -> None:
self.pool_workflow_id = pool_workflow_id
self.acquired_resources: list[AcquiredResource] = []

async def send_acquire_signal(self) -> None:
handle = workflow.get_external_workflow_handle(self.pool_workflow_id)
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
await handle.signal(
"acquire_resource", AcquireRequest(workflow.info().workflow_id)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await handle.signal(
"acquire_resource", AcquireRequest(workflow.info().workflow_id)
)
await handle.signal(
ResourcePoolWorkflow.acquire_resource, AcquireRequest(workflow.info().workflow_id)
)

To be more type safe. Same everywhere

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost everywhere - the pool workflow should signal the requester in an un-typesafe way (since it can't know what kind of workflow the requester is).


async def send_release_signal(self, acquired_resource: AcquiredResource) -> None:
handle = workflow.get_external_workflow_handle(self.pool_workflow_id)
await handle.signal(
"release_resource",
AcquireResponse(
resource=acquired_resource.resource,
release_key=acquired_resource.release_key,
),
)

def lazy_register_signal_handler(self) -> None:
if workflow.get_signal_handler("assign_resource") is None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name prevents use of multiple resource pools clients in a single workflow. If that's ok for this sample, might as well change the constructor of this client to have the workflow ID defaulted to the known, hardcoded, shared ID (or just remove it as a parameter altogether)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout. I prefer to include the workflow id in the signal name.

workflow.set_signal_handler("assign_resource", self.assign_resource)

def assign_resource(self, response: AcquireResponse) -> None:
self.acquired_resources.append(
AcquiredResource(
resource=response.resource, release_key=response.release_key
)
)

@asynccontextmanager
async def acquire_resource(
self,
*,
reattach: Optional[DetachedResource] = None,
max_wait_time: timedelta = timedelta(minutes=5),
) -> AsyncGenerator[AcquiredResource, None]:
warn_when_workflow_has_timeouts()
self.lazy_register_signal_handler()
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated

if reattach is None:
await self.send_acquire_signal()
await workflow.wait_condition(
lambda: len(self.acquired_resources) > 0, timeout=max_wait_time
)
resource = self.acquired_resources.pop(0)
else:
resource = AcquiredResource(
resource=reattach.resource, release_key=reattach.release_key
)

# Can't happen, but the typechecker doesn't know about workflow.wait_condition
if resource is None:
raise RuntimeError("resource was None when it can't be")

# During the yield, the calling workflow owns the resource. Note that this is a lock, not a lease! Our
# finally block will release the resource if an activity fails. This is why we asserted the lack of
# workflow-level timeouts above - the finally block wouldn't run if there was a timeout.
try:
yield resource
finally:
if not resource.detached:
await self.send_release_signal(resource)


def warn_when_workflow_has_timeouts() -> None:
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
if has_timeout(workflow.info().run_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have a run_timeout (found {workflow.info().run_timeout}) - this will leak locks"
)
if has_timeout(workflow.info().execution_timeout):
workflow.logger.warning(
f"ResourceLockingWorkflow cannot have an execution_timeout (found {workflow.info().execution_timeout}) - this will leak locks"
)


def has_timeout(timeout: Optional[timedelta]) -> bool:
# After continue_as_new, timeouts are 0, even if they were None before continue_as_new (and were not set in the
# continue_as_new call).
return timeout is not None and timeout > timedelta(0)
141 changes: 141 additions & 0 deletions resource_pool/resource_pool_workflow.py
Comment thread
nagl-temporal marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from dataclasses import dataclass
from typing import Optional

from temporalio import workflow

from resource_pool.shared import AcquireRequest, AcquireResponse


# Internal to this workflow, we'll associate randomly generated release signal names with each acquire request.
@dataclass
class InternalAcquireRequest(AcquireRequest):
release_signal: Optional[str]


@dataclass
class ResourcePoolWorkflowInput:
# Key is resource, value is current holder of the resource (None if not held)
resources: dict[str, Optional[InternalAcquireRequest]]
waiters: list[InternalAcquireRequest]


@workflow.defn
class ResourcePoolWorkflow:
@workflow.init
def __init__(self, input: ResourcePoolWorkflowInput) -> None:
self.resources = input.resources
self.waiters = input.waiters
self.release_key_to_resource: dict[str, str] = {}

for resource, holder in self.resources.items():
if holder is not None and holder.release_signal is not None:
self.release_key_to_resource[holder.release_signal] = resource

@workflow.signal
async def add_resources(self, resources: list[str]) -> None:
for resource in resources:
if resource in self.resources:
workflow.logger.warning(
f"Ignoring attempt to add already-existing resource: {resource}"
)
continue
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
else:
self.resources[resource] = None

@workflow.signal
async def acquire_resource(self, request: AcquireRequest) -> None:
self.waiters.append(
InternalAcquireRequest(workflow_id=request.workflow_id, release_signal=None)
)
workflow.logger.info(
f"workflow_id={request.workflow_id} is waiting for a resource"
)

@workflow.signal()
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
async def release_resource(self, acquire_response: AcquireResponse) -> None:
release_key = acquire_response.release_key
resource = self.release_key_to_resource.get(release_key)
if resource is None:
workflow.logger.warning(f"Ignoring unknown release_key: {release_key}")
return

holder = self.resources[resource]
if holder is None:
workflow.logger.warning(
f"Ignoring request to release resource that is not held: {resource}"
)
return

# Remove the current holder
workflow.logger.info(
f"workflow_id={holder.workflow_id} released resource {resource}"
)
self.resources[resource] = None
del self.release_key_to_resource[release_key]

@workflow.query
def get_current_holders(self) -> dict[str, Optional[InternalAcquireRequest]]:
return self.resources

async def assign_resource(
self, resource: str, internal_request: InternalAcquireRequest
) -> None:
self.resources[resource] = internal_request
workflow.logger.info(
f"workflow_id={internal_request.workflow_id} acquired resource {resource}"
)
internal_request.release_signal = str(workflow.uuid4())
self.release_key_to_resource[internal_request.release_signal] = resource

requester = workflow.get_external_workflow_handle(internal_request.workflow_id)
await requester.signal(
Comment thread
nagl-temporal marked this conversation as resolved.
Outdated
"assign_resource",
AcquireResponse(
release_key=internal_request.release_signal, resource=resource
),
)

async def assign_next_resource(self) -> bool:
if len(self.waiters) == 0:
return False

next_free_resource = self.get_free_resource()
if next_free_resource is None:
return False

next_waiter = self.waiters.pop(0)
await self.assign_resource(next_free_resource, next_waiter)
return True

def get_free_resource(self) -> Optional[str]:
return next(
(resource for resource, holder in self.resources.items() if holder is None),
None,
)

def can_assign_resource(self) -> bool:
return len(self.waiters) > 0 and self.get_free_resource() is not None

def should_continue_as_new(self) -> bool:
return (
workflow.info().is_continue_as_new_suggested()
and workflow.all_handlers_finished()
)

@workflow.run
async def run(self, _: ResourcePoolWorkflowInput) -> None:
while True:
await workflow.wait_condition(
lambda: self.can_assign_resource() or self.should_continue_as_new()
)

if await self.assign_next_resource():
continue

if self.should_continue_as_new():
workflow.continue_as_new(
ResourcePoolWorkflowInput(
resources=self.resources,
waiters=self.waiters,
)
)
88 changes: 88 additions & 0 deletions resource_pool/resource_user_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import asyncio
from dataclasses import dataclass, field
from datetime import timedelta
from typing import Optional

from temporalio import activity, workflow

from resource_pool.resource_pool_client import ResourcePoolClient
from resource_pool.shared import DetachedResource


@dataclass
class UseResourceActivityInput:
resource: str
iteration: str


@activity.defn
async def use_resource(input: UseResourceActivityInput) -> None:
info = activity.info()
activity.logger.info(
f"{info.workflow_id} starts using {input.resource} the {input.iteration} time"
)
await asyncio.sleep(3)
activity.logger.info(
f"{info.workflow_id} done using {input.resource} the {input.iteration} time"
)


@dataclass
class ResourceUserWorkflowInput:
# The id of the resource pool workflow to request a resource from
resource_pool_workflow_id: str

# If set, this workflow will fail after the "first" or "second" activity.
iteration_to_fail_after: Optional[str]

# If True, this workflow will continue as new after the last activity. The next iteration will run more activities,
# but will not continue as new.
should_continue_as_new: bool

# Used to transfer resource ownership between iterations during continue_as_new
already_acquired_resource: Optional[DetachedResource] = field(default=None)


class FailWorkflowException(Exception):
pass


# Wait this long for a resource before giving up
MAX_RESOURCE_WAIT_TIME = timedelta(minutes=5)


@workflow.defn(failure_exception_types=[FailWorkflowException])
class ResourceUserWorkflow:
@workflow.run
async def run(self, input: ResourceUserWorkflowInput) -> None:
pool_client = ResourcePoolClient(input.resource_pool_workflow_id)

async with pool_client.acquire_resource(
reattach=input.already_acquired_resource
) as acquired_resource:
for iteration in ["first", "second"]:
await workflow.execute_activity(
use_resource,
UseResourceActivityInput(acquired_resource.resource, iteration),
start_to_close_timeout=timedelta(seconds=10),
)

if iteration == input.iteration_to_fail_after:
workflow.logger.info(
f"Failing after iteration {input.iteration_to_fail_after}"
)
raise FailWorkflowException()

# This workflow only continues as new so it can demonstrate how to pass acquired resources across
# iterations. Ordinarily, such a short workflow would not use continue as new.
if input.should_continue_as_new:
detached_resource = acquired_resource.detach()

next_input = ResourceUserWorkflowInput(
resource_pool_workflow_id=input.resource_pool_workflow_id,
iteration_to_fail_after=input.iteration_to_fail_after,
should_continue_as_new=False,
already_acquired_resource=detached_resource,
)

workflow.continue_as_new(next_input)
Loading
Loading