Skip to content

Commit ae312db

Browse files
authored
feat: session.bytes_processed_sum will be updated when allow_large_re… (#1669)
* feat: session.bytes_processed_sum will be updated when allow_large_result=False * mypy fix * remove duplicate logic
1 parent 563f0cb commit ae312db

File tree

4 files changed

+45
-61
lines changed

4 files changed

+45
-61
lines changed

bigframes/session/__init__.py

-7
Original file line numberDiff line numberDiff line change
@@ -336,13 +336,6 @@ def _project(self):
336336
@property
337337
def bytes_processed_sum(self):
338338
"""The sum of all bytes processed by bigquery jobs using this session."""
339-
msg = bfe.format_message(
340-
"Queries executed with `allow_large_results=False` within the session will not "
341-
"have their bytes processed counted in this sum. If you need precise "
342-
"bytes processed information, query the `INFORMATION_SCHEMA` tables "
343-
"to get relevant metrics.",
344-
)
345-
warnings.warn(msg, UserWarning)
346339
return self._metrics.bytes_processed
347340

348341
@property

bigframes/session/_io/bigquery/__init__.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
2828
import google.api_core.exceptions
2929
import google.cloud.bigquery as bigquery
30-
import google.cloud.bigquery.table
3130

3231
from bigframes.core import log_adapter
3332
import bigframes.core.compile.googlesql as googlesql
@@ -249,7 +248,7 @@ def start_query_with_client(
249248
max_results=max_results,
250249
)
251250
if metrics is not None:
252-
metrics.count_job_stats(query=sql)
251+
metrics.count_job_stats(row_iterator=results_iterator)
253252
return results_iterator, None
254253

255254
query_job = bq_client.query(
@@ -278,7 +277,7 @@ def start_query_with_client(
278277
)
279278

280279
if metrics is not None:
281-
metrics.count_job_stats(query_job)
280+
metrics.count_job_stats(query_job=query_job)
282281
return results_iterator, query_job
283282

284283

bigframes/session/metrics.py

+26-25
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import google.cloud.bigquery as bigquery
2222
import google.cloud.bigquery.job as bq_job
23+
import google.cloud.bigquery.table as bq_table
2324

2425
LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"
2526

@@ -33,14 +34,22 @@ class ExecutionMetrics:
3334
query_char_count: int = 0
3435

3536
def count_job_stats(
36-
self, query_job: Optional[bq_job.QueryJob] = None, query: str = ""
37+
self,
38+
query_job: Optional[bq_job.QueryJob] = None,
39+
row_iterator: Optional[bq_table.RowIterator] = None,
3740
):
3841
if query_job is None:
39-
query_char_count = len(query)
42+
assert row_iterator is not None
43+
if (row_iterator.total_bytes_processed is None) or (
44+
row_iterator.query is None
45+
):
46+
return
47+
query_char_count = len(row_iterator.query)
48+
bytes_processed = row_iterator.total_bytes_processed
4049
self.execution_count += 1
4150
self.query_char_count += query_char_count
42-
if LOGGING_NAME_ENV_VAR in os.environ:
43-
write_stats_to_disk(query_char_count)
51+
self.bytes_processed += bytes_processed
52+
write_stats_to_disk(query_char_count, bytes_processed)
4453
return
4554

4655
stats = get_performance_stats(query_job)
@@ -51,11 +60,9 @@ def count_job_stats(
5160
self.bytes_processed += bytes_processed
5261
self.slot_millis += slot_millis
5362
self.execution_secs += execution_secs
54-
if LOGGING_NAME_ENV_VAR in os.environ:
55-
# when running notebooks via pytest nbmake
56-
write_stats_to_disk(
57-
query_char_count, bytes_processed, slot_millis, execution_secs
58-
)
63+
write_stats_to_disk(
64+
query_char_count, bytes_processed, slot_millis, execution_secs
65+
)
5966

6067

6168
def get_performance_stats(
@@ -88,32 +95,21 @@ def get_performance_stats(
8895

8996
def write_stats_to_disk(
9097
query_char_count: int,
91-
bytes_processed: Optional[int] = None,
98+
bytes_processed: int,
9299
slot_millis: Optional[int] = None,
93100
exec_seconds: Optional[float] = None,
94101
):
95102
"""For pytest runs only, log information about the query job
96103
to a file in order to create a performance report.
97104
"""
98105
if LOGGING_NAME_ENV_VAR not in os.environ:
99-
raise EnvironmentError(
100-
"Environment variable {env_var} is not set".format(
101-
env_var=LOGGING_NAME_ENV_VAR
102-
)
103-
)
106+
return
107+
108+
# when running notebooks via pytest nbmake and running benchmarks
104109
test_name = os.environ[LOGGING_NAME_ENV_VAR]
105110
current_directory = os.getcwd()
106111

107-
if (
108-
(bytes_processed is not None)
109-
and (slot_millis is not None)
110-
and (exec_seconds is not None)
111-
):
112-
# store bytes processed
113-
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
114-
with open(bytes_file, "a") as f:
115-
f.write(str(bytes_processed) + "\n")
116-
112+
if (slot_millis is not None) and (exec_seconds is not None):
117113
# store slot milliseconds
118114
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
119115
with open(slot_file, "a") as f:
@@ -132,3 +128,8 @@ def write_stats_to_disk(
132128
)
133129
with open(query_char_count_file, "a") as f:
134130
f.write(str(query_char_count) + "\n")
131+
132+
# store bytes processed
133+
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
134+
with open(bytes_file, "a") as f:
135+
f.write(str(bytes_processed) + "\n")

scripts/run_and_publish_benchmark.py

+17-26
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ def collect_benchmark_result(
9393
error_files = sorted(path.rglob("*.error"))
9494

9595
if not (
96-
len(bytes_files)
97-
== len(millis_files)
96+
len(millis_files)
9897
== len(bq_seconds_files)
99-
<= len(query_char_count_files)
98+
<= len(bytes_files)
99+
== len(query_char_count_files)
100100
== len(local_seconds_files)
101101
):
102102
raise ValueError(
@@ -108,10 +108,13 @@ def collect_benchmark_result(
108108
for idx in range(len(local_seconds_files)):
109109
query_char_count_file = query_char_count_files[idx]
110110
local_seconds_file = local_seconds_files[idx]
111+
bytes_file = bytes_files[idx]
111112
filename = query_char_count_file.relative_to(path).with_suffix("")
112-
if filename != local_seconds_file.relative_to(path).with_suffix(""):
113+
if filename != local_seconds_file.relative_to(path).with_suffix(
114+
""
115+
) or filename != bytes_file.relative_to(path).with_suffix(""):
113116
raise ValueError(
114-
"File name mismatch between query_char_count and seconds reports."
117+
"File name mismatch among query_char_count, bytes and seconds reports."
115118
)
116119

117120
with open(query_char_count_file, "r") as file:
@@ -123,27 +126,23 @@ def collect_benchmark_result(
123126
lines = file.read().splitlines()
124127
local_seconds = sum(float(line) for line in lines) / iterations
125128

129+
with open(bytes_file, "r") as file:
130+
lines = file.read().splitlines()
131+
total_bytes = sum(int(line) for line in lines) / iterations
132+
126133
if not has_full_metrics:
127-
total_bytes = None
128134
total_slot_millis = None
129135
bq_seconds = None
130136
else:
131-
bytes_file = bytes_files[idx]
132137
millis_file = millis_files[idx]
133138
bq_seconds_file = bq_seconds_files[idx]
134-
if (
135-
filename != bytes_file.relative_to(path).with_suffix("")
136-
or filename != millis_file.relative_to(path).with_suffix("")
137-
or filename != bq_seconds_file.relative_to(path).with_suffix("")
138-
):
139+
if filename != millis_file.relative_to(path).with_suffix(
140+
""
141+
) or filename != bq_seconds_file.relative_to(path).with_suffix(""):
139142
raise ValueError(
140143
"File name mismatch among query_char_count, bytes, millis, and seconds reports."
141144
)
142145

143-
with open(bytes_file, "r") as file:
144-
lines = file.read().splitlines()
145-
total_bytes = sum(int(line) for line in lines) / iterations
146-
147146
with open(millis_file, "r") as file:
148147
lines = file.read().splitlines()
149148
total_slot_millis = sum(int(line) for line in lines) / iterations
@@ -202,11 +201,7 @@ def collect_benchmark_result(
202201
print(
203202
f"{index} - query count: {row['Query_Count']},"
204203
+ f" query char count: {row['Query_Char_Count']},"
205-
+ (
206-
f" bytes processed sum: {row['Bytes_Processed']},"
207-
if has_full_metrics
208-
else ""
209-
)
204+
+ f" bytes processed sum: {row['Bytes_Processed']},"
210205
+ (f" slot millis sum: {row['Slot_Millis']}," if has_full_metrics else "")
211206
+ f" local execution time: {formatted_local_exec_time} seconds"
212207
+ (
@@ -238,11 +233,7 @@ def collect_benchmark_result(
238233
print(
239234
f"---Geometric mean of queries: {geometric_mean_queries},"
240235
+ f" Geometric mean of queries char counts: {geometric_mean_query_char_count},"
241-
+ (
242-
f" Geometric mean of bytes processed: {geometric_mean_bytes},"
243-
if has_full_metrics
244-
else ""
245-
)
236+
+ f" Geometric mean of bytes processed: {geometric_mean_bytes},"
246237
+ (
247238
f" Geometric mean of slot millis: {geometric_mean_slot_millis},"
248239
if has_full_metrics

0 commit comments

Comments
 (0)