Skip to content

Commit 00611d4

Browse files
perf: Use simple null constraints to simplify queries (#1381)
1 parent d6fbb5b commit 00611d4

File tree

9 files changed

+287
-206
lines changed

9 files changed

+287
-206
lines changed

bigframes/core/bigframe_node.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,17 @@
3737
class Field:
3838
id: identifiers.ColumnId
3939
dtype: bigframes.dtypes.Dtype
40+
# Best effort, nullable=True if not certain
41+
nullable: bool = True
42+
43+
def with_nullable(self) -> Field:
44+
return Field(self.id, self.dtype, nullable=True)
45+
46+
def with_nonnull(self) -> Field:
47+
return Field(self.id, self.dtype, nullable=False)
48+
49+
def with_id(self, id: identifiers.ColumnId) -> Field:
50+
return Field(id, self.dtype, nullable=self.nullable)
4051

4152

4253
@dataclasses.dataclass(eq=False, frozen=True)
@@ -274,10 +285,15 @@ def defined_variables(self) -> set[str]:
274285
def get_type(self, id: identifiers.ColumnId) -> bigframes.dtypes.Dtype:
275286
return self._dtype_lookup[id]
276287

288+
# TODO: Deprecate in favor of field_by_id, and eventually, by rich references
277289
@functools.cached_property
278-
def _dtype_lookup(self):
290+
def _dtype_lookup(self) -> dict[identifiers.ColumnId, bigframes.dtypes.Dtype]:
279291
return {field.id: field.dtype for field in self.fields}
280292

293+
@functools.cached_property
294+
def field_by_id(self) -> Mapping[identifiers.ColumnId, Field]:
295+
return {field.id: field for field in self.fields}
296+
281297
# Plan algorithms
282298
def unique_nodes(
283299
self: BigFrameNode,

bigframes/core/blocks.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2049,7 +2049,6 @@ def concat(
20492049

20502050
def isin(self, other: Block):
20512051
# TODO: Support multiple other columns and match on label
2052-
# TODO: Model as explicit "IN" subquery/join to better allow db to optimize
20532052
assert len(other.value_columns) == 1
20542053
unique_other_values = other.expr.select_columns(
20552054
[other.value_columns[0]]

bigframes/core/compile/compiled.py

Lines changed: 167 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import functools
1717
import itertools
1818
import typing
19-
from typing import Optional, Sequence
19+
from typing import Literal, Optional, Sequence
2020

2121
import bigframes_vendored.ibis
2222
import bigframes_vendored.ibis.backends.bigquery.backend as ibis_bigquery
@@ -94,7 +94,7 @@ def to_sql(
9494
return typing.cast(str, sql)
9595

9696
@property
97-
def columns(self) -> typing.Tuple[ibis_types.Value, ...]:
97+
def columns(self) -> tuple[ibis_types.Value, ...]:
9898
return self._columns
9999

100100
@property
@@ -107,7 +107,7 @@ def _ibis_bindings(self) -> dict[str, ibis_types.Value]:
107107

108108
def projection(
109109
self,
110-
expression_id_pairs: typing.Tuple[typing.Tuple[ex.Expression, str], ...],
110+
expression_id_pairs: tuple[tuple[ex.Expression, str], ...],
111111
) -> UnorderedIR:
112112
"""Apply an expression to the ArrayValue and assign the output to a column."""
113113
cannot_inline = any(expr.expensive for expr, _ in expression_id_pairs)
@@ -126,7 +126,7 @@ def projection(
126126

127127
def selection(
128128
self,
129-
input_output_pairs: typing.Tuple[typing.Tuple[ex.DerefOp, str], ...],
129+
input_output_pairs: tuple[tuple[ex.DerefOp, str], ...],
130130
) -> UnorderedIR:
131131
"""Apply an expression to the ArrayValue and assign the output to a column."""
132132
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
@@ -203,7 +203,7 @@ def filter(self, predicate: ex.Expression) -> UnorderedIR:
203203

204204
def aggregate(
205205
self,
206-
aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]],
206+
aggregations: typing.Sequence[tuple[ex.Aggregation, str]],
207207
by_column_ids: typing.Sequence[ex.DerefOp] = (),
208208
dropna: bool = True,
209209
order_by: typing.Sequence[OrderingExpression] = (),
@@ -323,7 +323,105 @@ def from_pandas(
323323
columns=columns,
324324
)
325325

326-
## Methods that only work with ordering
326+
def join(
327+
self: UnorderedIR,
328+
right: UnorderedIR,
329+
conditions: tuple[tuple[str, str], ...],
330+
type: Literal["inner", "outer", "left", "right", "cross"],
331+
*,
332+
join_nulls: bool = True,
333+
) -> UnorderedIR:
334+
"""Join two expressions by column equality.
335+
336+
Arguments:
337+
left: Expression for left table to join.
338+
left_column_ids: Column IDs (not label) to join by.
339+
right: Expression for right table to join.
340+
right_column_ids: Column IDs (not label) to join by.
341+
how: The type of join to perform.
342+
join_nulls (bool):
343+
If True, will joins NULL keys to each other.
344+
Returns:
345+
The joined expression. The resulting columns will be, in order,
346+
first the coalesced join keys, then, all the left columns, and
347+
finally, all the right columns.
348+
"""
349+
# Shouldn't need to select the column ids explicitly, but it seems that ibis has some
350+
# bug resolving column ids otherwise, potentially because of the "JoinChain" op
351+
left_table = self._to_ibis_expr().select(self.column_ids)
352+
right_table = right._to_ibis_expr().select(right.column_ids)
353+
354+
join_conditions = [
355+
_join_condition(
356+
left_table[left_index], right_table[right_index], nullsafe=join_nulls
357+
)
358+
for left_index, right_index in conditions
359+
]
360+
361+
combined_table = bigframes_vendored.ibis.join(
362+
left_table,
363+
right_table,
364+
predicates=join_conditions,
365+
how=type, # type: ignore
366+
)
367+
columns = [combined_table[col.get_name()] for col in self.columns] + [
368+
combined_table[col.get_name()] for col in right.columns
369+
]
370+
return UnorderedIR(
371+
combined_table,
372+
columns=columns,
373+
)
374+
375+
def isin_join(
376+
self: UnorderedIR,
377+
right: UnorderedIR,
378+
indicator_col: str,
379+
conditions: tuple[str, str],
380+
*,
381+
join_nulls: bool = True,
382+
) -> UnorderedIR:
383+
"""Join two expressions by column equality.
384+
385+
Arguments:
386+
left: Expression for left table to join.
387+
right: Expression for right table to join.
388+
conditions: Id pairs to compare
389+
Returns:
390+
The joined expression.
391+
"""
392+
left_table = self._to_ibis_expr()
393+
right_table = right._to_ibis_expr()
394+
if join_nulls: # nullsafe isin join must actually use "exists" subquery
395+
new_column = (
396+
(
397+
_join_condition(
398+
left_table[conditions[0]],
399+
right_table[conditions[1]],
400+
nullsafe=True,
401+
)
402+
)
403+
.any()
404+
.name(indicator_col)
405+
)
406+
407+
else: # Can do simpler "in" subquery
408+
new_column = (
409+
(left_table[conditions[0]])
410+
.isin((right_table[conditions[1]]))
411+
.name(indicator_col)
412+
)
413+
414+
columns = tuple(
415+
itertools.chain(
416+
(left_table[col.get_name()] for col in self.columns), (new_column,)
417+
)
418+
)
419+
420+
return UnorderedIR(
421+
left_table,
422+
columns=columns,
423+
)
424+
327425
def project_window_op(
328426
self,
329427
expression: ex.Aggregation,
@@ -429,7 +527,7 @@ def _ibis_window_from_spec(self, window_spec: WindowSpec):
429527
group_by: typing.List[ibis_types.Value] = (
430528
[
431529
typing.cast(
432-
ibis_types.Column, _as_identity(self._compile_expression(column))
530+
ibis_types.Column, _as_groupable(self._compile_expression(column))
433531
)
434532
for column in window_spec.grouping_keys
435533
]
@@ -514,7 +612,68 @@ def _convert_ordering_to_table_values(
514612
return ordering_values
515613

516614

517-
def _as_identity(value: ibis_types.Value):
615+
def _string_cast_join_cond(
616+
lvalue: ibis_types.Column, rvalue: ibis_types.Column
617+
) -> ibis_types.BooleanColumn:
618+
result = (
619+
lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0"))
620+
== rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("0"))
621+
) & (
622+
lvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1"))
623+
== rvalue.cast(ibis_dtypes.str).fill_null(ibis_types.literal("1"))
624+
)
625+
return typing.cast(ibis_types.BooleanColumn, result)
626+
627+
628+
def _numeric_join_cond(
629+
lvalue: ibis_types.Column, rvalue: ibis_types.Column
630+
) -> ibis_types.BooleanColumn:
631+
lvalue1 = lvalue.fill_null(ibis_types.literal(0))
632+
lvalue2 = lvalue.fill_null(ibis_types.literal(1))
633+
rvalue1 = rvalue.fill_null(ibis_types.literal(0))
634+
rvalue2 = rvalue.fill_null(ibis_types.literal(1))
635+
if lvalue.type().is_floating() and rvalue.type().is_floating():
636+
# NaN aren't equal so need to coalesce as well with diff constants
637+
lvalue1 = (
638+
typing.cast(ibis_types.FloatingColumn, lvalue)
639+
.isnan()
640+
.ifelse(ibis_types.literal(2), lvalue1)
641+
)
642+
lvalue2 = (
643+
typing.cast(ibis_types.FloatingColumn, lvalue)
644+
.isnan()
645+
.ifelse(ibis_types.literal(3), lvalue2)
646+
)
647+
rvalue1 = (
648+
typing.cast(ibis_types.FloatingColumn, rvalue)
649+
.isnan()
650+
.ifelse(ibis_types.literal(2), rvalue1)
651+
)
652+
rvalue2 = (
653+
typing.cast(ibis_types.FloatingColumn, rvalue)
654+
.isnan()
655+
.ifelse(ibis_types.literal(3), rvalue2)
656+
)
657+
result = (lvalue1 == rvalue1) & (lvalue2 == rvalue2)
658+
return typing.cast(ibis_types.BooleanColumn, result)
659+
660+
661+
def _join_condition(
662+
lvalue: ibis_types.Column, rvalue: ibis_types.Column, nullsafe: bool
663+
) -> ibis_types.BooleanColumn:
664+
if (lvalue.type().is_floating()) and (lvalue.type().is_floating()):
665+
# Need to always make safe join condition to handle nan, even if no nulls
666+
return _numeric_join_cond(lvalue, rvalue)
667+
if nullsafe:
668+
# TODO: Define more coalesce constants for non-numeric types to avoid cast
669+
if (lvalue.type().is_numeric()) and (lvalue.type().is_numeric()):
670+
return _numeric_join_cond(lvalue, rvalue)
671+
else:
672+
return _string_cast_join_cond(lvalue, rvalue)
673+
return typing.cast(ibis_types.BooleanColumn, lvalue == rvalue)
674+
675+
676+
def _as_groupable(value: ibis_types.Value):
518677
# Some types need to be converted to string to enable groupby
519678
if value.type().is_float64() or value.type().is_geospatial():
520679
return value.cast(ibis_dtypes.str)

bigframes/core/compile/compiler.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@
2929
import bigframes.core.compile.concat as concat_impl
3030
import bigframes.core.compile.explode
3131
import bigframes.core.compile.ibis_types
32-
import bigframes.core.compile.isin
3332
import bigframes.core.compile.scalar_op_compiler
3433
import bigframes.core.compile.scalar_op_compiler as compile_scalar
3534
import bigframes.core.compile.schema_translator
36-
import bigframes.core.compile.single_column
3735
import bigframes.core.expression as ex
3836
import bigframes.core.identifiers as ids
3937
import bigframes.core.nodes as nodes
@@ -130,24 +128,25 @@ def compile_join(self, node: nodes.JoinNode):
130128
condition_pairs = tuple(
131129
(left.id.sql, right.id.sql) for left, right in node.conditions
132130
)
131+
133132
left_unordered = self.compile_node(node.left_child)
134133
right_unordered = self.compile_node(node.right_child)
135-
return bigframes.core.compile.single_column.join_by_column_unordered(
136-
left=left_unordered,
134+
return left_unordered.join(
137135
right=right_unordered,
138136
type=node.type,
139137
conditions=condition_pairs,
138+
join_nulls=node.joins_nulls,
140139
)
141140

142141
@_compile_node.register
143142
def compile_isin(self, node: nodes.InNode):
144143
left_unordered = self.compile_node(node.left_child)
145144
right_unordered = self.compile_node(node.right_child)
146-
return bigframes.core.compile.isin.isin_unordered(
147-
left=left_unordered,
145+
return left_unordered.isin_join(
148146
right=right_unordered,
149147
indicator_col=node.indicator_col.sql,
150148
conditions=(node.left_col.id.sql, node.right_col.id.sql),
149+
join_nulls=node.joins_nulls,
151150
)
152151

153152
@_compile_node.register

bigframes/core/compile/isin.py

Lines changed: 0 additions & 71 deletions
This file was deleted.

0 commit comments

Comments
 (0)