@@ -101,12 +101,12 @@ def __polling(
101101 time .sleep (wait_time )
102102 if wait_time < 60 :
103103 wait_time *= 1.1
104- except Exception as e :
104+ except Exception :
105105 logging .error (f"Polling for Pipeline: polling for { name } : Continue" )
106106 if response_body and response_body ["status" ] == "SUCCESS" :
107107 try :
108108 logging .debug (f"Polling for Pipeline: Final status of polling for { name } : SUCCESS - { response_body } " )
109- except Exception as e :
109+ except Exception :
110110 logging .error (f"Polling for Pipeline: Final status of polling for { name } : ERROR - { response_body } " )
111111 else :
112112 logging .error (
@@ -130,7 +130,7 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict:
130130 try :
131131 resp = r .json ()
132132 logging .info (f"Single Poll for Pipeline: Status of polling for { name } : { resp } " )
133- except Exception as e :
133+ except Exception :
134134 resp = {"status" : "FAILED" }
135135 return resp
136136
@@ -206,7 +206,7 @@ def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[
206206 if isinstance (payload , int ) is True or isinstance (payload , float ) is True :
207207 payload = str (payload )
208208 payload = {"data" : payload }
209- except Exception as e :
209+ except Exception :
210210 payload = {"data" : data }
211211 else :
212212 payload = {}
@@ -251,7 +251,7 @@ def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[
251251 if target_row .id == data [node_label ]:
252252 data_found = True
253253 break
254- if data_found == True :
254+ if data_found is True :
255255 break
256256 except Exception :
257257 data_asset_found = False
@@ -303,17 +303,19 @@ def run_async(
303303
304304 poll_url = resp ["url" ]
305305 response = {"status" : "IN_PROGRESS" , "url" : poll_url }
306- except Exception as e :
306+ except Exception :
307307 response = {"status" : "FAILED" }
308308 if resp is not None :
309309 response ["error" ] = resp
310310 return response
311311
312- def update (self , pipeline : Union [Text , Dict ]):
312+ def update (self , pipeline : Union [Text , Dict ], save_as_asset : bool = False , api_key : Optional [ Text ] = None ):
313313 """Update Pipeline
314314
315315 Args:
316316 pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file
317+ save_as_asset (bool, optional): Save as asset (True) or draft (False). Defaults to False.
318+ api_key (Optional[Text], optional): Team API Key to create the Pipeline. Defaults to None.
317319
318320 Raises:
319321 Exception: Make sure the pipeline to be save is in a JSON file.
@@ -323,17 +325,38 @@ def update(self, pipeline: Union[Text, Dict]):
323325 _ , ext = os .path .splitext (pipeline )
324326 assert (
325327 os .path .exists (pipeline ) and ext == ".json"
326- ), "Pipeline Update Error: Make sure the pipeline to be save is in a JSON file."
328+ ), "Pipeline Update Error: Make sure the pipeline to be saved is in a JSON file."
327329 with open (pipeline ) as f :
328330 pipeline = json .load (f )
329331
332+ for i , node in enumerate (pipeline ["nodes" ]):
333+ if "functionType" in node and node ["functionType" ] == "AI" :
334+ pipeline ["nodes" ][i ]["functionType" ] = pipeline ["nodes" ][i ]["functionType" ].lower ()
330335 # prepare payload
331- payload = {"name" : self .name , "status" : "draft" , "architecture" : pipeline }
336+ status = "draft"
337+ if save_as_asset is True :
338+ status = "onboarded"
339+ payload = {"name" : self .name , "status" : status , "architecture" : pipeline }
332340 url = urljoin (config .BACKEND_URL , f"sdk/pipelines/{ self .id } " )
333- headers = {"Authorization" : f"Token { config .TEAM_API_KEY } " , "Content-Type" : "application/json" }
341+ api_key = api_key if api_key is not None else config .TEAM_API_KEY
342+ headers = {"Authorization" : f"Token { api_key } " , "Content-Type" : "application/json" }
334343 logging .info (f"Start service for PUT Update Pipeline - { url } - { headers } - { json .dumps (payload )} " )
335344 r = _request_with_retry ("put" , url , headers = headers , json = payload )
336345 response = r .json ()
337346 logging .info (f"Pipeline { response ['id' ]} Updated." )
338347 except Exception as e :
339348 raise Exception (e )
349+
350+ def delete (self ) -> None :
351+ """Delete Dataset service"""
352+ try :
353+ url = urljoin (config .BACKEND_URL , f"sdk/pipelines/{ self .id } " )
354+ headers = {"Authorization" : f"Token { config .TEAM_API_KEY } " , "Content-Type" : "application/json" }
355+ logging .info (f"Start service for DELETE Pipeline - { url } - { headers } " )
356+ r = _request_with_retry ("delete" , url , headers = headers )
357+ if r .status_code != 200 :
358+ raise Exception ()
359+ except Exception :
360+ message = "Pipeline Deletion Error: Make sure the pipeline exists and you are the owner."
361+ logging .error (message )
362+ raise Exception (f"{ message } " )
0 commit comments