Skip to content

Commit 0a08027

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
fix: Identify nested Vertex Tensorboard profile runs for uploading when standard event data is not present
PiperOrigin-RevId: 671909634
1 parent 5a32eb7 commit 0a08027

File tree

3 files changed

+106
-3
lines changed

3 files changed

+106
-3
lines changed

google/cloud/aiplatform/tensorboard/logdir_loader.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import collections
2121
import os
22+
import tensorflow as tf
2223

2324
from tensorboard.backend.event_processing import directory_watcher
2425
from tensorboard.backend.event_processing import io_wrapper
@@ -28,6 +29,37 @@
2829
logger = tb_logging.get_logger()
2930

3031

32+
def is_plugins_subdirectory(path):
33+
"""Returns true if the path is a profile subdirectory."""
34+
if not tf.io.gfile.isdir(path):
35+
return False
36+
dirs = tf.io.gfile.listdir(path)
37+
return "plugins/" in dirs or "plugins" in dirs
38+
39+
40+
def get_plugins_subdirectories(path):
41+
"""Returns a list of plugins subdirectories within the given path."""
42+
if not tf.io.gfile.exists(path):
43+
# No directory to traverse.
44+
logger.warning("Directory does not exist: %s", str(path))
45+
return ()
46+
47+
current_glob_string = os.path.join(path, "*")
48+
while True:
49+
globs = tf.io.gfile.glob(current_glob_string)
50+
51+
if not globs:
52+
# This subdirectory level lacks files. Terminate.
53+
return
54+
55+
for glob in globs:
56+
if is_plugins_subdirectory(glob):
57+
yield glob
58+
59+
# Iterate to the next level of subdirectories.
60+
current_glob_string = os.path.join(current_glob_string, "*")
61+
62+
3163
class LogdirLoader:
3264
"""Loader for a root log directory, maintaining multiple DirectoryLoaders.
3365
@@ -58,13 +90,11 @@ def __init__(self, logdir, directory_loader_factory):
5890
self._directory_loaders = {}
5991

6092
def synchronize_runs(self):
61-
"""Finds new runs within `logdir` and makes `DirectoryLoaders` for
62-
them.
93+
"""Finds new runs within `logdir` and makes `DirectoryLoaders` for them.
6394
6495
In addition, any existing `DirectoryLoader` whose run directory
6596
no longer exists will be deleted.
6697
67-
Modify run name to work with Experiments restrictions.
6898
"""
6999
logger.info("Starting logdir traversal of %s", self._logdir)
70100
runs_seen = set()
@@ -74,6 +104,12 @@ def synchronize_runs(self):
74104
if run not in self._directory_loaders:
75105
logger.info("- Adding run for relative directory %s", run)
76106
self._directory_loaders[run] = self._directory_loader_factory(subdir)
107+
for subdir in get_plugins_subdirectories(self._logdir):
108+
run = os.path.relpath(subdir, self._logdir)
109+
runs_seen.add(run)
110+
if run not in self._directory_loaders:
111+
logger.info("- Adding run for relative directory %s", run)
112+
self._directory_loaders[run] = self._directory_loader_factory(subdir)
77113
stale_runs = set(self._directory_loaders) - runs_seen
78114
if stale_runs:
79115
for run in stale_runs:

tests/unit/aiplatform/test_logdir_loader.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,34 @@ def test_single_event_logdir(self):
181181
# A second load should indicate no new data for the run.
182182
self.assertEqual(self._extract_run_to_tags(loader.get_run_events()), {".": []})
183183

184+
def test_profile_logdir(self):
185+
logdir = self.get_temp_dir()
186+
profile_dir = os.path.join(logdir, "foo/plugins/profile")
187+
os.makedirs(profile_dir, exist_ok=True)
188+
tempfile.NamedTemporaryFile(
189+
prefix="bar", suffix=".xplane.pb", dir=profile_dir, delete=False
190+
)
191+
self.assertNotEmpty(os.listdir(profile_dir))
192+
loader = self._create_logdir_loader(logdir)
193+
loader.synchronize_runs()
194+
self.assertEqual(
195+
self._extract_run_to_tags(loader.get_run_events()), {"foo": []}
196+
)
197+
198+
def test_profile_subdirectories(self):
199+
logdir = self.get_temp_dir()
200+
profile_dir = os.path.join(logdir, "foo/bar/subdir/plugins/profile")
201+
os.makedirs(profile_dir, exist_ok=True)
202+
tempfile.NamedTemporaryFile(
203+
prefix="bar", suffix=".xplane.pb", dir=profile_dir, delete=False
204+
)
205+
self.assertNotEmpty(os.listdir(profile_dir))
206+
loader = self._create_logdir_loader(logdir)
207+
loader.synchronize_runs()
208+
self.assertEqual(
209+
self._extract_run_to_tags(loader.get_run_events()), {"foo/bar/subdir": []}
210+
)
211+
184212
def test_multiple_writes_to_logdir(self):
185213
logdir = self.get_temp_dir()
186214
with FileWriter(os.path.join(logdir, "a")) as writer:

tests/unit/aiplatform/test_uploader.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,45 @@ def test_profile_plugin_included_by_default(
13751375
self.assertIn(run_name, profile_sender._run_to_file_request_sender)
13761376
experiment_tracker_mock.set_experiment.assert_called_once()
13771377

1378+
@patch.object(
1379+
uploader_utils.OnePlatformResourceManager,
1380+
"get_run_resource_name",
1381+
autospec=True,
1382+
)
1383+
@patch.object(metadata, "_experiment_tracker", autospec=True)
1384+
@patch.object(experiment_resources, "Experiment", autospec=True)
1385+
def test_nested_profile_files_are_uploaded(
1386+
self, experiment_resources_mock, experiment_tracker_mock, run_resource_mock
1387+
):
1388+
experiment_resources_mock.get.return_value = _TEST_EXPERIMENT_NAME
1389+
experiment_tracker_mock.set_experiment.return_value = _TEST_EXPERIMENT_NAME
1390+
experiment_tracker_mock.set_tensorboard.return_value = (
1391+
_TEST_TENSORBOARD_RESOURCE_NAME
1392+
)
1393+
run_name = "profile_test_run"
1394+
run_resource_mock.return_value = _TEST_ONE_PLATFORM_RUN_NAME
1395+
with tempfile.TemporaryDirectory() as logdir:
1396+
prof_path = os.path.join(
1397+
logdir, run_name, profile_uploader.ProfileRequestSender.PROFILE_PATH
1398+
)
1399+
os.makedirs(prof_path)
1400+
1401+
uploader = _create_uploader(
1402+
self.mock_client,
1403+
logdir,
1404+
one_shot=True,
1405+
)
1406+
1407+
uploader.create_experiment()
1408+
uploader._upload_once()
1409+
senders = uploader._dispatcher._additional_senders
1410+
self.assertIn("profile", senders.keys())
1411+
1412+
profile_sender = senders["profile"]
1413+
self.assertIn(run_name, profile_sender._run_to_profile_loaders)
1414+
self.assertIn(run_name, profile_sender._run_to_file_request_sender)
1415+
experiment_tracker_mock.set_experiment.assert_called_once()
1416+
13781417
@patch.object(metadata, "_experiment_tracker", autospec=True)
13791418
@patch.object(experiment_resources, "Experiment", autospec=True)
13801419
def test_active_experiment_set_experiment_not_called(

0 commit comments

Comments
 (0)