1
+ import asyncio
2
+ import json
3
+ import time
4
+ import aiohttp
5
+ from aiohttp import web
6
+ import re
7
+ from typing import Any
8
+ import base64
9
+
10
+ ws_connections = {}
11
+ ws_url_b = "ws://127.0.0.1:20004"
12
+
13
+ async def main ():
14
+ app = web .Application ()
15
+ app .router .add_route ("GET" , "/ws" , setup_connections )
16
+ runner = web .AppRunner (app )
17
+ await runner .setup ()
18
+ site = web .TCPSite (runner , "127.0.0.1" , 30004 )
19
+ print (f"WebSocket server is listening on 127.0.0.1:30004" )
20
+ await site .start ()
21
+ await asyncio .Future () # Keep the server running indefinitely
22
+
23
+ async def setup_connections (request ):
24
+ ws = web .WebSocketResponse ()
25
+ await ws .prepare (request )
26
+
27
+ async for msg in ws :
28
+ if msg .type == aiohttp .WSMsgType .TEXT :
29
+ # 如果 bot_id 不存在,就获取一下存起来
30
+ if "a" not in ws_connections :
31
+ print (ws )
32
+ ws_connections ["a" ] = ws
33
+ bot_id = extract_bot_id_from_message_a (msg .data )
34
+ if bot_id :
35
+ ws_connections ["a.bot_id" ] = bot_id
36
+ asyncio .create_task (_setup_b ( bot_id ))
37
+ # 如果 bot_id 已存在,就处理消息
38
+ else :
39
+ asyncio .create_task (recv_message_a (msg .data ))
40
+ elif msg .type == aiohttp .WSMsgType .ERROR :
41
+ print (f"WebSocket A connection closed: { ws .exception ()} " )
42
+ break
43
+ return ws
44
+
45
+ async def _setup_b (bot_id ):
46
+ async with aiohttp .ClientSession () as session :
47
+ try :
48
+ headers = {
49
+ "User-Agent" : "CQHttp/4.15.0" ,
50
+ "X-Client-Role" : "Universal" ,
51
+ "X-Self-ID" : str (bot_id )
52
+ }
53
+ async with session .ws_connect (ws_url_b , headers = headers ) as ws_b :
54
+ ws_connections ["b" ] = ws_b
55
+ ws_connections ["b.bot_id" ] = bot_id
56
+ message = {
57
+ "meta_event_type" : "lifecycle" ,
58
+ "post_type" : "meta_event" ,
59
+ "self_id" : bot_id ,
60
+ "sub_type" : "connect" ,
61
+ "time" : int (time .time ())
62
+ }
63
+ await ws_b .send_str (json .dumps (message ))
64
+ async for msg in ws_b :
65
+ asyncio .create_task (recv_message_b (msg ))
66
+ except aiohttp .ClientError as e :
67
+ print (f"Failed to connect websocket B: { e } " )
68
+
69
+ def extract_bot_id_from_message_a (msg ):
70
+ message = json .loads (msg )
71
+ if "CurrentQQ" in message :
72
+ return message ["CurrentQQ" ]
73
+ return None
74
+
75
+ async def recv_message_a (msg ):
76
+ message = json .loads (msg )
77
+ print ("从a收到了" , message )
78
+ transformed_message = transform_message_a_to_b (message )
79
+ await send_to_ws_b (transformed_message )
80
+
81
+ async def recv_message_b (msg ):
82
+ message = json .loads (msg .data )
83
+ print ("从b收到了" , str (message )[:200 ])
84
+ await call_api_from_dict (message )
85
+
86
+ async def call_api_from_dict (message ):
87
+ async with aiohttp .ClientSession () as session :
88
+ action = message ['action' ]
89
+ params = message ['params' ]
90
+ botqq = params ['botqq' ]
91
+ url = f'http://127.0.0.1:8086/v1/LuaApiCaller?funcname=MagicCgiCmd&timeout=10&qq={ botqq } '
92
+ upload_url = f'http://127.0.0.1:8086/v1/upload?qq={ botqq } '
93
+ headers = {'Content-Type' : 'application/json' }
94
+
95
+ at_pattern = r'\[CQ:at,qq=(\d+)\]'
96
+ local_image_pattern = r'\[CQ:image,file=file:///(.+)\]'
97
+ url_image_pattern = r'\[CQ:image,file=http://(.+)\]'
98
+ base64_image_pattern = r'\[CQ:image,file=base64://(.+)\]'
99
+ base64_record_pattern = r'\[CQ:record,file=base64://(.+)\]'
100
+
101
+ at_uin_lists = re .findall (at_pattern , params ['message' ])
102
+ at_uin_list_dicts = [{"Uin" : int (uin )} for uin in at_uin_lists ]
103
+
104
+ local_images = re .findall (local_image_pattern , params ['message' ])
105
+ url_images = re .findall (url_image_pattern , params ['message' ])
106
+ base64_images = re .findall (base64_image_pattern , params ['message' ])
107
+ base64_records = re .findall (base64_record_pattern , params ['message' ])
108
+
109
+ images_dicts = []
110
+ voice_dict = []
111
+
112
+ for img_path in local_images :
113
+ with open (img_path , "rb" ) as f :
114
+ base64_encoded_str = base64 .b64encode (f .read ()).decode ('utf-8' )
115
+ upload_payload = {
116
+ "CgiCmd" : "PicUp.DataUp" ,
117
+ "CgiRequest" : {
118
+ "CommandId" : 1 if action == "send_private_msg" else 2 ,
119
+ "Base64Buf" : base64_encoded_str
120
+ }
121
+ }
122
+ async with session .post (upload_url , headers = headers , json = upload_payload ) as response :
123
+ response_json = await response .json ()
124
+ response_json = {
125
+ "FileId" : response_json ["ResponseData" ]["FileId" ],
126
+ "FileMd5" : response_json ["ResponseData" ]["FileMd5" ],
127
+ "FileSize" : response_json ["ResponseData" ]["FileSize" ]
128
+ }
129
+ images_dicts .append (response_json )
130
+
131
+ for img_url in url_images :
132
+ upload_payload = {
133
+ "CgiCmd" : "PicUp.DataUp" ,
134
+ "CgiRequest" : {
135
+ "CommandId" : 1 if action == "send_private_msg" else 2 ,
136
+ "FileUrl" : f"http://{ img_url } "
137
+ }
138
+ }
139
+ async with session .post (upload_url , headers = headers , json = upload_payload ) as response :
140
+ response_json = await response .json ()
141
+ response_json = {
142
+ "FileId" : response_json ["ResponseData" ]["FileId" ],
143
+ "FileMd5" : response_json ["ResponseData" ]["FileMd5" ],
144
+ "FileSize" : response_json ["ResponseData" ]["FileSize" ]
145
+ }
146
+ images_dicts .append (response_json )
147
+
148
+ for base64_str in base64_images + base64_records :
149
+ is_record = base64_str in base64_records
150
+ upload_payload = {
151
+ "CgiCmd" : "PicUp.DataUp" ,
152
+ "CgiRequest" : {
153
+ "CommandId" : 26 if is_record and action == "send_private_msg" else 29 if is_record else 1 if action == "send_private_msg" else 2 ,
154
+ "Base64Buf" : base64_str
155
+ }
156
+ }
157
+ async with session .post (upload_url , headers = headers , json = upload_payload ) as response :
158
+ response_json = await response .json ()
159
+ response_data = {
160
+ "FileToken" : response_json ["ResponseData" ]["FileToken" ],
161
+ "FileMd5" : response_json ["ResponseData" ]["FileMd5" ],
162
+ "FileSize" : response_json ["ResponseData" ]["FileSize" ]
163
+ }
164
+ if is_record :
165
+ voice_dict = response_data
166
+ else :
167
+ images_dicts .append (response_data )
168
+
169
+ content = re .sub (at_pattern , '' , params ['message' ])
170
+ content = re .sub (local_image_pattern , '' , content )
171
+ content = re .sub (url_image_pattern , '' , content ).strip ()
172
+
173
+ to_type = 1 if action == 'send_private_msg' else 2 if action == 'send_group_msg' else None
174
+ if to_type is not None :
175
+ to_uin_key = 'user_id' if action == 'send_private_msg' else 'group_id'
176
+ to_uin = params [to_uin_key ]
177
+ payload = {
178
+ "CgiCmd" : "MessageSvc.PbSendMsg" ,
179
+ "CgiRequest" : {
180
+ "ToUin" : int (to_uin ),
181
+ "ToType" : to_type ,
182
+ "Content" : content ,
183
+ "AtUinLists" : at_uin_list_dicts ,
184
+ "Images" : images_dicts ,
185
+ ** ({"Voice" : voice_dict } if voice_dict else {})
186
+ }
187
+ }
188
+ else :
189
+ print (f"Unsupported action: { action } " )
190
+ return
191
+
192
+ print ("提交的请求" , payload )
193
+ async with session .post (url , headers = headers , json = payload ) as response :
194
+ response_text = await response .text ()
195
+ print (response_text )
196
+
197
+ def transform_message_a_to_b (message : dict ) -> dict :
198
+ msg_head = message ["CurrentPacket" ]["EventData" ]["MsgHead" ]
199
+ msg_body = message ["CurrentPacket" ]["EventData" ]["MsgBody" ]
200
+
201
+ if msg_body is None :
202
+ return None
203
+ if msg_head ["FromType" ] == 1 :
204
+ message_type = "private"
205
+ elif msg_head ["FromType" ] == 2 :
206
+ message_type = "group"
207
+ else :
208
+ return None
209
+
210
+ sender = {
211
+ "user_id" : msg_head ["SenderUin" ],
212
+ "nickname" : msg_head ["SenderNick" ]
213
+ }
214
+
215
+ if message_type == "group" :
216
+ group_info = msg_head ["GroupInfo" ]
217
+ sender .update ({"card" : group_info ["GroupCard" ]})
218
+ at_uin_list = msg_body .get ("AtUinLists" , [])
219
+ if at_uin_list is None :
220
+ at_uin_list = []
221
+ at_str = "" .join ([f"[CQ:at,qq={ at_uin ['Uin' ]} ]" for at_uin in at_uin_list ])
222
+ images = msg_body .get ("Images" , [])
223
+ if images is None :
224
+ images = []
225
+ image_str = "" .join ([f"[CQ:image,file={ image ['FileMd5' ]} .image,subType=0,url={ image ['Url' ]} ]" for image in images ])
226
+ raw_message = msg_body ["Content" ]
227
+ for at_uin in at_uin_list :
228
+ raw_message = raw_message .replace (f"@{ at_uin ['Nick' ]} " , f"[CQ:at,qq={ at_uin ['Uin' ]} ]" )
229
+ message_content = f"{ at_str } { raw_message } { image_str } " .strip ()
230
+ return {
231
+ "post_type" : "message" ,
232
+ "message_type" : message_type ,
233
+ "group_id" : msg_head ["FromUin" ],
234
+ "user_id" : msg_head ["SenderUin" ],
235
+ "self_id" : msg_head ["ToUin" ],
236
+ "sender" : sender ,
237
+ "message_seq" : msg_head ["MsgSeq" ],
238
+ "time" : msg_head ["MsgTime" ],
239
+ "message_id" : str (msg_head ["MsgUid" ]),
240
+ "raw_message" : message_content ,
241
+ "message" : message_content
242
+ }
243
+ elif message_type == "private" :
244
+ images = msg_body .get ("Images" , [])
245
+ if images is None :
246
+ images = []
247
+ image_str = "" .join ([f"[CQ:image,file={ image ['FileMd5' ]} .image,subType=0,url={ image ['Url' ]} ]" for image in images ])
248
+ raw_message = msg_body ["Content" ]
249
+ message_content = f"{ raw_message } { image_str } " .strip ()
250
+ return {
251
+ "post_type" : "message" ,
252
+ "message_type" : message_type ,
253
+ "sub_type" : "friend" ,
254
+ "user_id" : msg_head ["SenderUin" ],
255
+ "self_id" : msg_head ["ToUin" ],
256
+ "sender" : sender ,
257
+ "time" : msg_head ["MsgTime" ],
258
+ "message_id" : str (msg_head ["MsgUid" ]),
259
+ "raw_message" : message_content ,
260
+ "message" : message_content
261
+ }
262
+ else :
263
+ return None
264
+
265
+ async def send_to_ws_b (message ):
266
+ if message == None :
267
+ return
268
+ bot_id = message ["self_id" ]
269
+ ws_conn = ws_connections ["b" ]
270
+ print ("给b发送了" ,message )
271
+ if ws_conn :
272
+ try :
273
+ await ws_conn .send_str (json .dumps (message ))
274
+ except ConnectionResetError :
275
+ await ws_conn .close ()
276
+ print ("重连" )
277
+ await _setup_b (bot_id )
278
+ ws_conn = ws_connections ["b" ]
279
+ await ws_conn .send_str (json .dumps (message ))
280
+
281
+ if __name__ == "__main__" :
282
+ asyncio .run (main ())
0 commit comments