Skip to content

Commit 29c6454

Browse files
committed
Add samples, test, documentation
1 parent 8ac1876 commit 29c6454

File tree

11 files changed

+846
-16
lines changed

11 files changed

+846
-16
lines changed

docs/features.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,88 @@ Orchestrations can schedule durable timers using the `create_timer` API. These t
4848

4949
Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces. Sub-orchestrations can also be versioned in a similar manner to their parent orchestrations, however, they do not inherit the parent orchestrator's version. Instead, they will use the default_version defined in the current worker's VersioningOptions unless otherwise specified during `call_sub_orchestrator`.
5050

51+
### Entities
52+
53+
#### Concepts
54+
55+
Durable Entities provide a way to model small, stateful objects within your orchestration workflows. Each entity has a unique identity and maintains its own state, which is persisted durably. Entities can be interacted with by sending them operations (messages) that mutate or query their state. These operations are processed sequentially, ensuring consistency. Examples of uses for durable entities include counters, accumulators, or any other operation which requires state to persist across orchestrations.
56+
57+
Entities can be invoked from durable clients directly, or from durable orchestrators. They support features like automatic state persistence, concurrency control, and can be locked for exclusive access during critical operations.
58+
59+
Entities are accessed by a unique ID, implemented here as EntityInstanceId. This ID is comprised of two parts, an entity name referring to the function or class that defines the behavior of the entity, and a key which is any string defined in your code. Each entity instance, represented by a distinct EntityInstanceId, has its own state.
60+
61+
#### Syntax
62+
63+
##### Defining Entities
64+
65+
Entities can be defined using either function-based or class-based syntax.
66+
67+
```python
68+
# Funtion-based entity
69+
def counter(ctx: task.EntityContext, input: int):
70+
state = ctx.get_state(int, 0)
71+
if ctx.operation == "add":
72+
state += input
73+
ctx.set_state(state)
74+
elif operation == "get":
75+
return state
76+
77+
# Class-based entity
78+
class Counter(entities.DurableEntity):
79+
def __init__(self):
80+
self.set_state(0)
81+
82+
def add(self, amount: int):
83+
self.set_state(self.get_state(int, 0) + amount)
84+
85+
def get(self):
86+
return self.get_state(int, 0)
87+
```
88+
89+
> Note that the object properties of class-based entities may not be preserved across invocations. Use the derived get_state and set_state methods to access the persisted entity data.
90+
91+
##### Invoking entities
92+
93+
Entities are invoked using the `signal_entity` or `call_entity` APIs. The Durable Client only allows `signal_entity`:
94+
95+
```python
96+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
97+
taskhub=taskhub_name, token_credential=None)
98+
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
99+
c.signal_entity(entity_id, "do_nothing")
100+
```
101+
102+
Whereas orchestrators can choose to use `signal_entity` or `call_entity`:
103+
104+
```python
105+
# Signal an entity (fire-and-forget)
106+
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
107+
ctx.signal_entity(entity_id, operation_name="add", input=5)
108+
109+
# Call an entity (wait for result)
110+
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
111+
result = yield ctx.call_entity(entity_id, operation_name="get")
112+
```
113+
114+
##### Entity actions
115+
116+
Entities can perform actions such signaling other entities or starting new orchestrations
117+
118+
- `ctx.signal_entity(entity_id, operation, input)`
119+
- `ctx.schedule_new_orchestration(orchestrator_name, input)`
120+
121+
##### Locking and concurrency
122+
123+
Because entites can be accessed from multiple running orchestrations at the same time, entities may also be locked by a single orchestrator ensuring exclusive access during the duration of the lock (also known as a critical section). Think semaphores:
124+
125+
```python
126+
with (yield ctx.lock_entities([entity_id_1, entity_id_2]):
127+
# Perform entity call operations that require exclusive access
128+
...
129+
```
130+
131+
Note that locked entities may not be signalled, and every call to a locked entity must return a result before another call to the same entity may be made from within the critical section. For more details and advanced usage, see the examples and API documentation.
132+
51133
### External events
52134

53135
Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
11
from typing import Any, Optional, Type, TypeVar, overload
22

33
from durabletask.entities.entity_instance_id import EntityInstanceId
4+
from durabletask.task import EntityContext
45

56
TState = TypeVar("TState")
67

78

89
class DurableEntity:
9-
def _initialize_entity_context(self, context):
10+
def _initialize_entity_context(self, context: EntityContext):
1011
self.entity_context = context
1112

13+
@overload
14+
def get_state(self, intended_type: Type[TState], default: TState) -> TState: ...
15+
1216
@overload
1317
def get_state(self, intended_type: Type[TState]) -> Optional[TState]: ...
1418

1519
@overload
16-
def get_state(self, intended_type: None = None) -> Any: ...
20+
def get_state(self, intended_type: None = None, default: Any = None) -> Any: ...
1721

