-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathdcnow.py
138 lines (109 loc) · 3.98 KB
/
dcnow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
from __future__ import absolute_import
import threading
import os
import json
import time
import logging
import logging.handlers
import urllib.request
import urllib.parse
import sh # type: ignore - sh module is dynamic
from typing import List, Optional
from hashlib import sha256
from uuid import getnode as get_mac
logger = logging.getLogger("dcnow")
API_ROOT = "https://dcnow-2016.appspot.com"
UPDATE_END_POINT = "/api/update/{mac_address}/"
UPDATE_INTERVAL = 15
CONFIGURATION_FILE = os.path.expanduser("~/.dreampi.json")
def hash_mac_address():
mac = get_mac()
return sha256(
":".join(("%012X" % mac)[i : i + 2] for i in range(0, 12, 2)).encode()
).hexdigest()
class DreamcastNowThread(threading.Thread):
def __init__(self, service: "DreamcastNowService"):
self._service = service
self._running = True
super(DreamcastNowThread, self).__init__()
def run(self):
def post_update():
if not self._service.enabled:
return
lines: List[str] = list(
sh.tail( # type: ignore - sh has dynamic members
"/var/log/syslog", "-n", "10", _iter=True
)
)
dns_query = None
for line in lines[::-1]:
line: str = line
if "CONNECT" in line and "dreampi" in line:
# Don't seek back past connection
break
if "query[A]" in line:
# We did a DNS lookup, what was it?
remainder = line[line.find("query[A]") + len("query[A]") :].strip()
domain = remainder.split(" ", 1)[0].strip()
dns_query = sha256(domain.encode()).hexdigest()
break
user_agent = "Mozilla/4.0 (compatible; MSIE 5.5; Windows NT), Dreamcast Now"
header = {"User-Agent": user_agent}
mac_address = self._service.mac_address_hash
data = {}
if dns_query:
data["dns_query"] = dns_query
data = urllib.parse.urlencode(data).encode()
req = urllib.request.Request(
API_ROOT + UPDATE_END_POINT.format(mac_address=mac_address),
data,
header,
)
urllib.request.urlopen(req) # Send POST update
while self._running:
try:
post_update()
except:
logger.exception("Couldn't update Dreamcast Now!")
time.sleep(UPDATE_INTERVAL)
def stop(self):
self._running = False
self.join()
class DreamcastNowService(object):
def __init__(self):
self._thread = None
self._mac_address_hash: Optional[str] = None
self._enabled: bool = True
self.reload_settings()
logger.setLevel(logging.INFO)
syslog_handler = logging.handlers.SysLogHandler(address="/dev/log")
syslog_handler.setFormatter(
logging.Formatter("%(name)s[%(process)d]: %(levelname)s %(message)s")
)
logger.addHandler(syslog_handler)
def update_mac_address(self):
self._mac_address_hash = hash_mac_address()
logger.info("MAC address: {}".format(self._mac_address_hash))
def reload_settings(self):
settings_file = CONFIGURATION_FILE
if os.path.exists(settings_file):
with open(settings_file, "r") as settings:
content = json.loads(settings.read())
self._enabled = content["enabled"]
def go_online(self):
if not self._enabled:
return
self.update_mac_address()
self._thread = DreamcastNowThread(self)
self._thread.start()
def go_offline(self):
if self._thread is not None:
self._thread.stop()
self._thread = None
@property
def enabled(self):
return self._enabled
@property
def mac_address_hash(self):
return self._mac_address_hash