Skip to content

Commit 1614b3f

Browse files
authored
feat: query profiling part 1: synchronous (#938)
* feat: support query profiling * collection * fix unit tests * unit tests * vector get and stream, unit tests * aggregation get and stream, unit tests * docstring * query profile unit tests * update base classes' method signature * documentsnapshotlist unit tests * func signatures * undo client.py change * transaction.get() * lint * system test * fix shim test * fix sys test * fix sys test * system test * another system test * skip system test in emulator * stream generator unit tests * coverage * add system tests * small fixes * undo document change * add system tests * vector query system tests * format * fix system test * comments * add system tests * improve stream generator * type checking * adding stars * delete comment * remove coverage requirements for type checking part * add explain_options to StreamGenerator * yield tuple instead * raise exception when explain_metrics is absent * refactor documentsnapshotlist into queryresultslist * add comment * improve type hint * lint * move QueryResultsList to stream_generator.py * aggregation related type annotation * transaction return type hint * refactor QueryResultsList * change stream generator to return ExplainMetrics instead of yield * update aggregation query to use the new generator * update query to use the new generator * update vector query to use the new generator * lint * type annotations * fix type annotation to be python 3.9 compatible * fix type hint for python 3.8 * fix system test * add test coverage * use class method get_explain_metrics() instead of property explain_metrics * address comments * remove more Optional * add type hint for async stream generator * simplify yield in aggregation stream * stream generator type annotation * more type hints * remove "Integer" * docstring format * mypy * add more input verification for query_results.py
1 parent 3a546d3 commit 1614b3f

31 files changed

+2274
-176
lines changed

google/cloud/firestore/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from google.cloud.firestore_v1 import DocumentSnapshot
3939
from google.cloud.firestore_v1 import DocumentTransform
4040
from google.cloud.firestore_v1 import ExistsOption
41+
from google.cloud.firestore_v1 import ExplainOptions
4142
from google.cloud.firestore_v1 import FieldFilter
4243
from google.cloud.firestore_v1 import GeoPoint
4344
from google.cloud.firestore_v1 import Increment
@@ -78,6 +79,7 @@
7879
"DocumentSnapshot",
7980
"DocumentTransform",
8081
"ExistsOption",
82+
"ExplainOptions",
8183
"FieldFilter",
8284
"GeoPoint",
8385
"Increment",

google/cloud/firestore_v1/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from google.cloud.firestore_v1.collection import CollectionReference
5151
from google.cloud.firestore_v1.document import DocumentReference
5252
from google.cloud.firestore_v1.query import CollectionGroup, Query
53+
from google.cloud.firestore_v1.query_profile import ExplainOptions
5354
from google.cloud.firestore_v1.transaction import Transaction, transactional
5455
from google.cloud.firestore_v1.transforms import (
5556
DELETE_FIELD,
@@ -131,6 +132,7 @@
131132
"DocumentSnapshot",
132133
"DocumentTransform",
133134
"ExistsOption",
135+
"ExplainOptions",
134136
"FieldFilter",
135137
"GeoPoint",
136138
"Increment",

google/cloud/firestore_v1/aggregation.py

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030
BaseAggregationQuery,
3131
_query_response_to_result,
3232
)
33-
from google.cloud.firestore_v1.base_document import DocumentSnapshot
33+
from google.cloud.firestore_v1.query_results import QueryResultsList
3434
from google.cloud.firestore_v1.stream_generator import StreamGenerator
3535

3636
# Types needed only for Type Hints
37-
if TYPE_CHECKING:
38-
from google.cloud.firestore_v1 import transaction # pragma: NO COVER
37+
if TYPE_CHECKING: # pragma: NO COVER
38+
from google.cloud.firestore_v1 import transaction
39+
from google.cloud.firestore_v1.query_profile import ExplainMetrics
40+
from google.cloud.firestore_v1.query_profile import ExplainOptions
3941

4042

4143
class AggregationQuery(BaseAggregationQuery):
@@ -54,10 +56,14 @@ def get(
5456
retries.Retry, None, gapic_v1.method._MethodDefault
5557
] = gapic_v1.method.DEFAULT,
5658
timeout: float | None = None,
57-
) -> List[AggregationResult]:
59+
*,
60+
explain_options: Optional[ExplainOptions] = None,
61+
) -> QueryResultsList[AggregationResult]:
5862
"""Runs the aggregation query.
5963
60-
This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
64+
This sends a ``RunAggregationQuery`` RPC and returns a list of
65+
aggregation results in the stream of ``RunAggregationQueryResponse``
66+
messages.
6167
6268
Args:
6369
transaction
@@ -70,20 +76,39 @@ def get(
7076
should be retried. Defaults to a system-specified policy.
7177
timeout (float): The timeout for this request. Defaults to a
7278
system-specified value.
79+
explain_options
80+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
81+
Options to enable query profiling for this query. When set,
82+
explain_metrics will be available on the returned generator.
7383
7484
Returns:
75-
list: The aggregation query results
85+
QueryResultsList[AggregationResult]: The aggregation query results.
7686
7787
"""
78-
result = self.stream(transaction=transaction, retry=retry, timeout=timeout)
79-
return list(result) # type: ignore
88+
explain_metrics: ExplainMetrics | None = None
8089

