Skip to content

Commit 031f253

Browse files
authored
fix: use table clone instead of system time for read_gbq_table (#109)
* fix: use table clone instead of system time for `read_gbq_table` * accept expiration datetime instead of timedelta for easier testing * don't use table clone on _session tables * remove unnecessary assert * add docstrings
1 parent 6d1953b commit 031f253

File tree

6 files changed

+137
-84
lines changed

6 files changed

+137
-84
lines changed

bigframes/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@
2626

2727
ABSTRACT_METHOD_ERROR_MESSAGE = f"Abstract method. You have likely encountered a bug. Please share this stacktrace and how you reached it with the BigQuery DataFrames team. {FEEDBACK_LINK}"
2828

29-
DEFAULT_EXPIRATION = datetime.timedelta(days=1)
29+
DEFAULT_EXPIRATION = datetime.timedelta(days=7)

bigframes/dataframe.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from __future__ import annotations
1818

19+
import datetime
1920
import re
2021
import textwrap
2122
import typing
@@ -2309,7 +2310,8 @@ def to_gbq(
23092310
self._session.bqclient,
23102311
self._session._anonymous_dataset,
23112312
# TODO(swast): allow custom expiration times, probably via session configuration.
2312-
constants.DEFAULT_EXPIRATION,
2313+
datetime.datetime.now(datetime.timezone.utc)
2314+
+ constants.DEFAULT_EXPIRATION,
23132315
)
23142316

23152317
if if_exists is not None and if_exists != "replace":

bigframes/session/__init__.py

Lines changed: 24 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
from __future__ import annotations
1818

19+
import datetime
1920
import logging
2021
import os
2122
import re
@@ -430,7 +431,9 @@ def _read_gbq_query(
430431
index_cols = list(index_col)
431432

432433
destination, query_job = self._query_to_destination(
433-
query, index_cols, api_name="read_gbq_query"
434+
query,
435+
index_cols,
436+
api_name=api_name,
434437
)
435438

436439
# If there was no destination table, that means the query must have
@@ -508,6 +511,12 @@ def _read_gbq_table_to_ibis_with_total_ordering(
508511
If we can get a total ordering from the table, such as via primary key
509512
column(s), then return those too so that ordering generation can be
510513
avoided.
514+
515+
For tables that aren't already read-only, this creates Create a table
516+
clone so that any changes to the underlying table don't affect the
517+
DataFrame and break our assumptions, especially with regards to unique
518+
index and ordering. See:
519+
https://cloud.google.com/bigquery/docs/table-clones-create
511520
"""
512521
if table_ref.dataset_id.upper() == "_SESSION":
513522
# _SESSION tables aren't supported by the tables.get REST API.
@@ -518,15 +527,24 @@ def _read_gbq_table_to_ibis_with_total_ordering(
518527
None,
519528
)
520529

530+
now = datetime.datetime.now(datetime.timezone.utc)
531+
destination = bigframes_io.create_table_clone(
532+
table_ref,
533+
self._anonymous_dataset,
534+
# TODO(swast): Allow the default expiration to be configured.
535+
now + constants.DEFAULT_EXPIRATION,
536+
self,
537+
api_name,
538+
)
521539
table_expression = self.ibis_client.table(
522-
table_ref.table_id,
523-
database=f"{table_ref.project}.{table_ref.dataset_id}",
540+
destination.table_id,
541+
database=f"{destination.project}.{destination.dataset_id}",
524542
)
525543

526544
# If there are primary keys defined, the query engine assumes these
527545
# columns are unique, even if the constraint is not enforced. We make
528546
# the same assumption and use these columns as the total ordering keys.
529-
table = self.bqclient.get_table(table_ref)
547+
table = self.bqclient.get_table(destination)
530548

531549
# TODO(b/305264153): Use public properties to fetch primary keys once
532550
# added to google-cloud-bigquery.
@@ -535,23 +553,7 @@ def _read_gbq_table_to_ibis_with_total_ordering(
535553
.get("primaryKey", {})
536554
.get("columns")
537555
)
538-
539-
if not primary_keys:
540-
return table_expression, None
541-
else:
542-
# Read from a snapshot since we won't have to copy the table data to create a total ordering.
543-
job_config = bigquery.QueryJobConfig()
544-
job_config.labels["bigframes-api"] = api_name
545-
current_timestamp = list(
546-
self.bqclient.query(
547-
"SELECT CURRENT_TIMESTAMP() AS `current_timestamp`",
548-
job_config=job_config,
549-
).result()
550-
)[0][0]
551-
table_expression = self.ibis_client.sql(
552-
bigframes_io.create_snapshot_sql(table_ref, current_timestamp)
553-
)
554-
return table_expression, primary_keys
556+
return table_expression, primary_keys
555557

556558
def _read_gbq_table(
557559
self,
@@ -662,20 +664,7 @@ def _read_gbq_table(
662664
total_ordering_columns=frozenset(index_cols),
663665
)
664666

665-
# We have a total ordering, so query via "time travel" so that
666-
# the underlying data doesn't mutate.
667-
if is_total_ordering:
668-
# Get the timestamp from the job metadata rather than the query
669-
# text so that the query for determining uniqueness of the ID
670-
# columns can be cached.
671-
current_timestamp = query_job.started
672-
673-
# The job finished, so we should have a start time.
674-
assert current_timestamp is not None
675-
table_expression = self.ibis_client.sql(
676-
bigframes_io.create_snapshot_sql(table_ref, current_timestamp)
677-
)
678-
else:
667+
if not is_total_ordering:
679668
# Make sure when we generate an ordering, the row_number()
680669
# coresponds to the index columns.
681670
table_expression = table_expression.order_by(index_cols)

bigframes/session/_io/bigquery.py

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,21 @@
1414

1515
"""Private module: Helpers for I/O operations."""
1616

17+
from __future__ import annotations
18+
1719
import datetime
1820
import textwrap
1921
import types
22+
import typing
2023
from typing import Dict, Iterable, Union
2124
import uuid
2225

2326
import google.cloud.bigquery as bigquery
2427

28+
if typing.TYPE_CHECKING:
29+
import bigframes.session
30+
31+
2532
IO_ORDERING_ID = "bqdf_row_nums"
2633
TEMP_TABLE_PREFIX = "bqdf{date}_{random_id}"
2734

@@ -69,43 +76,83 @@ def create_export_data_statement(
6976
)
7077

7178

72-
def create_snapshot_sql(
73-
table_ref: bigquery.TableReference, current_timestamp: datetime.datetime
74-
) -> str:
75-
"""Query a table via 'time travel' for consistent reads."""
79+
def random_table(dataset: bigquery.DatasetReference) -> bigquery.TableReference:
80+
"""Generate a random table ID with BigQuery DataFrames prefix.
81+
82+
Args:
83+
dataset (google.cloud.bigquery.DatasetReference):
84+
The dataset to make the table reference in. Usually the anonymous
85+
dataset for the session.
86+
87+
Returns:
88+
google.cloud.bigquery.TableReference:
89+
Fully qualified table ID of a table that doesn't exist.
90+
"""
91+
now = datetime.datetime.now(datetime.timezone.utc)
92+
random_id = uuid.uuid4().hex
93+
table_id = TEMP_TABLE_PREFIX.format(
94+
date=now.strftime("%Y%m%d"), random_id=random_id
95+
)
96+
return dataset.table(table_id)
97+
7698

77-
# If we have a _SESSION table, assume that it's already a copy. Nothing to do here.
78-
if table_ref.dataset_id.upper() == "_SESSION":
79-
return f"SELECT * FROM `_SESSION`.`{table_ref.table_id}`"
99+
def table_ref_to_sql(table: bigquery.TableReference) -> str:
100+
"""Format a table reference as escaped SQL."""
101+
return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`"
80102

103+
104+
def create_table_clone(
105+
source: bigquery.TableReference,
106+
dataset: bigquery.DatasetReference,
107+
expiration: datetime.datetime,
108+
session: bigframes.session.Session,
109+
api_name: str,
110+
) -> bigquery.TableReference:
111+
"""Create a table clone for consistent reads."""
81112
# If we have an anonymous query results table, it can't be modified and
82113
# there isn't any BigQuery time travel.
83-
if table_ref.dataset_id.startswith("_"):
84-
return f"SELECT * FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`"
114+
if source.dataset_id.startswith("_"):
115+
return source
85116

86-
return textwrap.dedent(
117+
fully_qualified_source_id = table_ref_to_sql(source)
118+
destination = random_table(dataset)
119+
fully_qualified_destination_id = table_ref_to_sql(destination)
120+
121+
# Include a label so that Dataplex Lineage can identify temporary
122+
# tables that BigQuery DataFrames creates. Googlers: See internal issue
123+
# 296779699.
124+
ddl = textwrap.dedent(
87125
f"""
88-
SELECT *
89-
FROM `{table_ref.project}`.`{table_ref.dataset_id}`.`{table_ref.table_id}`
90-
FOR SYSTEM_TIME AS OF TIMESTAMP({repr(current_timestamp.isoformat())})
126+
CREATE OR REPLACE TABLE
127+
{fully_qualified_destination_id}
128+
CLONE {fully_qualified_source_id}
129+
OPTIONS(
130+
expiration_timestamp=TIMESTAMP "{expiration.isoformat()}",
131+
labels=[
132+
("source", "bigquery-dataframes-temp"),
133+
("bigframes-api", {repr(api_name)})
134+
]
135+
)
91136
"""
92137
)
138+
job_config = bigquery.QueryJobConfig()
139+
job_config.labels = {
140+
"source": "bigquery-dataframes-temp",
141+
"bigframes-api": api_name,
142+
}
143+
session._start_query(ddl, job_config=job_config)
144+
return destination
93145

94146

95147
def create_temp_table(
96148
bqclient: bigquery.Client,
97149
dataset: bigquery.DatasetReference,
98-
expiration: datetime.timedelta,
150+
expiration: datetime.datetime,
99151
) -> str:
100152
"""Create an empty table with an expiration in the desired dataset."""
101-
now = datetime.datetime.now(datetime.timezone.utc)
102-
random_id = uuid.uuid4().hex
103-
table_id = TEMP_TABLE_PREFIX.format(
104-
date=now.strftime("%Y%m%d"), random_id=random_id
105-
)
106-
table_ref = dataset.table(table_id)
153+
table_ref = random_table(dataset)
107154
destination = bigquery.Table(table_ref)
108-
destination.expires = now + expiration
155+
destination.expires = expiration
109156
bqclient.create_table(destination)
110157
return f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}"
111158

tests/system/small/test_session.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,6 @@ def test_read_gbq_w_primary_keys_table(
252252
sorted_result = result.sort_values(primary_keys)
253253
pd.testing.assert_frame_equal(result, sorted_result)
254254

255-
# Verify that we're working from a snapshot rather than a copy of the table.
256-
assert "FOR SYSTEM_TIME AS OF TIMESTAMP" in df.sql
257-
258255

259256
@pytest.mark.parametrize(
260257
("query_or_table", "max_results"),

tests/unit/session/test_io_bigquery.py

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,46 +19,63 @@
1919
import google.cloud.bigquery as bigquery
2020
import pytest
2121

22+
import bigframes.session
2223
import bigframes.session._io.bigquery
2324

2425

25-
def test_create_snapshot_sql_doesnt_timetravel_anonymous_datasets():
26-
table_ref = bigquery.TableReference.from_string(
26+
def test_create_table_clone_doesnt_clone_anonymous_datasets():
27+
session = mock.create_autospec(bigframes.session.Session)
28+
source = bigquery.TableReference.from_string(
2729
"my-test-project._e8166e0cdb.anonbb92cd"
2830
)
2931

30-
sql = bigframes.session._io.bigquery.create_snapshot_sql(
31-
table_ref, datetime.datetime.now(datetime.timezone.utc)
32+
destination = bigframes.session._io.bigquery.create_table_clone(
33+
source,
34+
bigquery.DatasetReference("other-project", "other_dataset"),
35+
datetime.datetime(2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc),
36+
session,
37+
"test_api",
3238
)
3339

34-
# Anonymous query results tables don't support time travel.
35-
assert "SYSTEM_TIME" not in sql
40+
# Anonymous query results tables don't support CLONE
41+
assert destination is source
42+
session._start_query.assert_not_called()
3643

37-
# Need fully-qualified table name.
38-
assert "`my-test-project`.`_e8166e0cdb`.`anonbb92cd`" in sql
3944

40-
41-
def test_create_snapshot_sql_doesnt_timetravel_session_tables():
42-
table_ref = bigquery.TableReference.from_string("my-test-project._session.abcdefg")
43-
44-
sql = bigframes.session._io.bigquery.create_snapshot_sql(
45-
table_ref, datetime.datetime.now(datetime.timezone.utc)
45+
def test_create_table_clone_sets_expiration():
46+
session = mock.create_autospec(bigframes.session.Session)
47+
source = bigquery.TableReference.from_string(
48+
"my-test-project.test_dataset.some_table"
4649
)
4750

48-
# We aren't modifying _SESSION tables, so don't use time travel.
49-
assert "SYSTEM_TIME" not in sql
51+
expiration = datetime.datetime(
52+
2023, 11, 2, 15, 43, 21, tzinfo=datetime.timezone.utc
53+
)
54+
bigframes.session._io.bigquery.create_table_clone(
55+
source,
56+
bigquery.DatasetReference("other-project", "other_dataset"),
57+
expiration,
58+
session,
59+
"test_api",
60+
)
5061

51-
# Don't need the project ID for _SESSION tables.
52-
assert "my-test-project" not in sql
62+
session._start_query.assert_called_once()
63+
call_args = session._start_query.call_args
64+
query = call_args.args[0]
65+
assert "CREATE OR REPLACE TABLE" in query
66+
assert "CLONE" in query
67+
assert f'expiration_timestamp=TIMESTAMP "{expiration.isoformat()}"' in query
68+
assert '("source", "bigquery-dataframes-temp")' in query
69+
assert call_args.kwargs["job_config"].labels["bigframes-api"] == "test_api"
5370

5471

5572
def test_create_temp_table_default_expiration():
5673
"""Make sure the created table has an expiration."""
5774
bqclient = mock.create_autospec(bigquery.Client)
5875
dataset = bigquery.DatasetReference("test-project", "test_dataset")
59-
now = datetime.datetime.now(datetime.timezone.utc)
60-
expiration = datetime.timedelta(days=3)
61-
expected_expires = now + expiration
76+
expiration = datetime.datetime(
77+
2023, 11, 2, 13, 44, 55, 678901, datetime.timezone.utc
78+
)
6279

6380
bigframes.session._io.bigquery.create_temp_table(bqclient, dataset, expiration)
6481

@@ -68,10 +85,11 @@ def test_create_temp_table_default_expiration():
6885
assert table.project == "test-project"
6986
assert table.dataset_id == "test_dataset"
7087
assert table.table_id.startswith("bqdf")
88+
# TODO(swast): Why isn't the expiration exactly what we set it to?
7189
assert (
72-
(expected_expires - datetime.timedelta(minutes=1))
90+
(expiration - datetime.timedelta(minutes=1))
7391
< table.expires
74-
< (expected_expires + datetime.timedelta(minutes=1))
92+
< (expiration + datetime.timedelta(minutes=1))
7593
)
7694

7795

0 commit comments

Comments
 (0)