-
-
Notifications
You must be signed in to change notification settings - Fork 452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Abuse Whois Analyzer (Closes #2308) #2686
Open
pranjalg1331
wants to merge
10
commits into
intelowlproject:develop
Choose a base branch
from
pranjalg1331:abuse_whois
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+325
−2
Open
Changes from 3 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
5ae73fe
initial
pranjalg1331 31e2316
abuse analyzer completed
pranjalg1331 625a2a2
migration
pranjalg1331 4d47a44
add to playbook
pranjalg1331 1f3dba2
dependency resolve
pranjalg1331 6d72bd6
requirements complete
pranjalg1331 0252b81
cyclomatic funtion removed
pranjalg1331 90ef484
deep source python correction
pranjalg1331 f91cdca
deep source python correction 2
pranjalg1331 ed93e2e
deep source python correction 3
pranjalg1331 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
125 changes: 125 additions & 0 deletions
125
api_app/analyzers_manager/migrations/0147_analyzer_config_abusewhois.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
from django.db import migrations | ||
from django.db.models.fields.related_descriptors import ( | ||
ForwardManyToOneDescriptor, | ||
ForwardOneToOneDescriptor, | ||
ManyToManyDescriptor, | ||
ReverseManyToOneDescriptor, | ||
ReverseOneToOneDescriptor, | ||
) | ||
|
||
plugin = { | ||
"python_module": { | ||
"health_check_schedule": None, | ||
"update_schedule": None, | ||
"module": "abusewhois.AbuseWHOIS", | ||
"base_path": "api_app.analyzers_manager.observable_analyzers", | ||
}, | ||
"name": "AbuseWhois", | ||
"description": "A Sigma and RDAP/Whois-based abuse contacts finder.", | ||
"disabled": False, | ||
"soft_time_limit": 30, | ||
"routing_key": "default", | ||
"health_check_status": True, | ||
"type": "observable", | ||
"docker_based": False, | ||
"maximum_tlp": "AMBER", | ||
"observable_supported": ["ip", "url", "domain"], | ||
"supported_filetypes": [], | ||
"run_hash": False, | ||
"run_hash_type": "", | ||
"not_supported_filetypes": [], | ||
"mapping_data_model": {}, | ||
"model": "analyzers_manager.AnalyzerConfig", | ||
} | ||
|
||
params = [] | ||
|
||
values = [] | ||
|
||
|
||
def _get_real_obj(Model, field, value): | ||
def _get_obj(Model, other_model, value): | ||
if isinstance(value, dict): | ||
real_vals = {} | ||
for key, real_val in value.items(): | ||
real_vals[key] = _get_real_obj(other_model, key, real_val) | ||
value = other_model.objects.get_or_create(**real_vals)[0] | ||
# it is just the primary key serialized | ||
else: | ||
if isinstance(value, int): | ||
if Model.__name__ == "PluginConfig": | ||
value = other_model.objects.get(name=plugin["name"]) | ||
else: | ||
value = other_model.objects.get(pk=value) | ||
else: | ||
value = other_model.objects.get(name=value) | ||
return value | ||
|
||
if ( | ||
type(getattr(Model, field)) | ||
in [ | ||
ForwardManyToOneDescriptor, | ||
ReverseManyToOneDescriptor, | ||
ReverseOneToOneDescriptor, | ||
ForwardOneToOneDescriptor, | ||
] | ||
and value | ||
): | ||
other_model = getattr(Model, field).get_queryset().model | ||
value = _get_obj(Model, other_model, value) | ||
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: | ||
other_model = getattr(Model, field).rel.model | ||
value = [_get_obj(Model, other_model, val) for val in value] | ||
return value | ||
|
||
|
||
def _create_object(Model, data): | ||
mtm, no_mtm = {}, {} | ||
for field, value in data.items(): | ||
value = _get_real_obj(Model, field, value) | ||
if type(getattr(Model, field)) is ManyToManyDescriptor: | ||
mtm[field] = value | ||
else: | ||
no_mtm[field] = value | ||
try: | ||
o = Model.objects.get(**no_mtm) | ||
except Model.DoesNotExist: | ||
o = Model(**no_mtm) | ||
o.full_clean() | ||
o.save() | ||
for field, value in mtm.items(): | ||
attribute = getattr(o, field) | ||
if value is not None: | ||
attribute.set(value) | ||
return False | ||
return True | ||
|
||
|
||
def migrate(apps, schema_editor): | ||
Parameter = apps.get_model("api_app", "Parameter") | ||
PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
python_path = plugin.pop("model") | ||
Model = apps.get_model(*python_path.split(".")) | ||
if not Model.objects.filter(name=plugin["name"]).exists(): | ||
exists = _create_object(Model, plugin) | ||
if not exists: | ||
for param in params: | ||
_create_object(Parameter, param) | ||
for value in values: | ||
_create_object(PluginConfig, value) | ||
|
||
|
||
def reverse_migrate(apps, schema_editor): | ||
python_path = plugin.pop("model") | ||
Model = apps.get_model(*python_path.split(".")) | ||
Model.objects.get(name=plugin["name"]).delete() | ||
|
||
|
||
class Migration(migrations.Migration): | ||
atomic = False | ||
dependencies = [ | ||
("api_app", "0065_job_mpnodesearch"), | ||
("analyzers_manager", "0146_analyzer_config_wad"), | ||
] | ||
|
||
operations = [migrations.RunPython(migrate, reverse_migrate)] |
199 changes: 199 additions & 0 deletions
199
api_app/analyzers_manager/observable_analyzers/abusewhois.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl | ||
# See the file 'LICENSE' for copying permission. | ||
|
||
|
||
import asyncio | ||
|
||
from abuse_whois import get_abuse_contacts | ||
|
||
from api_app.analyzers_manager import classes | ||
|
||
|
||
class AbuseWHOIS(classes.ObservableAnalyzer): | ||
|
||
@classmethod | ||
def update(cls) -> bool: | ||
pass | ||
|
||
def _clean_contact_info(self, contact): | ||
"""Remove null values and replace with REDACTED if appropriate""" | ||
if not any(contact.values()): | ||
return {"status": "REDACTED FOR PRIVACY"} | ||
return {k: v for k, v in contact.items() if v is not None} | ||
|
||
def _parse_raw_whois_text(self, raw_text): | ||
"""Extract network information from raw WHOIS text""" | ||
info = {} | ||
for line in raw_text.split("\n"): | ||
line = line.strip() | ||
if not line or line.startswith("#"): | ||
continue | ||
|
||
if ":" in line: | ||
key, value = line.split(":", 1) | ||
key = key.strip() | ||
value = value.strip() | ||
if value: | ||
info[key] = value | ||
|
||
return info | ||
|
||
def _format_ip_data(self, result): | ||
"""Format IP address WHOIS data""" | ||
raw_info = self._parse_raw_whois_text(result.records.ip_address.raw_text) | ||
|
||
return { | ||
"network": { | ||
"address": result.address, | ||
"hostname": result.hostname, | ||
"ip_address": result.ip_address, | ||
"range": raw_info.get("NetRange"), | ||
"cidr": raw_info.get("CIDR"), | ||
"name": raw_info.get("NetName"), | ||
"type": raw_info.get("NetType"), | ||
"origin_as": raw_info.get("OriginAS"), | ||
}, | ||
"organization": { | ||
"name": raw_info.get("OrgName"), | ||
"id": raw_info.get("OrgId"), | ||
"address": raw_info.get("Address"), | ||
"city": raw_info.get("City"), | ||
"state": raw_info.get("StateProv"), | ||
"postal_code": raw_info.get("PostalCode"), | ||
"country": raw_info.get("Country"), | ||
"registration_date": raw_info.get("RegDate"), | ||
"last_updated": raw_info.get("Updated"), | ||
}, | ||
"contacts": { | ||
"abuse": { | ||
"email": raw_info.get("OrgAbuseEmail"), | ||
"phone": raw_info.get("OrgAbusePhone"), | ||
"name": raw_info.get("OrgAbuseName"), | ||
}, | ||
"technical": { | ||
"email": raw_info.get("OrgTechEmail"), | ||
"phone": raw_info.get("OrgTechPhone"), | ||
"name": raw_info.get("OrgTechName"), | ||
}, | ||
}, | ||
} | ||
|
||
def _format_domain_data(self, result): | ||
"""Format domain WHOIS data""" | ||
return { | ||
"domain": { | ||
"name": result.address, | ||
"ip_address": result.ip_address, | ||
"registrar": { | ||
"provider": result.registrar.provider if result.registrar else None, | ||
"email": result.registrar.address if result.registrar else None, | ||
"type": result.registrar.type if result.registrar else None, | ||
}, | ||
}, | ||
"domain_info": { | ||
"nameservers": ( | ||
result.records.domain.name_servers if result.records.domain else [] | ||
), | ||
"statuses": ( | ||
result.records.domain.statuses if result.records.domain else [] | ||
), | ||
"expires_at": ( | ||
result.records.domain.expires_at.isoformat() | ||
if result.records.domain and result.records.domain.expires_at | ||
else None | ||
), | ||
"updated_at": ( | ||
result.records.domain.updated_at.isoformat() | ||
if result.records.domain and result.records.domain.updated_at | ||
else None | ||
), | ||
}, | ||
"contacts": { | ||
"registrant": self._clean_contact_info( | ||
{ | ||
"organization": ( | ||
result.records.domain.registrant.organization | ||
if result.records.domain | ||
else None | ||
), | ||
"email": ( | ||
result.records.domain.registrant.email | ||
if result.records.domain | ||
else None | ||
), | ||
"name": ( | ||
result.records.domain.registrant.name | ||
if result.records.domain | ||
else None | ||
), | ||
"telephone": ( | ||
result.records.domain.registrant.telephone | ||
if result.records.domain | ||
else None | ||
), | ||
} | ||
), | ||
"abuse": self._clean_contact_info( | ||
{ | ||
"email": ( | ||
result.records.domain.abuse.email | ||
if result.records.domain | ||
else None | ||
), | ||
"telephone": ( | ||
result.records.domain.abuse.telephone | ||
if result.records.domain | ||
else None | ||
), | ||
} | ||
), | ||
"technical": self._clean_contact_info( | ||
{ | ||
"organization": ( | ||
result.records.domain.tech.organization | ||
if result.records.domain | ||
else None | ||
), | ||
"email": ( | ||
result.records.domain.tech.email | ||
if result.records.domain | ||
else None | ||
), | ||
"name": ( | ||
result.records.domain.tech.name | ||
if result.records.domain | ||
else None | ||
), | ||
"telephone": ( | ||
result.records.domain.tech.telephone | ||
if result.records.domain | ||
else None | ||
), | ||
} | ||
), | ||
}, | ||
} | ||
|
||
async def _get_whois_data(self): | ||
"""Get and format WHOIS data""" | ||
result = await get_abuse_contacts(self.observable_name) | ||
|
||
# Determine if this is an IP address or domain lookup and format accordingly | ||
formatted_data = ( | ||
self._format_ip_data(result) | ||
if result.records.domain is None | ||
else self._format_domain_data(result) | ||
) | ||
|
||
# Remove any remaining null values at the top level | ||
return {k: v for k, v in formatted_data.items() if v is not None} | ||
|
||
def run(self): | ||
"""Run the analyzer""" | ||
report = asyncio.run(self._get_whois_data()) | ||
return report | ||
|
||
@classmethod | ||
def _monkeypatch(cls): | ||
patches = [] | ||
return super()._monkeypatch(patches=patches) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fgibertoni, I am trying to install the abuse_whois package through
project-requirements.txt
, but it always fails, so I have installed it through Dockerfile. Please suggest what can be done here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please post also the error you're getting while installing the package so I can help you better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the abuse_whois dependency is causing conflicts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fgibertoni Creating a separate requirements file for abuse_whois does not give any error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll think about a solution. I don't really like having two different requirements files but it seems the only way to go at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i agree with Federico: split the dependencies into multiple files makes more difficult manage them. Did you check what is the last version compatible of the new package ?