Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ stix2==3.0.1
# stix2extensions
stix2-patterns==2.0.0
# via stix2
stix2arango==1.1.12
stix2arango==1.3.4
# via
# -r requirements.in
# arango-cve-processor
Expand Down
17 changes: 14 additions & 3 deletions utilities/s2a_importer/insert_archive_cve.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import requests
import time
import calendar
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from stix2arango.stix2arango.stix2arango import Stix2Arango
from manager import VersionManager
from pathlib import Path
Expand Down Expand Up @@ -51,17 +51,26 @@ def parse_arguments():
parser = argparse.ArgumentParser(description="Process NVD CVE versions.")
parser.add_argument('--min_date', type=str, help='Start date in yyyy-mm-dd format.')
parser.add_argument('--max_date', type=str, help='End date in yyyy-mm-dd format.')
parser.add_argument('--days', type=lambda x: datetime.fromisoformat(x).date(), nargs='+', help='manually select days to run.')
parser.add_argument('--ignore_embedded_relationships', action='store_true', help='Flag to ignore embedded relationships. Default is false.')
parser.add_argument('--database', type=str, default="cti_knowledge_base_store", help='Name of the database to use. Default is "cti_knowledge_base_store".')
parser.add_argument('--start_over', action='store_true', help='Delete database holding previous attempts.')
parser.add_argument('--download_path', type=parse_path, help='Path where the database is to be downloaded to. Default is `./cti_knowledge_base_store`', default='./cti_knowledge_base_store/')
return parser.parse_args()

def filter_versions_by_date(min_date, max_date):
def filter_versions_by_date_range(min_date, max_date):
min_dt = datetime.strptime(min_date, "%Y-%m-%d")
max_dt = datetime.strptime(max_date, "%Y-%m-%d")
return [item for item in all_versions if min_dt <= datetime(item[0], item[1], item[2]) <= max_dt]

def filter_versions_by_date_list(dates):
versions = []
for version in all_versions:
d = date(version[0], version[1], version[2])
if d in dates:
versions.append(version)
return versions

def create_directory(path):
if not os.path.exists(path):
os.makedirs(path)
Expand Down Expand Up @@ -104,10 +113,12 @@ def main():
print(f"Database to be used: {args.database}")

if args.min_date and args.max_date:
versions = filter_versions_by_date(args.min_date, args.max_date)
versions = filter_versions_by_date_range(args.min_date, args.max_date)
elif args.min_date or args.max_date:
print("Both --min_date and --max_date must be provided.")
return
elif args.days:
versions = filter_versions_by_date_list(args.days)
else:
versions = all_versions # If no filter is applied, process all versions.

Expand Down
Loading