Skip to content

Commit e499937

Browse files
committed
Feedback, fix tests
1 parent c63be78 commit e499937

File tree

2 files changed

+18
-14
lines changed

2 files changed

+18
-14
lines changed

durabletask/worker.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,7 @@ def _execute_entity_batch(
803803

804804
def _cancel_entity_batch(
805805
self,
806-
req: pb.EntityBatchRequest,
806+
req: Union[pb.EntityBatchRequest, pb.EntityRequest],
807807
stub: stubs.TaskHubSidecarServiceStub,
808808
completionToken,
809809
):
@@ -812,7 +812,7 @@ def _cancel_entity_batch(
812812
completionToken=completionToken
813813
)
814814
)
815-
self._logger.info(f"Cancelled entity batch task for entity instance ID: {req.instanceId}")
815+
self._logger.info(f"Cancelled entity batch task for instance ID: {req.instanceId}")
816816

817817

818818
class _RuntimeOrchestrationContext(task.OrchestrationContext):
@@ -2089,13 +2089,14 @@ async def run(self):
20892089
)
20902090
)
20912091
except Exception as queue_exception:
2092-
self._logger.error(f"Shutting down worker - Uncaught error in activity manager thread pool: {queue_exception}")
2092+
self._logger.error(f"Shutting down worker - Uncaught error in worker manager: {queue_exception}")
20932093
while self.activity_queue is not None and not self.activity_queue.empty():
20942094
try:
20952095
func, cancellation_func, args, kwargs = self.activity_queue.get_nowait()
20962096
await self._run_func(cancellation_func, *args, **kwargs)
20972097
self._logger.error(f"Activity work item args: {args}, kwargs: {kwargs}")
20982098
except asyncio.QueueEmpty:
2099+
# Queue was empty, no cancellation needed
20992100
pass
21002101
except Exception as cancellation_exception:
21012102
self._logger.error(f"Uncaught error while cancelling activity work item: {cancellation_exception}")
@@ -2105,6 +2106,7 @@ async def run(self):
21052106
await self._run_func(cancellation_func, *args, **kwargs)
21062107
self._logger.error(f"Orchestration work item args: {args}, kwargs: {kwargs}")
21072108
except asyncio.QueueEmpty:
2109+
# Queue was empty, no cancellation needed
21082110
pass
21092111
except Exception as cancellation_exception:
21102112
self._logger.error(f"Uncaught error while cancelling orchestration work item: {cancellation_exception}")
@@ -2114,6 +2116,7 @@ async def run(self):
21142116
await self._run_func(cancellation_func, *args, **kwargs)
21152117
self._logger.error(f"Entity batch work item args: {args}, kwargs: {kwargs}")
21162118
except asyncio.QueueEmpty:
2119+
# Queue was empty, no cancellation needed
21172120
pass
21182121
except Exception as cancellation_exception:
21192122
self._logger.error(f"Uncaught error while cancelling entity batch work item: {cancellation_exception}")
@@ -2150,7 +2153,8 @@ async def _process_work_item(
21502153
async with semaphore:
21512154
try:
21522155
await self._run_func(func, *args, **kwargs)
2153-
except Exception:
2156+
except Exception as work_exception:
2157+
self._logger.error(f"Uncaught error while processing work item, item will be abandoned: {work_exception}")
21542158
await self._run_func(cancellation_func, *args, **kwargs)
21552159
finally:
21562160
queue.task_done()
@@ -2218,22 +2222,22 @@ async def reset_for_new_run(self):
22182222
while not self.activity_queue.empty():
22192223
func, cancellation_func, args, kwargs = self.activity_queue.get_nowait()
22202224
await self._run_func(cancellation_func, *args, **kwargs)
2221-
except Exception:
2222-
pass
2225+
except Exception as reset_exception:
2226+
self._logger.warning(f"Error while clearing activity queue during reset: {reset_exception}")
22232227
if self.orchestration_queue is not None:
22242228
try:
22252229
while not self.orchestration_queue.empty():
22262230
func, cancellation_func, args, kwargs = self.orchestration_queue.get_nowait()
22272231
await self._run_func(cancellation_func, *args, **kwargs)
2228-
except Exception:
2229-
pass
2232+
except Exception as reset_exception:
2233+
self._logger.warning(f"Error while clearing orchestration queue during reset: {reset_exception}")
22302234
if self.entity_batch_queue is not None:
22312235
try:
22322236
while not self.entity_batch_queue.empty():
22332237
func, cancellation_func, args, kwargs = self.entity_batch_queue.get_nowait()
22342238
await self._run_func(cancellation_func, *args, **kwargs)
2235-
except Exception:
2236-
pass
2239+
except Exception as reset_exception:
2240+
self._logger.warning(f"Error while clearing entity queue during reset: {reset_exception}")
22372241
# Clear pending work lists
22382242
self._pending_activity_work.clear()
22392243
self._pending_orchestration_work.clear()

tests/durabletask/test_worker_concurrency_loop_async.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ def test_worker_concurrency_loop_async():
4646
grpc_worker = TaskHubGrpcWorker(concurrency_options=options)
4747
stub = DummyStub()
4848

49-
async def dummy_orchestrator(self, req, stub, completionToken):
49+
async def dummy_orchestrator(req, stub, completionToken):
5050
await asyncio.sleep(0.1)
5151
stub.CompleteOrchestratorTask('ok')
5252

53-
async def cancel_dummy_orchestrator(self, req, stub, completionToken):
53+
async def cancel_dummy_orchestrator(req, stub, completionToken):
5454
pass
5555

56-
async def dummy_activity(self, req, stub, completionToken):
56+
async def dummy_activity(req, stub, completionToken):
5757
await asyncio.sleep(0.1)
5858
stub.CompleteActivityTask('ok')
5959

60-
async def cancel_dummy_activity(self, req, stub, completionToken):
60+
async def cancel_dummy_activity(req, stub, completionToken):
6161
pass
6262

6363
# Patch the worker's _execute_orchestrator and _execute_activity

0 commit comments

Comments
 (0)