Skip to content

Add custom function for delete_dag API #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion plugins/rest_api_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,15 @@ def get_config_boolean_value(section, key, default_value):
"airflow_version": "None - Custom API",
"http_method": ["GET", "POST"],
"arguments": []
},
{
"name": "delete_dag",
"description": "Delete a DAG in the Web Server from Airflow databas and filesystem",
"airflow_version": "None - Custom API",
"http_method": ["GET", "POST"],
"arguments": [
{"name": "dag_id", "description": "The id of the dag", "form_input_type": "text", "required": True}
]
}
]

Expand Down Expand Up @@ -701,6 +710,8 @@ def api(self):
final_response = self.refresh_dag(base_response)
elif api == "refresh_all_dags":
final_response = self.refresh_all_dags(base_response)
elif api == "delete_dag":
final_response = self.delete_dag(base_response)
else:
final_response = self.execute_cli(base_response, api_metadata)

Expand Down Expand Up @@ -904,6 +915,35 @@ def refresh_all_dags(self, base_response):

return REST_API_Response_Util.get_200_response(base_response=base_response, output="All DAGs are now up to date")

# Custom Function for the delete_dag API
def delete_dag(self, base_response):
logging.info("Executing custom 'delete_dag' function")
dag_id = request.args.get('dag_id')
logging.info("dag_id to delete: '" + str(dag_id) + "'")
if self.is_arg_not_provided(dag_id):
return REST_API_Response_Util.get_400_error_response(base_response, "dag_id should be provided")
elif " " in dag_id:
return REST_API_Response_Util.get_400_error_response(base_response, "dag_id contains spaces and is therefore an illegal argument")

try:
dag_full_path = airflow_dags_folder + os.sep + dag_id + ".py"

if os.path.exists(dag_full_path):
os.remove(dag_full_path)

from airflow.api.common.experimental import delete_dag
deleted_dags = delete_dag.delete_dag(dag_id, keep_records_in_log=False)
if deleted_dags > 0:
logging.info("Deleted dag " + dag_id)
else:
logging.info("No dags deleted")
except Exception as e:
error_message = "An error occurred while trying to delete the DAG '" + str(dag_id) + "': " + str(e)
logging.error(error_message)
return REST_API_Response_Util.get_500_error_response(base_response, error_message)

return REST_API_Response_Util.get_200_response(base_response=base_response, output="DAG [{}] deleted".format(dag_id))

# Executes the airflow command passed into it in the background so the function isn't tied to the webserver process
@staticmethod
def execute_cli_command_background_mode(airflow_cmd):
Expand Down Expand Up @@ -993,4 +1033,4 @@ class REST_API_Plugin(AirflowPlugin):
hooks = []
executors = []
admin_views = [rest_api_view]
menu_links = []
menu_links = []