Skip to content

Commit 01d6bbb

Browse files
authored
test: restore remote function stickiness in small tests (#847)
* feat: support remote function cleanup with `session.close` * accept the possibility that the artifact may have already been deleted * add cleanup by previous session id * add more documentation * hold session artifacts in a remote function session class * fix the missing return keyword * test: restore stickiness in small `remote_function` tests docs: make `close_session`/`reset_session` appears in the docs
1 parent 676a410 commit 01d6bbb

File tree

4 files changed

+178
-95
lines changed

4 files changed

+178
-95
lines changed

bigframes/functions/remote_function.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,12 @@ class IbisSignature(NamedTuple):
204204
output_type: IbisDataType
205205

206206

207-
def get_cloud_function_name(function_hash, session_id, uniq_suffix=None):
207+
def get_cloud_function_name(function_hash, session_id=None, uniq_suffix=None):
208208
"Get a name for the cloud function for the given user defined function."
209-
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX, session_id, function_hash]
209+
parts = [_BIGFRAMES_REMOTE_FUNCTION_PREFIX]
210+
if session_id:
211+
parts.append(session_id)
212+
parts.append(function_hash)
210213
if uniq_suffix:
211214
parts.append(uniq_suffix)
212215
return _GCF_FUNCTION_NAME_SEPERATOR.join(parts)
@@ -566,10 +569,13 @@ def provision_bq_remote_function(
566569
)
567570

568571
# Derive the name of the cloud function underlying the intended BQ
569-
# remote function, also collect updated package requirements as
570-
# determined in the name resolution
572+
# remote function. Use the session id to identify the GCF for unnamed
573+
# functions. The named remote functions are treated as a persistant
574+
# artifacts, so let's keep them independent of session id, which also
575+
# makes their naming more stable for the same udf code
576+
session_id = None if name else self._session.session_id
571577
cloud_function_name = get_cloud_function_name(
572-
function_hash, self._session.session_id, uniq_suffix
578+
function_hash, session_id, uniq_suffix
573579
)
574580
cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name)
575581

@@ -635,13 +641,12 @@ def get_remote_function_specs(self, remote_function_name):
635641
)
636642
try:
637643
for routine in routines:
644+
routine = cast(bigquery.Routine, routine)
638645
if routine.reference.routine_id == remote_function_name:
639-
# TODO(shobs): Use first class properties when they are available
640-
# https://github.com/googleapis/python-bigquery/issues/1552
641-
rf_options = routine._properties.get("remoteFunctionOptions")
646+
rf_options = routine.remote_function_options
642647
if rf_options:
643-
http_endpoint = rf_options.get("endpoint")
644-
bq_connection = rf_options.get("connection")
648+
http_endpoint = rf_options.endpoint
649+
bq_connection = rf_options.connection
645650
if bq_connection:
646651
bq_connection = os.path.basename(bq_connection)
647652
break
@@ -731,15 +736,15 @@ class _RemoteFunctionSession:
731736

732737
def __init__(self):
733738
# Session level mapping of remote function artifacts
734-
self._temp_session_artifacts: Dict[str, str] = dict()
739+
self._temp_artifacts: Dict[str, str] = dict()
735740

736-
# Lock to synchronize the update of the session level mapping
737-
self._session_artifacts_lock = threading.Lock()
741+
# Lock to synchronize the update of the session artifacts
742+
self._artifacts_lock = threading.Lock()
738743

739-
def _update_artifacts(self, bqrf_routine: str, gcf_path: str):
744+
def _update_temp_artifacts(self, bqrf_routine: str, gcf_path: str):
740745
"""Update remote function artifacts in the current session."""
741-
with self._session_artifacts_lock:
742-
self._temp_session_artifacts[bqrf_routine] = gcf_path
746+
with self._artifacts_lock:
747+
self._temp_artifacts[bqrf_routine] = gcf_path
743748

744749
def clean_up(
745750
self,
@@ -748,8 +753,8 @@ def clean_up(
748753
session_id: str,
749754
):
750755
"""Delete remote function artifacts in the current session."""
751-
with self._session_artifacts_lock:
752-
for bqrf_routine, gcf_path in self._temp_session_artifacts.items():
756+
with self._artifacts_lock:
757+
for bqrf_routine, gcf_path in self._temp_artifacts.items():
753758
# Let's accept the possibility that the remote function may have
754759
# been deleted directly by the user
755760
bqclient.delete_routine(bqrf_routine, not_found_ok=True)
@@ -761,7 +766,7 @@ def clean_up(
761766
except google.api_core.exceptions.NotFound:
762767
pass
763768

764-
self._temp_session_artifacts.clear()
769+
self._temp_artifacts.clear()
765770

766771
# Inspired by @udf decorator implemented in ibis-bigquery package
767772
# https://github.com/ibis-project/ibis-bigquery/blob/main/ibis_bigquery/udf/__init__.py
@@ -1206,7 +1211,7 @@ def try_delattr(attr):
12061211
# explicit name, we are assuming that the user wants to persist them
12071212
# with that name and would directly manage their lifecycle.
12081213
if created_new and (not name):
1209-
self._update_artifacts(
1214+
self._update_temp_artifacts(
12101215
func.bigframes_remote_function, func.bigframes_cloud_function
12111216
)
12121217
return func

bigframes/pandas/__init__.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -847,10 +847,28 @@ def clean_up_by_session_id(
847847
option_context = config.option_context
848848
"""Global :class:`~bigframes._config.option_context` to configure BigQuery DataFrames."""
849849

850+
850851
# Session management APIs
851-
get_global_session = global_session.get_global_session
852-
close_session = global_session.close_session
853-
reset_session = global_session.close_session
852+
def get_global_session():
853+
return global_session.get_global_session()
854+
855+
856+
get_global_session.__doc__ = global_session.get_global_session.__doc__
857+
858+
859+
def close_session():
860+
return global_session.close_session()
861+
862+
863+
close_session.__doc__ = global_session.close_session.__doc__
864+
865+
866+
def reset_session():
867+
return global_session.close_session()
868+
869+
870+
reset_session.__doc__ = global_session.close_session.__doc__
871+
854872

855873
# SQL Compilation uses recursive algorithms on deep trees
856874
# 10M tree depth should be sufficient to generate any sql that is under bigquery limit

bigframes/session/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ def _clean_up_tables(self):
400400
def close(self):
401401
"""Delete resources that were created with this session's session_id.
402402
This includes BigQuery tables, remote functions and cloud functions
403-
serving the remote functions"""
403+
serving the remote functions."""
404404
self._clean_up_tables()
405405
self._remote_function_session.clean_up(
406406
self.bqclient, self.cloudfunctionsclient, self.session_id

0 commit comments

Comments
 (0)