-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver_tlslite_once.py
119 lines (93 loc) · 3.2 KB
/
server_tlslite_once.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# echo server - tlslite - serve once
from tlslite import TLSConnection
from tlslite.recordlayer import RecordLayer, RecordSocket, RecordHeader3, Parser
from tlslite import X509, X509CertChain, parsePEMKey
import socket
import argparse
from hexdump import hexdump
argsParser = argparse.ArgumentParser(description="Experiment using tlslite-ng module as wrapper and pure TLS")
argsParser.add_argument("--host", type=str, help="listen interface", default="localhost")
argsParser.add_argument("--port", type=int, help="listen port", default=9999)
argsParser.add_argument("--cert", type=str, help="server certificate", default="cert/my.crt")
argsParser.add_argument("--key", type=str, help="server private key", default="cert/my.key")
args = argsParser.parse_args()
HOST = args.host
PORT = args.port
CERT = args.cert
KEY = args.key
BUFFER_SIZE = 4096
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(1)
print("Server started at {}:{}".format(HOST, PORT))
conn, client_addr = server.accept()
print("New client:", client_addr)
# TLSConnection -> _recordLayer (RecordLayer) -> _recordSocket (RecordSocket)
class MyRecordSocket(RecordSocket):
def __init__(self, sock, data):
super().__init__(sock)
self._amazingData = data
self._startOfSomethingAmazing = True
def recv(self):
if self._startOfSomethingAmazing:
self._startOfSomethingAmazing = False
# assume it is SSL3 header, who cares about SSL2 ?
record = RecordHeader3().parse(Parser(self._amazingData))
return (record, self._amazingData[5:]),
else:
return super().recv()
class MyRecordLayer(RecordLayer):
def __init__(self, sock, data):
super().__init__(sock)
self._recordSocket = MyRecordSocket(sock, data)
class MyTLSConnection(TLSConnection):
def __init__(self, sock, data):
super().__init__(sock)
self._recordLayer = MyRecordLayer(sock, data)
# recv first packet
data = conn.recv(BUFFER_SIZE)
hexdump(data)
isTLS = ""
isRunning = True
if data.startswith(b"\x16\x03"):
print(">> Detect ssl request")
# ssl context
x509 = X509()
x509.parse(open(CERT).read())
certChain = X509CertChain([x509])
privateKey = parsePEMKey(open(KEY).read(), private=True)
# wrap it with modified class
connection = MyTLSConnection(conn, data)
# resume the handshake
try:
connection.handshakeServer(certChain=certChain, privateKey=privateKey, reqCert=True)
except:
print(">> Failed to switch ssl")
else:
print(">> Upgraded")
conn = connection
isTLS = "[TLS] "
else:
# its not ssl context, so we send back the first packet / or quit
if data.startswith(b"quit"):
isRunning = False
else:
conn.send(data)
# loop
while isRunning:
try:
data = conn.recv(BUFFER_SIZE)
except:
# tlslite socket error
break
# client closed
if not data:
break
print("> {}Recv: {} bytes".format(isTLS, len(data)))
hexdump(data)
if data.startswith(b"quit"):
break
conn.send(data)
conn.close()
server.close()