48
48
[gca_pipeline_state_v1beta1 .PipelineState .PIPELINE_STATE_FAILED ]
49
49
)
50
50
51
- # Vertex AI Pipelines service API job name relative name prefix pattern.
52
- _JOB_NAME_PATTERN = "{parent}/pipelineJobs/{job_id}"
53
-
54
51
# Pattern for valid names used as a Vertex resource name.
55
52
_VALID_NAME_PATTERN = re .compile ("^[a-z][-a-z0-9]{0,127}$" )
56
53
@@ -178,19 +175,18 @@ def __init__(
178
175
)
179
176
180
177
pipeline_name = pipeline_job ["pipelineSpec" ]["pipelineInfo" ]["name" ]
181
- job_id = job_id or "{pipeline_name}-{timestamp}" .format (
178
+ self . job_id = job_id or "{pipeline_name}-{timestamp}" .format (
182
179
pipeline_name = re .sub ("[^-0-9a-z]+" , "-" , pipeline_name .lower ())
183
180
.lstrip ("-" )
184
181
.rstrip ("-" ),
185
182
timestamp = _get_current_time ().strftime ("%Y%m%d%H%M%S" ),
186
183
)
187
- if not _VALID_NAME_PATTERN .match (job_id ):
184
+ if not _VALID_NAME_PATTERN .match (self . job_id ):
188
185
raise ValueError (
189
186
"Generated job ID: {} is illegal as a Vertex pipelines job ID. "
190
187
"Expecting an ID following the regex pattern "
191
188
'"[a-z][-a-z0-9]{{0,127}}"' .format (job_id )
192
189
)
193
- job_name = _JOB_NAME_PATTERN .format (parent = self ._parent , job_id = job_id )
194
190
195
191
builder = pipeline_utils .PipelineRuntimeConfigBuilder .from_job_spec_json (
196
192
pipeline_job
@@ -206,7 +202,6 @@ def __init__(
206
202
207
203
self ._gca_resource = gca_pipeline_job_v1beta1 .PipelineJob (
208
204
display_name = display_name ,
209
- name = job_name ,
210
205
pipeline_spec = pipeline_job ["pipelineSpec" ],
211
206
labels = labels ,
212
207
runtime_config = runtime_config ,
@@ -215,18 +210,6 @@ def __init__(
215
210
),
216
211
)
217
212
218
- def _assert_gca_resource_is_available (self ) -> None :
219
- # TODO(b/193800063) Change this to name after this fix
220
- if not getattr (self ._gca_resource , "create_time" , None ):
221
- raise RuntimeError (
222
- f"{ self .__class__ .__name__ } resource has not been created."
223
- + (
224
- f" Resource failed with: { self ._exception } "
225
- if self ._exception
226
- else ""
227
- )
228
- )
229
-
230
213
@base .optional_sync ()
231
214
def run (
232
215
self ,
@@ -257,12 +240,10 @@ def run(
257
240
258
241
_LOGGER .log_create_with_lro (self .__class__ )
259
242
260
- # PipelineJob.name is not used by pipeline service
261
- pipeline_job_id = self ._gca_resource .name .split ("/" )[- 1 ]
262
243
self ._gca_resource = self .api_client .create_pipeline_job (
263
244
parent = self ._parent ,
264
245
pipeline_job = self ._gca_resource ,
265
- pipeline_job_id = pipeline_job_id ,
246
+ pipeline_job_id = self . job_id ,
266
247
)
267
248
268
249
_LOGGER .log_create_complete_with_getter (
0 commit comments