diff --git a/packages/bigframes/bigframes/blob/_functions.py b/packages/bigframes/bigframes/blob/_functions.py index 5114f60058c1..c888847c8971 100644 --- a/packages/bigframes/bigframes/blob/_functions.py +++ b/packages/bigframes/bigframes/blob/_functions.py @@ -103,7 +103,7 @@ def _create_udf(self): \"\"\" """ - bf_io_bigquery.start_query_with_client( + bf_io_bigquery.start_query_with_job( self._session.bqclient, sql, job_config=bigquery.QueryJobConfig(), @@ -111,7 +111,6 @@ def _create_udf(self): location=None, project=None, timeout=None, - query_with_job=True, publisher=self._session._publisher, ) diff --git a/packages/bigframes/bigframes/core/bq_data.py b/packages/bigframes/bigframes/core/bq_data.py index 3f341cd6053e..55ac1270b6c4 100644 --- a/packages/bigframes/bigframes/core/bq_data.py +++ b/packages/bigframes/bigframes/core/bq_data.py @@ -253,6 +253,9 @@ def __post_init__(self): # Optimization field, must be correct if set, don't put maybe-stale number here n_rows: Optional[int] = None + def with_ordering(self, ordering: orderings.RowOrdering) -> BigqueryDataSource: + return dataclasses.replace(self, ordering=ordering) + _WORKER_TIME_INCREMENT = 0.05 diff --git a/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py b/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py index 27b79f266bc1..1e0b561e8c5b 100644 --- a/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py +++ b/packages/bigframes/bigframes/core/compile/sqlglot/sqlglot_ir.py @@ -177,7 +177,7 @@ def from_table( project_id: str, dataset_id: str, table_id: str, - uid_gen: guid.SequentialUIDGenerator, + uid_gen: guid.SequentialUIDGenerator | None = None, columns: typing.Sequence[str] = (), sql_predicate: typing.Optional[str] = None, system_time: typing.Optional[datetime.datetime] = None, @@ -202,6 +202,8 @@ def from_table( if system_time else None ) + if uid_gen is None: + uid_gen = guid.SequentialUIDGenerator() table_alias = next(uid_gen.get_uid_stream("bft_")) table_expr = sge.Table( this=sql.identifier(table_id), diff --git a/packages/bigframes/bigframes/core/nodes.py b/packages/bigframes/bigframes/core/nodes.py index f30df7f30840..e51ae61c725e 100644 --- a/packages/bigframes/bigframes/core/nodes.py +++ b/packages/bigframes/bigframes/core/nodes.py @@ -846,10 +846,10 @@ def remap_refs( ) -> ReadTableNode: return self - def with_order_cols(self): + def pull_out_order(self): # Maybe the ordering should be required to always be in the scan list, and then we won't need this? if self.source.ordering is None: - return self, orderings.RowOrdering() + return self, RowOrdering() order_cols = {col.sql for col in self.source.ordering.referenced_columns} scan_cols = {col.source_id for col in self.scan_list.items} @@ -863,10 +863,18 @@ def with_order_cols(self): ] new_scan_list = ScanList(items=(*self.scan_list.items, *new_scan_cols)) new_order = self.source.ordering.remap_column_refs( - {identifiers.ColumnId(item.source_id): item.id for item in new_scan_cols}, + { + identifiers.ColumnId(item.source_id): item.id + for item in new_scan_list.items + }, allow_partial_bindings=True, ) - return dataclasses.replace(self, scan_list=new_scan_list), new_order + new_node = dataclasses.replace( + self, + scan_list=new_scan_list, + source=self.source.with_ordering(RowOrdering()), + ) + return new_node, new_order @dataclasses.dataclass(frozen=True, eq=False) diff --git a/packages/bigframes/bigframes/core/rewrite/__init__.py b/packages/bigframes/bigframes/core/rewrite/__init__.py index ab5559ab6554..ae4b142b1a46 100644 --- a/packages/bigframes/bigframes/core/rewrite/__init__.py +++ b/packages/bigframes/bigframes/core/rewrite/__init__.py @@ -19,7 +19,7 @@ from bigframes.core.rewrite.implicit_align import try_row_join from bigframes.core.rewrite.legacy_align import legacy_join_as_projection from bigframes.core.rewrite.nullity import simplify_join -from bigframes.core.rewrite.order import bake_order, defer_order +from bigframes.core.rewrite.order import bake_order, defer_order, pull_out_order from bigframes.core.rewrite.pruning import column_pruning from bigframes.core.rewrite.scan_reduction import ( try_reduce_to_local_scan, @@ -50,6 +50,7 @@ "rewrite_range_rolling", "try_reduce_to_table_scan", "bake_order", + "pull_out_order", "try_reduce_to_local_scan", "fold_row_counts", "pull_out_window_order", diff --git a/packages/bigframes/bigframes/core/rewrite/order.py b/packages/bigframes/bigframes/core/rewrite/order.py index 4d6612e6798d..b61fca82182f 100644 --- a/packages/bigframes/bigframes/core/rewrite/order.py +++ b/packages/bigframes/bigframes/core/rewrite/order.py @@ -47,6 +47,15 @@ def bake_order( return node +def pull_out_order( + node: bigframes.core.nodes.BigFrameNode, +) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]: + import bigframes.core.rewrite.slices + + node = node.bottom_up(bigframes.core.rewrite.slices.rewrite_slice) + return _pull_up_order(node, order_root=True) + + # Makes ordering explicit in window definitions def _pull_up_order( root: bigframes.core.nodes.BigFrameNode, @@ -153,7 +162,7 @@ def pull_up_order_inner( ) elif isinstance(node, bigframes.core.nodes.ReadTableNode): if node.source.ordering is not None: - return node.with_order_cols() + return node.pull_out_order() else: # No defined ordering return node, bigframes.core.ordering.RowOrdering() @@ -272,7 +281,7 @@ def pull_up_order_inner( offsets_id ) return new_explode, child_order.join(inner_order) - raise ValueError(f"Unexpected node: {node}") + raise ValueError(f"Unexpected node type {type(node).__name__}") def pull_order_concat( node: bigframes.core.nodes.ConcatNode, diff --git a/packages/bigframes/bigframes/functions/_function_client.py b/packages/bigframes/bigframes/functions/_function_client.py index cff35b7484fb..5cf421ceced7 100644 --- a/packages/bigframes/bigframes/functions/_function_client.py +++ b/packages/bigframes/bigframes/functions/_function_client.py @@ -142,7 +142,7 @@ def _create_bq_function(self, create_function_ddl: str) -> None: # TODO(swast): plumb through the original, user-facing api_name. import bigframes.session._io.bigquery - _, query_job = bigframes.session._io.bigquery.start_query_with_client( + _, query_job = bigframes.session._io.bigquery.start_query_with_job( cast(bigquery.Client, self._session.bqclient), create_function_ddl, job_config=bigquery.QueryJobConfig(), @@ -150,7 +150,6 @@ def _create_bq_function(self, create_function_ddl: str) -> None: project=None, timeout=None, metrics=None, - query_with_job=True, publisher=self._session._publisher, ) logger.info(f"Created bigframes function {query_job.ddl_target_routine}") diff --git a/packages/bigframes/bigframes/functions/function.py b/packages/bigframes/bigframes/functions/function.py index f197579113ae..59e1e35b0aea 100644 --- a/packages/bigframes/bigframes/functions/function.py +++ b/packages/bigframes/bigframes/functions/function.py @@ -197,10 +197,9 @@ def __call__(self, *args, **kwargs): args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args]) sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})" - iter, job = bf_io_bigquery.start_query_with_client( + iter, job = bf_io_bigquery.start_query_with_job( self._session.bqclient, sql=sql, - query_with_job=True, job_config=bigquery.QueryJobConfig(), publisher=self._session._publisher, ) # type: ignore diff --git a/packages/bigframes/bigframes/session/__init__.py b/packages/bigframes/bigframes/session/__init__.py index e134863b0ee4..22efbcf03975 100644 --- a/packages/bigframes/bigframes/session/__init__.py +++ b/packages/bigframes/bigframes/session/__init__.py @@ -2310,7 +2310,7 @@ def _start_query_ml_ddl( # so we must reset any encryption set in the job config # https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model job_config.destination_encryption_configuration = None - iterator, query_job = bf_io_bigquery.start_query_with_client( + iterator, query_job = bf_io_bigquery.start_query_with_job( self.bqclient, sql, job_config=job_config, @@ -2318,7 +2318,6 @@ def _start_query_ml_ddl( location=None, project=None, timeout=None, - query_with_job=True, job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY, publisher=self._publisher, session=self, @@ -2340,7 +2339,7 @@ def _create_object_table(self, path: str, connection: str) -> str: uris = ['{path}']); """ ) - bf_io_bigquery.start_query_with_client( + bf_io_bigquery.start_query_with_job( self.bqclient, sql, job_config=bigquery.QueryJobConfig(), @@ -2348,7 +2347,6 @@ def _create_object_table(self, path: str, connection: str) -> str: location=None, project=None, timeout=None, - query_with_job=True, publisher=self._publisher, session=self, ) diff --git a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py index 780ba55c50db..77a54b7eedd8 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/__init__.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/__init__.py @@ -262,73 +262,57 @@ def publish_bq_event(event): return publish_bq_event -@overload -def start_query_with_client( +def start_query_with_job( bq_client: bigquery.Client, sql: str, *, job_config: bigquery.QueryJobConfig, - location: Optional[str], - project: Optional[str], - timeout: Optional[float], - metrics: Optional[bigframes.session.metrics.ExecutionMetrics], - query_with_job: Literal[True], - publisher: bigframes.core.events.Publisher, - session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... - - -@overload -def start_query_with_client( - bq_client: bigquery.Client, - sql: str, - *, - job_config: bigquery.QueryJobConfig, - location: Optional[str], - project: Optional[str], - timeout: Optional[float], - metrics: Optional[bigframes.session.metrics.ExecutionMetrics], - query_with_job: Literal[False], - publisher: bigframes.core.events.Publisher, - session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... - - -@overload -def start_query_with_client( - bq_client: bigquery.Client, - sql: str, - *, - job_config: bigquery.QueryJobConfig, - location: Optional[str], - project: Optional[str], - timeout: Optional[float], - metrics: Optional[bigframes.session.metrics.ExecutionMetrics], - query_with_job: Literal[True], - job_retry: google.api_core.retry.Retry, + location: Optional[str] = None, + project: Optional[str] = None, + timeout: Optional[float] = None, + metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + # TODO(tswast): We can stop providing our own default once we use a + # google-cloud-bigquery version with + # https://github.com/googleapis/python-bigquery/pull/2256 merged, likely + # version 3.36.0 or later. + job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... +) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: + """ + Starts query job and waits for results. + """ + # Note: Ensure no additional labels are added to job_config after this + # point, as `add_and_trim_labels` ensures the label count does not + # exceed MAX_LABELS_COUNT. + add_and_trim_labels(job_config, session=session) + try: + query_job = bq_client.query( + sql, + job_config=job_config, + location=location, + project=project, + timeout=timeout, + job_retry=job_retry, + ) + except google.api_core.exceptions.Forbidden as ex: + if "Drive credentials" in ex.message: + ex.message += CHECK_DRIVE_PERMISSIONS + raise -@overload -def start_query_with_client( - bq_client: bigquery.Client, - sql: str, - *, - job_config: bigquery.QueryJobConfig, - location: Optional[str], - project: Optional[str], - timeout: Optional[float], - metrics: Optional[bigframes.session.metrics.ExecutionMetrics], - query_with_job: Literal[False], - job_retry: google.api_core.retry.Retry, - publisher: bigframes.core.events.Publisher, - session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... + results_iterator = query_job.result() + _publish_events( + query_job=query_job, + total_rows=results_iterator.total_rows, + sql=sql, + publisher=publisher, + metrics=metrics, + ) + return results_iterator, query_job -def start_query_with_client( +def start_query_job_optional( bq_client: bigquery.Client, sql: str, *, @@ -337,7 +321,6 @@ def start_query_with_client( project: Optional[str] = None, timeout: Optional[float] = None, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, - query_with_job: bool = True, # TODO(tswast): We can stop providing our own default once we use a # google-cloud-bigquery version with # https://github.com/googleapis/python-bigquery/pull/2256 merged, likely @@ -345,43 +328,34 @@ def start_query_with_client( job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY, publisher: bigframes.core.events.Publisher, session=None, -) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: - """ - Starts query job and waits for results. - """ - # Note: Ensure no additional labels are added to job_config after this - # point, as `add_and_trim_labels` ensures the label count does not - # exceed MAX_LABELS_COUNT. +) -> google.cloud.bigquery.table.RowIterator: add_and_trim_labels(job_config, session=session) - try: - if not query_with_job: - results_iterator = bq_client._query_and_wait_bigframes( - sql, - job_config=job_config, - location=location, - project=project, - api_timeout=timeout, - job_retry=job_retry, - callback=create_bq_event_callback(publisher), - ) - if metrics is not None: - metrics.count_job_stats(row_iterator=results_iterator) - return results_iterator, None - - query_job = bq_client.query( + results_iterator = bq_client._query_and_wait_bigframes( sql, job_config=job_config, location=location, project=project, - timeout=timeout, + api_timeout=timeout, job_retry=job_retry, + callback=create_bq_event_callback(publisher), ) + if metrics is not None: + metrics.count_job_stats(row_iterator=results_iterator) + return results_iterator except google.api_core.exceptions.Forbidden as ex: if "Drive credentials" in ex.message: ex.message += CHECK_DRIVE_PERMISSIONS raise + +def _publish_events( + query_job: bigquery.QueryJob, + sql: str, + total_rows: Optional[int], + publisher: bigframes.core.events.Publisher, + metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, +): if not query_job.configuration.dry_run: publisher.publish( bigframes.core.events.BigQuerySentEvent( @@ -392,7 +366,6 @@ def start_query_with_client( request_id=None, ) ) - results_iterator = query_job.result() if not query_job.configuration.dry_run: publisher.publish( bigframes.core.events.BigQueryFinishedEvent( @@ -400,7 +373,7 @@ def start_query_with_client( location=query_job.location, job_id=query_job.job_id, destination=query_job.destination, - total_rows=results_iterator.total_rows, + total_rows=total_rows, total_bytes_processed=query_job.total_bytes_processed, slot_millis=query_job.slot_millis, created=query_job.created, @@ -411,7 +384,6 @@ def start_query_with_client( if metrics is not None: metrics.count_job_stats(query_job=query_job) - return results_iterator, query_job def delete_tables_matching_session_id( @@ -473,7 +445,7 @@ def create_bq_dataset_reference( """ job_config = google.cloud.bigquery.QueryJobConfig() - _, query_job = start_query_with_client( + _, query_job = start_query_with_job( bq_client, "SELECT 1", location=location, @@ -481,7 +453,6 @@ def create_bq_dataset_reference( project=project, timeout=None, metrics=None, - query_with_job=True, publisher=publisher, ) diff --git a/packages/bigframes/bigframes/session/_io/bigquery/read_gbq_table.py b/packages/bigframes/bigframes/session/_io/bigquery/read_gbq_table.py index 39c5dedab626..faaf0f01912a 100644 --- a/packages/bigframes/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/packages/bigframes/bigframes/session/_io/bigquery/read_gbq_table.py @@ -193,7 +193,7 @@ def is_time_travel_eligible( ) try: # If this succeeds, we know that time travel will for sure work. - bigframes.session._io.bigquery.start_query_with_client( + bigframes.session._io.bigquery.start_query_job_optional( bq_client=bqclient, sql=snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True), @@ -201,7 +201,6 @@ def is_time_travel_eligible( project=None, timeout=None, metrics=None, - query_with_job=False, publisher=publisher, ) return True @@ -266,7 +265,7 @@ def check_if_index_columns_are_unique( index_cols, table.get_table_ref() ) job_config = bigquery.QueryJobConfig() - results, _ = bigframes.session._io.bigquery.start_query_with_client( + results = bigframes.session._io.bigquery.start_query_job_optional( bq_client=bqclient, sql=is_unique_sql, job_config=job_config, @@ -274,7 +273,6 @@ def check_if_index_columns_are_unique( location=None, project=None, metrics=None, - query_with_job=False, publisher=publisher, ) row = next(iter(results)) diff --git a/packages/bigframes/bigframes/session/bigquery_session.py b/packages/bigframes/bigframes/session/bigquery_session.py index 6fe34f53c3a9..e8969326f31e 100644 --- a/packages/bigframes/bigframes/session/bigquery_session.py +++ b/packages/bigframes/bigframes/session/bigquery_session.py @@ -95,7 +95,7 @@ def create_temp_table( ddl = f"CREATE TEMP TABLE `_SESSION`.{sg_sql.to_sql(sg_sql.identifier(table_ref.table_id))} ({fields_string}){cluster_string}" - _, job = bfbqio.start_query_with_client( + _, job = bfbqio.start_query_with_job( self.bqclient, ddl, job_config=job_config, @@ -103,7 +103,6 @@ def create_temp_table( project=None, timeout=None, metrics=None, - query_with_job=True, publisher=self._publisher, ) job.result() @@ -117,7 +116,7 @@ def close(self): self._sessiondaemon.stop() if self._session_id is not None and self.bqclient is not None: - bfbqio.start_query_with_client( + bfbqio.start_query_job_optional( self.bqclient, f"CALL BQ.ABORT_SESSION('{self._session_id}')", job_config=bigquery.QueryJobConfig(), @@ -125,7 +124,6 @@ def close(self): project=None, timeout=None, metrics=None, - query_with_job=False, publisher=self._publisher, ) @@ -137,7 +135,7 @@ def _get_session_id(self) -> str: job_config = bigquery.QueryJobConfig(create_session=True) # Make sure the session is a new one, not one associated with another query. job_config.use_query_cache = False - _, query_job = bfbqio.start_query_with_client( + _, query_job = bfbqio.start_query_with_job( self.bqclient, "SELECT 1", job_config=job_config, @@ -145,7 +143,6 @@ def _get_session_id(self) -> str: project=None, timeout=None, metrics=None, - query_with_job=True, publisher=self._publisher, ) query_job.result() # blocks until finished @@ -169,7 +166,7 @@ def _keep_session_alive(self): ] ) try: - bfbqio.start_query_with_client( + bfbqio.start_query_job_optional( self.bqclient, "SELECT 1", job_config=job_config, @@ -177,7 +174,6 @@ def _keep_session_alive(self): project=None, timeout=KEEPALIVE_QUERY_TIMEOUT_SECONDS, metrics=None, - query_with_job=False, publisher=self._publisher, ) except Exception as e: diff --git a/packages/bigframes/bigframes/session/bq_caching_executor.py b/packages/bigframes/bigframes/session/bq_caching_executor.py index 84a82ba4fcc0..4a01df693a8f 100644 --- a/packages/bigframes/bigframes/session/bq_caching_executor.py +++ b/packages/bigframes/bigframes/session/bq_caching_executor.py @@ -15,13 +15,12 @@ from __future__ import annotations import concurrent.futures +import dataclasses import math import threading from typing import Literal, Mapping, Optional, Sequence, Tuple import google.api_core.exceptions -import google.cloud.bigquery.job as bq_job -import google.cloud.bigquery.table as bq_table import google.cloud.bigquery_storage_v1 from google.cloud import bigquery @@ -30,8 +29,8 @@ import bigframes.core import bigframes.core.events import bigframes.core.guid -import bigframes.core.identifiers import bigframes.core.nodes as nodes +import bigframes.core.ordering import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties import bigframes.dtypes @@ -41,11 +40,11 @@ import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temporary_storage -from bigframes import exceptions as bfe -from bigframes.core import bq_data, compile, local_data, rewrite +from bigframes.core import bq_data, compile, guid, identifiers, local_data, rewrite from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir from bigframes.session import ( + direct_gbq_execution, executor, loader, local_scan_executor, @@ -91,10 +90,9 @@ def __init__( ) self.metrics = metrics self.loader = loader - self.bqstoragereadclient = bqstoragereadclient self._enable_polars_execution = enable_polars_execution self._publisher = publisher - self._labels = labels + self._compiler_name = compiler_name # TODO(tswast): Send events from semi-executors, too. self._semi_executors: Sequence[semi_executor.SemiExecutor] = ( @@ -112,7 +110,14 @@ def __init__( polars_executor.PolarsExecutor(), ) self._upload_lock = threading.Lock() - self._compiler_name = compiler_name + self._gbq_executor = direct_gbq_execution.DirectGbqExecutor( + bqclient, + compiler=compiler_name, + bqstoragereadclient=bqstoragereadclient, + metrics=self.metrics, + publisher=self._publisher, + labels=dict(labels), + ) def to_sql( self, @@ -141,59 +146,111 @@ def execute( execution_spec: ex_spec.ExecutionSpec, ) -> executor.ExecuteResult: self._publisher.publish(bigframes.core.events.ExecutionStarted()) + maybe_result = self._try_execute_semi_executors(array_value, execution_spec) + if maybe_result is not None: + return maybe_result + result = self._execute_bigquery(array_value, execution_spec) + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=result, + ) + ) + return result - # TODO: Support export jobs in combination with semi executors - if execution_spec.destination_spec is None: - plan = self.prepare_plan(array_value.node, target="simplify") - for exec in self._semi_executors: - maybe_result = exec.execute( - plan, ordered=execution_spec.ordered, peek=execution_spec.peek - ) - if maybe_result: - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=maybe_result, - ) + def _try_execute_semi_executors( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + ) -> Optional[executor.ExecuteResult]: + plan = self.prepare_plan(array_value.node, target="simplify") + for exec in self._semi_executors: + maybe_result = exec.execute(plan, execution_spec) + if maybe_result: + self._publisher.publish( + bigframes.core.events.ExecutionFinished( + result=maybe_result, ) - return maybe_result - - if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec): - if execution_spec.peek or execution_spec.ordered: - raise NotImplementedError( - "Ordering and peeking not supported for gbq export" ) - # separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml - result = self._export_gbq( - array_value, - execution_spec.destination_spec, - extra_labels=execution_spec.labels, + return maybe_result + return None + + def _execute_bigquery( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + ) -> executor.ExecuteResult: + dest_spec = execution_spec.destination_spec + # Recursive handlers for different cases, maybe extract to explicit interface. + if isinstance(dest_spec, ex_spec.GcsOutputSpec): + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.EphemeralTableSpec() ) - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=result, + results = self._execute_bigquery(array_value, execution_spec) + self._export_result_gcs(results, dest_spec) + return results + elif isinstance(dest_spec, ex_spec.TableOutputSpec): + return self._execute_gbq_table_export(array_value, execution_spec) + # Force table creation if result might be large (and user explicitly allowed large results) + elif isinstance(dest_spec, ex_spec.EphemeralTableSpec) or (dest_spec is None): + if not execution_spec.promise_under_10gb: + table = self.storage_manager.create_temp_table( + array_value.schema.to_bigquery() ) + execution_spec = dataclasses.replace( + execution_spec, + destination_spec=ex_spec.TableOutputSpec( + table=table, if_exists="append" + ), + ) + # We don't use _execute_gbq_table_export, as this result is internal, not exported. + return self._execute_gbq_query_only(array_value, execution_spec) + # At this point, dst should be unspecified, a specific bq table, or an ephemeral temp table that fits in <10gb + return self._execute_gbq_query_only(array_value, execution_spec) + + def _execute_gbq_table_export( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + ) -> executor.ExecuteResult: + dest_spec = execution_spec.destination_spec + assert isinstance(dest_spec, ex_spec.TableOutputSpec) + existing_table = self._maybe_find_existing_table(dest_spec) + if (existing_table is not None) and _is_schema_match( + existing_table.schema, array_value.schema + ): + # Special DML path - maybe this should be configurable, dml vs query destination has tradeoffs + execution_spec = dataclasses.replace( + execution_spec, destination_spec=ex_spec.EphemeralTableSpec() ) - return result + results = self._execute_bigquery(array_value, execution_spec) + assert isinstance(results, executor.BQTableExecuteResult) + self._export_gbq_with_dml(results, dest_spec) + result: executor.ExecuteResult = results + else: + result = self._execute_gbq_query_only(array_value, execution_spec) - result = self._execute_plan_gbq( - array_value.node, - ordered=execution_spec.ordered, - peek=execution_spec.peek, - cache_spec=execution_spec.destination_spec - if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec) - else None, - must_create_table=not execution_spec.promise_under_10gb, - extra_labels=execution_spec.labels, + has_special_dtype_col = any( + t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE) + for t in array_value.schema.dtypes ) - # post steps: export - if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec): - self._export_result_gcs(result, execution_spec.destination_spec) + if dest_spec.if_exists != "append" and has_special_dtype_col: + table = self.bqclient.get_table(dest_spec.table) + table.schema = array_value.schema.to_bigquery() + self.bqclient.update_table(table, ["schema"]) - self._publisher.publish( - bigframes.core.events.ExecutionFinished( - result=result, + return result + + def _execute_gbq_query_only( + self, + array_value: bigframes.core.ArrayValue, + execution_spec: ex_spec.ExecutionSpec, + ) -> executor.ExecuteResult: + gbq_plan = self.prepare_plan(array_value.node, target="bq_execution") + result = self._gbq_executor.execute(gbq_plan, execution_spec) + if result is None: + raise ValueError( + f"Couldn't execute plan {array_value.node} with {execution_spec}" ) - ) return result def _export_result_gcs( @@ -209,7 +266,7 @@ def _export_result_gcs( format=gcs_export_spec.format, export_options=dict(gcs_export_spec.export_options), ) - bq_io.start_query_with_client( + bq_io.start_query_with_job( self.bqclient, export_data_statement, job_config=bigquery.QueryJobConfig(), @@ -217,105 +274,38 @@ def _export_result_gcs( project=None, location=None, timeout=None, - query_with_job=True, publisher=self._publisher, ) - def _maybe_find_existing_table( - self, spec: ex_spec.TableOutputSpec - ) -> Optional[bigquery.Table]: - # validate destination table - try: - table = self.bqclient.get_table(spec.table) - if spec.if_exists == "fail": - raise ValueError(f"Table already exists: {spec.table.__str__()}") - - if len(spec.cluster_cols) != 0: - if (table.clustering_fields is None) or ( - tuple(table.clustering_fields) != spec.cluster_cols - ): - raise ValueError( - "Table clustering fields cannot be changed after the table has " - f"been created. Requested clustering fields: {spec.cluster_cols}, existing clustering fields: {table.clustering_fields}" - ) - return table - except google.api_core.exceptions.NotFound: - return None - - def _export_gbq( - self, - array_value: bigframes.core.ArrayValue, - spec: ex_spec.TableOutputSpec, - extra_labels: tuple[tuple[str, str], ...] = (), - ) -> executor.ExecuteResult: + def _export_gbq_with_dml( + self, result: executor.BQTableExecuteResult, spec: ex_spec.TableOutputSpec + ): """ - Export the ArrayValue to an existing BigQuery table. + Export the ArrayValue to an existing BigQuery table, using DML. """ - plan = self.prepare_plan(array_value.node, target="bq_execution") - - # validate destination table - existing_table = self._maybe_find_existing_table(spec) - - compiled = compile.compile_sql( - compile.CompileRequest(plan, sort_rows=False), - compiler_name=self._compiler_name, - ) - sql = compiled.sql - - if (existing_table is not None) and _is_schema_match( - existing_table.schema, array_value.schema - ): - # b/409086472: Uses DML for table appends and replacements to avoid - # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: - # https://cloud.google.com/bigquery/quotas#standard_tables - job_config = bigquery.QueryJobConfig() - - ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) - if spec.if_exists == "append": - sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) - else: # for "replace" - assert spec.if_exists == "replace" - sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) - else: - dispositions = { - "fail": bigquery.WriteDisposition.WRITE_EMPTY, - "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, - "append": bigquery.WriteDisposition.WRITE_APPEND, - } - job_config = bigquery.QueryJobConfig( - write_disposition=dispositions[spec.if_exists], - destination=spec.table, - clustering_fields=spec.cluster_cols if spec.cluster_cols else None, - ) - - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - iterator, job = self._run_execute_query( - sql=sql, - job_config=job_config, - session=array_value.session, - extra_labels=extra_labels, - ) - - has_special_dtype_col = any( - t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE) - for t in array_value.schema.dtypes + # b/409086472: Uses DML for table appends and replacements to avoid + # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: + # https://cloud.google.com/bigquery/quotas#standard_tables + assert result.query_job is not None + assert result.query_job.destination is not None + ir = sqlglot_ir.SQLGlotIR.from_table( + result.query_job.destination.project, + result.query_job.destination.dataset_id, + result.query_job.destination.table_id, ) - - if spec.if_exists != "append" and has_special_dtype_col: - # Only update schema if this is not modifying an existing table, and the - # new table contains special columns (like timedelta or obj_ref). - table = self.bqclient.get_table(spec.table) - table.schema = array_value.schema.to_bigquery() - self.bqclient.update_table(table, ["schema"]) - - return executor.EmptyExecuteResult( - bf_schema=array_value.schema, - execution_metadata=executor.ExecutionMetadata.from_iterator_and_job( - iterator, job - ), + sql = "" + if spec.if_exists == "append": + sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) + else: # for "replace" + assert spec.if_exists == "replace" + sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) + + bq_io.start_query_with_job( + self.bqclient, + sql, + job_config=bigquery.QueryJobConfig(), + metrics=self.metrics, + publisher=self._publisher, ) def dry_run( @@ -358,66 +348,42 @@ def cached( array_value, cluster_cols=config.optimize_for.columns ) - # Helpers - def _run_execute_query( - self, - sql: str, - job_config: Optional[bq_job.QueryJobConfig] = None, - query_with_job: bool = True, - session=None, - extra_labels: tuple[tuple[str, str], ...] = (), - ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: - """ - Starts BigQuery query job and waits for results. - """ - job_config = bq_job.QueryJobConfig() if job_config is None else job_config - if bigframes.options.compute.maximum_bytes_billed is not None: - job_config.maximum_bytes_billed = ( - bigframes.options.compute.maximum_bytes_billed + def _execute_to_cached_table( + self, plan: nodes.BigFrameNode, cache_spec: ex_spec.CacheSpec + ) -> executor.ExecuteResult: + # "ephemeral" temp tables created in the course of exeuction, don't need to be allocated + # materialized ordering only really makes sense for internal temp tables used by caching + cluster_cols = cache_spec.cluster_cols + # Rewrite plan to materialize ordering as extra columns + if cache_spec.ordering == "offsets_col": + order_col_id = guid.generate_guid() + plan = nodes.PromoteOffsetsNode(plan, identifiers.ColumnId(order_col_id)) + cluster_cols = (order_col_id,) + ordering: bigframes.core.ordering.RowOrdering = ( + bigframes.core.ordering.TotalOrdering.from_offset_col(order_col_id) ) + elif cache_spec.ordering == "order_key": + plan, ordering = rewrite.pull_out_order(plan) + destination_table = self.storage_manager.create_temp_table( + plan.schema.to_bigquery(), cluster_cols + ) + arr_value = bigframes.core.ArrayValue(plan) + execution_spec = ex_spec.ExecutionSpec( + destination_spec=ex_spec.TableOutputSpec( + table=destination_table, + cluster_cols=cluster_cols, + if_exists="replace", + ) + ) + # We don't use _execute_gbq_table_export, as this result is internal, not exported. + result = self._execute_gbq_query_only(arr_value, execution_spec) + assert isinstance(result, executor.BQTableExecuteResult), ( + "expected result to be BQTableExecuteResult" + ) + result._data = dataclasses.replace(result._data, ordering=ordering) + return result - if self._labels: - job_config.labels.update(self._labels) - if extra_labels: - job_config.labels.update(extra_labels) - - try: - # Trick the type checker into thinking we got a literal. - if query_with_job: - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config, - metrics=self.metrics, - project=None, - location=None, - timeout=None, - query_with_job=True, - publisher=self._publisher, - session=session, - ) - else: - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config, - metrics=self.metrics, - project=None, - location=None, - timeout=None, - query_with_job=False, - publisher=self._publisher, - session=session, - ) - - except google.api_core.exceptions.BadRequest as e: - # Unfortunately, this error type does not have a separate error code or exception type - if "Resources exceeded during query execution" in e.message: - new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." - raise bfe.QueryComplexityError(new_message) from e - else: - raise - + # Helpers def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): """ Can the block be evaluated very cheaply? @@ -460,23 +426,29 @@ def _cache_with_cluster_cols( self, array_value: bigframes.core.ArrayValue, cluster_cols: Sequence[str] ): """Executes the query and uses the resulting table to rewrite future executions.""" - execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.CacheSpec(cluster_cols=tuple(cluster_cols)) - ) - self.execute( - array_value, - execution_spec=execution_spec, + cluster_cols = [ + col + for col in cluster_cols + if bigframes.dtypes.is_clusterable(array_value.schema.get_type(col)) + ] + cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] + result = self._execute_to_cached_table( + array_value.node, + ex_spec.CacheSpec(cluster_cols=tuple(cluster_cols), ordering="order_key"), ) + assert isinstance(result, executor.BQTableExecuteResult) + assert result._data.ordering is not None + self.cache.cache_results_table(array_value.node, result._data) def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" - execution_spec = ex_spec.ExecutionSpec( - destination_spec=ex_spec.CacheSpec(cluster_cols=tuple()) - ) - self.execute( - array_value, - execution_spec=execution_spec, + result = self._execute_to_cached_table( + array_value.node, + ex_spec.CacheSpec(ordering="offsets_col"), ) + assert isinstance(result, executor.BQTableExecuteResult) + assert result._data.ordering is not None + self.cache.cache_results_table(array_value.node, result._data) def _cache_with_session_awareness( self, @@ -587,130 +559,26 @@ def map_local_scans(node: nodes.BigFrameNode): return original_root.bottom_up(map_local_scans) - def _execute_plan_gbq( - self, - plan: nodes.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, - cache_spec: Optional[ex_spec.CacheSpec] = None, - must_create_table: bool = True, - extra_labels: tuple[tuple[str, str], ...] = (), - ) -> executor.ExecuteResult: - """Just execute whatever plan as is, without further caching or decomposition.""" - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - - og_plan = plan - og_schema = plan.schema - - plan = self.prepare_plan(plan, target="bq_execution") - create_table = must_create_table - cluster_cols: Sequence[str] = [] - if cache_spec is not None: - if peek is not None: - raise ValueError("peek is not compatible with caching.") - - create_table = True - if not cache_spec.cluster_cols: - offsets_id = bigframes.core.identifiers.ColumnId( - bigframes.core.guid.generate_guid() - ) - plan = nodes.PromoteOffsetsNode(plan, offsets_id) - cluster_cols = [offsets_id.sql] - else: - cluster_cols = [ - col - for col in cache_spec.cluster_cols - if bigframes.dtypes.is_clusterable(plan.schema.get_type(col)) - ] - cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] - - compiled = compile.compile_sql( - compile.CompileRequest( - plan, - sort_rows=ordered, - peek_count=peek, - materialize_all_order_keys=(cache_spec is not None), - ), - compiler_name=self._compiler_name, - ) - # might have more columns than og schema, for hidden ordering columns - compiled_schema = compiled.sql_schema - - destination_table: Optional[bigquery.TableReference] = None - - job_config = bigquery.QueryJobConfig() - if create_table: - destination_table = self.storage_manager.create_temp_table( - compiled_schema, cluster_cols - ) - job_config.destination = destination_table - - # Attach data type usage to the job labels - job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs - iterator, query_job = self._run_execute_query( - sql=compiled.sql, - job_config=job_config, - query_with_job=(destination_table is not None), - session=plan.session, - extra_labels=extra_labels, - ) - - # we could actually cache even when caching is not explicitly requested, but being conservative for now - result_bq_data = None - if query_job and query_job.destination: - # we might add extra sql columns in compilation, esp if caching w ordering, infer a bigframes type for them - result_bf_schema = _result_schema(og_schema, list(compiled.sql_schema)) - dst = query_job.destination - result_bq_data = bq_data.BigqueryDataSource( - table=bq_data.GbqNativeTable.from_ref_and_schema( - dst, - tuple(compiled_schema), - cluster_cols=tuple(cluster_cols), - location=iterator.location or self.storage_manager.location, - table_type="TABLE", - ), - schema=result_bf_schema, - ordering=compiled.row_order, - n_rows=iterator.total_rows, - ) - - if cache_spec is not None: - assert result_bq_data is not None - assert compiled.row_order is not None - self.cache.cache_results_table(og_plan, result_bq_data) - - execution_metadata = executor.ExecutionMetadata.from_iterator_and_job( - iterator, query_job - ) - result_mostly_cached = ( - hasattr(iterator, "_is_almost_completely_cached") - and iterator._is_almost_completely_cached() - ) - if result_bq_data is not None and not result_mostly_cached: - return executor.BQTableExecuteResult( - data=result_bq_data, - project_id=self.bqclient.project, - storage_client=self.bqstoragereadclient, - execution_metadata=execution_metadata, - selected_fields=tuple((col, col) for col in og_schema.names), - ) - else: - return executor.LocalExecuteResult( - data=iterator.to_arrow().select(og_schema.names), - bf_schema=plan.schema, - execution_metadata=execution_metadata, - ) - + def _maybe_find_existing_table( + self, spec: ex_spec.TableOutputSpec + ) -> Optional[bigquery.Table]: + # validate destination table + try: + table = self.bqclient.get_table(spec.table) + if spec.if_exists == "fail": + raise ValueError(f"Table already exists: {spec.table.__str__()}") -def _result_schema( - logical_schema: schemata.ArraySchema, sql_schema: list[bigquery.SchemaField] -) -> schemata.ArraySchema: - inferred_schema = bigframes.dtypes.bf_type_from_type_kind(sql_schema) - inferred_schema.update(logical_schema._mapping) - return schemata.ArraySchema( - tuple(schemata.SchemaItem(col, dtype) for col, dtype in inferred_schema.items()) - ) + if len(spec.cluster_cols) != 0: + if (table.clustering_fields is None) or ( + tuple(table.clustering_fields) != spec.cluster_cols + ): + raise ValueError( + "Table clustering fields cannot be changed after the table has " + f"been created. Requested clustering fields: {spec.cluster_cols}, existing clustering fields: {table.clustering_fields}" + ) + return table + except google.api_core.exceptions.NotFound: + return None def _is_schema_match( diff --git a/packages/bigframes/bigframes/session/direct_gbq_execution.py b/packages/bigframes/bigframes/session/direct_gbq_execution.py index 95cf4b204764..5f838b416baa 100644 --- a/packages/bigframes/bigframes/session/direct_gbq_execution.py +++ b/packages/bigframes/bigframes/session/direct_gbq_execution.py @@ -13,86 +13,179 @@ # limitations under the License. from __future__ import annotations -from typing import Literal, Optional, Tuple +from typing import Callable, Literal, Mapping, Optional, Tuple +import google.api_core.exceptions import google.cloud.bigquery.job as bq_job import google.cloud.bigquery.table as bq_table +import google.cloud.bigquery_storage_v1 from google.cloud import bigquery +import bigframes.core.compile import bigframes.core.compile.ibis_compiler.ibis_compiler as ibis_compiler import bigframes.core.compile.sqlglot.compiler as sqlglot_compiler import bigframes.core.events +import bigframes.core.schema as schemata import bigframes.session._io.bigquery as bq_io -from bigframes.core import compile, nodes -from bigframes.session import executor, semi_executor +import bigframes.session.metrics +from bigframes import exceptions as bfe +from bigframes.core import bq_data, compile, nodes +from bigframes.core.compile.configs import CompileRequest, CompileResult +from bigframes.session import execution_spec, executor, semi_executor + +_WRITE_DISPOSITIONS = { + "fail": bigquery.WriteDisposition.WRITE_EMPTY, + "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, + "append": bigquery.WriteDisposition.WRITE_APPEND, +} -# used only in testing right now, BigQueryCachingExecutor is the fully featured engine -# simplified, doesnt not do large >10 gb result queries, error handling, respect global config -# or record metrics. Also avoids caching, and most pre-compile rewrites, to better serve as a -# reference for validating more complex executors. class DirectGbqExecutor(semi_executor.SemiExecutor): def __init__( self, bqclient: bigquery.Client, - compiler: Literal["ibis", "sqlglot"] = "ibis", + bqstoragereadclient: google.cloud.bigquery_storage_v1.BigQueryReadClient, *, publisher: bigframes.core.events.Publisher, + compiler: Literal["ibis", "sqlglot"] = "ibis", + metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + labels: Mapping[str, str] = {}, ): self.bqclient = bqclient - self._compile_fn = ( - ibis_compiler.compile_sql - if compiler == "ibis" - else sqlglot_compiler.compile_sql - ) + self._compiler_name = compiler + self._bqstoragereadclient = bqstoragereadclient self._publisher = publisher + self._metrics = metrics + self._labels = labels def execute( self, plan: nodes.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + spec: execution_spec.ExecutionSpec, ) -> executor.ExecuteResult: """Just execute whatever plan as is, without further caching or decomposition.""" - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. - - compiled = self._compile_fn( - compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) + compiled = compile.compile_sql( + CompileRequest( + plan, + sort_rows=spec.ordered, + peek_count=spec.peek, + ), + compiler_name=self._compiler_name, ) + job_config = bigquery.QueryJobConfig() + dest_spec = spec.destination_spec + cluster_cols = None + can_skip_job = True + if isinstance(dest_spec, execution_spec.TableOutputSpec): + job_config.destination = dest_spec.table + job_config.write_disposition = _WRITE_DISPOSITIONS[dest_spec.if_exists] + cluster_cols = dest_spec.cluster_cols if dest_spec.cluster_cols else None + job_config.clustering_fields = cluster_cols + can_skip_job = False + elif isinstance(dest_spec, execution_spec.EphemeralTableSpec): + # Need destination table, but jobless execution might not create a destination table + can_skip_job = False + elif dest_spec is not None: + raise ValueError( + f"Direct GBQ Executor does not support destination: {dest_spec}" + ) + + job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs + if spec.labels: + job_config.labels.update(spec.labels) iterator, query_job = self._run_execute_query( sql=compiled.sql, + job_config=job_config, + query_with_job=(not can_skip_job), session=plan.session, ) + result_bq_data = None + if query_job and query_job.destination: + dst = query_job.destination + result_bq_data = bq_data.BigqueryDataSource( + table=bq_data.GbqNativeTable.from_ref_and_schema( + dst, + tuple(compiled.sql_schema), + cluster_cols=cluster_cols or (), + location=iterator.location or self.bqclient.location, + table_type="TABLE", + ), + schema=plan.schema, + ordering=compiled.row_order, + n_rows=iterator.total_rows, + ) - # just immediately downlaod everything for simplicity - return executor.LocalExecuteResult( - data=iterator.to_arrow(), - bf_schema=plan.schema, - execution_metadata=executor.ExecutionMetadata.from_iterator_and_job( - iterator, query_job - ), + execution_metadata = executor.ExecutionMetadata.from_iterator_and_job( + iterator, query_job + ) + result_mostly_cached = ( + hasattr(iterator, "_is_almost_completely_cached") + and iterator._is_almost_completely_cached() ) + if (isinstance(dest_spec, execution_spec.EphemeralTableSpec)) or ( + (result_bq_data is not None) and not result_mostly_cached + ): + assert result_bq_data is not None, "expected result table but none exists" + return executor.BQTableExecuteResult( + data=result_bq_data, + project_id=self.bqclient.project, + storage_client=self._bqstoragereadclient, + execution_metadata=execution_metadata, + selected_fields=tuple((col, col) for col in plan.schema.names), + ) + else: + return executor.LocalExecuteResult( + data=iterator.to_arrow().select(plan.schema.names), + bf_schema=plan.schema, + execution_metadata=execution_metadata, + ) + def _run_execute_query( self, sql: str, - job_config: Optional[bq_job.QueryJobConfig] = None, - session=None, + job_config: bq_job.QueryJobConfig, + query_with_job: bool, + session, ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts BigQuery query job and waits for results. """ - return bq_io.start_query_with_client( - self.bqclient, - sql, - job_config=job_config or bq_job.QueryJobConfig(), - project=None, - location=None, - timeout=None, - metrics=None, - query_with_job=False, - publisher=self._publisher, - session=session, - ) + if bigframes.options.compute.maximum_bytes_billed is not None: + job_config.maximum_bytes_billed = ( + bigframes.options.compute.maximum_bytes_billed + ) + + if self._labels: + job_config.labels.update(self._labels) + + try: + if query_with_job: + return bq_io.start_query_with_job( + self.bqclient, + sql, + job_config=job_config, + metrics=self._metrics, + publisher=self._publisher, + session=session, + ) + else: + return ( + bq_io.start_query_job_optional( + self.bqclient, + sql, + job_config=job_config, + metrics=self._metrics, + publisher=self._publisher, + session=session, + ), + None, + ) + except google.api_core.exceptions.BadRequest as e: + # Unfortunately, this error type does not have a separate error code or exception type + if "Resources exceeded during query execution" in e.message: + new_message = "Computation is too complex to execute as a single query. Try using DataFrame.cache() on intermediate results, or setting bigframes.options.compute.enable_multi_query_execution." + raise bfe.QueryComplexityError(new_message) from e + else: + raise diff --git a/packages/bigframes/bigframes/session/execution_spec.py b/packages/bigframes/bigframes/session/execution_spec.py index 7ba4666a3616..764c3a042efe 100644 --- a/packages/bigframes/bigframes/session/execution_spec.py +++ b/packages/bigframes/bigframes/session/execution_spec.py @@ -22,11 +22,16 @@ @dataclasses.dataclass(frozen=True) class ExecutionSpec: - destination_spec: Union[TableOutputSpec, GcsOutputSpec, CacheSpec, None] = None + # destination for the result of the operation. Executor may also incidentally create other temporary tables for its own purposes. + destination_spec: Union[ + TableOutputSpec, GcsOutputSpec, EphemeralTableSpec, None + ] = None + # If set, the result will be truncated to the given number of rows. Which N rows is + # implementation dependent and not stable. peek: Optional[int] = None - ordered: bool = ( - False # ordered and promise_under_10gb must both be together for bq execution - ) + # Controls whether output iterator is ordered. Cannot be true if destination is not + # guaranteed to be ordered. + ordered: bool = False # This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur promise_under_10gb: bool = False @@ -36,17 +41,49 @@ def add_labels(self, labels: Mapping[str, str]) -> ExecutionSpec: return dataclasses.replace(self, labels=self.labels + tuple(labels.items())) -# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes -# that will be cached only when a super-tree is executed +# Used internally by execution +@dataclasses.dataclass(frozen=True) +class EphemeralTableSpec: + """ + Specifies that the result of an operation should be a temporary table of some sort. + + No guarantees on lifetime, may be a session temp table, or a bq-created temp table with <24hr life. + + Used internally when results need temporary staging, because they are large (>10GB), or needed in subsequent operations. + """ + + pass + + @dataclasses.dataclass(frozen=True) class CacheSpec: - cluster_cols: tuple[str, ...] + """ + Specifies that the result of an operation should be a session temp table. + The table will be automatically deleted after the session ends. + """ + + cluster_cols: tuple[ + str, ... + ] = () # if empty, will cluster using order key if ordering_key is set + # Controls ordering and whether extra columns are materialized to preserve ordering + # Any extra columns will be appended to the end of the schema. + # None: ordering may be discarded entirely (ordering metadata will still be provided if ordering is derivable from materialized columns) + # order_rows: the result iterator itself will be ordered. For gbq execution, result cannot exceed 10GB. + # order_key: the result set ordered by a key, may materialize extra columns. + # offsets_col: order the result set by an offsets column, materializes one extra column. + ordering: Literal["order_rows", "offsets_col", "order_key"] | None = None @dataclasses.dataclass(frozen=True) class TableOutputSpec: + """ + Specifies that the result of an operation should be exported to a specific named table. + + The executor is not responsible for managing lifecycle of the table. + """ + table: bigquery.TableReference - cluster_cols: tuple[str, ...] + cluster_cols: tuple[str, ...] = () if_exists: Literal["fail", "replace", "append"] = "fail" diff --git a/packages/bigframes/bigframes/session/loader.py b/packages/bigframes/bigframes/session/loader.py index c671bbbc6d3d..43f45a500f00 100644 --- a/packages/bigframes/bigframes/session/loader.py +++ b/packages/bigframes/bigframes/session/loader.py @@ -1442,7 +1442,7 @@ def _prepare_job_config( job_config = bigquery.QueryJobConfig() if job_config is None else job_config if bigframes.options.compute.maximum_bytes_billed is not None: - # Maybe this should be pushed down into start_query_with_client + # Maybe this should be pushed down into start_query_with_job job_config.maximum_bytes_billed = ( bigframes.options.compute.maximum_bytes_billed ) @@ -1462,7 +1462,7 @@ def _start_query_with_job_optional( Do not execute dataframe through this API, instead use the executor. """ job_config = self._prepare_job_config(job_config) - rows, _ = bf_io_bigquery.start_query_with_client( + rows = bf_io_bigquery.start_query_job_optional( self._bqclient, sql, job_config=job_config, @@ -1470,7 +1470,6 @@ def _start_query_with_job_optional( location=None, project=None, metrics=None, - query_with_job=False, publisher=self._publisher, session=self._session, ) @@ -1489,7 +1488,7 @@ def _start_query_with_job( Do not execute dataframe through this API, instead use the executor. """ job_config = self._prepare_job_config(job_config) - _, query_job = bf_io_bigquery.start_query_with_client( + _, query_job = bf_io_bigquery.start_query_with_job( self._bqclient, sql, job_config=job_config, @@ -1497,7 +1496,6 @@ def _start_query_with_job( location=None, project=None, metrics=None, - query_with_job=True, publisher=self._publisher, session=self._session, ) diff --git a/packages/bigframes/bigframes/session/local_scan_executor.py b/packages/bigframes/bigframes/session/local_scan_executor.py index fee0f557ea76..32a9aaa30d7e 100644 --- a/packages/bigframes/bigframes/session/local_scan_executor.py +++ b/packages/bigframes/bigframes/session/local_scan_executor.py @@ -16,7 +16,7 @@ from typing import Optional from bigframes.core import bigframe_node, rewrite -from bigframes.session import executor, semi_executor +from bigframes.session import execution_spec, executor, semi_executor class LocalScanExecutor(semi_executor.SemiExecutor): @@ -27,15 +27,17 @@ class LocalScanExecutor(semi_executor.SemiExecutor): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: + if execution_spec.destination_spec is not None: + return None + reduced_result = rewrite.try_reduce_to_local_scan(plan) if not reduced_result: return None node, limit = reduced_result - + peek = execution_spec.peek if limit is not None: if peek is None or limit < peek: peek = limit diff --git a/packages/bigframes/bigframes/session/polars_executor.py b/packages/bigframes/bigframes/session/polars_executor.py index 06c7fcb925c4..996d7d365ea3 100644 --- a/packages/bigframes/bigframes/session/polars_executor.py +++ b/packages/bigframes/bigframes/session/polars_executor.py @@ -34,7 +34,7 @@ numeric_ops, string_ops, ) -from bigframes.session import executor, semi_executor +from bigframes.session import execution_spec, executor, semi_executor if TYPE_CHECKING: import polars as pl @@ -140,20 +140,20 @@ def __init__(self): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: if not self._can_execute(plan): return None - # Note: Ignoring ordered flag, as just executing totally ordered is fine. + if execution_spec.destination_spec is not None: + return None try: lazy_frame: pl.LazyFrame = self._compiler.compile( array_value.ArrayValue(plan).node ) except Exception: return None - if peek is not None: - lazy_frame = lazy_frame.limit(peek) + if execution_spec.peek is not None: + lazy_frame = lazy_frame.limit(execution_spec.peek) pa_table = lazy_frame.collect().to_arrow() return executor.LocalExecuteResult( data=pa_table, diff --git a/packages/bigframes/bigframes/session/read_api_execution.py b/packages/bigframes/bigframes/session/read_api_execution.py index 9f2d196ce8eb..01e08243da15 100644 --- a/packages/bigframes/bigframes/session/read_api_execution.py +++ b/packages/bigframes/bigframes/session/read_api_execution.py @@ -18,7 +18,7 @@ from google.cloud import bigquery_storage_v1 from bigframes.core import bigframe_node, bq_data, nodes, rewrite -from bigframes.session import executor, semi_executor +from bigframes.session import execution_spec, executor, semi_executor class ReadApiSemiExecutor(semi_executor.SemiExecutor): @@ -37,14 +37,16 @@ def __init__( def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: - adapt_result = self._try_adapt_plan(plan, ordered) + if execution_spec.destination_spec is not None: + return None + + adapt_result = self._try_adapt_plan(plan, execution_spec.ordered) if not adapt_result: return None node, limit = adapt_result - if node.explicitly_ordered and ordered: + if node.explicitly_ordered and execution_spec.ordered: return None if not isinstance(node.source.table, bq_data.GbqNativeTable): @@ -53,6 +55,7 @@ def execute( if not node.source.table.is_physically_stored: return None + peek = execution_spec.peek if limit is not None: if peek is None or limit < peek: peek = limit diff --git a/packages/bigframes/bigframes/session/semi_executor.py b/packages/bigframes/bigframes/session/semi_executor.py index c41d7c96d3e9..27f1179a2f82 100644 --- a/packages/bigframes/bigframes/session/semi_executor.py +++ b/packages/bigframes/bigframes/session/semi_executor.py @@ -15,7 +15,7 @@ from typing import Optional from bigframes.core import bigframe_node -from bigframes.session import executor +from bigframes.session import execution_spec, executor # Unstable interface, in development @@ -27,7 +27,6 @@ class SemiExecutor(abc.ABC): def execute( self, plan: bigframe_node.BigFrameNode, - ordered: bool, - peek: Optional[int] = None, + execution_spec: execution_spec.ExecutionSpec, ) -> Optional[executor.ExecuteResult]: raise NotImplementedError("execute not implemented for this executor") diff --git a/packages/bigframes/bigframes/testing/engine_utils.py b/packages/bigframes/bigframes/testing/engine_utils.py index edb68c3a9b0a..456d6b463840 100644 --- a/packages/bigframes/bigframes/testing/engine_utils.py +++ b/packages/bigframes/bigframes/testing/engine_utils.py @@ -15,7 +15,12 @@ import pandas.testing from bigframes.core import nodes -from bigframes.session import semi_executor +from bigframes.session import semi_executor, execution_spec + + +SPEC = execution_spec.ExecutionSpec( + ordered=True, +) def assert_equivalence_execution( @@ -23,8 +28,8 @@ def assert_equivalence_execution( engine1: semi_executor.SemiExecutor, engine2: semi_executor.SemiExecutor, ): - e1_result = engine1.execute(node, ordered=True) - e2_result = engine2.execute(node, ordered=True) + e1_result = engine1.execute(node, SPEC) + e2_result = engine2.execute(node, SPEC) assert e1_result is not None assert e2_result is not None # Convert to pandas, as pandas has better comparison utils than arrow diff --git a/packages/bigframes/tests/system/conftest.py b/packages/bigframes/tests/system/conftest.py index 0742fbfeac17..8c6de23c9bd3 100644 --- a/packages/bigframes/tests/system/conftest.py +++ b/packages/bigframes/tests/system/conftest.py @@ -29,6 +29,7 @@ import google.cloud.bigquery_connection_v1 as bigquery_connection_v1 import google.cloud.exceptions import google.cloud.functions_v2 as functions_v2 +import google.cloud.bigquery_storage_v1 import google.cloud.resourcemanager_v3 as resourcemanager_v3 import google.cloud.storage as storage # type: ignore import numpy as np @@ -114,6 +115,13 @@ def bigquery_client(session: bigframes.Session) -> bigquery.Client: return session.bqclient +@pytest.fixture(scope="session") +def bigquery_storage_read_client( + session: bigframes.Session, +) -> google.cloud.bigquery_storage_v1.BigQueryReadClient: + return session.bqstoragereadclient + + @pytest.fixture(scope="session") def bigquery_client_tokyo(session_tokyo: bigframes.Session) -> bigquery.Client: return session_tokyo.bqclient diff --git a/packages/bigframes/tests/system/small/engines/conftest.py b/packages/bigframes/tests/system/small/engines/conftest.py index cea505dd28a6..002781caefb7 100644 --- a/packages/bigframes/tests/system/small/engines/conftest.py +++ b/packages/bigframes/tests/system/small/engines/conftest.py @@ -14,6 +14,7 @@ import pathlib from typing import Generator +import google.cloud.bigquery_storage_v1 import pandas as pd import pytest from google.cloud import bigquery @@ -44,21 +45,45 @@ def fake_session() -> Generator[bigframes.Session, None, None]: yield session +@pytest.fixture(scope="session") +def sqlglot_engine( + bigquery_client: bigquery.Client, + bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient, +): + return direct_gbq_execution.DirectGbqExecutor( + bigquery_client, + bqstoragereadclient=bigquery_storage_read_client, + compiler="sqlglot", + publisher=events.Publisher(), + ) + + +@pytest.fixture(scope="session") +def bq_engine( + bigquery_client: bigquery.Client, + bigquery_storage_read_client: google.cloud.bigquery_storage_v1.BigQueryReadClient, +): + return direct_gbq_execution.DirectGbqExecutor( + bigquery_client, + bqstoragereadclient=bigquery_storage_read_client, + publisher=events.Publisher(), + ) + + @pytest.fixture(scope="session", params=["pyarrow", "polars", "bq", "bq-sqlglot"]) -def engine(request, bigquery_client: bigquery.Client) -> semi_executor.SemiExecutor: +def engine( + request, + sqlglot_engine, + bq_engine, +) -> semi_executor.SemiExecutor: if request.param == "pyarrow": return local_scan_executor.LocalScanExecutor() if request.param == "polars": return polars_executor.PolarsExecutor() - publisher = events.Publisher() if request.param == "bq": - return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, publisher=publisher - ) + return bq_engine if request.param == "bq-sqlglot": - return direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot", publisher=publisher - ) + return sqlglot_engine raise ValueError(f"Unrecognized param: {request.param}") diff --git a/packages/bigframes/tests/system/small/engines/test_aggregation.py b/packages/bigframes/tests/system/small/engines/test_aggregation.py index b95781d21b14..e6e4ac571578 100644 --- a/packages/bigframes/tests/system/small/engines/test_aggregation.py +++ b/packages/bigframes/tests/system/small/engines/test_aggregation.py @@ -128,19 +128,14 @@ def test_engines_unary_variance_aggregates( def test_sql_engines_median_op_aggregates( scalars_array_value: array_value.ArrayValue, bigquery_client: bigquery.Client, + bq_engine, + sqlglot_engine, ): node = apply_agg_to_all_valid( scalars_array_value, agg_ops.MedianOp(), ).node - publisher = events.Publisher() - left_engine = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, publisher=publisher - ) - right_engine = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot", publisher=publisher - ) - assert_equivalence_execution(node, left_engine, right_engine) + assert_equivalence_execution(node, bq_engine, sqlglot_engine) @pytest.mark.parametrize("engine", ["polars", "bq", "bq-sqlglot"], indirect=True) diff --git a/packages/bigframes/tests/system/small/engines/test_sorting.py b/packages/bigframes/tests/system/small/engines/test_sorting.py index 920282bc9319..bcab19f324dd 100644 --- a/packages/bigframes/tests/system/small/engines/test_sorting.py +++ b/packages/bigframes/tests/system/small/engines/test_sorting.py @@ -16,7 +16,7 @@ import bigframes.operations as bf_ops from bigframes.core import array_value, nodes, ordering -from bigframes.session import polars_executor +from bigframes.session import polars_executor, execution_spec from bigframes.testing.engine_utils import assert_equivalence_execution pytest.importorskip("polars") @@ -96,7 +96,7 @@ def test_polars_engines_skips_unrecognized_order_expr( ), ) node = nodes.OrderByNode(node, ORDER_EXPRESSIONS) - assert engine.execute(node, ordered=True) is None + assert engine.execute(node, execution_spec.ExecutionSpec(ordered=True)) is None def apply_reverse(node: nodes.BigFrameNode) -> nodes.BigFrameNode: diff --git a/packages/bigframes/tests/system/small/engines/test_windowing.py b/packages/bigframes/tests/system/small/engines/test_windowing.py index c748947d997b..fcc1dad928f8 100644 --- a/packages/bigframes/tests/system/small/engines/test_windowing.py +++ b/packages/bigframes/tests/system/small/engines/test_windowing.py @@ -46,8 +46,9 @@ def test_engines_with_offsets( @pytest.mark.parametrize("agg_op", [agg_ops.sum_op, agg_ops.count_op]) def test_engines_with_rows_window( scalars_array_value: array_value.ArrayValue, - bigquery_client: bigquery.Client, agg_op, + bq_engine, + sqlglot_engine, ): window = window_spec.WindowSpec( bounds=window_spec.RowsWindowBounds.from_window_size(3, "left"), @@ -62,12 +63,4 @@ def test_engines_with_rows_window( ), window_spec=window, ) - - publisher = events.Publisher() - bq_executor = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, publisher=publisher - ) - bq_sqlgot_executor = direct_gbq_execution.DirectGbqExecutor( - bigquery_client, compiler="sqlglot", publisher=publisher - ) - assert_equivalence_execution(window_node, bq_executor, bq_sqlgot_executor) + assert_equivalence_execution(window_node, bq_engine, sqlglot_engine) diff --git a/packages/bigframes/tests/system/small/functions/test_remote_function.py b/packages/bigframes/tests/system/small/functions/test_remote_function.py index 2fc577bdf66a..eb0593e1ab6d 100644 --- a/packages/bigframes/tests/system/small/functions/test_remote_function.py +++ b/packages/bigframes/tests/system/small/functions/test_remote_function.py @@ -768,7 +768,7 @@ def test_read_gbq_function_runs_existing_udf_4_params(session): def test_read_gbq_function_runs_existing_udf_array_output(session, routine_id_unique): - bigframes.session._io.bigquery.start_query_with_client( + bigframes.session._io.bigquery.start_query_with_job( session.bqclient, textwrap.dedent( f""" @@ -784,7 +784,6 @@ def test_read_gbq_function_runs_existing_udf_array_output(session, routine_id_un project=None, timeout=None, metrics=None, - query_with_job=True, publisher=bigframes.core.events.Publisher(), ) func = session.read_gbq_function(routine_id_unique) @@ -807,7 +806,7 @@ def test_read_gbq_function_runs_existing_udf_array_output(session, routine_id_un def test_read_gbq_function_runs_existing_udf_2_params_array_output( session, routine_id_unique ): - bigframes.session._io.bigquery.start_query_with_client( + bigframes.session._io.bigquery.start_query_with_job( session.bqclient, textwrap.dedent( f""" @@ -823,7 +822,6 @@ def test_read_gbq_function_runs_existing_udf_2_params_array_output( project=None, timeout=None, metrics=None, - query_with_job=True, publisher=bigframes.core.events.Publisher(), ) func = session.read_gbq_function(routine_id_unique) @@ -848,7 +846,7 @@ def test_read_gbq_function_runs_existing_udf_2_params_array_output( def test_read_gbq_function_runs_existing_udf_4_params_array_output( session, routine_id_unique ): - bigframes.session._io.bigquery.start_query_with_client( + bigframes.session._io.bigquery.start_query_with_job( session.bqclient, textwrap.dedent( f""" @@ -864,7 +862,6 @@ def test_read_gbq_function_runs_existing_udf_4_params_array_output( project=None, timeout=None, metrics=None, - query_with_job=True, publisher=bigframes.core.events.Publisher(), ) func = session.read_gbq_function(routine_id_unique) diff --git a/packages/bigframes/tests/system/small/session/test_session_logging.py b/packages/bigframes/tests/system/small/session/test_session_logging.py index eb8ec3f49e51..4618e110687b 100644 --- a/packages/bigframes/tests/system/small/session/test_session_logging.py +++ b/packages/bigframes/tests/system/small/session/test_session_logging.py @@ -23,8 +23,8 @@ def test_data_type_logging(scalars_df_index): # We want to check the job_config passed to _query_and_wait_bigframes with mock.patch( - "bigframes.session._io.bigquery.start_query_with_client", - wraps=bq_io.start_query_with_client, + "bigframes.session._io.bigquery.start_query_job_optional", + wraps=bq_io.start_query_job_optional, ) as mock_query: s.to_pandas() diff --git a/packages/bigframes/tests/system/small/test_session.py b/packages/bigframes/tests/system/small/test_session.py index e4101d4e941b..76788da8a111 100644 --- a/packages/bigframes/tests/system/small/test_session.py +++ b/packages/bigframes/tests/system/small/test_session.py @@ -114,11 +114,10 @@ def test_read_gbq_tokyo( df.sort_index(inplace=True) expected = scalars_pandas_df_index - # use_explicit_destination=True, otherwise might use path with no query_job exec_result = session_tokyo._executor.execute( df._block.expr, bigframes.session.execution_spec.ExecutionSpec( - bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + destination_spec=bigframes.session.execution_spec.EphemeralTableSpec() ), ) assert exec_result.query_job is not None @@ -951,7 +950,7 @@ def test_read_pandas_tokyo( result = session_tokyo._executor.execute( df._block.expr, bigframes.session.execution_spec.ExecutionSpec( - bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + destination_spec=bigframes.session.execution_spec.EphemeralTableSpec() ), ) assert result.query_job is not None diff --git a/packages/bigframes/tests/unit/session/test_io_bigquery.py b/packages/bigframes/tests/unit/session/test_io_bigquery.py index 79996e185ecf..5783e2b208a1 100644 --- a/packages/bigframes/tests/unit/session/test_io_bigquery.py +++ b/packages/bigframes/tests/unit/session/test_io_bigquery.py @@ -197,7 +197,7 @@ def test_add_and_trim_labels_length_limit_met(): ("timeout", "api_name"), [(None, None), (30.0, "test_api")], ) -def test_start_query_with_client_labels_length_limit_met( +def test_start_query_with_job_labels_length_limit_met( mock_bq_client: bigquery.Client, timeout: Optional[float], api_name ): sql = "select * from abc" @@ -221,7 +221,7 @@ def test_start_query_with_client_labels_length_limit_met( for _ in range(52): df.head() - io_bq.start_query_with_client( + io_bq.start_query_with_job( mock_bq_client, sql, job_config=job_config, @@ -229,7 +229,6 @@ def test_start_query_with_client_labels_length_limit_met( project=None, timeout=timeout, metrics=None, - query_with_job=True, publisher=bigframes.core.events.Publisher(), ) diff --git a/packages/bigframes/tests/unit/session/test_local_scan_executor.py b/packages/bigframes/tests/unit/session/test_local_scan_executor.py index fc59253153b6..4059d928f9df 100644 --- a/packages/bigframes/tests/unit/session/test_local_scan_executor.py +++ b/packages/bigframes/tests/unit/session/test_local_scan_executor.py @@ -17,9 +17,13 @@ import pytest from bigframes.core import identifiers, local_data, nodes -from bigframes.session import local_scan_executor +from bigframes.session import execution_spec, local_scan_executor from bigframes.testing import mocks +SPEC = execution_spec.ExecutionSpec( + ordered=True, +) + @pytest.fixture def object_under_test(): @@ -72,7 +76,7 @@ def test_local_scan_executor_with_slice(start, stop, expected_rows, object_under stop=stop, ) - result = object_under_test.execute(plan, ordered=True) + result = object_under_test.execute(plan, SPEC) result_table = pyarrow.Table.from_batches(result.batches().arrow_batches) assert result_table.num_rows == expected_rows @@ -98,4 +102,4 @@ def test_local_scan_executor_with_slice_unsupported_inputs( stop=stop, step=step, ) - assert object_under_test.execute(plan, ordered=True) is None + assert object_under_test.execute(plan, SPEC) is None