Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions akagi_backend/akagi_ng/bridge/majsoul/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from akagi_ng.bridge.logger import logger
from akagi_ng.bridge.majsoul.consts import OperationAnGangAddGang, OperationChiPengGang
from akagi_ng.bridge.majsoul.liqi import LiqiProto, MsgType
from akagi_ng.bridge.majsoul.modifier import MajsoulModifier, PacketMutation
from akagi_ng.bridge.majsoul.tile_mapping import MS_TILE_2_MJAI_TILE, compare_pai
from akagi_ng.schema.constants import MahjongConstants
from akagi_ng.schema.notifications import NotificationCode
Expand All @@ -16,6 +17,10 @@
def __init__(self):
super().__init__()
self.liqi_proto = LiqiProto()
self.modifier = MajsoulModifier()
self._logged_first_frame = False
self._logged_first_parsed_message = False
self._logged_first_unparsed_frame = False
self._init_state()

def _init_state(self):
Expand Down Expand Up @@ -62,6 +67,39 @@
logger.error(f"Error parsing Majsoul message: {e}")
return [self.make_system_event(NotificationCode.PARSE_ERROR)]

def process_message(self, content: bytes, *, from_client: bool) -> tuple[list[AkagiEvent], PacketMutation]:
"""Parse MJAI events and optionally mutate the original websocket packet."""
try:
if not self._logged_first_frame:
self._logged_first_frame = True
logger.info(f"[MajsoulBridge] First websocket frame received from_client={from_client}")
liqi_message = self.liqi_proto.parse(content)
if liqi_message and not self._logged_first_parsed_message:
self._logged_first_parsed_message = True
logger.info(

Check warning on line 79 in akagi_backend/akagi_ng/bridge/majsoul/bridge.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/bridge.py#L78-L79

Added lines #L78 - L79 were not covered by tests
f"[MajsoulBridge] First parsed liqi message: "
f"type={liqi_message.get('type')} method={liqi_message.get('method')}"
)
if not liqi_message and not self._logged_first_unparsed_frame:
self._logged_first_unparsed_frame = True
logger.warning("[MajsoulBridge] Received websocket frame but failed to parse liqi message")
mutation = self.modifier.process(

Check warning on line 86 in akagi_backend/akagi_ng/bridge/majsoul/bridge.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/bridge.py#L84-L86

Added lines #L84 - L86 were not covered by tests
liqi_message,
from_client=from_client,
raw_content=content,
liqi_proto=self.liqi_proto,
)
parsed = self.parse_liqi(liqi_message)

Check warning on line 92 in akagi_backend/akagi_ng/bridge/majsoul/bridge.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/bridge.py#L92

Added line #L92 was not covered by tests

if parsed:
logger.trace(f"<- {liqi_message}")
logger.trace(f"-> {parsed}")

Check warning on line 96 in akagi_backend/akagi_ng/bridge/majsoul/bridge.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/bridge.py#L95-L96

Added lines #L95 - L96 were not covered by tests

return parsed, mutation

Check warning on line 98 in akagi_backend/akagi_ng/bridge/majsoul/bridge.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/bridge.py#L98

Added line #L98 was not covered by tests
except Exception as e:
logger.error(f"Error processing Majsoul message: {e}")
return [self.make_system_event(NotificationCode.PARSE_ERROR)], PacketMutation()

def _parse_sync_game(self, liqi_message: dict) -> list[AkagiEvent]:
"""处理游戏同步消息(重连后的同步)"""
self._pre_scan_mode_from_sync_msg(liqi_message)
Expand Down
117 changes: 116 additions & 1 deletion akagi_backend/akagi_ng/bridge/majsoul/liqi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import message_factory as _message_factory
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToDict, ParseDict

from akagi_ng.bridge.logger import logger
from akagi_ng.bridge.majsoul.consts import LiqiProtocolConstants
Expand All @@ -21,6 +21,9 @@
Res = 3