81-
def _get_stream_iterator(self, transaction, retry, timeout):
90+
result = self.stream(
91+
transaction=transaction,
92+
retry=retry,
93+
timeout=timeout,
94+
explain_options=explain_options,
95+
)
96+
result_list = list(result)
97+
98+
if explain_options is None:
99+
explain_metrics = None
100+
else:
101+
explain_metrics = result.get_explain_metrics()
102+
103+
return QueryResultsList(result_list, explain_options, explain_metrics)
104+
105+
def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None):
82106
"""Helper method for :meth:`stream`."""
83107
request, kwargs = self._prep_stream(
84108
transaction,
85109
retry,
86110
timeout,
111+
explain_options,
87112
)
88113

89114
return self._client._firestore_api.run_aggregation_query(
@@ -106,9 +131,12 @@ def _retry_query_after_exception(self, exc, retry, transaction):
106131
def _make_stream(
107132
self,
108133
transaction: Optional[transaction.Transaction] = None,
109-
retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT,
134+
retry: Union[
135+
retries.Retry, None, gapic_v1.method._MethodDefault
136+
] = gapic_v1.method.DEFAULT,
110137
timeout: Optional[float] = None,
111-
) -> Union[Generator[List[AggregationResult], Any, None]]:
138+
explain_options: Optional[ExplainOptions] = None,
139+
) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]:
112140
"""Internal method for stream(). Runs the aggregation query.
113141
114142
This sends a ``RunAggregationQuery`` RPC and then returns a generator
@@ -127,16 +155,27 @@ def _make_stream(
127155
system-specified policy.
128156
timeout (Optional[float]): The timeout for this request. Defaults
129157
to a system-specified value.
158+
explain_options
159+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
160+
Options to enable query profiling for this query. When set,
161+
explain_metrics will be available on the returned generator.
130162
131163
Yields:
132-
:class:`~google.cloud.firestore_v1.base_aggregation.AggregationResult`:
164+
List[AggregationResult]:
133165
The result of aggregations of this query.
166+
167+
Returns:
168+
(Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]):
169+
The results of query profiling, if received from the service.
170+
134171
"""
172+
metrics: ExplainMetrics | None = None
135173

