Skip to content
Open
3 changes: 1 addition & 2 deletions packages/bigframes/bigframes/blob/_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,14 @@ def _create_udf(self):
\"\"\"
"""

bf_io_bigquery.start_query_with_client(
bf_io_bigquery.start_query_with_job(
self._session.bqclient,
sql,
job_config=bigquery.QueryJobConfig(),
metrics=self._session._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
publisher=self._session._publisher,
)

Expand Down
3 changes: 3 additions & 0 deletions packages/bigframes/bigframes/core/bq_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ def __post_init__(self):
# Optimization field, must be correct if set, don't put maybe-stale number here
n_rows: Optional[int] = None

def with_ordering(self, ordering: orderings.RowOrdering) -> BigqueryDataSource:
return dataclasses.replace(self, ordering=ordering)


_WORKER_TIME_INCREMENT = 0.05

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def from_table(
project_id: str,
dataset_id: str,
table_id: str,
uid_gen: guid.SequentialUIDGenerator,
uid_gen: guid.SequentialUIDGenerator | None = None,
columns: typing.Sequence[str] = (),
sql_predicate: typing.Optional[str] = None,
system_time: typing.Optional[datetime.datetime] = None,
Expand All @@ -202,6 +202,8 @@ def from_table(
if system_time
else None
)
if uid_gen is None:
uid_gen = guid.SequentialUIDGenerator()
table_alias = next(uid_gen.get_uid_stream("bft_"))
table_expr = sge.Table(
this=sql.identifier(table_id),
Expand Down
16 changes: 12 additions & 4 deletions packages/bigframes/bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -846,10 +846,10 @@ def remap_refs(
) -> ReadTableNode:
return self

def with_order_cols(self):
def pull_out_order(self):
# Maybe the ordering should be required to always be in the scan list, and then we won't need this?
if self.source.ordering is None:
return self, orderings.RowOrdering()
return self, RowOrdering()

order_cols = {col.sql for col in self.source.ordering.referenced_columns}
scan_cols = {col.source_id for col in self.scan_list.items}
Expand All @@ -863,10 +863,18 @@ def with_order_cols(self):
]
new_scan_list = ScanList(items=(*self.scan_list.items, *new_scan_cols))
new_order = self.source.ordering.remap_column_refs(
{identifiers.ColumnId(item.source_id): item.id for item in new_scan_cols},
{
identifiers.ColumnId(item.source_id): item.id
for item in new_scan_list.items
},
allow_partial_bindings=True,
)
return dataclasses.replace(self, scan_list=new_scan_list), new_order
new_node = dataclasses.replace(
self,
scan_list=new_scan_list,
source=self.source.with_ordering(RowOrdering()),
)
return new_node, new_order


@dataclasses.dataclass(frozen=True, eq=False)
Expand Down
3 changes: 2 additions & 1 deletion packages/bigframes/bigframes/core/rewrite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from bigframes.core.rewrite.implicit_align import try_row_join
from bigframes.core.rewrite.legacy_align import legacy_join_as_projection
from bigframes.core.rewrite.nullity import simplify_join
from bigframes.core.rewrite.order import bake_order, defer_order
from bigframes.core.rewrite.order import bake_order, defer_order, pull_out_order
from bigframes.core.rewrite.pruning import column_pruning
from bigframes.core.rewrite.scan_reduction import (
try_reduce_to_local_scan,
Expand Down Expand Up @@ -50,6 +50,7 @@
"rewrite_range_rolling",
"try_reduce_to_table_scan",
"bake_order",
"pull_out_order",
"try_reduce_to_local_scan",
"fold_row_counts",
"pull_out_window_order",
Expand Down
13 changes: 11 additions & 2 deletions packages/bigframes/bigframes/core/rewrite/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ def bake_order(
return node


def pull_out_order(
node: bigframes.core.nodes.BigFrameNode,
) -> Tuple[bigframes.core.nodes.BigFrameNode, bigframes.core.ordering.RowOrdering]:
import bigframes.core.rewrite.slices

node = node.bottom_up(bigframes.core.rewrite.slices.rewrite_slice)
return _pull_up_order(node, order_root=True)


# Makes ordering explicit in window definitions
def _pull_up_order(
root: bigframes.core.nodes.BigFrameNode,
Expand Down Expand Up @@ -153,7 +162,7 @@ def pull_up_order_inner(
)
elif isinstance(node, bigframes.core.nodes.ReadTableNode):
if node.source.ordering is not None:
return node.with_order_cols()
return node.pull_out_order()
else:
# No defined ordering
return node, bigframes.core.ordering.RowOrdering()
Expand Down Expand Up @@ -272,7 +281,7 @@ def pull_up_order_inner(
offsets_id
)
return new_explode, child_order.join(inner_order)
raise ValueError(f"Unexpected node: {node}")
raise ValueError(f"Unexpected node type {type(node).__name__}")

def pull_order_concat(
node: bigframes.core.nodes.ConcatNode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,14 @@ def _create_bq_function(self, create_function_ddl: str) -> None:
# TODO(swast): plumb through the original, user-facing api_name.
import bigframes.session._io.bigquery

_, query_job = bigframes.session._io.bigquery.start_query_with_client(
_, query_job = bigframes.session._io.bigquery.start_query_with_job(
cast(bigquery.Client, self._session.bqclient),
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
location=None,
project=None,
timeout=None,
metrics=None,
query_with_job=True,
publisher=self._session._publisher,
)
logger.info(f"Created bigframes function {query_job.ddl_target_routine}")
Expand Down
3 changes: 1 addition & 2 deletions packages/bigframes/bigframes/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,9 @@ def __call__(self, *args, **kwargs):

args_string = ", ".join([sg_sql.to_sql(sg_sql.literal(v)) for v in args])
sql = f"SELECT `{str(self._udf_def.routine_ref)}`({args_string})"
iter, job = bf_io_bigquery.start_query_with_client(
iter, job = bf_io_bigquery.start_query_with_job(
self._session.bqclient,
sql=sql,
query_with_job=True,
job_config=bigquery.QueryJobConfig(),
publisher=self._session._publisher,
) # type: ignore
Expand Down
6 changes: 2 additions & 4 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2310,15 +2310,14 @@ def _start_query_ml_ddl(
# so we must reset any encryption set in the job config
# https://cloud.google.com/bigquery/docs/customer-managed-encryption#encrypt-model
job_config.destination_encryption_configuration = None
iterator, query_job = bf_io_bigquery.start_query_with_client(
iterator, query_job = bf_io_bigquery.start_query_with_job(
self.bqclient,
sql,
job_config=job_config,
metrics=self._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY,
publisher=self._publisher,
session=self,
Expand All @@ -2340,15 +2339,14 @@ def _create_object_table(self, path: str, connection: str) -> str:
uris = ['{path}']);
"""
)
bf_io_bigquery.start_query_with_client(
bf_io_bigquery.start_query_with_job(
self.bqclient,
sql,
job_config=bigquery.QueryJobConfig(),
metrics=self._metrics,
location=None,
project=None,
timeout=None,
query_with_job=True,
publisher=self._publisher,
session=self,
)
Expand Down
Loading
Loading