18-
def get_state(self, intended_type: Optional[Type[TState]] = None) -> Optional[TState] | Any:
19-
return self.entity_context.get_state(intended_type)
22+
def get_state(self, intended_type: Optional[Type[TState]] = None, default: Optional[TState] = None) -> Optional[TState] | Any:
23+
return self.entity_context.get_state(intended_type, default)
2024

2125
def set_state(self, state: Any):
2226
self.entity_context.set_state(state)
2327

2428
def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Optional[Any] = None) -> None:
2529
self.entity_context.signal_entity(entity_instance_id, operation, input)
2630

27-
def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> None:
28-
self.entity_context.schedule_new_orchestration(orchestration_name, input, instance_id=instance_id)
31+
def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> str:
32+
return self.entity_context.schedule_new_orchestration(orchestration_name, input, instance_id=instance_id)

durabletask/internal/entity_state_shim.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,23 @@ def __init__(self, start_state):
1313
self._operation_actions: list[pb.OperationAction] = []
1414
self._actions_checkpoint_state: int = 0
1515

16+
@overload
17+
def get_state(self, intended_type: Type[TState], default: TState) -> TState: ...
18+
1619
@overload
1720
def get_state(self, intended_type: Type[TState]) -> Optional[TState]: ...
1821

1922
@overload
20-
def get_state(self, intended_type: None = None) -> Any: ...
23+
def get_state(self, intended_type: None = None, default: Any = None) -> Any: ...
24+
25+
def get_state(self, intended_type: Optional[Type[TState]] = None, default: Optional[TState] = None) -> Optional[TState] | Any:
26+
if self._current_state is None and default is not None:
27+
return default
2128

22-
def get_state(self, intended_type: Optional[Type[TState]] = None) -> Optional[TState] | Any:
2329
if intended_type is None:
2430
return self._current_state
2531

26-
if isinstance(self._current_state, intended_type) or self._current_state is None:
32+
if isinstance(self._current_state, intended_type):
2733
return self._current_state
2834

2935
try:

durabletask/task.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ def call_activity(self, activity: Union[Activity[TInput, TOutput], str], *,
144144

145145
@abstractmethod
146146
def call_entity(self, entity: EntityInstanceId,
147-
operation: str, *,
148-
input: Optional[TInput] = None):
147+
operation: str,
148+
input: Optional[TInput] = None) -> Task:
149149
"""Schedule entity function for execution.
150150
151151
Parameters
@@ -548,14 +548,17 @@ def operation(self) -> str:
548548
"""
549549
return self._operation
550550

551+
@overload
552+
def get_state(self, intended_type: Type[TState], default: TState) -> TState: ...
553+
551554
@overload
552555
def get_state(self, intended_type: Type[TState]) -> Optional[TState]: ...
553556

554557
@overload
555-
def get_state(self, intended_type: None = None) -> Any: ...
558+
def get_state(self, intended_type: None = None, default: Any = None) -> Any: ...
556559

557-
def get_state(self, intended_type: Optional[Type[TState]] = None) -> Optional[TState] | Any:
558-
return self._state.get_state(intended_type)
560+
def get_state(self, intended_type: Optional[Type[TState]] = None, default: Optional[TState] = None) -> Optional[TState] | Any:
561+
return self._state.get_state(intended_type, default)
559562

560563
def set_state(self, new_state: Any):
561564
self._state.set_state(new_state)
@@ -575,12 +578,14 @@ def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, in
575578
)
576579
)
577580

