Skip to content

Commit 25aabc1

Browse files
authored
Merge pull request #47 from rockythorn/use-last-indexed-date
2 parents 2a6d786 + 5278032 commit 25aabc1

2 files changed

Lines changed: 47 additions & 15 deletions

File tree

apollo/rhworker/poll_rh_activities.py

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,26 @@ def parse_datetime(dt_str: str) -> datetime:
5858
continue
5959
raise ValueError(f"Unable to parse datetime string: {dt_str}")
6060

61+
async def upsert_last_indexed_at(new_date: datetime) -> None:
62+
"""
63+
Create or update the last_indexed_at field in red_hat_index_state,
64+
but only update if new_date is after the current value.
65+
"""
66+
logger = Logger()
67+
state = await RedHatIndexState.first()
68+
if isinstance(new_date, str):
69+
logger.debug("new_date is a string, converting to datetime")
70+
new_date = parse_datetime(new_date)
71+
if isinstance(state, str):
72+
logger.debug("state is a string, converting to datetime")
73+
state = parse_datetime(state)
74+
logger.debug(f"Current state: {state}, new_date: {new_date}")
75+
if state:
76+
if not state.last_indexed_at or new_date > state.last_indexed_at:
77+
state.last_indexed_at = new_date
78+
await state.save()
79+
else:
80+
await RedHatIndexState.create(last_indexed_at=new_date)
6181

6282
@activity.defn
6383
async def get_last_indexed_date() -> Optional[str]:
@@ -111,12 +131,7 @@ async def get_rh_advisories(from_timestamp: str = None) -> None:
111131
advisory_last_indexed_at = parse_red_hat_date(
112132
advisory.portal_publication_date
113133
)
114-
state = await RedHatIndexState.first()
115-
if state:
116-
state.last_indexed_at = advisory_last_indexed_at
117-
await state.save()
118-
else:
119-
await RedHatIndexState().create(last_index_at=advisory_last_indexed_at)
134+
await upsert_last_indexed_at(advisory_last_indexed_at)
120135

121136
logger.info("Processing advisory %s", advisory.id)
122137

@@ -419,17 +434,20 @@ async def process_csaf_file(json_data: dict, filepath: str) -> Optional[RedHatAd
419434
logger.error(f"Error in transaction: {str(e)}")
420435
raise
421436

437+
# Update RedHatIndexState with the latest indexed date
438+
latest_date_str = data.get("red_hat_updated_at") or data.get("red_hat_issued_at")
439+
logger.debug(f"Latest date string from {advisory.name} CSAF data: {latest_date_str}")
440+
await upsert_last_indexed_at(latest_date_str)
441+
422442
return advisory
423443

424444

425445
@activity.defn
426-
async def process_csaf_files() -> dict:
446+
async def process_csaf_files(from_timestamp: str = None) -> dict:
427447
logger = Logger()
428448
logger.info("Starting CSAF file processing (streaming from Red Hat)")
429449

430450
base_url = "https://security.access.redhat.com/data/csaf/v2/advisories/"
431-
now = datetime.now(timezone.utc)
432-
cutoff = now - timedelta(days=30)
433451

434452
async def fetch_csv_with_dates(session, url):
435453
async with session.get(url) as resp:
@@ -458,13 +476,22 @@ async def fetch_csv_with_dates(session, url):
458476
for advisory_id in deletions:
459477
all_advisories.pop(advisory_id, None)
460478

461-
# Filter advisories to only those within the last 30 days
462-
filtered_advisory_ids = [
463-
advisory_id for advisory_id, timestamp in all_advisories.items()
464-
if datetime.fromisoformat(timestamp.replace("Z", "+00:00")) >= cutoff
465-
]
479+
if from_timestamp:
480+
from_timestamp_dt = datetime.fromisoformat(from_timestamp.replace("Z", "+00:00"))
481+
else:
482+
from_timestamp_dt = None
483+
filtered_advisory_ids = []
484+
for advisory_id, timestamp in all_advisories.items():
485+
# If from_timestamp_dt is not set, include all advisories.
486+
# Otherwise, only include advisories with a timestamp >= from_timestamp_dt.
487+
if not from_timestamp_dt:
488+
filtered_advisory_ids.append(advisory_id)
489+
else:
490+
advisory_time = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
491+
if advisory_time >= from_timestamp_dt:
492+
filtered_advisory_ids.append(advisory_id)
466493

467-
logger.info(f"Found {len(filtered_advisory_ids)} advisories to process from the last 30 days")
494+
logger.info(f"Found {len(filtered_advisory_ids)} advisories to process since last indexed date ({from_timestamp})")
468495

469496
for advisory_id in filtered_advisory_ids: #TODO: parallelize this for faster processing
470497
json_url = base_url + advisory_id

apollo/rhworker/poll_rh_workflow.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,13 @@ class PollRHCSAFAdvisoriesWorkflow:
3131
"""
3232
@workflow.run
3333
async def run(self) -> None:
34+
from_timestamp = await workflow.execute_activity(
35+
"get_last_indexed_date",
36+
start_to_close_timeout=datetime.timedelta(seconds=20),
37+
)
3438
await workflow.execute_activity(
3539
"process_csaf_files",
40+
from_timestamp,
3641
start_to_close_timeout=datetime.timedelta(hours=2),
3742
)
3843

0 commit comments

Comments
 (0)