Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,6 @@ runtime/secrets/postgres_password_secret
# This is where the sqlite cache will be stored by default if not running on Docker.
volume_data/*.sqlite3
volume_data/*.yaml

# Temporary submissions directory (for development/testing purposes)
kernelCI_app/management/commands/tmp_submissions/*
13 changes: 12 additions & 1 deletion backend/kernelCI_app/helpers/trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def sanitize_tree(
"""Sanitizes a checkout that was returned by a 'treelisting-like' query
Returns a Checkout object"""

build_status = StatusCount(
PASS=checkout["pass_builds"],
FAIL=checkout["fail_builds"],
Expand Down Expand Up @@ -87,13 +88,23 @@ def sanitize_tree(
"skip": checkout["skip_boots"],
}

if isinstance(checkout.get("git_commit_tags"), str):
# Has to check if it's a string because sqlite doesn't support ArrayFields.
# So if the query came from sqlite, it will be a string.
git_commit_tags = checkout.get("git_commit_tags")
if isinstance(git_commit_tags, str):
try:
checkout["git_commit_tags"] = json.loads(checkout["git_commit_tags"])
if not isinstance(checkout["git_commit_tags"], list):
checkout["git_commit_tags"] = []
except json.JSONDecodeError:
checkout["git_commit_tags"] = []
elif git_commit_tags and isinstance(git_commit_tags, list):
first_tag = git_commit_tags[0]
if isinstance(first_tag, str):
# The git_commit_tags comes as list[str] on a normal query, but `Checkout`
# expects list[list[str]]. This is a workaround, the queries should *always*
# return a simples list[str].
checkout["git_commit_tags"] = [git_commit_tags]

return Checkout(
**checkout,
Expand Down
127 changes: 127 additions & 0 deletions backend/kernelCI_app/management/commands/helpers/denormal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from datetime import datetime
from django.db import connections
from kernelCI_app.models import Checkouts, TreeListing


def handle_checkout_denormalization(*, buffer: list[Checkouts]) -> None:
"""Deals with the operations related to the extra tables for denormalization.

In the case of checkouts, it will update TreeListing table, and consume from PendingCheckouts.
"""

if not buffer:
return

tuple_params = [
(c.origin, c.tree_name, c.git_repository_branch, c.git_repository_url)
for c in buffer
]
flattened_list = []
for tuple in tuple_params:
flattened_list += list(tuple)

# check if the tree already exists on TreeListing // check which trees exist
query = f"""
SELECT
checkout_id,
start_time
FROM
tree_listing t
JOIN
(VALUES {','.join(["(%s, %s, %s, %s)"] * len(tuple_params))})
AS v(origin, tree_name, git_repository_branch, git_repository_url)
ON (
t.origin = v.origin
AND t.tree_name = v.tree_name
AND t.git_repository_branch = v.git_repository_branch
AND t.git_repository_url = v.git_repository_url
)
"""

with connections["default"].cursor() as cursor:
cursor.execute(query, flattened_list)
results = cursor.fetchall()

existing_checkouts_map = {r[0]: r[1] for r in results}

checkouts_for_update: list[Checkouts] = []

# results now have the list of checkout_id that *are* in the TreeListing
for checkout in buffer:
# if the checkout is in treeListing, check the start_time
if checkout.id in existing_checkouts_map:
# if newer than existing, update
checkout_start_time = datetime.fromisoformat(checkout.start_time)
if checkout_start_time >= existing_checkouts_map[checkout.id]:
checkouts_for_update.append(checkout)
# if older than existing, ignore (no action)
# if it's not on treeListing, add it
else:
checkouts_for_update.append(checkout)

if checkouts_for_update:
tree_listing_objects = [
TreeListing(
field_timestamp=checkout.field_timestamp,
checkout_id=checkout.id,
origin=checkout.origin,
tree_name=checkout.tree_name,
git_repository_url=checkout.git_repository_url,
git_repository_branch=checkout.git_repository_branch,
git_commit_hash=checkout.git_commit_hash,
git_commit_name=checkout.git_commit_name,
git_commit_tags=checkout.git_commit_tags,
start_time=checkout.start_time,
origin_builds_finish_time=checkout.origin_builds_finish_time,
origin_tests_finish_time=checkout.origin_tests_finish_time,
# Countings are defaulted to 0 when not provided
)
for checkout in checkouts_for_update
]

TreeListing.objects.bulk_create(
tree_listing_objects,
update_conflicts=True,
unique_fields=[
"origin",
"tree_name",
"git_repository_branch",
"git_repository_url",
],
update_fields=[
"field_timestamp",
"checkout_id",
"origin",
"tree_name",
"git_repository_url",
"git_repository_branch",
"git_commit_hash",
"git_commit_name",
"git_commit_tags",
"start_time",
"origin_builds_finish_time",
"origin_tests_finish_time",
"pass_builds",
"fail_builds",
"done_builds",
"miss_builds",
"skip_builds",
"error_builds",
"null_builds",
"pass_boots",
"fail_boots",
"done_boots",
"miss_boots",
"skip_boots",
"error_boots",
"null_boots",
"pass_tests",
"fail_tests",
"done_tests",
"miss_tests",
"skip_tests",
"error_tests",
"null_tests",
],
)
print(f"Updated {len(checkouts_for_update)} trees in TreeListing", flush=True)
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
import yaml
import kcidb_io
from django.db import transaction
from kernelCI_app.management.commands.helpers.denormal import (
handle_checkout_denormalization,
)
from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents

from kernelCI_app.management.commands.helpers.process_submissions import (
ProcessedSubmission,
TableNames,
build_instances_from_submission,
)
from kernelCI_app.typeModels.modelTypes import MODEL_MAP, TableModels

VERBOSE = 0
LOGEXCERPT_THRESHOLD = 256 # 256 bytes threshold for logexcerpt
Expand Down Expand Up @@ -293,6 +299,29 @@ def prepare_file_data(filename, trees_name, spool_dir):
}


def consume_buffer(buffer: list[TableModels], item_type: TableNames) -> None:
"""
Consume a buffer of items and insert them into the database.
This function is called by the db_worker thread.
"""
if not buffer:
return

if item_type == "checkouts":
handle_checkout_denormalization(buffer=buffer)

model = MODEL_MAP[item_type]

t0 = time.time()
model.objects.bulk_create(
buffer,
batch_size=INGEST_BATCH_SIZE,
ignore_conflicts=True,
)
_out("bulk_create %s: n=%d in %.3fs" % (item_type, len(buffer), time.time() - t0))


# TODO: lower the complexity of this function
def db_worker(stop_event: threading.Event): # noqa: C901
"""
Worker thread that processes the database queue.
Expand All @@ -303,11 +332,11 @@ def db_worker(stop_event: threading.Event): # noqa: C901
"""

# Local buffers for batching
issues_buf = []
checkouts_buf = []
builds_buf = []
tests_buf = []
incidents_buf = []
issues_buf: list[Issues] = []
checkouts_buf: list[Checkouts] = []
builds_buf: list[Builds] = []
tests_buf: list[Tests] = []
incidents_buf: list[Incidents] = []

last_flush_ts = time.time()

Expand All @@ -331,55 +360,11 @@ def flush_buffers():
try:
# Single transaction for all tables in the flush
with transaction.atomic():
if issues_buf:
t0 = time.time()
Issues.objects.bulk_create(
issues_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create issues: n=%d in %.3fs"
% (len(issues_buf), time.time() - t0)
)
if checkouts_buf:
t0 = time.time()
Checkouts.objects.bulk_create(
checkouts_buf,
batch_size=INGEST_BATCH_SIZE,
ignore_conflicts=True,
)
_out(
"bulk_create checkouts: n=%d in %.3fs"
% (len(checkouts_buf), time.time() - t0)
)
if builds_buf:
t0 = time.time()
Builds.objects.bulk_create(
builds_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create builds: n=%d in %.3fs"
% (len(builds_buf), time.time() - t0)
)
if tests_buf:
t0 = time.time()
Tests.objects.bulk_create(
tests_buf, batch_size=INGEST_BATCH_SIZE, ignore_conflicts=True
)
_out(
"bulk_create tests: n=%d in %.3fs"
% (len(tests_buf), time.time() - t0)
)
if incidents_buf:
t0 = time.time()
Incidents.objects.bulk_create(
incidents_buf,
batch_size=INGEST_BATCH_SIZE,
ignore_conflicts=True,
)
_out(
"bulk_create incidents: n=%d in %.3fs"
% (len(incidents_buf), time.time() - t0)
)
consume_buffer(issues_buf, "issues")
consume_buffer(checkouts_buf, "checkouts")
consume_buffer(builds_buf, "builds")
consume_buffer(tests_buf, "tests")
consume_buffer(incidents_buf, "incidents")
except Exception as e:
logger.error("Error during bulk_create flush: %s", e)
finally:
Expand Down Expand Up @@ -415,7 +400,7 @@ def flush_buffers():
try:
data, metadata = item
if data is not None:
inst = build_instances_from_submission(data)
inst: ProcessedSubmission = build_instances_from_submission(data)
issues_buf.extend(inst["issues"])
checkouts_buf.extend(inst["checkouts"])
builds_buf.extend(inst["builds"])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import logging
from django.utils import timezone
from typing import Any, Literal
from typing import Any, TypedDict

from django.db import IntegrityError
from pydantic import ValidationError

from kernelCI_app.models import Builds, Checkouts, Incidents, Issues, Tests
from kernelCI_app.typeModels.modelTypes import TableNames


TableNames = Literal["issues", "checkouts", "builds", "tests", "incidents"]
class ProcessedSubmission(TypedDict):
"""Stores the list of items in a single submission.
Lists can't be None but can be empty."""

issues: list[Issues]
checkouts: list[Checkouts]
builds: list[Builds]
tests: list[Tests]
incidents: list[Incidents]


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -128,12 +137,12 @@ def make_incident_instance(incident) -> Incidents:
return obj


def build_instances_from_submission(data: dict[str, Any]) -> dict[TableNames, list]:
def build_instances_from_submission(data: dict[str, Any]) -> ProcessedSubmission:
"""
Convert raw submission dicts into unsaved Django model instances, grouped by type.
Per-item errors are logged and the item is skipped, matching the previous behavior.
"""
out: dict[TableNames, list] = {
out: ProcessedSubmission = {
"issues": [],
"checkouts": [],
"builds": [],
Expand Down
Loading