Skip to content

Commit 68f6639

Browse files
valentijnscholtenValentijn Scholten
andauthored
Deduplicate findings in batches (#13491)
* initial batching code * fix dedupe_inside_engagement * all tests working incl sarif with internal dupes * cleanup * deduplication: add more importer unit tests * deduplication: add more importer unit tests * deduplication: log hash_code_fields_always * view_finding: show unique_id_from_tool with hash_code * view_finding: show unique_id_from_tool with hash_code * uncomment tests * add more assessments * fix duplicate finding links * split per algo, move into new file * align logging * better method name and param order * ruff apps.py * update task/query counts * update comments, parameters names * finetune uidorhash logic * fix tests to import from deduplication.py * ruff unit tests * simplify base queryset building * deduplication logic: add cross scanner unique_id tests * hook old per finding dedupe to batch dedupe code * fix and make uid_or_hash_code matching identical to old dedupe * UNIQUE_ID_OR_HASH_CODE: dont stop after one candidate * UNIQUE_ID_OR_HASH_CODE: dont stop after one candidate in Batch mode * uid_or_hash_code: fix self/older check * notifications test: replace hardcoded ids with references * optimize prefetching * update query counts in test * complete merge * add more logging is_older, dedupe_eng_mismatch * support FINDING_DEDUPE_METHOD * add support for FINDING_DEDUPE_BATCH_METHOD * simplify * update log line * make batch size a setting * add false positive history to new batch post process task * commands: add command to clear celery queue * update dedupe command to use batch mode * default to batch_mode for dedupe command * do not deduplicate duplicates * improve logging * prefetch better in dedupe command * dedupe command: max batch size 1000 * remove leftover method * reimport: support pro hash method * finalize return statement * ruff --------- Co-authored-by: Valentijn Scholten <[email protected]>
1 parent 19dc283 commit 68f6639

File tree

12 files changed

+948
-464
lines changed

12 files changed

+948
-464
lines changed

dojo/finding/deduplication.py

Lines changed: 564 additions & 0 deletions
Large diffs are not rendered by default.

dojo/finding/helper.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
from dojo.decorators import dojo_async_task, dojo_model_from_id, dojo_model_to_id
1818
from dojo.endpoint.utils import save_endpoints_to_add
1919
from dojo.file_uploads.helper import delete_related_files
20+
from dojo.finding.deduplication import (
21+
dedupe_batch_of_findings,
22+
do_dedupe_finding,
23+
get_finding_models_for_deduplication,
24+
)
2025
from dojo.models import (
2126
Endpoint,
2227
Endpoint_Status,
@@ -35,7 +40,6 @@
3540
from dojo.utils import (
3641
calculate_grade,
3742
close_external_issue,
38-
do_dedupe_finding,
3943
do_false_positive_history,
4044
get_current_user,
4145
mass_model_updater,
@@ -457,6 +461,59 @@ def post_process_finding_save_internal(finding, dedupe_option=True, rules_option
457461
jira_helper.push_to_jira(finding.finding_group)
458462

459463

464+
@dojo_async_task(signature=True)
465+
@app.task
466+
def post_process_findings_batch_signature(finding_ids, *args, dedupe_option=True, rules_option=True, product_grading_option=True,
467+
issue_updater_option=True, push_to_jira=False, user=None, **kwargs):
468+
return post_process_findings_batch(finding_ids, dedupe_option, rules_option, product_grading_option,
469+
issue_updater_option, push_to_jira, user, **kwargs)
470+
471+
472+
@dojo_async_task
473+
@app.task
474+
def post_process_findings_batch(finding_ids, *args, dedupe_option=True, rules_option=True, product_grading_option=True,
475+
issue_updater_option=True, push_to_jira=False, user=None, **kwargs):
476+
477+
if not finding_ids:
478+
return
479+
480+
system_settings = System_Settings.objects.get()
481+
482+
# use list() to force a complete query execution and related objects to be loaded once
483+
findings = get_finding_models_for_deduplication(finding_ids)
484+
485+
if not findings:
486+
logger.debug(f"no findings found for batch deduplication with IDs: {finding_ids}")
487+
return
488+
489+
# Batch dedupe with single queries per algorithm; fallback to per-finding for anything else
490+
if dedupe_option and system_settings.enable_deduplication:
491+
dedupe_batch_of_findings(findings)
492+
493+
if system_settings.false_positive_history:
494+
# Only perform false positive history if deduplication is disabled
495+
if system_settings.enable_deduplication:
496+
deduplicationLogger.warning("skipping false positive history because deduplication is also enabled")
497+
else:
498+
for finding in findings:
499+
do_false_positive_history(finding, *args, **kwargs)
500+
501+
# Non-status changing tasks
502+
if issue_updater_option:
503+
for finding in findings:
504+
tool_issue_updater.async_tool_issue_update(finding)
505+
506+
if product_grading_option and system_settings.enable_product_grade:
507+
calculate_grade(findings[0].test.engagement.product)
508+
509+
if push_to_jira:
510+
for finding in findings:
511+
if finding.has_jira_issue or not finding.finding_group:
512+
jira_helper.push_to_jira(finding)
513+
else:
514+
jira_helper.push_to_jira(finding.finding_group)
515+
516+
460517
@receiver(pre_delete, sender=Finding)
461518
def finding_pre_delete(sender, instance, **kwargs):
462519
logger.debug("finding pre_delete: %d", instance.id)

dojo/importers/default_importer.py

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22

3+
from django.conf import settings
34
from django.core.files.uploadedfile import TemporaryUploadedFile
45
from django.core.serializers import serialize
56
from django.db.models.query_utils import Q
@@ -157,10 +158,9 @@ def process_findings(
157158
parsed_findings: list[Finding],
158159
**kwargs: dict,
159160
) -> list[Finding]:
160-
# Progressive batching for chord execution
161-
post_processing_task_signatures = []
162-
current_batch_number = 1
163-
max_batch_size = 1024
161+
# Batched post-processing (no chord): dispatch a task per 1000 findings or on final finding
162+
batch_finding_ids: list[int] = []
163+
batch_max_size = getattr(settings, "IMPORT_REIMPORT_DEDUPE_BATCH_SIZE", 1000)
164164

165165
"""
166166
Saves findings in memory that were parsed from the scan report into the database.
@@ -237,32 +237,34 @@ def process_findings(
237237
finding = self.process_vulnerability_ids(finding)
238238
# Categorize this finding as a new one
239239
new_findings.append(finding)
240-
# all data is already saved on the finding, we only need to trigger post processing
241-
242-
# We create a signature for the post processing task so we can decide to apply it async or sync
240+
# all data is already saved on the finding, we only need to trigger post processing in batches
243241
push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by)
244-
post_processing_task_signature = finding_helper.post_process_finding_save_signature(
245-
finding,
246-
dedupe_option=True,
247-
rules_option=True,
248-
product_grading_option=False,
249-
issue_updater_option=True,
250-
push_to_jira=push_to_jira,
251-
)
252-
253-
post_processing_task_signatures.append(post_processing_task_signature)
254-
255-
# Check if we should launch a chord (batch full or end of findings)
256-
if we_want_async(async_user=self.user) and post_processing_task_signatures:
257-
post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord(
258-
post_processing_task_signatures,
259-
current_batch_number,
260-
max_batch_size,
261-
is_final_finding,
262-
)
263-
else:
264-
# Execute task immediately for synchronous processing
265-
post_processing_task_signature()
242+
batch_finding_ids.append(finding.id)
243+
244+
# If batch is full or we're at the end, dispatch one batched task
245+
if len(batch_finding_ids) >= batch_max_size or is_final_finding:
246+
finding_ids_batch = list(batch_finding_ids)
247+
batch_finding_ids.clear()
248+
if we_want_async(async_user=self.user):
249+
finding_helper.post_process_findings_batch_signature(
250+
finding_ids_batch,
251+
dedupe_option=True,
252+
rules_option=True,
253+
product_grading_option=True,
254+
issue_updater_option=True,
255+
push_to_jira=push_to_jira,
256+
)()
257+
else:
258+
finding_helper.post_process_findings_batch(
259+
finding_ids_batch,
260+
dedupe_option=True,
261+
rules_option=True,
262+
product_grading_option=True,
263+
issue_updater_option=True,
264+
push_to_jira=push_to_jira,
265+
)
266+
267+
# No chord: tasks are dispatched immediately above per batch
266268

267269
for (group_name, findings) in group_names_to_findings_dict.items():
268270
finding_helper.add_findings_to_auto_group(

dojo/importers/default_reimporter.py

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,7 @@ def process_findings(
183183
self.unchanged_items = []
184184
self.group_names_to_findings_dict = {}
185185
# Progressive batching for chord execution
186-
post_processing_task_signatures = []
187-
current_batch_number = 1
188-
max_batch_size = 1024
186+
# No chord: we dispatch per 1000 findings or on the final finding
189187

190188
logger.debug(f"starting reimport of {len(parsed_findings) if parsed_findings else 0} items.")
191189
logger.debug("STEP 1: looping over findings from the reimported report and trying to match them to existing findings")
@@ -205,6 +203,9 @@ def process_findings(
205203
continue
206204
cleaned_findings.append(sanitized)
207205

206+
batch_finding_ids: list[int] = []
207+
batch_max_size = 1000
208+
208209
for idx, unsaved_finding in enumerate(cleaned_findings):
209210
is_final = idx == len(cleaned_findings) - 1
210211
# Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report)
@@ -255,31 +256,34 @@ def process_findings(
255256
finding,
256257
unsaved_finding,
257258
)
258-
# all data is already saved on the finding, we only need to trigger post processing
259-
260-
# Execute post-processing task immediately if async, otherwise execute synchronously
259+
# all data is already saved on the finding, we only need to trigger post processing in batches
261260
push_to_jira = self.push_to_jira and (not self.findings_groups_enabled or not self.group_by)
262-
263-
post_processing_task_signature = finding_helper.post_process_finding_save_signature(
264-
finding,
265-
dedupe_option=True,
266-
rules_option=True,
267-
product_grading_option=False,
268-
issue_updater_option=True,
269-
push_to_jira=push_to_jira,
270-
)
271-
post_processing_task_signatures.append(post_processing_task_signature)
272-
273-
# Check if we should launch a chord (batch full or end of findings)
274-
if we_want_async(async_user=self.user) and post_processing_task_signatures:
275-
post_processing_task_signatures, current_batch_number, _ = self.maybe_launch_post_processing_chord(
276-
post_processing_task_signatures,
277-
current_batch_number,
278-
max_batch_size,
279-
is_final,
280-
)
281-
else:
282-
post_processing_task_signature()
261+
batch_finding_ids.append(finding.id)
262+
263+
# If batch is full or we're at the end, dispatch one batched task
264+
if len(batch_finding_ids) >= batch_max_size or is_final:
265+
finding_ids_batch = list(batch_finding_ids)
266+
batch_finding_ids.clear()
267+
if we_want_async(async_user=self.user):
268+
finding_helper.post_process_findings_batch_signature(
269+
finding_ids_batch,
270+
dedupe_option=True,
271+
rules_option=True,
272+
product_grading_option=True,
273+
issue_updater_option=True,
274+
push_to_jira=push_to_jira,
275+
)()
276+
else:
277+
finding_helper.post_process_findings_batch(
278+
finding_ids_batch,
279+
dedupe_option=True,
280+
rules_option=True,
281+
product_grading_option=True,
282+
issue_updater_option=True,
283+
push_to_jira=push_to_jira,
284+
)
285+
286+
# No chord: tasks are dispatched immediately above per batch
283287

284288
self.to_mitigate = (set(self.original_items) - set(self.reactivated_items) - set(self.unchanged_items))
285289
# due to #3958 we can have duplicates inside the same report
@@ -779,4 +783,6 @@ def calculate_unsaved_finding_hash_code(
779783
self,
780784
unsaved_finding: Finding,
781785
) -> str:
786+
# this is overridden in Pro, but will still call this via super()
787+
deduplicationLogger.debug("Calculating hash code for unsaved finding")
782788
return unsaved_finding.compute_hash_code()
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import logging
2+
3+
from django.core.management.base import BaseCommand
4+
5+
from dojo.celery import app
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class Command(BaseCommand):
11+
help = "Clear (purge) all tasks from Celery queues"
12+
13+
def add_arguments(self, parser):
14+
parser.add_argument(
15+
"--queue",
16+
type=str,
17+
help="Specific queue name to clear (default: all queues)",
18+
)
19+
parser.add_argument(
20+
"--dry-run",
21+
action="store_true",
22+
help="Show what would be cleared without actually clearing",
23+
)
24+
parser.add_argument(
25+
"--force",
26+
action="store_true",
27+
help="Skip confirmation prompt (use with caution)",
28+
)
29+
30+
def handle(self, *args, **options):
31+
queue_name = options["queue"]
32+
dry_run = options["dry_run"]
33+
force = options["force"]
34+
35+
# Get connection to broker
36+
with app.connection() as conn:
37+
# Get all queues or specific queue
38+
if queue_name:
39+
queues = [queue_name]
40+
self.stdout.write(f"Targeting queue: {queue_name}")
41+
else:
42+
# Get all active queues from the broker
43+
inspector = app.control.inspect()
44+
active_queues = inspector.active_queues()
45+
if active_queues:
46+
# Extract unique queue names from all workers
47+
queues = set()
48+
for worker_queues in active_queues.values():
49+
queues.update(queue_info["name"] for queue_info in worker_queues)
50+
queues = list(queues)
51+
else:
52+
# Fallback: try common default queue
53+
queues = ["celery"]
54+
self.stdout.write(f"Found {len(queues)} queue(s) to process")
55+
56+
if not queues:
57+
self.stdout.write(self.style.WARNING("No queues found to clear"))
58+
return
59+
60+
# Show what will be cleared
61+
total_purged = 0
62+
for queue in queues:
63+
try:
64+
# Get queue length using channel
65+
with conn.channel() as channel:
66+
_, message_count, _ = channel.queue_declare(queue=queue, passive=True)
67+
except Exception as e:
68+
logger.debug(f"Could not get message count for queue {queue}: {e}")
69+
message_count = "unknown"
70+
71+
if dry_run:
72+
self.stdout.write(
73+
self.style.WARNING(f" Would purge {message_count} messages from queue: {queue}"),
74+
)
75+
else:
76+
self.stdout.write(f" Queue '{queue}': {message_count} messages")
77+
78+
if dry_run:
79+
self.stdout.write(self.style.SUCCESS("\nDry run complete. Use without --dry-run to actually purge."))
80+
return
81+
82+
# Confirmation prompt
83+
if not force:
84+
self.stdout.write(
85+
self.style.WARNING(
86+
f"\nThis will permanently delete all messages from {len(queues)} queue(s).",
87+
),
88+
)
89+
confirm = input("Are you sure you want to continue? (yes/no): ")
90+
if confirm.lower() not in {"yes", "y"}:
91+
self.stdout.write(self.style.ERROR("Operation cancelled."))
92+
return
93+
94+
# Purge queues using direct channel purge
95+
self.stdout.write("\nPurging queues...")
96+
for queue in queues:
97+
try:
98+
with conn.channel() as channel:
99+
purged_count = channel.queue_purge(queue=queue)
100+
total_purged += purged_count
101+
self.stdout.write(
102+
self.style.SUCCESS(f" ✓ Purged {purged_count} messages from queue: {queue}"),
103+
)
104+
except Exception as e:
105+
self.stdout.write(
106+
self.style.ERROR(f" ✗ Failed to purge queue '{queue}': {e}"),
107+
)
108+
logger.error(f"Error purging queue {queue}: {e}")
109+
110+
if total_purged > 0:
111+
self.stdout.write(
112+
self.style.SUCCESS(f"\nSuccessfully purged {total_purged} message(s) from {len(queues)} queue(s)."),
113+
)
114+
else:
115+
self.stdout.write(self.style.WARNING("\nNo messages were purged (queues may have been empty)."))

0 commit comments

Comments
 (0)