136174
response_iterator = self._get_stream_iterator(
137175
transaction,
138176
retry,
139177
timeout,
178+
explain_options,
140179
)
141180
while True:
142181
try:
@@ -154,15 +193,26 @@ def _make_stream(
154193

155194
if response is None: # EOI
156195
break
196+
197+
if metrics is None and response.explain_metrics:
198+
metrics = response.explain_metrics
199+
157200
result = _query_response_to_result(response)
158-
yield result
201+
if result:
202+
yield result
203+
204+
return metrics
159205

160206
def stream(
161207
self,
162208
transaction: Optional["transaction.Transaction"] = None,
163-
retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT,
209+
retry: Union[
210+
retries.Retry, None, gapic_v1.method._MethodDefault
211+
] = gapic_v1.method.DEFAULT,
164212
timeout: Optional[float] = None,
165-
) -> "StreamGenerator[DocumentSnapshot]":
213+
*,
214+
explain_options: Optional[ExplainOptions] = None,
215+
) -> StreamGenerator[List[AggregationResult]]:
166216
"""Runs the aggregation query.
167217
168218
This sends a ``RunAggregationQuery`` RPC and then returns a generator
@@ -181,13 +231,19 @@ def stream(
181231
system-specified policy.
182232
timeout (Optinal[float]): The timeout for this request. Defaults
183233
to a system-specified value.
234+
explain_options
235+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
236+
Options to enable query profiling for this query. When set,
237+
explain_metrics will be available on the returned generator.
184238
185239
Returns:
186-
`StreamGenerator[DocumentSnapshot]`: A generator of the query results.
240+
`StreamGenerator[List[AggregationResult]]`:
241+
A generator of the query results.
187242
"""
188243
inner_generator = self._make_stream(
189244
transaction=transaction,
190245
retry=retry,
191246
timeout=timeout,
247+
explain_options=explain_options,
192248
)
193-
return StreamGenerator(inner_generator)
249+
return StreamGenerator(inner_generator, explain_options)

google/cloud/firestore_v1/async_aggregation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ async def get(
5353
retries.AsyncRetry, None, gapic_v1.method._MethodDefault
5454
] = gapic_v1.method.DEFAULT,
5555
timeout: float | None = None,
56-
) -> List[AggregationResult]:
56+
) -> List[List[AggregationResult]]:
5757
"""Runs the aggregation query.
5858
5959
This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
@@ -71,7 +71,7 @@ async def get(
7171
system-specified value.
7272
7373
Returns:
74-
list: The aggregation query results
74+
List[List[AggregationResult]]: The aggregation query results
7575
7676
"""
7777
stream_result = self.stream(

google/cloud/firestore_v1/async_stream_generator.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,28 @@
1616
Firestore API.
1717
"""
1818

19-
from collections import abc
19+
from typing import Any, AsyncGenerator, Awaitable, TypeVar
2020

2121

22-
class AsyncStreamGenerator(abc.AsyncGenerator):
22+
T = TypeVar("T")
23+
24+
25+
class AsyncStreamGenerator(AsyncGenerator[T, Any]):
2326
"""Asynchronous generator for the streamed results."""
2427

25-
def __init__(self, response_generator):
28+
def __init__(self, response_generator: AsyncGenerator[T, Any]):
2629
self._generator = response_generator
2730

28-
def __aiter__(self):
29-
return self._generator
31+
def __aiter__(self) -> AsyncGenerator[T, Any]:
32+
return self
3033

31-
def __anext__(self):
34+
def __anext__(self) -> Awaitable[T]:
3235
return self._generator.__anext__()
3336

34-
def asend(self, value=None):
37+
def asend(self, value=None) -> Awaitable[Any]:
3538
return self._generator.asend(value)
3639

37-
def athrow(self, exp=None):
40+
def athrow(self, exp=None) -> Awaitable[Any]:
3841
return self._generator.athrow(exp)
3942

4043
def aclose(self):

google/cloud/firestore_v1/base_aggregation.py

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,7 @@
2424

2525
import abc
2626
from abc import ABC
27-
from typing import (
28-
TYPE_CHECKING,
29-
Any,
30-
AsyncGenerator,
31-
Coroutine,
32-
Generator,
33-
List,
34-
Optional,
35-
Tuple,
36-
Union,
37-
)
27+
from typing import TYPE_CHECKING, Any, Coroutine, List, Optional, Tuple, Union
3828

3929
from google.api_core import gapic_v1
4030
from google.api_core import retry as retries
@@ -47,8 +37,14 @@
4737
)
4838

4939
# Types needed only for Type Hints
50-
if TYPE_CHECKING:
51-
from google.cloud.firestore_v1 import transaction # pragma: NO COVER
40+
if TYPE_CHECKING: # pragma: NO COVER
41+
from google.cloud.firestore_v1 import transaction
42+
from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
43+
from google.cloud.firestore_v1.query_profile import ExplainOptions
44+
from google.cloud.firestore_v1.query_results import QueryResultsList
45+
from google.cloud.firestore_v1.stream_generator import (
46+
StreamGenerator,
47+
)
5248

5349

5450
class AggregationResult(object):
@@ -62,7 +58,7 @@ class AggregationResult(object):
6258
:param value: The resulting read_time
6359
"""
6460

65-
def __init__(self, alias: str, value: int, read_time=None):
61+
def __init__(self, alias: str, value: float, read_time=None):
6662
self.alias = alias
6763
self.value = value
6864
self.read_time = read_time
@@ -211,13 +207,16 @@ def _prep_stream(
211207
transaction=None,
212208
retry: Union[retries.Retry, None, gapic_v1.method._MethodDefault] = None,
213209
timeout: float | None = None,
210+
explain_options: Optional[ExplainOptions] = None,
214211
) -> Tuple[dict, dict]:
215212
parent_path, expected_prefix = self._collection_ref._parent_info()
216213
request = {
217214
"parent": parent_path,
218215
"structured_aggregation_query": self._to_protobuf(),
219216
"transaction": _helpers.get_transaction_id(transaction),
220217
}
218+
if explain_options:
219+
request["explain_options"] = explain_options._to_dict()
221220
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
222221

223222
return request, kwargs
@@ -230,10 +229,17 @@ def get(
230229
retries.Retry, None, gapic_v1.method._MethodDefault
231230
] = gapic_v1.method.DEFAULT,
232231
timeout: float | None = None,
233-
) -> List[AggregationResult] | Coroutine[Any, Any, List[AggregationResult]]:
232+
*,
233+
explain_options: Optional[ExplainOptions] = None,
234+
) -> (
235+
QueryResultsList[AggregationResult]
236+
| Coroutine[Any, Any, List[List[AggregationResult]]]
237+
):
234238
"""Runs the aggregation query.
235239
236-
This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages.
240+
This sends a ``RunAggregationQuery`` RPC and returns a list of
241+
aggregation results in the stream of ``RunAggregationQueryResponse``
242+
messages.
237243
238244
Args:
239245
transaction
@@ -246,22 +252,27 @@ def get(
246252
should be retried. Defaults to a system-specified policy.
247253
timeout (float): The timeout for this request. Defaults to a
248254
system-specified value.
255+
explain_options
256+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
257+
Options to enable query profiling for this query. When set,
258+
explain_metrics will be available on the returned generator.
249259
250260
Returns:
251-
list: The aggregation query results
252-
261+
(QueryResultsList[List[AggregationResult]] | Coroutine[Any, Any, List[List[AggregationResult]]]):
262+
The aggregation query results.
253263
"""
254264

