Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.venv
.idea
__pycache__
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Some examples require extra dependencies. See each sample's directory for specif
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
* [sentry](sentry) - Report errors to Sentry.
* [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments.
* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping.
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.

Expand Down
42 changes: 42 additions & 0 deletions updatable_timer/README.md
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like our other samples w/ no special install instructions, may want to link to top-level README for "prerequisites"

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Updatable Timer Sample

Demonstrates a helper class which relies on `workflow.wait_condition` to implement a blocking sleep that can be updated at any moment.

The sample is composed of the three executables:

* `worker.py` hosts the Workflow Executions.
* `starter.py` starts Workflow Executions.
* `wake_up_timer_updater.py` Signals the Workflow Execution with the new time to wake up.

First start the Worker:

```bash
poetry run python worker.py
```
Check the output of the Worker window. The expected output is:

```
Worker started, ctrl+c to exit
```

Then in a different terminal window start the Workflow Execution:

```bash
poetry run python starter.py
```
Check the output of the Worker window. The expected output is:
```
Workflow started: run_id=...
```

Then run the updater as many times as you want to change timer to 10 seconds from now:

```bash
poetry run python wake_up_time_updater.py
```

Check the output of the worker window. The expected output is:

```
Updated wake up time to 10 seconds from now
```
1 change: 1 addition & 0 deletions updatable_timer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TASK_QUEUE = "updatable-timer"
32 changes: 32 additions & 0 deletions updatable_timer/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional

from temporalio import exceptions
from temporalio.client import Client

from updatable_timer import TASK_QUEUE
from updatable_timer.workflow import Workflow


async def main(client: Optional[Client] = None):
logging.basicConfig(level=logging.INFO)

client = client or await Client.connect("localhost:7233")
try:
handle = await client.start_workflow(
Workflow.run,
(datetime.now() + timedelta(days=1)).timestamp(),
id=f"updatable-timer-workflow",
task_queue=TASK_QUEUE,
)
logging.info(f"Workflow started: run_id={handle.result_run_id}")
except exceptions.WorkflowAlreadyStartedError as e:
logging.info(
f"Workflow already running: workflow_id={e.workflow_id}, run_id={e.run_id}"
)


if __name__ == "__main__":
asyncio.run(main())
39 changes: 39 additions & 0 deletions updatable_timer/updatable_timer_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import asyncio
from datetime import datetime, timedelta

from temporalio import workflow


class UpdatableTimer:
def __init__(self, wake_up_time: datetime) -> None:
self.wake_up_time = wake_up_time
self.wake_up_time_updated = False

async def sleep(self) -> None:
workflow.logger.info(f"sleep_until: {self.wake_up_time}")
while True:
now = workflow.now()

sleep_interval = self.wake_up_time - now
if sleep_interval <= timedelta(0):
break
workflow.logger.info(f"Going to sleep for {sleep_interval}")

try:
self.wake_up_time_updated = False
await workflow.wait_condition(
lambda: self.wake_up_time_updated,
timeout=sleep_interval,
)
except asyncio.TimeoutError:
# checks condition at the beginning of the loop
continue
workflow.logger.info(f"sleep_until completed")

def update_wake_up_time(self, wake_up_time: datetime) -> None:
workflow.logger.info(f"update_wake_up_time: {wake_up_time}")
self.wake_up_time = wake_up_time
self.wake_up_time_updated = True

def get_wake_up_time(self) -> datetime:
return self.wake_up_time
26 changes: 26 additions & 0 deletions updatable_timer/wake_up_time_updater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Optional

from temporalio.client import Client

from updatable_timer.workflow import Workflow


async def main(client: Optional[Client] = None):
logging.basicConfig(level=logging.INFO)

client = client or await Client.connect("localhost:7233")
handle = client.get_workflow_handle(workflow_id="updatable-timer-workflow")
# signal workflow about the wake up time change
await handle.signal(
Workflow.update_wake_up_time,
(datetime.now() + timedelta(seconds=10)).timestamp(),
)

logging.info("Updated wake up time to 10 seconds from now")


if __name__ == "__main__":
asyncio.run(main())
35 changes: 35 additions & 0 deletions updatable_timer/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import asyncio
import logging

from temporalio.client import Client
from temporalio.worker import Worker

from updatable_timer import TASK_QUEUE
from updatable_timer.workflow import Workflow

interrupt_event = asyncio.Event()


async def main():
logging.basicConfig(level=logging.INFO)

client = await Client.connect("localhost:7233")
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[Workflow],
):
logging.info("Worker started, ctrl+c to exit")
# Wait until interrupted
await interrupt_event.wait()
logging.info("Interrupt received, shutting down...")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
35 changes: 35 additions & 0 deletions updatable_timer/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from datetime import datetime, timezone
from typing import Optional

from temporalio import workflow

from updatable_timer.updatable_timer_lib import UpdatableTimer


@workflow.defn
class Workflow:
def __init__(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend return type annotations everywhere

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My Java brain protested this for a constructor :).

self.timer: Optional[UpdatableTimer] = None

@workflow.run
async def run(self, wake_up_time: float):
self.timer = UpdatableTimer(
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
)
await self.timer.sleep()

@workflow.signal
async def update_wake_up_time(self, wake_up_time: float):
# Deals with situation when the signal method is called before the run method.
# This happens when a workflow task is executed after a signal is received
# or when a workflow is started using the signal-with-start.
await workflow.wait_condition(lambda: self.timer is not None)
assert self.timer is not None # for mypy
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a neat, fairly new feature for @workflow.init that lets you operate on workflow input on the constructor, so can change this to:

Suggested change
@workflow.defn
class Workflow:
def __init__(self):
self.timer: Optional[UpdatableTimer] = None
@workflow.run
async def run(self, wake_up_time: float):
self.timer = UpdatableTimer(
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
)
await self.timer.sleep()
@workflow.signal
async def update_wake_up_time(self, wake_up_time: float):
# Deals with situation when the signal method is called before the run method.
# This happens when a workflow task is executed after a signal is received
# or when a workflow is started using the signal-with-start.
await workflow.wait_condition(lambda: self.timer is not None)
assert self.timer is not None # for mypy
@workflow.defn
class Workflow:
@workflow.init
def __init__(self, wake_up_time: float) -> None:
self.timer = UpdatableTimer(
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
)
@workflow.run
async def run(self, wake_up_time: float) -> None:
await self.timer.sleep()
@workflow.signal
async def update_wake_up_time(self, wake_up_time: float):

(also can remove the assert self.timer is not None # for mypy in the query)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! So much clearer!

self.timer.update_wake_up_time(
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
)

@workflow.query
def get_wake_up_time(self):
assert self.timer is not None # for mypy
return self.timer.get_wake_up_time()