16
16
#
17
17
18
18
import os
19
+ import time
19
20
from urllib import request
20
21
21
22
import pytest
45
46
"median_income" : 3.014700 ,
46
47
}
47
48
49
+ _PERMANENT_AUTOML_MODEL_RESOURCE_NAME = f"projects/{ e2e_base ._PROJECT_NUMBER } /locations/us-central1/models/6591277539400876032"
50
+
48
51
49
52
@pytest .mark .usefixtures (
50
53
"prepare_staging_bucket" , "delete_staging_bucket" , "tear_down_resources"
@@ -78,7 +81,6 @@ def test_end_to_end_tabular(self, shared_state):
78
81
)
79
82
80
83
# Create and import to single managed dataset for both training jobs
81
-
82
84
dataset_gcs_source = f'gs://{ shared_state ["staging_bucket_name" ]} /{ _BLOB_PATH } '
83
85
84
86
ds = aiplatform .TabularDataset .create (
@@ -91,7 +93,6 @@ def test_end_to_end_tabular(self, shared_state):
91
93
shared_state ["resources" ].extend ([ds ])
92
94
93
95
# Define both training jobs
94
-
95
96
custom_job = aiplatform .CustomTrainingJob (
96
97
display_name = self ._make_display_name ("train-housing-custom" ),
97
98
script_path = _LOCAL_TRAINING_SCRIPT_PATH ,
@@ -106,8 +107,7 @@ def test_end_to_end_tabular(self, shared_state):
106
107
optimization_objective = "minimize-rmse" ,
107
108
)
108
109
109
- # Kick off both training jobs, AutoML job will take approx one hour to run
110
-
110
+ # Kick off both training jobs to check they are started correctly, then cancel the AutoML job
111
111
custom_model = custom_job .run (
112
112
ds ,
113
113
replica_count = 1 ,
@@ -119,21 +119,32 @@ def test_end_to_end_tabular(self, shared_state):
119
119
create_request_timeout = None ,
120
120
)
121
121
122
- automl_model = automl_job .run (
122
+ automl_job .run (
123
123
dataset = ds ,
124
124
target_column = "median_house_value" ,
125
125
model_display_name = self ._make_display_name ("automl-housing-model" ),
126
126
sync = False ,
127
127
)
128
128
129
- shared_state ["resources" ].extend (
130
- [automl_job , automl_model , custom_job , custom_model ]
131
- )
129
+ while (
130
+ automl_job .state != gca_pipeline_state .PipelineState .PIPELINE_STATE_RUNNING
131
+ ):
132
+ time .sleep (5 )
133
+
134
+ # Cancel the AutoML job once it's successfully been created, this is async
135
+ automl_job .cancel ()
132
136
133
- # Deploy both models after training completes
137
+ shared_state ["resources" ].extend ([custom_job , custom_model ])
138
+
139
+ # Deploy the custom model after training completes
134
140
custom_endpoint = custom_model .deploy (machine_type = "n1-standard-4" , sync = False )
141
+
142
+ # Create a reference to the permanent AutoML model and deloy it to a temporary endpoint
143
+ automl_model = aiplatform .Model (
144
+ model_name = _PERMANENT_AUTOML_MODEL_RESOURCE_NAME
145
+ )
135
146
automl_endpoint = automl_model .deploy (machine_type = "n1-standard-4" , sync = False )
136
- shared_state ["resources" ].extend ([automl_endpoint , custom_endpoint ])
147
+ shared_state ["resources" ].extend ([custom_endpoint , automl_endpoint ])
137
148
138
149
custom_batch_prediction_job = custom_model .batch_predict (
139
150
job_display_name = self ._make_display_name ("automl-housing-model" ),
@@ -149,7 +160,6 @@ def test_end_to_end_tabular(self, shared_state):
149
160
in_progress_done_check = custom_job .done ()
150
161
custom_job .wait_for_resource_creation ()
151
162
152
- automl_job .wait_for_resource_creation ()
153
163
custom_batch_prediction_job .wait_for_resource_creation ()
154
164
155
165
# Send online prediction with same instance to both deployed models
@@ -172,7 +182,6 @@ def test_end_to_end_tabular(self, shared_state):
172
182
173
183
custom_batch_prediction_job .wait ()
174
184
175
- automl_endpoint .wait ()
176
185
automl_prediction = automl_endpoint .predict (
177
186
[{k : str (v ) for k , v in _INSTANCE .items ()}], # Cast int values to strings
178
187
timeout = 180.0 ,
@@ -189,14 +198,14 @@ def test_end_to_end_tabular(self, shared_state):
189
198
custom_job .state
190
199
== gca_pipeline_state .PipelineState .PIPELINE_STATE_SUCCEEDED
191
200
)
192
- assert (
193
- automl_job .state
194
- == gca_pipeline_state .PipelineState .PIPELINE_STATE_SUCCEEDED
195
- )
196
201
assert (
197
202
custom_batch_prediction_job .state
198
203
== gca_job_state .JobState .JOB_STATE_SUCCEEDED
199
204
)
205
+ assert (
206
+ automl_job .state
207
+ == gca_pipeline_state .PipelineState .PIPELINE_STATE_CANCELLED
208
+ )
200
209
201
210
# Ensure a single prediction was returned
202
211
assert len (custom_prediction .predictions ) == 1
0 commit comments