RPC_METHOD_PARTS = 4


keys = [0x84, 0x5E, 0x4E, 0x42, 0x39, 0xA2, 0x1F, 0x60, 0x1C]


Expand Down Expand Up @@ -170,6 +173,78 @@
logger.warning(f"Message type {name} not found in protocol")
return None

def get_rpc_message_classes(
self, method_name: str
) -> tuple[type[_message.Message] | None, type[_message.Message] | None]:
"""Return request/response protobuf classes for an RPC method."""
parts = method_name.split(".")
if len(parts) < RPC_METHOD_PARTS:
return None, None

lq, service, rpc = parts[1:4]
try:
rpc_info = self.jsonProto["nested"][lq]["nested"][service]["methods"][rpc]
except KeyError:
logger.warning(f"RPC method {method_name} not found in protocol")
return None, None

Check warning on line 189 in akagi_backend/akagi_ng/bridge/majsoul/liqi.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/liqi.py#L184-L189

Added lines #L184 - L189 were not covered by tests

req_cls = self.get_message_class(rpc_info["requestType"])
res_cls = self.get_message_class(rpc_info["responseType"])
return req_cls, res_cls

Check warning on line 193 in akagi_backend/akagi_ng/bridge/majsoul/liqi.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/liqi.py#L191-L193

Added lines #L191 - L193 were not covered by tests

def set_pending_response(self, msg_id: int, method_name: str) -> None:
"""Override the response type associated with a pending request."""
_, res_cls = self.get_rpc_message_classes(method_name)
self.res_type[msg_id] = (method_name, res_cls)

def drop_pending_response(self, msg_id: int) -> None:
self.res_type.pop(msg_id, None)

def build_message(self, message_name: str, data: dict | None) -> bytes:
"""Serialize a protobuf message from a dict using dynamic descriptors."""
msg_cls = self.get_message_class(message_name)
if not msg_cls:
raise AttributeError(f"Unknown Message: {message_name}")

proto_obj = msg_cls()
ParseDict(data or {}, proto_obj, ignore_unknown_fields=False)
return proto_obj.SerializeToString()

def build_packet(self, msg_type: MsgType, method_name: str, data: dict | None, msg_id: int = -1) -> bytes:
"""Serialize a Liqi packet back into its websocket binary form."""
if msg_type == MsgType.Notify:
message_name = method_name.split(".")[-1]
else:
req_cls, res_cls = self.get_rpc_message_classes(method_name)
target_cls = req_cls if msg_type == MsgType.Req else res_cls
if not target_cls:
raise AttributeError(f"Unknown RPC message class for {method_name}")
message_name = target_cls.DESCRIPTOR.name
payload = self.build_message(message_name, data)

if msg_type == MsgType.Notify:
blocks = [
{"id": 1, "type": "string", "data": method_name.encode()},
{"id": 2, "type": "string", "data": payload},
]
return bytes([int(msg_type)]) + to_protobuf(blocks)

if msg_type == MsgType.Req:
blocks = [
{"id": 1, "type": "string", "data": method_name.encode()},
{"id": 2, "type": "string", "data": payload},
]
return bytes([int(msg_type)]) + struct.pack("<H", msg_id) + to_protobuf(blocks)

if msg_type == MsgType.Res:
blocks = [
{"id": 1, "type": "string", "data": b""},
{"id": 2, "type": "string", "data": payload},
]
return bytes([int(msg_type)]) + struct.pack("<H", msg_id) + to_protobuf(blocks)

raise ValueError(f"Unsupported message type: {msg_type}")

def init(self):
self.msg_id = 1
self.res_type.clear()
Expand Down Expand Up @@ -332,3 +407,43 @@
raise ValueError(f"unknown pb block type: {block_type}")
result.append({"id": block_id, "type": block_type, "data": data, "begin": block_begin})
return result


