-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblacklistlookup.py
139 lines (126 loc) · 3.87 KB
/
blacklistlookup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import argparse
import urllib.request
import urllib.parse
import urllib.error
import ssl
import base64
import sqlite3
import json
parser = argparse.ArgumentParser(description="Lookup coordinates of IP addresses.")
parser.add_argument(
"-u",
"--username",
dest="username",
required=True,
type=str,
help="Maxmind username",
)
parser.add_argument(
"-p",
"--password",
dest="password",
required=True,
type=str,
help="Maxmind password",
)
args = parser.parse_args()
# Ignore SSL certificate errors
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# Sign-up at https://www.maxmind.com/en/geolite2/signup
# Then generate username & password at https://www.maxmind.com/en/accounts/current/license-key
b64auth = base64.standard_b64encode(
(args.username + ":" + args.password).encode("utf-8")
).decode()
def retrieve_latlongorgiso(ip_address):
url = "https://geolite.info/geoip/v2.1/city/" + ip_address
request = urllib.request.Request(url)
request.add_header("Authorization", "Basic " + b64auth)
geoip_url = urllib.request.urlopen(request, context=ctx)
geoip_data = geoip_url.read().decode()
geoip_json = json.loads(geoip_data)
latitude = 0.0
longitude = 0.0
organization = ""
iso_code = ""
try:
latitude = geoip_json["location"]["latitude"]
longitude = geoip_json["location"]["longitude"]
organization = geoip_json["traits"]["autonomous_system_organization"]
iso_code = geoip_json["country"]["iso_code"]
except KeyError as err:
print("KeyError", err)
print(ip_address, geoip_json)
return [latitude, longitude, organization, iso_code]
blacklist_db = sqlite3.connect("blacklist.sqlite")
cur_single = blacklist_db.cursor()
# First try to lookup the IPs without latitude and longitude
blacklist_incomplete_list = blacklist_db.cursor()
blacklist_incomplete_list.execute(
"""SELECT ip_address
FROM Blacklist
WHERE latitude is NULL
OR longitude is NULL
OR organization is NULL
OR iso_code is NULL"""
)
for row in blacklist_incomplete_list:
ip_address = str(row[0].strip())
try:
latlongorgiso = retrieve_latlongorgiso(ip_address)
except urllib.error.HTTPError as err:
print("Lookup incomplete", err)
blacklist_db.commit()
quit()
cur_single.execute(
"""UPDATE Blacklist
SET latitude = ?,
longitude = ?,
organization = ?,
iso_code = ?
WHERE ip_address = ?""",
(
latlongorgiso[0],
latlongorgiso[1],
latlongorgiso[2],
latlongorgiso[3],
ip_address,
),
)
blacklist_db.commit()
blacklist_incomplete_list.close()
# Then try to update the ones which were updated the last
blacklist_last_updated_list = blacklist_db.cursor()
blacklist_last_updated_list.execute(
"""SELECT ip_address
FROM Blacklist
WHERE updated_on < DATETIME('now', '-6 month')
ORDER BY updated_on ASC"""
)
for row in blacklist_last_updated_list:
ip_address = str(row[0].strip())
try:
latlongorgiso = retrieve_latlongorgiso(ip_address)
except urllib.error.HTTPError as err:
print("Lookup latest", err)
blacklist_db.commit()
quit()
cur_single.execute(
"""UPDATE Blacklist
SET latitude = ?,
longitude = ?,
organization = ?,
iso_code = ?
WHERE ip_address = ?""",
(
latlongorgiso[0],
latlongorgiso[1],
latlongorgiso[2],
latlongorgiso[3],
ip_address,
),
)
blacklist_db.commit()
blacklist_last_updated_list.close()
blacklist_db.commit()