578-
def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> None:
581+
def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> str:
579582
encoded_input = shared.to_json(input) if input is not None else None
583+
if not instance_id:
584+
instance_id = uuid.uuid4().hex
580585
self._state.add_operation_action(
581586
pb.OperationAction(
582587
startNewOrchestration=pb.StartNewOrchestrationAction(
583-
instanceId=instance_id if instance_id else uuid.uuid4().hex, # TODO: Should this be non-none?
588+
instanceId=instance_id,
584589
name=orchestration_name,
585590
input=pbh.get_string_value(encoded_input),
586591
version=None,
@@ -590,6 +595,7 @@ def schedule_new_orchestration(self, orchestration_name: str, input: Optional[An
590595
)
591596
)
592597
)
598+
return instance_id
593599

594600
@property
595601
def entity_id(self) -> EntityInstanceId:
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""End-to-end sample that demonstrates how to configure an orchestrator
2+
that calls an activity function in a sequence and prints the outputs."""
3+
import os
4+
5+
from azure.identity import DefaultAzureCredential
6+
7+
from durabletask import client, task, entities
8+
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
9+
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
10+
11+
12+
class Counter(entities.DurableEntity):
13+
def set(self, input: int):
14+
self.set_state(input)
15+
16+
def add(self, input: int):
17+
current_state = self.get_state(int, 0)
18+
new_state = current_state + (input or 1)
19+
self.set_state(new_state)
20+
return new_state
21+
22+
def get(self):
23+
return self.get_state(int, 0)
24+
25+
26+
def counter_orchestrator(ctx: task.OrchestrationContext, _):
27+
"""Orchestrator function that demonstrates the behavior of the counter entity"""
28+
29+
entity_id = task.EntityInstanceId("Counter", "myCounter")
30+
31+
# Initialize the entity with state 0
32+
ctx.signal_entity(entity_id, "set", 0)
33+
# Increment the counter by 1
34+
yield ctx.call_entity(entity_id, "add", 1)
35+
# Return the entity's current value (should be 1)
36+
return (yield ctx.call_entity(entity_id, "get"))
37+
38+
39+
# Use environment variables if provided, otherwise use default emulator values
40+
taskhub_name = os.getenv("TASKHUB", "default")
41+
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
42+
43+
print(f"Using taskhub: {taskhub_name}")
44+
print(f"Using endpoint: {endpoint}")
45+
46+
# Set credential to None for emulator, or DefaultAzureCredential for Azure
47+
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
48+
49+
# configure and start the worker - use secure_channel=False for emulator
50+
secure_channel = endpoint != "http://localhost:8080"
51+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
52+
taskhub=taskhub_name, token_credential=credential) as w:
53+
w.add_orchestrator(counter_orchestrator)
54+
w.add_entity(Counter)
55+
w.start()
56+
57+
# Construct the client and run the orchestrations
58+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
59+
taskhub=taskhub_name, token_credential=credential)
60+
instance_id = c.schedule_new_orchestration(counter_orchestrator)
61+
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
62+
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
63+
print(f'Orchestration completed! Result: {state.serialized_output}')
64+
elif state:
65+
print(f'Orchestration failed: {state.failure_details}')
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
"""End-to-end sample that demonstrates how to configure an orchestrator
2+
that calls an activity function in a sequence and prints the outputs."""
3+
import os
4+
5+
from azure.identity import DefaultAzureCredential
6+
7+
from durabletask import client, task, entities
8+
from durabletask.azuremanaged.client import DurableTaskSchedulerClient
9+
from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker
10+
11+
12+
class Counter(entities.DurableEntity):
13+
def set(self, input: int):
14+
self.set_state(input)
15+
16+
def add(self, input: int):
17+
current_state = self.get_state(int, 0)
18+
new_state = current_state + (input or 1)
19+
self.set_state(new_state)
20+
return new_state
21+
22+
def update_parent(self):
23+
parent_entity_id = entities.EntityInstanceId("Counter", "parentCounter")
24+
if self.entity_context.entity_id == parent_entity_id:
25+
return # Prevent self-update
26+
self.signal_entity(parent_entity_id, "set", self.get_state(int, 0))
27+
28+
def start_hello(self):
29+
self.schedule_new_orchestration("hello_orchestrator")
30+
31+
def get(self):
32+
return self.get_state(int, 0)
33+
34+
35+
def counter_orchestrator(ctx: task.OrchestrationContext, _):
36+
"""Orchestrator function that demonstrates the behavior of the counter entity"""
37+
38+
entity_id = task.EntityInstanceId("Counter", "myCounter")
39+
parent_entity_id = task.EntityInstanceId("Counter", "parentCounter")
40+
41+
# Use Counter to demonstrate starting an orchestration from an entity
42+
ctx.signal_entity(entity_id, "start_hello")
43+
44+
# User Counter to demonstrate signaling an entity from another entity
45+
# Initialize myCounter with state 0, increment it by 1, and set the state of parentCounter using
46+
# update_parent on myCounter. Retrieve and return the state of parentCounter (should be 1).
47+
ctx.signal_entity(entity_id, "set", 0)
48+
yield ctx.call_entity(entity_id, "add", 1)
49+
yield ctx.call_entity(entity_id, "update_parent")
50+
51+
return (yield ctx.call_entity(parent_entity_id, "get"))
52+
53+
54+
def hello_orchestrator(ctx: task.OrchestrationContext, _):
55+
return f"Hello world!"
56+
57+
58+
# Use environment variables if provided, otherwise use default emulator values
59+
taskhub_name = os.getenv("TASKHUB", "default")
60+
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
61+
62+
print(f"Using taskhub: {taskhub_name}")
63+
print(f"Using endpoint: {endpoint}")
64+
65+
# Set credential to None for emulator, or DefaultAzureCredential for Azure
66+
credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential()
67+
68+
# configure and start the worker - use secure_channel=False for emulator
69+
secure_channel = endpoint != "http://localhost:8080"
70+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel,
71+
taskhub=taskhub_name, token_credential=credential) as w:
72+
w.add_orchestrator(counter_orchestrator)
73+
w.add_orchestrator(hello_orchestrator)
74+
w.add_entity(Counter)
75+
w.start()
76+
77+
# Construct the client and run the orchestrations
78+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel,
79+
taskhub=taskhub_name, token_credential=credential)
80+
instance_id = c.schedule_new_orchestration(counter_orchestrator)
81+
state = c.wait_for_orchestration_completion(instance_id, timeout=60)
82+
if state and state.runtime_status == client.OrchestrationStatus.COMPLETED:
83+
print(f'Orchestration completed! Result: {state.serialized_output}')
84+
elif state:
85+
print(f'Orchestration failed: {state.failure_details}')

0 commit comments

Comments
 (0)