-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbinder.py
487 lines (384 loc) · 15.7 KB
/
binder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
from __future__ import annotations
import argparse
import logging
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
import gdb
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(current_dir, 'lib'))
from rbtree import for_each_rb_entry, init as rbtree_init
from macros import for_each_entry, container_of
log = logging.getLogger(__name__)
NORMAL = "\x1b[0m"
BLACK = "\x1b[30m"
RED = "\x1b[31m"
GREEN = "\x1b[32m"
YELLOW = "\x1b[33m"
BLUE = "\x1b[97m"
def terminateWith(x: str, color: str) -> str:
return x.replace("\x1b[0m", NORMAL + color)
def colorize(x: str, color: str) -> str:
return color + terminateWith(str(x), color) + NORMAL
def red(x: str) -> str:
return colorize(x, RED)
def green(x: str) -> str:
return colorize(x, GREEN)
def yellow(x: str) -> str:
return colorize(x, YELLOW)
def blue(x: str) -> str:
return colorize(x, BLUE)
addrc = green
fieldnamec = blue
fieldvaluec = yellow
typenamec = red
import traceback
def exception_handler(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
print("Exception in GDB Python plugin:")
traceback.print_exc()
return wrapper
def read_ushort_from_address(address):
"""
从指定的 64 位地址读取一个 ushort (16-bit 无符号整型)
:param address: 目标地址 (int)
:return: ushort 数据 (int)
"""
try:
# 将地址表示为 GDB 的 Value 对象
addr_value = gdb.Value(address).cast(gdb.lookup_type("unsigned short").pointer())
# 解引用指针获取值
ushort_value = addr_value.dereference()
return int(ushort_value) # 转换为 Python 整型
except gdb.error as e:
print(f"Error reading ushort from address {hex(address)}: {e}")
return None
def for_each_transaction(addr: gdb.Value, field: str) -> Iterator[gdb.Value]:
typename = "struct binder_transaction"
addr_int = int(addr)
while addr_int != 0:
transaction = gdb.Value(addr).cast(gdb.lookup_type(typename).pointer())
yield transaction
addr = transaction[field]
addr_int = int(addr)
# TODO: pull this out from the slab command so we can reuse it
class IndentContextManager:
def __init__(self):
self.indent = 0
def __enter__(self):
self.indent += 1
def __exit__(self, exc_type, exc_value, exc_tb):
self.indent -= 1
assert self.indent >= 0
node_types = {
"waiting_threads": "struct binder_thread",
"todo": "struct binder_work",
"refs": "struct binder_ref",
"threads": "struct binder_thread",
"nodes": "struct binder_node",
"refs_by_node": "struct binder_ref",
}
entry_field_names = {
"waiting_threads": "waiting_thread_node",
"todo": "entry",
"refs": "node_entry",
}
rb_node_field_names = {
"threads": "rb_node",
"nodes": "rb_node",
"refs_by_node": "rb_node_node",
}
codenames = [x for x in dir(gdb) if "TYPE_CODE_" in x]
code_to_name = {getattr(gdb, name): name for name in codenames}
name_to_code = {v: k for k, v in code_to_name.items()}
# TODO: merge with for_each_entry?
def for_each_hlist_entry(head , typename, field) -> Iterator[gdb.Value]:
addr = head["first"]
addr_int = int(addr)
while addr_int != 0:
yield container_of(addr_int, typename, field)
addr = addr.dereference()["next"]
addr_int = int(addr)
class BinderVisitor:
def __init__(self, procs_addr):
self.indent = IndentContextManager()
self.addr = procs_addr # pwndbg.aglib.memory.get_typed_pointer_value("struct hlist_head", procs_addr)
def _format_indent(self, text: str) -> str:
return " " * self.indent.indent + text
def _format_heading(self, typename: str, fields: str, addr: int) -> str:
hex_addr = hex(addr)
return self._format_indent(
f"{fieldnamec(typename)} {fieldvaluec(fields)} ({addrc(hex_addr)})"
)
# TODO: do this in a cleaner, object-oriented way
@exception_handler
def _format_field(
self, field: Optional[str] = None, value: gdb.Value | str = "", only_heading: bool = True
) -> str:
if isinstance(value, gdb.Value):
t = value.type
if t.code == name_to_code["TYPE_CODE_TYPEDEF"]:
real_type = t.strip_typedefs()
# We only want to replace the typedef with the real type if the
# real type is not an anonymous struct
if real_type.name is not None:
t = real_type
if t.code == name_to_code["TYPE_CODE_INT"]:
value = int(value)
elif t.code == name_to_code["TYPE_CODE_BOOL"]:
value = True if value else False
elif t.code == name_to_code["TYPE_CODE_PTR"]:
typename = t.target().name
if int(value) == 0:
value = "NULL"
elif typename == "binder_proc":
value = self.format_proc(value, only_heading=only_heading).strip()
elif typename == "binder_thread":
value = self.format_thread(value, only_heading=only_heading).strip()
elif typename == "binder_node":
value = self.format_node(value).strip()
elif typename == "binder_ref":
value = self.format_ref(value, only_heading=only_heading).strip()
elif typename == "binder_work":
value = self.format_work(value.dereference()).strip()
elif typename == "binder_transaction":
value = self.format_transaction(value, only_heading=only_heading)
if only_heading:
value = value.strip()
else:
value = "\n" + "\n".join([" " + line for line in value.split("\n")])
else:
print(f"Warning: no formatter for pointer type {typename}")
elif t.code in [name_to_code["TYPE_CODE_STRUCT"], name_to_code["TYPE_CODE_TYPEDEF"]]:
typename = t.name
if typename == "spinlock":
value = self.format_spinlock(value).strip()
elif typename == "atomic_t":
value = value["counter"]
elif typename == "rb_root":
assert field is not None
value, num_elts = self.format_rb_tree(field, value)
field += f" [{num_elts}]"
elif typename in ["list_head", "hlist_head"]:
assert field is not None
value, num_elts = self.format_list(field, value, typename)
field += f" [{num_elts}]"
else:
print(f"Warning: no formatter for type {typename}")
print(t)
else:
print(f"Warning: no formatter for type code {t.code}")
output = ""
if field:
output += fieldnamec(field) + ": "
output += fieldvaluec(str(value))
return self._format_indent(output)
def format_rb_tree(self, field: str, value: gdb.Value) -> Tuple[str, int]:
res = []
node_type = node_types[field]
entry_field_name = rb_node_field_names[field]
with self.indent:
for entry in for_each_rb_entry(
value,
node_type,
entry_field_name,
):
s = self._format_field(value=entry, only_heading=False)
num_space = len(s) - len(s.lstrip())
res.append(" " * num_space + "* " + s.lstrip())
if len(res) == 0:
return "EMPTY", 0
# Prepend a newline so the list starts on the line after the field name
return "\n" + "\n".join(res), len(res)
@exception_handler
def format_list(self, field: str, value: gdb.Value, typename: str) -> Tuple[str, int]:
res = []
node_type = node_types[field]
entry_field_name = entry_field_names[field]
if typename == "list_head":
each_entry = for_each_entry
elif typename == "hlist_head":
each_entry = for_each_hlist_entry
else:
assert False
with self.indent:
for entry in each_entry(
value,
node_type,
entry_field_name,
):
s = self._format_field(value=entry)
num_space = len(s) - len(s.lstrip())
res.append(" " * num_space + "* " + s.lstrip())
if len(res) == 0:
return "EMPTY", 0
# Prepend a newline so the list starts on the line after the field name
return "\n" + "\n".join(res), len(res)
def _format_fields(self, obj: gdb.Value, fields: List[str], only_heading: bool = True) -> str:
res = []
for field in fields:
res.append(self._format_field(field, obj[field], only_heading=only_heading))
return "\n".join(res)
def visit(self):
for proc in for_each_hlist_entry(self.addr, "struct binder_proc", "proc_node"):
print(self.format_proc(proc))
print()
def format_proc(self, proc: gdb.Value, only_heading=False):
res = []
res.append(self._format_heading("binder_proc", "PID %s" % proc["pid"], int(proc)))
if only_heading:
return "\n".join(res)
with self.indent:
fields = [
"is_dead",
"tmp_ref",
"inner_lock",
"outer_lock",
"waiting_threads",
"todo",
"threads",
"nodes",
"refs_by_node",
]
res.append(self._format_fields(proc, fields))
return "\n".join(res)
def format_thread(self, thread: gdb.Value, only_heading: bool = False) -> str:
res = []
res.append(self._format_heading("binder_thread", "PID %s" % thread["pid"], int(thread)))
if only_heading:
return "\n".join(res)
with self.indent:
fields = ["tmp_ref", "looper_need_return", "process_todo", "is_dead", "todo"]
res.append(self._format_fields(thread, fields))
# We need to print this separately since we wanted print the entire
# object and not just the heading
res.append(self._format_fields(thread, ["transaction_stack"], only_heading=False))
return "\n".join(res)
def format_transaction(self, transaction: gdb.Value, only_heading: bool = False) -> str:
res = []
res.append(
self._format_heading(
"binder_transaction", "ID %s" % str(transaction["debug_id"]), int(transaction)
)
)
if only_heading:
return "\n".join(res)
with self.indent:
res.append(self._format_fields(transaction, ["lock", "to_proc", "from", "to_thread"]))
if int(transaction["from_parent"]) == 0:
res.append(self._format_field("from_parent", "NULL"))
else:
res.append(self._format_field("from_parent"))
with self.indent:
for transaction in for_each_transaction(
transaction["from_parent"], "from_parent"
):
res.append(self.format_transaction(transaction))
if int(transaction["to_parent"]) == 0:
res.append(self._format_field("to_parent", "NULL"))
else:
res.append(self._format_field("to_parent"))
with self.indent:
for transaction in for_each_transaction(transaction["to_parent"], "to_parent"):
res.append(self.format_transaction(transaction))
return "\n".join(res)
def format_node(self, node: gdb.Value) -> str:
res = []
res.append(self._format_heading("binder_node", "", int(node)))
with self.indent:
fields = [
"lock",
"internal_strong_refs",
"local_weak_refs",
"local_strong_refs",
"ptr",
"cookie",
"tmp_refs",
"refs",
]
res.append(self._format_fields(node, fields))
return "\n".join(res)
def format_ref(self, ref: gdb.Value, only_heading: bool = False):
res = []
res.append(self._format_heading("binder_ref", "HANDLE %s" % ref["data"]["desc"], int(ref)))
if only_heading:
return "\n".join(res)
with self.indent:
fields = ["strong", "weak"]
res.append(self._format_fields(ref["data"], fields))
return "\n".join(res)
def format_work(self, work: gdb.Value):
res = []
res.append(self._format_heading("binder_work", str(work["type"]), int(work.address)))
t = int(work["type"])
# TODO: Create enum
if t == 1:
obj = container_of(int(work.address), "struct binder_transaction", "work")
elif t in [2, 3]:
return "\n".join(res) # These are just binder_work objects
elif t == 4:
obj = container_of(int(work.address), "struct binder_error", "work")
elif t == 5:
obj = container_of(int(work.address), "struct binder_node", "work")
elif t in [6, 7, 8]:
obj = container_of(int(work.address), "struct binder_ref_death", "work")
else:
assert False
with self.indent:
res.append(self._format_field(value=obj))
return "\n".join(res)
def print_object(self, obj: gdb.Value):
# TODO: type
print(obj)
def format_spinlock(self, lock: gdb.Value) -> str:
raw_lock = lock["rlock"]["raw_lock"]
# val = pwndbg.aglib.memory.ushort(int(raw_lock.address))
val = read_ushort_from_address(int(raw_lock.address))
locked = val & 0xFF
pending = val >> 8
return self._format_heading("", f"LOCKED: {locked} PENDING: {pending}", int(lock.address))
parser = argparse.ArgumentParser(description="Show Android Binder information")
class Binder(gdb.Command):
def __init__(self):
super(Binder, self).__init__("binder2", gdb.COMMAND_DATA)
@exception_handler
def invoke(self, arg, from_tty):
# log.warning("This command is a work in progress and may not work as expected.")
procs_addr = gdb.parse_and_eval("binder_procs")
bv = BinderVisitor(procs_addr)
bv.visit()
class BinderProc(gdb.Command):
def __init__(self):
super(BinderProc, self).__init__("binder_proc", gdb.COMMAND_DATA)
@exception_handler
def invoke(self, arg, from_tty):
argv = gdb.string_to_argv(arg)
if not argv:
raise gdb.GdbError("proc addr provided")
procs_addr = gdb.parse_and_eval("binder_procs")
proc_struct = gdb.lookup_type("struct binder_proc")
bv = BinderVisitor(procs_addr)
proc = gdb.Value(int(argv[0], 16)).cast(proc_struct.pointer())
print(bv.format_proc(proc))
class BinderNode(gdb.Command):
def __init__(self):
super(BinderNode, self).__init__("binder_node", gdb.COMMAND_DATA)
@exception_handler
def invoke(self, arg, from_tty):
argv = gdb.string_to_argv(arg)
if not argv:
raise gdb.GdbError("proc addr provided")
procs_addr = gdb.parse_and_eval("binder_procs")
node_struct = gdb.lookup_type("struct binder_node")
bv = BinderVisitor(procs_addr)
node = gdb.Value(int(argv[0], 16)).cast(node_struct.pointer())
print(bv.format_node(node))
# rb tree init
rbtree_init()
Binder()
BinderProc()
BinderNode()