Skip to content

Commit 7003d1a

Browse files
perf: Utilize ORDER BY LIMIT over ROW_NUMBER where possible (#1077)
1 parent 51cdd33 commit 7003d1a

File tree

9 files changed

+459
-100
lines changed

9 files changed

+459
-100
lines changed

bigframes/core/compile/api.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import google.cloud.bigquery as bigquery
1919

2020
import bigframes.core.compile.compiler as compiler
21+
import bigframes.core.rewrite as rewrites
2122

2223
if TYPE_CHECKING:
2324
import bigframes.core.nodes
@@ -42,6 +43,7 @@ def compile_unordered(
4243
col_id_overrides: Mapping[str, str] = {},
4344
) -> str:
4445
"""Compile node into sql where rows are unsorted, and no ordering information is preserved."""
46+
# TODO: Enable limit pullup, but only if not being used to write to clustered table.
4547
return self._compiler.compile_unordered_ir(node).to_sql(
4648
col_id_overrides=col_id_overrides
4749
)
@@ -53,8 +55,10 @@ def compile_ordered(
5355
col_id_overrides: Mapping[str, str] = {},
5456
) -> str:
5557
"""Compile node into sql where rows are sorted with ORDER BY."""
56-
return self._compiler.compile_ordered_ir(node).to_sql(
57-
col_id_overrides=col_id_overrides, ordered=True
58+
# If we are ordering the query anyways, compiling the slice as a limit is probably a good idea.
59+
new_node, limit = rewrites.pullup_limit_from_slice(node)
60+
return self._compiler.compile_ordered_ir(new_node).to_sql(
61+
col_id_overrides=col_id_overrides, ordered=True, limit=limit
5862
)
5963

6064
def compile_raw(

bigframes/core/compile/compiled.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -943,8 +943,9 @@ def to_sql(
943943
self,
944944
col_id_overrides: typing.Mapping[str, str] = {},
945945
ordered: bool = False,
946+
limit: Optional[int] = None,
946947
) -> str:
947-
if ordered:
948+
if ordered or limit:
948949
# Need to bake ordering expressions into the selected column in order for our ordering clause builder to work.
949950
baked_ir = self._bake_ordering()
950951
sql = ibis_bigquery.Backend().compile(
@@ -969,7 +970,11 @@ def to_sql(
969970
order_by_clause = bigframes.core.sql.ordering_clause(
970971
baked_ir._ordering.all_ordering_columns
971972
)
972-
sql += f"{order_by_clause}\n"
973+
sql += f"\n{order_by_clause}"
974+
if limit is not None:
975+
if not isinstance(limit, int):
976+
raise TypeError(f"Limit param: {limit} must be an int.")
977+
sql += f"\nLIMIT {limit}"
973978
else:
974979
sql = ibis_bigquery.Backend().compile(
975980
self._to_ibis_expr(

bigframes/core/nodes.py

Lines changed: 125 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import functools
2121
import itertools
2222
import typing
23-
from typing import Callable, Iterable, Optional, Sequence, Tuple
23+
from typing import Callable, cast, Iterable, Optional, Sequence, Tuple
2424

2525
import google.cloud.bigquery as bq
2626

@@ -30,6 +30,7 @@
3030
import bigframes.core.identifiers as bfet_ids
3131
from bigframes.core.ordering import OrderingExpression
3232
import bigframes.core.schema as schemata
33+
import bigframes.core.slices as slices
3334
import bigframes.core.window_spec as window
3435
import bigframes.dtypes
3536
import bigframes.operations.aggregations as agg_ops
@@ -82,6 +83,11 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
8283
"""Direct children of this node"""
8384
return tuple([])
8485

86+
@property
87+
@abc.abstractmethod
88+
def row_count(self) -> typing.Optional[int]:
89+
return None
90+
8591
@functools.cached_property
8692
def session(self):
8793
sessions = []
@@ -304,6 +310,26 @@ def variables_introduced(self) -> int:
304310
def relation_ops_created(self) -> int:
305311
return 2
306312

313+
@property
314+
def is_limit(self) -> bool:
315+
"""Returns whether this is equivalent to a ORDER BY ... LIMIT N."""
316+
# TODO: Handle tail case.
317+
return (
318+
(not self.start)
319+
and (self.step == 1)
320+
and (self.stop is not None)
321+
and (self.stop > 0)
322+
)
323+
324+
@property
325+
def row_count(self) -> typing.Optional[int]:
326+
child_length = self.child.row_count
327+
if child_length is None:
328+
return None
329+
return slices.slice_output_rows(
330+
(self.start, self.stop, self.step), child_length
331+
)
332+
307333

308334
@dataclass(frozen=True, eq=False)
309335
class JoinNode(BigFrameNode):
@@ -351,6 +377,15 @@ def variables_introduced(self) -> int:
351377
def joins(self) -> bool:
352378
return True
353379

380+
@property
381+
def row_count(self) -> Optional[int]:
382+
if self.type == "cross":
383+
if self.left_child.row_count is None or self.right_child.row_count is None:
384+
return None
385+
return self.left_child.row_count * self.right_child.row_count
386+
387+
return None
388+
354389
def transform_children(
355390
self, t: Callable[[BigFrameNode], BigFrameNode]
356391
) -> BigFrameNode:
@@ -412,6 +447,16 @@ def variables_introduced(self) -> int:
412447
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
413448
return len(self.schema.items) + OVERHEAD_VARIABLES
414449

450+
@property
451+
def row_count(self) -> Optional[int]:
452+
sub_counts = [node.row_count for node in self.child_nodes]
453+
total = 0
454+
for count in sub_counts:
455+
if count is None:
456+
return None
457+
total += count
458+
return total
459+
415460
def transform_children(
416461
self, t: Callable[[BigFrameNode], BigFrameNode]
417462
) -> BigFrameNode:
@@ -460,6 +505,10 @@ def variables_introduced(self) -> int:
460505
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
461506
return len(self.schema.items) + OVERHEAD_VARIABLES
462507

508+
@property
509+
def row_count(self) -> Optional[int]:
510+
return None
511+
463512
def transform_children(
464513
self, t: Callable[[BigFrameNode], BigFrameNode]
465514
) -> BigFrameNode:
@@ -484,19 +533,18 @@ def roots(self) -> typing.Set[BigFrameNode]:
484533
return {self}
485534

486535
@property
487-
def supports_fast_head(self) -> bool:
536+
def fast_offsets(self) -> bool:
537+
return False
538+
539+
@property
540+
def fast_ordered_limit(self) -> bool:
488541
return False
489542

490543
def transform_children(
491544
self, t: Callable[[BigFrameNode], BigFrameNode]
492545
) -> BigFrameNode:
493546
return self
494547

495-
@property
496-
def row_count(self) -> typing.Optional[int]:
497-
"""How many rows are in the data source. None means unknown."""
498-
return None
499-
500548

501549
class ScanItem(typing.NamedTuple):
502550
id: bfet_ids.ColumnId
@@ -528,7 +576,11 @@ def variables_introduced(self) -> int:
528576
return len(self.scan_list.items) + 1
529577

530578
@property
531-
def supports_fast_head(self) -> bool:
579+
def fast_offsets(self) -> bool:
580+
return True
581+
582+
@property
583+
def fast_ordered_limit(self) -> bool:
532584
return True
533585

534586
@property
@@ -635,12 +687,27 @@ def relation_ops_created(self) -> int:
635687
return 3
636688

637689
@property
638-
def supports_fast_head(self) -> bool:
639-
# Fast head is only supported when row offsets are available.
640-
# In the future, ORDER BY+LIMIT optimizations may allow fast head when
641-
# clustered and/or partitioned on ordering key
690+
def fast_offsets(self) -> bool:
691+
# Fast head is only supported when row offsets are available or data is clustered over ordering key.
642692
return (self.source.ordering is not None) and self.source.ordering.is_sequential
643693

694+
@property
695+
def fast_ordered_limit(self) -> bool:
696+
if self.source.ordering is None:
697+
return False
698+
order_cols = self.source.ordering.all_ordering_columns
699+
# monotonicity would probably be fine
700+
if not all(col.scalar_expression.is_identity for col in order_cols):
701+
return False
702+
order_col_ids = tuple(
703+
cast(ex.DerefOp, col.scalar_expression).id.name for col in order_cols
704+
)
705+
cluster_col_ids = self.source.table.cluster_cols
706+
if cluster_col_ids is None:
707+
return False
708+
709+
return order_col_ids == cluster_col_ids[: len(order_col_ids)]
710+
644711
@property
645712
def order_ambiguous(self) -> bool:
646713
return (
@@ -706,6 +773,10 @@ def relation_ops_created(self) -> int:
706773
def variables_introduced(self) -> int:
707774
return 1
708775

776+
@property
777+
def row_count(self) -> Optional[int]:
778+
return self.child.row_count
779+
709780
def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
710781
if self.col_id not in used_cols:
711782
return self.child.prune(used_cols)
@@ -726,6 +797,10 @@ def row_preserving(self) -> bool:
726797
def variables_introduced(self) -> int:
727798
return 1
728799

800+
@property
801+
def row_count(self) -> Optional[int]:
802+
return None
803+
729804
def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
730805
consumed_ids = used_cols.union(self.predicate.column_references)
731806
pruned_child = self.child.prune(consumed_ids)
@@ -749,6 +824,10 @@ def relation_ops_created(self) -> int:
749824
def explicitly_ordered(self) -> bool:
750825
return True
751826

827+
@property
828+
def row_count(self) -> Optional[int]:
829+
return self.child.row_count
830+
752831
def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
753832
ordering_cols = itertools.chain.from_iterable(
754833
map(lambda x: x.referenced_columns, self.by)
@@ -772,6 +851,10 @@ def relation_ops_created(self) -> int:
772851
# Doesnt directly create any relational operations
773852
return 0
774853

854+
@property
855+
def row_count(self) -> Optional[int]:
856+
return self.child.row_count
857+
775858

776859
@dataclass(frozen=True, eq=False)
777860
class SelectionNode(UnaryNode):
@@ -798,6 +881,10 @@ def variables_introduced(self) -> int:
798881
def defines_namespace(self) -> bool:
799882
return True
800883

884+
@property
885+
def row_count(self) -> Optional[int]:
886+
return self.child.row_count
887+
801888
def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
802889
pruned_selections = tuple(
803890
select for select in self.input_output_pairs if select[1] in used_cols
@@ -842,6 +929,10 @@ def variables_introduced(self) -> int:
842929
new_vars = sum(1 for i in self.assignments if not i[0].is_identity)
843930
return new_vars
844931

932+
@property
933+
def row_count(self) -> Optional[int]:
934+
return self.child.row_count
935+
845936
def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
846937
pruned_assignments = tuple(i for i in self.assignments if i[1] in used_cols)
847938
if len(pruned_assignments) == 0:
@@ -877,6 +968,10 @@ def variables_introduced(self) -> int:
877968
def defines_namespace(self) -> bool:
878969
return True
879970

971+
@property
972+
def row_count(self) -> Optional[int]:
973+
return 1
974+
880975

881976
@dataclass(frozen=True, eq=False)
882977
class AggregateNode(UnaryNode):
@@ -926,6 +1021,12 @@ def explicitly_ordered(self) -> bool:
9261021
def defines_namespace(self) -> bool:
9271022
return True
9281023

1024+
@property
1025+
def row_count(self) -> Optional[int]:
1026+
if not self.by_column_ids:
1027+
return 1
1028+
return None
1029+
9291030
def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
9301031
by_ids = (ref.id for ref in self.by_column_ids)
9311032
pruned_aggs = tuple(agg for agg in self.aggregations if agg[1] in used_cols)
@@ -963,6 +1064,10 @@ def relation_ops_created(self) -> int:
9631064
# Assume that if not reprojecting, that there is a sequence of window operations sharing the same window
9641065
return 0 if self.skip_reproject_unsafe else 4
9651066

1067+
@property
1068+
def row_count(self) -> Optional[int]:
1069+
return self.child.row_count
1070+
9661071
@functools.cached_property
9671072
def added_field(self) -> Field:
9681073
input_type = self.child.get_type(self.column_name.id)
@@ -994,6 +1099,10 @@ def row_preserving(self) -> bool:
9941099
def variables_introduced(self) -> int:
9951100
return 1
9961101

1102+
@property
1103+
def row_count(self) -> Optional[int]:
1104+
return None
1105+
9971106

9981107
# TODO: Explode should create a new column instead of overriding the existing one
9991108
@dataclass(frozen=True, eq=False)
@@ -1030,6 +1139,10 @@ def variables_introduced(self) -> int:
10301139
def defines_namespace(self) -> bool:
10311140
return True
10321141

1142+
@property
1143+
def row_count(self) -> Optional[int]:
1144+
return None
1145+
10331146
def prune(self, used_cols: COLUMN_SET) -> BigFrameNode:
10341147
# Cannot prune explode op
10351148
return self.transform_children(

0 commit comments

Comments
 (0)