88
99
1010import itertools
11+ import json
1112import shutil
13+ from datetime import datetime
1214from operator import attrgetter
1315from pathlib import Path
1416
1517import saneyaml
1618from aboutcode .pipeline import LoopProgress
1719from django .conf import settings
1820from django .db .models import Prefetch
21+ from django .utils import timezone
1922
2023from aboutcode .federated import DataFederation
2124from vulnerabilities .models import AdvisoryV2
2225from vulnerabilities .models import ImpactedPackage
26+ from vulnerabilities .models import ImpactedPackageAffecting
27+ from vulnerabilities .models import ImpactedPackageFixedBy
2328from vulnerabilities .models import PackageV2
2429from vulnerabilities .pipelines import VulnerableCodePipeline
2530from vulnerabilities .pipes import federatedcode
31+ from vulnerabilities .utils import load_json
2632
2733
2834class FederatePackageVulnerabilities (VulnerableCodePipeline ):
29- """Export package vulnerabilities and advisory to FederatedCode."""
35+ """
36+ Export package vulnerabilities and advisories to FederatedCode.
37+
38+ - Export all packages and advisories to FederatedCode.
39+ - On subsequent runs, export incremental updates.
40+ - Remove `checkpoint.json` file from FederatedCode git repository to
41+ force a full re-export of all packages and advisories.
42+ """
3043
3144 pipeline_id = "federate_vulnerabilities_v2"
3245
@@ -37,8 +50,10 @@ def steps(cls):
3750 cls .create_federatedcode_working_dir ,
3851 cls .fetch_federation_config ,
3952 cls .clone_federation_repository ,
53+ cls .load_checkpoint ,
4054 cls .publish_package_related_advisories ,
4155 cls .publish_advisories ,
56+ cls .save_checkpoint ,
4257 cls .delete_working_dir ,
4358 )
4459
@@ -64,29 +79,35 @@ def clone_federation_repository(self):
6479 clone_path = self .working_path / "advisories-data" ,
6580 logger = self .log ,
6681 )
82+ self .repo_path = Path (self .repo .working_dir )
83+
84+ def load_checkpoint (self ):
85+ checkpoint_file = self .repo_path / "checkpoint.json"
86+ data = {}
87+ self .start_time = str (timezone .now ())
88+ self .checkpoint = None
89+ if checkpoint_file .exists ():
90+ data = load_json (checkpoint_file )
91+
92+ if last_run := data .get ("last_run" ):
93+ self .checkpoint = datetime .fromisoformat (last_run )
6794
6895 def publish_package_related_advisories (self ):
6996 """Publish package advisories relations to FederatedCode"""
70- repo_path = Path (self .repo .working_dir )
7197 commit_count = 1
72- batch_size = 2000
98+ batch_size = 4000
7399 chunk_size = 500
74100 files_to_commit = set ()
75101
76- distinct_packages_count = (
77- PackageV2 .objects .values ("type" , "namespace" , "name" , "version" )
78- .distinct ("type" , "namespace" , "name" , "version" )
79- .count ()
80- )
81- package_qs = package_prefetched_qs ()
102+ packages_count , package_qs = package_prefetched_qs (self .checkpoint )
82103 grouped_packages = itertools .groupby (
83104 package_qs .iterator (chunk_size = chunk_size ),
84105 key = attrgetter ("type" , "namespace" , "name" , "version" ),
85106 )
86107
87- self .log (f"Exporting advisory relation for { distinct_packages_count } packages." )
108+ self .log (f"Exporting advisory relation for { packages_count } packages." )
88109 progress = LoopProgress (
89- total_iterations = distinct_packages_count ,
110+ total_iterations = packages_count ,
90111 progress_step = 5 ,
91112 logger = self .log ,
92113 )
@@ -96,15 +117,18 @@ def publish_package_related_advisories(self):
96117 package_vulnerability_path = f"packages/{ package_repo } /{ datafile_path } "
97118
98119 write_file (
99- repo_path = repo_path ,
120+ repo_path = self . repo_path ,
100121 file_path = package_vulnerability_path ,
101122 data = package_vulnerabilities ,
102123 )
103124 files_to_commit .add (package_vulnerability_path )
104125
105126 if len (files_to_commit ) > batch_size :
106127 if federatedcode .commit_and_push_changes (
107- commit_message = self .commit_message ("package advisory relations" , commit_count ),
128+ commit_message = self .commit_message (
129+ "Add new package advisory relations" ,
130+ commit_count ,
131+ ),
108132 repo = self .repo ,
109133 files_to_commit = files_to_commit ,
110134 logger = self .log ,
@@ -115,7 +139,7 @@ def publish_package_related_advisories(self):
115139 if files_to_commit :
116140 federatedcode .commit_and_push_changes (
117141 commit_message = self .commit_message (
118- "package advisory relations" ,
142+ "Add new package advisory relations" ,
119143 commit_count ,
120144 commit_count ,
121145 ),
@@ -124,16 +148,15 @@ def publish_package_related_advisories(self):
124148 logger = self .log ,
125149 )
126150
127- self .log (f"Federated { distinct_packages_count } package advisories." )
151+ self .log (f"Federated { packages_count } package advisories." )
128152
129153 def publish_advisories (self ):
130154 """Publish advisory to FederatedCode"""
131- repo_path = Path (self .repo .working_dir )
132155 commit_count = 1
133- batch_size = 2000
156+ batch_size = 4000
134157 chunk_size = 1000
135158 files_to_commit = set ()
136- advisory_qs = advisory_prefetched_qs ()
159+ advisory_qs = advisory_prefetched_qs (self . checkpoint )
137160 advisory_count = advisory_qs .count ()
138161
139162 self .log (f"Exporting { advisory_count } advisory." )
@@ -146,15 +169,15 @@ def publish_advisories(self):
146169 advisory_data = serialize_advisory (advisory )
147170 adv_file = f"advisories/{ advisory .avid } .yml"
148171 write_file (
149- repo_path = repo_path ,
172+ repo_path = self . repo_path ,
150173 file_path = adv_file ,
151174 data = advisory_data ,
152175 )
153176 files_to_commit .add (adv_file )
154177
155178 if len (files_to_commit ) > batch_size :
156179 if federatedcode .commit_and_push_changes (
157- commit_message = self .commit_message ("advisories" , commit_count ),
180+ commit_message = self .commit_message ("Add new advisories" , commit_count ),
158181 repo = self .repo ,
159182 files_to_commit = files_to_commit ,
160183 logger = self .log ,
@@ -165,7 +188,7 @@ def publish_advisories(self):
165188 if files_to_commit :
166189 federatedcode .commit_and_push_changes (
167190 commit_message = self .commit_message (
168- "advisories" ,
191+ "Add new advisories" ,
169192 commit_count ,
170193 commit_count ,
171194 ),
@@ -176,6 +199,19 @@ def publish_advisories(self):
176199
177200 self .log (f"Successfully federated { advisory_count } advisories." )
178201
202+ def save_checkpoint (self ):
203+ checkpoint_file = self .repo_path / "checkpoint.json"
204+ checkpoint = {"last_run" : self .start_time }
205+ with open (checkpoint_file , "w" ) as f :
206+ json .dump (checkpoint , f , indent = 2 )
207+
208+ federatedcode .commit_and_push_changes (
209+ commit_message = self .commit_message ("Update checkpoint" , 1 , 1 ),
210+ repo = self .repo ,
211+ files_to_commit = [checkpoint_file ],
212+ logger = self .log ,
213+ )
214+
179215 def delete_working_dir (self ):
180216 """Remove temporary working dir."""
181217 if hasattr (self , "working_path" ) and self .working_path :
@@ -186,20 +222,21 @@ def on_failure(self):
186222
187223 def commit_message (
188224 self ,
189- item_type ,
225+ heading ,
190226 commit_count ,
191227 total_commit_count = "many" ,
192228 ):
193229 """Commit message for pushing package vulnerability."""
194230 return federatedcode .commit_message (
195- item_type = item_type ,
231+ heading = heading ,
196232 commit_count = commit_count ,
197233 total_commit_count = total_commit_count ,
198234 )
199235
200236
201- def package_prefetched_qs ():
202- return (
237+ def package_prefetched_qs (checkpoint ):
238+ count = None
239+ qs = (
203240 PackageV2 .objects .order_by ("type" , "namespace" , "name" , "version" )
204241 .only ("package_url" , "type" , "namespace" , "name" , "version" )
205242 .prefetch_related (
@@ -224,6 +261,26 @@ def package_prefetched_qs():
224261 )
225262 )
226263
264+ if checkpoint :
265+ affected_package_ids_qs = (
266+ ImpactedPackageAffecting .objects .filter (created_at__gte = checkpoint )
267+ .values_list ("package_id" , flat = True )
268+ .distinct ()
269+ )
270+ fixing_package_ids_qs = (
271+ ImpactedPackageFixedBy .objects .filter (created_at__gte = checkpoint )
272+ .values_list ("package_id" , flat = True )
273+ .distinct ()
274+ )
275+
276+ updated_packages = affected_package_ids_qs .union (fixing_package_ids_qs )
277+ count = updated_packages .count ()
278+ qs = qs .filter (id__in = updated_packages )
279+
280+ count = qs .count () if not count else count
281+
282+ return count , qs
283+
227284
228285def get_package_related_advisory (packages ):
229286 package_vulnerabilities = []
@@ -243,15 +300,17 @@ def get_package_related_advisory(packages):
243300 return package .package_url , package_vulnerabilities
244301
245302
246- def advisory_prefetched_qs ():
247- return AdvisoryV2 .objects .prefetch_related (
303+ def advisory_prefetched_qs (checkpoint ):
304+ qs = AdvisoryV2 .objects . order_by ( "date_collected" ) .prefetch_related (
248305 "impacted_packages" ,
249306 "aliases" ,
250307 "references" ,
251308 "severities" ,
252309 "weaknesses" ,
253310 )
254311
312+ return qs .filter (date_collected__gte = checkpoint ) if checkpoint else qs
313+
255314
256315def serialize_severity (sev ):
257316 return {
0 commit comments