Skip to content

Commit

Permalink
Merge pull request #386 from DalgoT4D/airbyte-logs-from-subflow
Browse files Browse the repository at this point in the history
fetch airbyte logs via a subflow
  • Loading branch information
Ishankoradia authored Dec 11, 2023
2 parents 702d000 + f5c5f61 commit 6e79b1d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 19 deletions.
9 changes: 8 additions & 1 deletion ddpui/tests/helper/test_prefectlogs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import patch, Mock
from unittest.mock import patch
from ddpui.utils.prefectlogs import (
remove_timestamps,
skip_line,
Expand Down Expand Up @@ -53,6 +53,12 @@ def test_parse_airbyte_wait_for_completion_log():
assert parse_airbyte_wait_for_completion_log("Job 123 succeeded") == {
"pattern": "airbyte-sync-job-succeeded",
"status": "success",
"job_id": 123,
}
assert parse_airbyte_wait_for_completion_log("Job 123 failed") == {
"pattern": "airbyte-sync-job-failed",
"status": "failed",
"job_id": 123,
}


Expand Down Expand Up @@ -282,6 +288,7 @@ def test_parse_prefect_logs_3():
"log_lines": ["Job 23 succeeded"],
"pattern": "airbyte-sync-job-succeeded",
"status": "success",
"job_id": 23,
"task_name": "airbyte sync",
}
]
65 changes: 47 additions & 18 deletions ddpui/utils/prefectlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,40 @@
def fetch_logs_from_db(connection_info: dict, flow_run_id: str):
"""fetches the logs from the prefect database"""

connection = psycopg2.connect(**connection_info)
cursor = connection.cursor()
query = f"""
SELECT "log"."timestamp",
"task_run"."name",
"task_run"."state_name",
"task_run"."state_type",
"log"."message"
FROM "log"
JOIN "task_run"
ON "log"."task_run_id" = "task_run"."id"
WHERE "log"."flow_run_id" = '{flow_run_id}'
ORDER BY "timestamp"
"""
cursor.execute(query)
records = cursor.fetchall()
cursor.close()
with psycopg2.connect(**connection_info) as connection:
cursor = connection.cursor()
query_tasks_from_flowrun = f"""
SELECT "log"."timestamp",
"task_run"."name",
"task_run"."state_name",
"task_run"."state_type",
"log"."message"
FROM "log"
JOIN "task_run"
ON "log"."task_run_id" = "task_run"."id"
WHERE "log"."flow_run_id" = '{flow_run_id}'
ORDER BY "timestamp"
"""
cursor.execute(query_tasks_from_flowrun)
records = cursor.fetchall()

# for airbyte jobs the parent flow starts a subflow which runs the tasks
if len(records) == 0:
query_get_subflow_id = f"""
SELECT "flow_run"."id"
FROM "flow_run"
JOIN "task_run"
ON "task_run"."id" = "flow_run"."parent_task_run_id"
WHERE "task_run"."flow_run_id" = '{flow_run_id}'
"""
cursor.execute(query_get_subflow_id)
records = cursor.fetchall()
if len(records) == 1:
subflow_id = records[0][0]
cursor.close()
return fetch_logs_from_db(connection_info, subflow_id)

cursor.close()
connection.close()
header = ["timestamp", "task_name", "state_name", "state_type", "message"]

Expand Down Expand Up @@ -79,11 +96,23 @@ def parse_airbyte_wait_for_completion_log(line: str):
"pattern": "airbyte-sync-job-failed",
"status": "failed",
}
pattern_2 = re.compile(r"Job \d+ succeeded")
pattern_2 = re.compile(r"Job (\d+) succeeded")
if pattern_2.match(line):
match = pattern_2.match(line)
job_id = int(match.groups()[0])
return {
"pattern": "airbyte-sync-job-succeeded",
"status": "success",
"job_id": job_id,
}
pattern_3 = re.compile(r"Job (\d+) failed")
if pattern_3.match(line):
match = pattern_3.match(line)
job_id = int(match.groups()[0])
return {
"pattern": "airbyte-sync-job-failed",
"status": "failed",
"job_id": job_id,
}


Expand Down

0 comments on commit 6e79b1d

Please sign in to comment.