-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmodels.py
More file actions
120 lines (106 loc) · 4.57 KB
/
models.py
File metadata and controls
120 lines (106 loc) · 4.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python3
from datetime import datetime
from sqlalchemy import Column, Integer, String, Text, DateTime, Float, Boolean, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
Base = declarative_base()
class Dataset(Base):
__tablename__ = 'datasets'
id = Column(Integer, primary_key=True)
filename = Column(String(255), nullable=False, unique=True)
name = Column(String(255), nullable=False)
description = Column(Text)
sources = Column(JSON)
total_entries = Column(Integer, default=0)
file_size = Column(Integer, default=0)
quality_score = Column(Float, default=0.0)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'filename': self.filename,
'name': self.name,
'description': self.description,
'sources': self.sources,
'total_entries': self.total_entries,
'file_size': self.file_size,
'quality_score': self.quality_score,
'created_at': self.created_at.isoformat() if self.created_at is not None else None,
'updated_at': self.updated_at.isoformat() if self.updated_at is not None else None
}
class VulnerabilityEntry(Base):
__tablename__ = 'vulnerability_entries'
id = Column(Integer, primary_key=True)
dataset_id = Column(Integer, nullable=False)
source = Column(String(50), nullable=False)
instruction = Column(Text, nullable=False)
input_text = Column(Text, nullable=False)
output_text = Column(Text, nullable=False)
vulnerability_type = Column(String(100))
severity = Column(String(20))
cvss_score = Column(Float)
original_url = Column(String(500))
content_hash = Column(String(64), unique=True)
created_at = Column(DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'dataset_id': self.dataset_id,
'source': self.source,
'instruction': self.instruction,
'input': self.input_text,
'output': self.output_text,
'vulnerability_type': self.vulnerability_type,
'severity': self.severity,
'cvss_score': self.cvss_score,
'original_url': self.original_url,
'content_hash': self.content_hash,
'created_at': self.created_at.isoformat() if self.created_at is not None else None
}
class ScrapingJob(Base):
__tablename__ = 'scraping_jobs'
id = Column(Integer, primary_key=True)
dataset_id = Column(Integer, nullable=True)
sources = Column(JSON)
max_entries_per_source = Column(Integer, default=100)
status = Column(String(20), default='pending')
progress = Column(Integer, default=0)
current_source = Column(String(50))
total_entries = Column(Integer, default=0)
errors = Column(JSON)
started_at = Column(DateTime)
completed_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'dataset_id': self.dataset_id,
'sources': self.sources,
'max_entries_per_source': self.max_entries_per_source,
'status': self.status,
'progress': self.progress,
'current_source': self.current_source,
'total_entries': self.total_entries,
'errors': self.errors,
'started_at': self.started_at.isoformat() if self.started_at is not None else None,
'completed_at': self.completed_at.isoformat() if self.completed_at is not None else None,
'created_at': self.created_at.isoformat() if self.created_at is not None else None
}
class UserPreferences(Base):
__tablename__ = 'user_preferences'
id = Column(Integer, primary_key=True)
key = Column(String(100), nullable=False, unique=True)
value = Column(JSON)
description = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'key': self.key,
'value': self.value,
'description': self.description,
'created_at': self.created_at.isoformat() if self.created_at is not None else None,
'updated_at': self.updated_at.isoformat() if self.updated_at is not None else None
}