-
Notifications
You must be signed in to change notification settings - Fork 1.7k
refactor(bigframes): Modularize compiler routing as proxy executor #16907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
68ef9a5
2374b90
cd64929
789bd5a
b3dbbb3
022ba4f
df42d07
ee240cd
d6c534c
f8bbdee
562fa56
2f24b23
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,15 +17,12 @@ | |
| import concurrent.futures | ||
| import math | ||
| import threading | ||
| import uuid | ||
| import warnings | ||
| from typing import Literal, Mapping, Optional, Sequence, Tuple | ||
|
|
||
| import google.api_core.exceptions | ||
| import google.cloud.bigquery.job as bq_job | ||
| import google.cloud.bigquery.table as bq_table | ||
| import google.cloud.bigquery_storage_v1 | ||
| import google.cloud.exceptions | ||
| from google.cloud import bigquery | ||
|
|
||
| import bigframes | ||
|
|
@@ -83,11 +80,15 @@ def __init__( | |
| metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, | ||
| enable_polars_execution: bool = False, | ||
| publisher: bigframes.core.events.Publisher, | ||
| labels: Mapping[str, str] = {}, | ||
| labels: tuple[tuple[str, str], ...] = (), | ||
| compiler_name: Literal["ibis", "sqlglot"] = "ibis", | ||
| cache: Optional[execution_cache.ExecutionCache] = None, | ||
| ): | ||
| self.bqclient = bqclient | ||
| self.storage_manager = storage_manager | ||
| self.cache: execution_cache.ExecutionCache = execution_cache.ExecutionCache() | ||
| self.cache: execution_cache.ExecutionCache = ( | ||
| cache or execution_cache.ExecutionCache() | ||
| ) | ||
| self.metrics = metrics | ||
| self.loader = loader | ||
| self.bqstoragereadclient = bqstoragereadclient | ||
|
|
@@ -111,6 +112,7 @@ def __init__( | |
| polars_executor.PolarsExecutor(), | ||
| ) | ||
| self._upload_lock = threading.Lock() | ||
| self._compiler_name = compiler_name | ||
|
|
||
| def to_sql( | ||
| self, | ||
|
|
@@ -127,7 +129,10 @@ def to_sql( | |
| else array_value.node | ||
| ) | ||
| node = self._substitute_large_local_sources(node) | ||
| compiled = self._compile(node, ordered=ordered) | ||
| compiled = compile.compile_sql( | ||
| compile.CompileRequest(node, sort_rows=ordered), | ||
| compiler_name=self._compiler_name, | ||
| ) | ||
| return compiled.sql | ||
|
|
||
| def execute( | ||
|
|
@@ -158,7 +163,11 @@ def execute( | |
| "Ordering and peeking not supported for gbq export" | ||
| ) | ||
| # separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml | ||
| result = self._export_gbq(array_value, execution_spec.destination_spec) | ||
| result = self._export_gbq( | ||
| array_value, | ||
| execution_spec.destination_spec, | ||
| extra_labels=execution_spec.labels, | ||
| ) | ||
| self._publisher.publish( | ||
| bigframes.core.events.ExecutionFinished( | ||
| result=result, | ||
|
|
@@ -174,6 +183,7 @@ def execute( | |
| if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec) | ||
| else None, | ||
| must_create_table=not execution_spec.promise_under_10gb, | ||
| extra_labels=execution_spec.labels, | ||
| ) | ||
| # post steps: export | ||
| if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec): | ||
|
|
@@ -233,7 +243,10 @@ def _maybe_find_existing_table( | |
| return None | ||
|
|
||
| def _export_gbq( | ||
| self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec | ||
| self, | ||
| array_value: bigframes.core.ArrayValue, | ||
| spec: ex_spec.TableOutputSpec, | ||
| extra_labels: tuple[tuple[str, str], ...] = (), | ||
| ) -> executor.ExecuteResult: | ||
| """ | ||
| Export the ArrayValue to an existing BigQuery table. | ||
|
|
@@ -243,55 +256,48 @@ def _export_gbq( | |
| # validate destination table | ||
| existing_table = self._maybe_find_existing_table(spec) | ||
|
|
||
| def run_with_compiler(compiler_name, compiler_id=None): | ||
| compiled = self._compile(plan, ordered=False, compiler_name=compiler_name) | ||
| sql = compiled.sql | ||
| compiled = compile.compile_sql( | ||
| compile.CompileRequest(plan, sort_rows=False), | ||
| compiler_name=self._compiler_name, | ||
| ) | ||
| sql = compiled.sql | ||
|
|
||
| if (existing_table is not None) and _is_schema_match( | ||
| existing_table.schema, array_value.schema | ||
| ): | ||
| # b/409086472: Uses DML for table appends and replacements to avoid | ||
| # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: | ||
| # https://cloud.google.com/bigquery/quotas#standard_tables | ||
| job_config = bigquery.QueryJobConfig() | ||
|
|
||
| ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) | ||
| if spec.if_exists == "append": | ||
| sql = sg_sql.to_sql( | ||
| sg_sql.insert(ir.expr.as_select_all(), spec.table) | ||
| ) | ||
| else: # for "replace" | ||
| assert spec.if_exists == "replace" | ||
| sql = sg_sql.to_sql( | ||
| sg_sql.replace(ir.expr.as_select_all(), spec.table) | ||
| ) | ||
| else: | ||
| dispositions = { | ||
| "fail": bigquery.WriteDisposition.WRITE_EMPTY, | ||
| "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, | ||
| "append": bigquery.WriteDisposition.WRITE_APPEND, | ||
| } | ||
| job_config = bigquery.QueryJobConfig( | ||
| write_disposition=dispositions[spec.if_exists], | ||
| destination=spec.table, | ||
| clustering_fields=spec.cluster_cols if spec.cluster_cols else None, | ||
| ) | ||
| if (existing_table is not None) and _is_schema_match( | ||
| existing_table.schema, array_value.schema | ||
| ): | ||
| # b/409086472: Uses DML for table appends and replacements to avoid | ||
| # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: | ||
| # https://cloud.google.com/bigquery/quotas#standard_tables | ||
| job_config = bigquery.QueryJobConfig() | ||
|
|
||
| # Attach data type usage to the job labels | ||
| job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs | ||
| job_config.labels["bigframes-compiler"] = ( | ||
| f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name | ||
| ) | ||
| # TODO(swast): plumb through the api_name of the user-facing api that | ||
| # caused this query. | ||
| iterator, job = self._run_execute_query( | ||
| sql=sql, | ||
| job_config=job_config, | ||
| session=array_value.session, | ||
| ir = sqlglot_ir.SQLGlotIR.from_unparsed_query(sql) | ||
| if spec.if_exists == "append": | ||
| sql = sg_sql.to_sql(sg_sql.insert(ir.expr.as_select_all(), spec.table)) | ||
| else: # for "replace" | ||
| assert spec.if_exists == "replace" | ||
| sql = sg_sql.to_sql(sg_sql.replace(ir.expr.as_select_all(), spec.table)) | ||
| else: | ||
| dispositions = { | ||
| "fail": bigquery.WriteDisposition.WRITE_EMPTY, | ||
| "replace": bigquery.WriteDisposition.WRITE_TRUNCATE, | ||
| "append": bigquery.WriteDisposition.WRITE_APPEND, | ||
| } | ||
| job_config = bigquery.QueryJobConfig( | ||
| write_disposition=dispositions[spec.if_exists], | ||
| destination=spec.table, | ||
| clustering_fields=spec.cluster_cols if spec.cluster_cols else None, | ||
| ) | ||
| return iterator, job | ||
|
|
||
| iterator, job = self._compile_with_fallback(run_with_compiler) | ||
| # Attach data type usage to the job labels | ||
| job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs | ||
| # TODO(swast): plumb through the api_name of the user-facing api that | ||
| # caused this query. | ||
| iterator, job = self._run_execute_query( | ||
| sql=sql, | ||
| job_config=job_config, | ||
| session=array_value.session, | ||
| extra_labels=extra_labels, | ||
| ) | ||
|
|
||
| has_special_dtype_col = any( | ||
| t in (bigframes.dtypes.TIMEDELTA_DTYPE, bigframes.dtypes.OBJ_REF_DTYPE) | ||
|
|
@@ -359,6 +365,7 @@ def _run_execute_query( | |
| job_config: Optional[bq_job.QueryJobConfig] = None, | ||
| query_with_job: bool = True, | ||
| session=None, | ||
| extra_labels: tuple[tuple[str, str], ...] = (), | ||
| ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: | ||
| """ | ||
| Starts BigQuery query job and waits for results. | ||
|
|
@@ -369,8 +376,9 @@ def _run_execute_query( | |
| bigframes.options.compute.maximum_bytes_billed | ||
| ) | ||
|
|
||
| if self._labels: | ||
| if self._labels or extra_labels: | ||
| job_config.labels.update(self._labels) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. both are enforced to be non-null - for extra safety have split into two separate if conditions |
||
| job_config.labels.update(extra_labels) | ||
|
|
||
| try: | ||
| # Trick the type checker into thinking we got a literal. | ||
|
|
@@ -420,43 +428,6 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): | |
| self.prepare_plan(array_value.node) | ||
| ) | ||
|
|
||
| def _compile( | ||
| self, | ||
| node: nodes.BigFrameNode, | ||
| *, | ||
| ordered: bool = False, | ||
| peek: Optional[int] = None, | ||
| materialize_all_order_keys: bool = False, | ||
| compiler_name: Literal["sqlglot", "ibis"] = "sqlglot", | ||
| ) -> compile.CompileResult: | ||
| return compile.compile_sql( | ||
| compile.CompileRequest( | ||
| node, | ||
| sort_rows=ordered, | ||
| peek_count=peek, | ||
| materialize_all_order_keys=materialize_all_order_keys, | ||
| ), | ||
| compiler_name=compiler_name, | ||
| ) | ||
|
|
||
| def _compile_with_fallback(self, run_fn): | ||
| compiler_option = bigframes.options.experiments.sql_compiler | ||
| if compiler_option == "legacy": | ||
| return run_fn("ibis") | ||
| elif compiler_option == "experimental": | ||
| return run_fn("sqlglot") | ||
| else: # stable | ||
| compiler_id = f"{uuid.uuid1().hex[:12]}" | ||
| try: | ||
| return run_fn("sqlglot", compiler_id=compiler_id) | ||
| except google.cloud.exceptions.BadRequest as e: | ||
| msg = bfe.format_message( | ||
| f"Compiler ID {compiler_id}: BadRequest on sqlglot. " | ||
| f"Falling back to ibis. Details: {e.message}" | ||
| ) | ||
| warnings.warn(msg, category=UserWarning) | ||
| return run_fn("ibis", compiler_id=compiler_id) | ||
|
|
||
| def prepare_plan( | ||
| self, | ||
| plan: nodes.BigFrameNode, | ||
|
|
@@ -547,8 +518,9 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool: | |
| max_complexity=QUERY_COMPLEXITY_LIMIT, | ||
| cache=self.cache, | ||
| # Heuristic: subtree_compleixty * (copies of subtree)^2 | ||
| heuristic=lambda complexity, count: math.log(complexity) | ||
| + 2 * math.log(count), | ||
| heuristic=lambda complexity, count: ( | ||
| math.log(complexity) + 2 * math.log(count) | ||
| ), | ||
| ) | ||
| if selection is None: | ||
| # No good subtrees to cache, just return original tree | ||
|
|
@@ -621,6 +593,7 @@ def _execute_plan_gbq( | |
| peek: Optional[int] = None, | ||
| cache_spec: Optional[ex_spec.CacheSpec] = None, | ||
| must_create_table: bool = True, | ||
| extra_labels: tuple[tuple[str, str], ...] = (), | ||
| ) -> executor.ExecuteResult: | ||
| """Just execute whatever plan as is, without further caching or decomposition.""" | ||
| # TODO(swast): plumb through the api_name of the user-facing api that | ||
|
|
@@ -651,43 +624,36 @@ def _execute_plan_gbq( | |
| ] | ||
| cluster_cols = cluster_cols[:_MAX_CLUSTER_COLUMNS] | ||
|
|
||
| def run_with_compiler(compiler_name, compiler_id=None): | ||
| compiled = self._compile( | ||
| compiled = compile.compile_sql( | ||
| compile.CompileRequest( | ||
| plan, | ||
| ordered=ordered, | ||
| peek=peek, | ||
| sort_rows=ordered, | ||
| peek_count=peek, | ||
| materialize_all_order_keys=(cache_spec is not None), | ||
| compiler_name=compiler_name, | ||
| ) | ||
| # might have more columns than og schema, for hidden ordering columns | ||
| compiled_schema = compiled.sql_schema | ||
|
|
||
| destination_table: Optional[bigquery.TableReference] = None | ||
| ), | ||
| compiler_name=self._compiler_name, | ||
| ) | ||
| # might have more columns than og schema, for hidden ordering columns | ||
| compiled_schema = compiled.sql_schema | ||
|
|
||
| job_config = bigquery.QueryJobConfig() | ||
| if create_table: | ||
| destination_table = self.storage_manager.create_temp_table( | ||
| compiled_schema, cluster_cols | ||
| ) | ||
| job_config.destination = destination_table | ||
| destination_table: Optional[bigquery.TableReference] = None | ||
|
|
||
| # Attach data type usage to the job labels | ||
| job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs | ||
| job_config.labels["bigframes-compiler"] = ( | ||
| f"{compiler_name}-{compiler_id}" if compiler_id else compiler_name | ||
| job_config = bigquery.QueryJobConfig() | ||
| if create_table: | ||
| destination_table = self.storage_manager.create_temp_table( | ||
| compiled_schema, cluster_cols | ||
| ) | ||
| iterator, query_job = self._run_execute_query( | ||
| sql=compiled.sql, | ||
| job_config=job_config, | ||
| query_with_job=(destination_table is not None), | ||
| session=plan.session, | ||
| ) | ||
| return iterator, query_job, compiled | ||
|
|
||
| iterator, query_job, compiled = self._compile_with_fallback(run_with_compiler) | ||
|
|
||
| # might have more columns than og schema, for hidden ordering columns | ||
| compiled_schema = compiled.sql_schema | ||
| job_config.destination = destination_table | ||
|
|
||
| # Attach data type usage to the job labels | ||
| job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs | ||
| iterator, query_job = self._run_execute_query( | ||
| sql=compiled.sql, | ||
| job_config=job_config, | ||
| query_with_job=(destination_table is not None), | ||
| session=plan.session, | ||
| extra_labels=extra_labels, | ||
| ) | ||
|
|
||
| # we could actually cache even when caching is not explicitly requested, but being conservative for now | ||
| result_bq_data = None | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't we change the default to "sqlglot"?