Skip to content

Commit 9299d65

Browse files
JadeCaraJade WibbelsgilluminateVagoasdfgalvana
authored
[ENG-1404] Duplicate DSR - runner integration (#6860)
Co-authored-by: Jade Wibbels <[email protected]> Co-authored-by: Jason Gill <[email protected]> Co-authored-by: Bruno Gutierrez Rios <[email protected]> Co-authored-by: Adrian Galvan <[email protected]> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: erosselli <[email protected]>
1 parent 5d7a296 commit 9299d65

File tree

5 files changed

+468
-149
lines changed

5 files changed

+468
-149
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Changes can also be flagged with a GitHub label for tracking purposes. The URL o
2727
- Added keyboard shortcuts for bulk select/deselect in Action Center fields [#6911](https://github.com/ethyca/fides/pull/6911)
2828
- Added endpoint to bulk cancel requests [#6916](https://github.com/ethyca/fides/pull/6916)
2929
- Added a memory dump output to the logs when the memory watchdog hits the 90% memory limit [#6916](https://github.com/ethyca/fides/pull/6916)
30+
- Added duplicate DSR checking to request runner [#6860](https://github.com/ethyca/fides/pull/6860/)
3031

3132
### Changed
3233
- Restricted monitor tree selection to only classifiable resource types [#6900](https://github.com/ethyca/fides/pull/6900)

src/fides/api/service/privacy_request/duplication_detection.py

Lines changed: 126 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from datetime import datetime, timedelta, timezone
22
from typing import Optional
3+
from uuid import UUID
34

45
from loguru import logger
56
from sqlalchemy.orm import Session
@@ -21,12 +22,12 @@
2122
from fides.api.task.conditional_dependencies.sql_translator import (
2223
SQLConditionTranslator,
2324
)
25+
from fides.config.config_proxy import ConfigProxy
2426
from fides.config.duplicate_detection_settings import DuplicateDetectionSettings
2527

2628
ACTIONED_REQUEST_STATUSES = [
2729
PrivacyRequestStatus.approved,
2830
PrivacyRequestStatus.in_processing,
29-
PrivacyRequestStatus.complete,
3031
PrivacyRequestStatus.requires_manual_finalization,
3132
PrivacyRequestStatus.requires_input,
3233
PrivacyRequestStatus.paused,
@@ -38,9 +39,13 @@
3839
class DuplicateDetectionService:
3940
def __init__(self, db: Session):
4041
self.db = db
42+
self._config = ConfigProxy(db).privacy_request_duplicate_detection
43+
44+
def is_enabled(self) -> bool:
45+
return self._config.enabled
4146

4247
def _create_identity_conditions(
43-
self, current_request: PrivacyRequest, config: DuplicateDetectionSettings
48+
self, current_request: PrivacyRequest
4449
) -> list[Condition]:
4550
"""Creates conditions for matching identity fields.
4651
@@ -52,12 +57,12 @@ def _create_identity_conditions(
5257
current_identities: dict[str, str] = {
5358
pi.field_name: pi.hashed_value
5459
for pi in current_request.provided_identities # type: ignore [attr-defined]
55-
if pi.field_name in config.match_identity_fields
60+
if pi.field_name in self._config.match_identity_fields
5661
}
57-
if len(current_identities) != len(config.match_identity_fields):
62+
if len(current_identities) != len(self._config.match_identity_fields):
5863
missing_fields = [
5964
field
60-
for field in config.match_identity_fields
65+
for field in self._config.match_identity_fields
6166
if field not in current_identities.keys()
6267
]
6368
logger.debug(
@@ -103,7 +108,6 @@ def _create_time_window_condition(self, time_window_days: int) -> Condition:
103108
def create_duplicate_detection_conditions(
104109
self,
105110
current_request: PrivacyRequest,
106-
config: DuplicateDetectionSettings,
107111
) -> Optional[ConditionGroup]:
108112
"""
109113
Create conditions for duplicate detection based on configuration.
@@ -115,15 +119,15 @@ def create_duplicate_detection_conditions(
115119
Returns:
116120
A ConditionGroup with AND operator, or None if no conditions can be created
117121
"""
118-
if len(config.match_identity_fields) == 0:
122+
if len(self._config.match_identity_fields) == 0:
119123
return None
120124

121-
identity_conditions = self._create_identity_conditions(current_request, config)
125+
identity_conditions = self._create_identity_conditions(current_request)
122126
if not identity_conditions:
123127
return None # Only proceed if we have identity conditions
124128

125129
time_window_condition = self._create_time_window_condition(
126-
config.time_window_days
130+
self._config.time_window_days
127131
)
128132

129133
# Combine all conditions with AND operator
@@ -135,7 +139,6 @@ def create_duplicate_detection_conditions(
135139
def find_duplicate_privacy_requests(
136140
self,
137141
current_request: PrivacyRequest,
138-
config: DuplicateDetectionSettings,
139142
) -> list[PrivacyRequest]:
140143
"""
141144
Find potential duplicate privacy requests based on duplicate detection configuration.
@@ -151,7 +154,7 @@ def find_duplicate_privacy_requests(
151154
List of PrivacyRequest objects that match the duplicate criteria,
152155
does not include the current request
153156
"""
154-
condition = self.create_duplicate_detection_conditions(current_request, config)
157+
condition = self.create_duplicate_detection_conditions(current_request)
155158

156159
if condition is None:
157160
return []
@@ -162,28 +165,75 @@ def find_duplicate_privacy_requests(
162165
query = query.filter(PrivacyRequest.id != current_request.id)
163166
return query.all()
164167

165-
def generate_dedup_key(
166-
self, request: PrivacyRequest, config: DuplicateDetectionSettings
167-
) -> str:
168+
def generate_dedup_key(self, request: PrivacyRequest) -> str:
168169
"""
169170
Generate a dedup key for a request based on the duplicate detection settings.
170171
"""
171172
current_identities: dict[str, str] = {
172173
pi.field_name: pi.hashed_value
173174
for pi in request.provided_identities # type: ignore [attr-defined]
174-
if pi.field_name in config.match_identity_fields
175+
if pi.field_name in self._config.match_identity_fields
175176
}
176-
if len(current_identities) != len(config.match_identity_fields):
177+
if len(current_identities) != len(self._config.match_identity_fields):
177178
raise ValueError(
178179
"This request does not contain the required identity fields for duplicate detection."
179180
)
180181
return "|".join(
181182
[
182183
current_identities[field]
183-
for field in sorted(config.match_identity_fields)
184+
for field in sorted(self._config.match_identity_fields)
184185
]
185186
)
186187

188+
def update_duplicate_group_ids(
189+
self,
190+
request: PrivacyRequest,
191+
duplicates: list[PrivacyRequest],
192+
duplicate_group_id: UUID,
193+
) -> None:
194+
"""
195+
Update the duplicate request group ids for a request and its duplicates.
196+
Args:
197+
request: The privacy request to update
198+
duplicates: The list of duplicate requests to update
199+
duplicate_group_id: The duplicate request group id to update
200+
"""
201+
update_all = [request] + duplicates
202+
try:
203+
for privacy_request in update_all:
204+
privacy_request.duplicate_request_group_id = duplicate_group_id # type: ignore [assignment]
205+
except Exception as e:
206+
logger.error(f"Failed to update duplicate request group ids: {e}")
207+
raise e
208+
209+
def add_error_execution_log(self, request: PrivacyRequest, message: str) -> None:
210+
request.add_error_execution_log(
211+
db=self.db,
212+
connection_key=None,
213+
dataset_name="Duplicate Request Detection",
214+
collection_name=None,
215+
message=message,
216+
action_type=(
217+
request.policy.get_action_type() # type: ignore [arg-type]
218+
if request.policy
219+
else ActionType.access
220+
),
221+
)
222+
223+
def add_success_execution_log(self, request: PrivacyRequest, message: str) -> None:
224+
request.add_success_execution_log(
225+
db=self.db,
226+
connection_key=None,
227+
dataset_name="Duplicate Request Detection",
228+
collection_name=None,
229+
message=message,
230+
action_type=(
231+
request.policy.get_action_type() # type: ignore [arg-type]
232+
if request.policy
233+
else ActionType.access
234+
),
235+
)
236+
187237
def verified_identity_cases(
188238
self, request: PrivacyRequest, duplicates: list[PrivacyRequest]
189239
) -> bool:
@@ -206,60 +256,52 @@ def verified_identity_cases(
206256
# The request identity is not verified.
207257
if not request.identity_verified_at:
208258
if len(verified_in_group) > 0:
209-
logger.debug(
210-
f"Request {request.id} is a duplicate: it is not verified and duplicating verified request(s) {verified_in_group}."
211-
)
259+
message = f"Request {request.id} is a duplicate: it is duplicating request(s) {[duplicate.id for duplicate in verified_in_group]}."
260+
logger.debug(message)
261+
self.add_error_execution_log(request, message)
212262
return True
213263

214-
min_created_at = min(
215-
(d.created_at for d in duplicates if d.created_at), default=None
216-
) or datetime.now(timezone.utc)
217-
request_created_at = (
218-
request.created_at
219-
if request.created_at is not None
220-
else datetime.now(timezone.utc)
264+
canonical_request = min(duplicates, key=lambda x: x.created_at) # type: ignore [arg-type, return-value]
265+
canonical_request_created_at = canonical_request.created_at or datetime.now(
266+
timezone.utc
221267
)
222-
if request_created_at < min_created_at:
223-
logger.debug(
224-
f"Request {request.id} is not a duplicate: it is the first request to be created in the group."
225-
)
268+
request_created_at = request.created_at or datetime.now(timezone.utc)
269+
if request_created_at < canonical_request_created_at:
270+
message = f"Request {request.id} is not a duplicate: it is the first request to be created in the group."
271+
logger.debug(message)
272+
self.add_success_execution_log(request, message)
226273
return False
227-
logger.debug(
228-
f"Request {request.id} is a duplicate: it is not verified and is not the first request to be created in the group."
229-
)
274+
275+
message = f"Request {request.id} is a duplicate: it is duplicating request(s) ['{canonical_request.id}']."
276+
logger.debug(message)
277+
self.add_error_execution_log(request, message)
230278
return True
231279

232280
# The request identity is verified.
233281
if not verified_in_group:
234-
logger.debug(
235-
f"Request {request.id} is not a duplicate: it is verified and no other requests in the group are verified."
236-
)
282+
message = f"Request {request.id} is not a duplicate: it is the first request to be verified in the group."
283+
logger.debug(message)
284+
self.add_success_execution_log(request, message)
237285
return False
238286

239287
# If this request is the first with a verified identity, it is not a duplicate.
240-
min_verified_at = min(
241-
(d.identity_verified_at for d in duplicates if d.identity_verified_at),
242-
default=None,
243-
) or datetime.now(timezone.utc)
244-
request_verified_at = (
245-
request.identity_verified_at
246-
if request.identity_verified_at is not None
247-
else datetime.now(timezone.utc)
288+
canonical_request = min(verified_in_group, key=lambda x: x.identity_verified_at) # type: ignore [arg-type, return-value]
289+
canonical_request_verified_at = (
290+
canonical_request.identity_verified_at or datetime.now(timezone.utc)
248291
)
249-
if request_verified_at < min_verified_at:
250-
logger.debug(
251-
f"Request {request.id} is not a duplicate: it is the first request to be verified in the group."
252-
)
292+
request_verified_at = request.identity_verified_at or datetime.now(timezone.utc)
293+
if request_verified_at < canonical_request_verified_at:
294+
message = f"Request {request.id} is not a duplicate: it is the first request to be verified in the group."
295+
logger.debug(message)
296+
self.add_success_execution_log(request, message)
253297
return False
254-
logger.debug(
255-
f"Request {request.id} is a duplicate: it is verified but not the first request to be verified in the group."
256-
)
298+
message = f"Request {request.id} is a duplicate: it is duplicating request(s) ['{canonical_request.id}']."
299+
logger.debug(message)
300+
self.add_error_execution_log(request, message)
257301
return True
258302

259303
# pylint: disable=too-many-return-statements
260-
def is_duplicate_request(
261-
self, request: PrivacyRequest, config: DuplicateDetectionSettings
262-
) -> bool:
304+
def is_duplicate_request(self, request: PrivacyRequest) -> bool:
263305
"""
264306
Determine if a request is a duplicate request and assigns a duplicate request group id.
265307
@@ -281,52 +323,44 @@ def is_duplicate_request(
281323
Returns:
282324
True if the request is a duplicate request, False otherwise
283325
"""
284-
duplicates = self.find_duplicate_privacy_requests(request, config)
285-
rule_version = generate_rule_version(config)
326+
duplicates = self.find_duplicate_privacy_requests(request)
327+
rule_version = generate_rule_version(
328+
DuplicateDetectionSettings(
329+
enabled=self._config.enabled,
330+
time_window_days=self._config.time_window_days,
331+
match_identity_fields=self._config.match_identity_fields,
332+
)
333+
)
286334
try:
287-
dedup_key = self.generate_dedup_key(request, config)
335+
dedup_key = self.generate_dedup_key(request)
288336
except ValueError as e:
289-
logger.debug(f"Request {request.id} is not a duplicate: {e}")
337+
message = f"Request {request.id} is not a duplicate: {e}"
338+
logger.debug(message)
339+
self.add_success_execution_log(request, message)
290340
return False
291341

292342
_, duplicate_group = DuplicateGroup.get_or_create(
293343
db=self.db, data={"rule_version": rule_version, "dedup_key": dedup_key}
294344
)
295345
if duplicate_group is None:
296-
logger.error(
297-
f"Failed to create duplicate group for request {request.id} with dedup key {dedup_key}"
298-
)
346+
message = f"Failed to create duplicate group for request {request.id} with dedup key {dedup_key}"
347+
logger.error(message)
348+
self.add_error_execution_log(request, message)
299349
return False
300-
logger.info(
301-
f"Duplicate group {duplicate_group.id} created for request {request.id} with dedup key {dedup_key}"
302-
)
303-
request.update(
304-
db=self.db, data={"duplicate_request_group_id": duplicate_group.id}
305-
)
350+
351+
self.update_duplicate_group_ids(request, duplicates, duplicate_group.id) # type: ignore [arg-type]
306352

307353
# if this is the only request in the group, it is not a duplicate
308354
if len(duplicates) == 0:
309-
logger.debug(
310-
f"Request {request.id} is not a duplicate: no matching requests were found."
311-
)
355+
message = f"Request {request.id} is not a duplicate."
356+
logger.debug(message)
357+
self.add_success_execution_log(request, message)
312358
return False
313359

314360
if request.status == PrivacyRequestStatus.duplicate:
315-
logger.warning(
316-
f"Request {request.id} is a duplicate request that was requeued. This should not happen."
317-
)
318-
request.add_error_execution_log(
319-
db=self.db,
320-
connection_key=None,
321-
dataset_name="Duplicate Request Detection",
322-
collection_name=None,
323-
message=f"Request {request.id} is a duplicate request that was requeued. This should not happen.",
324-
action_type=(
325-
request.policy.get_action_type() # type: ignore [arg-type]
326-
if request.policy
327-
else ActionType.access
328-
),
329-
)
361+
message = f"Request {request.id} is a duplicate request that was requeued. This should not happen."
362+
logger.warning(message)
363+
self.add_error_execution_log(request, message)
330364
return True
331365

332366
# only compare to non-duplicate requests for the following cases
@@ -337,9 +371,9 @@ def is_duplicate_request(
337371
]
338372
# If no non-duplicate requests are found, this request is not a duplicate.
339373
if len(canonical_requests) == 0:
340-
logger.debug(
341-
f"Request {request.id} is not a duplicate: all matching requests have been marked as duplicate requests."
342-
)
374+
message = f"Request {request.id} is not a duplicate."
375+
logger.debug(message)
376+
self.add_success_execution_log(request, message)
343377
return False
344378

345379
# If any requests in group are actioned, this request is a duplicate.
@@ -349,9 +383,9 @@ def is_duplicate_request(
349383
if duplicate.status in ACTIONED_REQUEST_STATUSES
350384
]
351385
if len(actioned_in_group) > 0:
352-
logger.debug(
353-
f"Request {request.id} is a duplicate: it is duplicating actioned request(s) {actioned_in_group}."
354-
)
386+
message = f"Request {request.id} is a duplicate: it is duplicating actioned request(s) {[duplicate.id for duplicate in actioned_in_group]}."
387+
logger.debug(message)
388+
self.add_error_execution_log(request, message)
355389
return True
356390
# Check against verified identity rules.
357391
return self.verified_identity_cases(request, canonical_requests)

0 commit comments

Comments
 (0)