@@ -48,7 +48,7 @@ class IncrementalGraph:
4848
4949 _root_nodes : dict [SubsequentResultRecord , None ]
5050 _completed_queue : list [IncrementalDataRecordResult ]
51- _next_queue : list [Future [Iterable [IncrementalDataRecordResult ]]]
51+ _next_queue : list [Future [Iterable [IncrementalDataRecordResult ] | None ]]
5252
5353 _tasks : set [Task [Any ]]
5454
@@ -87,24 +87,31 @@ def add_completed_reconcilable_deferred_grouped_field_set(
8787 incremental_data_records , deferred_records
8888 )
8989
90- async def completed_incremental_data (
90+ def current_completed_batch (
9191 self ,
92- ) -> AsyncGenerator [Iterable [IncrementalDataRecordResult ], None ]:
93- """Asynchronously yield completed incremental data record results."""
92+ ) -> Generator [IncrementalDataRecordResult , None , None ]:
93+ """Yield the current completed batch of incremental data record results."""
94+ queue = self ._completed_queue
95+ while queue :
96+ yield queue .pop (0 )
97+ if not self ._root_nodes :
98+ self .abort ()
99+
100+ def next_completed_batch (
101+ self ,
102+ ) -> Future [Iterable [IncrementalDataRecordResult ] | None ]:
103+ """Return a future that resolves to the next completed batch."""
94104 loop = get_running_loop ()
95- while True :
96- if self ._completed_queue :
97- first_result = self ._completed_queue .pop (0 )
98- yield self ._yield_current_completed_incremental_data (first_result )
99- else :
100- future : Future [Iterable [IncrementalDataRecordResult ]] = (
101- loop .create_future ()
102- )
103- self ._next_queue .append (future )
104- try :
105- yield await future
106- except CancelledError :
107- break # pragma: no cover
105+ future : Future [Iterable [IncrementalDataRecordResult ] | None ] = (
106+ loop .create_future ()
107+ )
108+ self ._next_queue .append (future )
109+ return future
110+
111+ def abort (self ) -> None :
112+ """Abort the incremental graph execution."""
113+ for resolve in self ._next_queue :
114+ resolve .set_result (None )
108115
109116 def has_next (self ) -> bool :
110117 """Check if there are more results to process."""
@@ -332,11 +339,7 @@ def _yield_current_completed_incremental_data(
332339 ) -> Generator [IncrementalDataRecordResult , None , None ]:
333340 """Yield the current completed incremental data."""
334341 yield first_result
335- queue = self ._completed_queue
336- while queue :
337- yield queue .pop (0 )
338- if not self ._root_nodes :
339- self .stop_incremental_data ()
342+ yield from self .current_completed_batch ()
340343
341344 def _enqueue (self , completed : IncrementalDataRecordResult ) -> None :
342345 """Enqueue completed incremental data record result."""
0 commit comments