def encode_varint(value: int) -> bytes:
"""Encode an integer using protobuf varint format."""
if value < 0:
raise ValueError("varint only supports non-negative integers")

Check warning on line 415 in akagi_backend/akagi_ng/bridge/majsoul/liqi.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/liqi.py#L415

Added line #L415 was not covered by tests

out = bytearray()
while True:
to_write = value & 0x7F
value >>= 7
if value:
out.append(to_write | 0x80)
else:
out.append(to_write)
break
return bytes(out)


def to_protobuf(blocks: list[dict]) -> bytes:
"""Serialize the simplified block structure used by Liqi packets."""
out = bytearray()
type_map = {"varint": LiqiProtocolConstants.BLOCK_TYPE_VARINT, "string": LiqiProtocolConstants.BLOCK_TYPE_STRING}

for block in blocks:
field_id = int(block["id"])
block_type = block["type"]
wire_type = type_map[block_type]
out.append((field_id << 3) | wire_type)

if block_type == "varint":
out.extend(encode_varint(int(block["data"])))
elif block_type == "string":
data = block["data"]
out.extend(encode_varint(len(data)))
out.extend(data)
else:
raise ValueError(f"unknown pb block type: {block_type}")

Check warning on line 447 in akagi_backend/akagi_ng/bridge/majsoul/liqi.py

View check run for this annotation

Codecov / codecov/patch

akagi_backend/akagi_ng/bridge/majsoul/liqi.py#L447

Added line #L447 was not covered by tests

return bytes(out)
3 changes: 3 additions & 0 deletions akagi_backend/akagi_ng/bridge/majsoul/mod_proto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from akagi_ng.bridge.majsoul.mod_proto.loader import config_pb2, sheets_pb2

__all__ = ["config_pb2", "sheets_pb2"]
27 changes: 27 additions & 0 deletions akagi_backend/akagi_ng/bridge/majsoul/mod_proto/config.desc
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

config.proto lq.config"T
Field

field_name ( 
array_length (
pb_type ( 
pb_index ("*
SheetMeta
category ( 
key ( "a
SheetSchema
name ( "
meta ( 2.lq.config.SheetMeta
fields ( 2.lq.config.Field"C
TableSchema
name ( &
sheets ( 2.lq.config.SheetSchema"7
SheetData
table ( 
sheet ( 
data ( "‚
ConfigTables
version ( 
header_hash ( '
schemas ( 2.lq.config.TableSchema#
datas ( 2.lq.config.SheetDatabproto3
Expand Down
38 changes: 38 additions & 0 deletions akagi_backend/akagi_ng/bridge/majsoul/mod_proto/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

from importlib import resources
from types import ModuleType

from google.protobuf import descriptor_pool, message_factory
from google.protobuf.descriptor import FileDescriptor

type ProtoModuleName = str
type DescriptorFileName = str


def _iter_message_names(descriptor: FileDescriptor) -> list[str]:
return list(descriptor.message_types_by_name)


def _load_descriptor_bytes(file_name: DescriptorFileName) -> bytes:
return resources.files(__package__).joinpath(file_name).read_bytes()


def _build_module(module_name: ProtoModuleName, descriptor_file: DescriptorFileName) -> ModuleType:
pool = descriptor_pool.DescriptorPool()
descriptor = pool.AddSerializedFile(_load_descriptor_bytes(descriptor_file))
module = ModuleType(module_name)
module.DESCRIPTOR = descriptor

for message_name in _iter_message_names(descriptor):
full_name = f"{descriptor.package}.{message_name}" if descriptor.package else message_name
message_descriptor = pool.FindMessageTypeByName(full_name)
setattr(module, message_name, message_factory.GetMessageClass(message_descriptor))

return module


config_pb2 = _build_module("config_pb2", "config.desc")
sheets_pb2 = _build_module("sheets_pb2", "sheets.desc")

__all__ = ["config_pb2", "sheets_pb2"]
Loading