255265
@abc.abstractmethod
256266
def stream(
257267
self,
258268
transaction: Optional[transaction.Transaction] = None,
259-
retry: Optional[retries.Retry] = gapic_v1.method.DEFAULT,
269+
retry: Union[
270+
retries.Retry, None, gapic_v1.method._MethodDefault
271+
] = gapic_v1.method.DEFAULT,
260272
timeout: Optional[float] = None,
261-
) -> (
262-
Generator[List[AggregationResult], Any, None]
263-
| AsyncGenerator[List[AggregationResult], None]
264-
):
273+
*,
274+
explain_options: Optional[ExplainOptions] = None,
275+
) -> StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator:
265276
"""Runs the aggregation query.
266277
267278
This sends a``RunAggregationQuery`` RPC and returns a generator in the stream of ``RunAggregationQueryResponse`` messages.
@@ -274,8 +285,13 @@ def stream(
274285
errors, if any, should be retried. Defaults to a
275286
system-specified policy.
276287
timeout (Optinal[float]): The timeout for this request. Defaults
277-
to a system-specified value.
288+
to a system-specified value.
289+
explain_options
290+
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
291+
Options to enable query profiling for this query. When set,
292+
explain_metrics will be available on the returned generator.
278293
279294
Returns:
295+
StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator:
280296
A generator of the query results.
281297
"""

0 commit comments

Comments
 (0)