2222"""
2323import json
2424import logging
25+ import os
2526from typing import Dict , List , Optional , Text , Union
2627from aixplain .enums .data_type import DataType
2728from aixplain .enums .function import Function
@@ -207,7 +208,7 @@ def list(
207208 output_data_types = [output_data_types ]
208209 payload ["inputDataTypes" ] = [data_type .value for data_type in output_data_types ]
209210
210- logging .info (f"Start service for POST List Dataset - { url } - { headers } - { json .dumps (payload )} " )
211+ logging .info (f"Start service for POST List Pipeline - { url } - { headers } - { json .dumps (payload )} " )
211212 r = _request_with_retry ("post" , url , headers = headers , json = payload )
212213 resp = r .json ()
213214
@@ -220,3 +221,40 @@ def list(
220221 for pipeline in results :
221222 pipelines .append (cls .__from_response (pipeline ))
222223 return {"results" : pipelines , "page_total" : page_total , "page_number" : page_number , "total" : total }
224+
225+ @classmethod
226+ def create (cls , name : Text , pipeline : Union [Text , Dict ], status : Text = "draft" ) -> Pipeline :
227+ """Pipeline Creation
228+
229+ Args:
230+ name (Text): Pipeline Name
231+ pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file
232+ status (Text, optional): Status of the pipeline. Currently only draft pipelines can be saved. Defaults to "draft".
233+
234+ Raises:
235+ Exception: Currently just the creation of draft pipelines are supported
236+
237+ Returns:
238+ Pipeline: instance of the new pipeline
239+ """
240+ try :
241+ assert status == "draft" , "Pipeline Creation Error: Currently just the creation of draft pipelines are supported."
242+ if isinstance (pipeline , str ) is True :
243+ _ , ext = os .path .splitext (pipeline )
244+ assert (
245+ os .path .exists (pipeline ) and ext == ".json"
246+ ), "Pipeline Creation Error: Make sure the pipeline to be save is in a JSON file."
247+ with open (pipeline ) as f :
248+ pipeline = json .load (f )
249+
250+ # prepare payload
251+ payload = {"name" : name , "status" : "draft" , "architecture" : pipeline }
252+ url = urljoin (cls .backend_url , "sdk/pipelines" )
253+ headers = {"Authorization" : f"Token { config .TEAM_API_KEY } " , "Content-Type" : "application/json" }
254+ logging .info (f"Start service for POST Create Pipeline - { url } - { headers } - { json .dumps (payload )} " )
255+ r = _request_with_retry ("post" , url , headers = headers , json = payload )
256+ response = r .json ()
257+
258+ return Pipeline (response ["id" ], name , config .TEAM_API_KEY )
259+ except Exception as e :
260+ raise Exception (e )
0 commit comments