-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstock_flash.py
executable file
·183 lines (149 loc) · 5.04 KB
/
stock_flash.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
convert servicefile xml to list of shell commands and run them
"""
import argparse
import hashlib
import logging
import os
import stat
import time
import xml.etree.ElementTree as ET
import sh
LOG = logging.getLogger()
LOG.setLevel("INFO")
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
ch = logging.StreamHandler()
ch.setFormatter(formatter)
LOG.addHandler(ch)
def parse_args():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
help="enable debug logging information",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="don't do anything, just print what would be done",
)
parser.add_argument(
"--max-retries",
default=3,
type=int,
help="retry the shell command NUM times in case of failure",
)
parser.add_argument(
"SRC_ROM_DIR",
nargs="?",
default=".",
help="use this directory as source instead of default (current directory)",
)
return parser.parse_args()
def ensure_device_connected(dry_run=False):
if dry_run:
print("skipping check for device as dry-run mode")
else:
assert fastboot("devices", dry_run=dry_run, timeout=5) != "", "no device found"
def ensure_servicefile_exists(servicefile):
assert stat.S_ISREG(os.stat(servicefile).st_mode), "servicefile.xml doesn't exist"
def fastboot(*args, num_retries=3, timeout=None, dry_run=False):
for i in range(num_retries):
try:
LOG.info(" ".join(args))
if not dry_run:
ret = sh.fastboot(*args, _timeout=timeout)
else:
ret = 0
return ret
except sh.TimeoutException as e:
LOG.warning("fastboot timeout: {}. try {}/{}".format(e, i, num_retries))
LOG.fatal(
"FASTBOOT FAILURE; max_retries = {} exceeded. Bailing now".format(num_retries)
)
raise sh.TimeoutException("max retries exceeded", full_cmd=" ".join(args))
def reboot_device(dry_run=False):
LOG.info("rebooting; waiting for device")
fastboot("reboot", "bootloader", timeout=20, dry_run=dry_run)
if not dry_run:
time.sleep(5)
def verify_file_md5(filepath, ref_md5sum):
with open(filepath, "rb") as f:
file_contents = f.read()
calc_md5sum = hashlib.md5(file_contents).hexdigest()
return ref_md5sum == calc_md5sum
def flash_device(
src_rom_dir, preprocessing=None, postprocessing=None, retries=3, dry_run=False
):
preprocessing = preprocessing or {}
postprocessing = postprocessing or {}
ensure_device_connected(dry_run)
servicefile = "{}/servicefile.xml".format(src_rom_dir)
ensure_servicefile_exists(servicefile)
tree = ET.parse(servicefile)
steps = tree.findall("./steps")
assert len(steps) == 1, "more than 1 `steps` section found"
steps = steps[0]
def getvar(step):
var = step.attrib["var"]
fastboot("getvar", var, timeout=5, dry_run=dry_run)
def oem(step):
var = step.attrib["var"]
fastboot("oem", var, timeout=5, dry_run=dry_run)
def flash(step):
filename = step.attrib["filename"]
partition = step.attrib["partition"]
md5sum = step.attrib["MD5"]
filepath = os.path.join(src_rom_dir, filename)
assert verify_file_md5(filepath, md5sum), "bad md5sum for file {}".format(
filepath
)
if filename in preprocessing:
preprocessing[filename](dry_run)
fastboot("flash", partition, filepath, dry_run=dry_run, timeout=60)
if filename in postprocessing:
postprocessing[filename](dry_run)
def erase(step):
partition = step.attrib["partition"]
if partition == "userdata":
ret = input("WARNING - REQUESTED ERASING USERDATA. THIS WILL ERASE ALL DATA ON PHONE. Are you sure about this? Enter \"YES\" TO CONTINUE: ")
if ret == "YES":
fastboot("erase", partition, dry_run=dry_run, timeout=20)
else:
print("SKIPPING ERASING USERDATA")
else:
fastboot("erase", partition, dry_run=dry_run, timeout=20)
operation_dispatch = {
"getvar": getvar,
"oem": oem,
"flash": flash,
"erase": erase,
}
for step in steps:
op = step.attrib["operation"]
operation_dispatch[op](step)
def main():
args = parse_args()
# system partition is large; reboot to make sure we're starting with a
# clean slate
preprocessing = {
"system.img": reboot_device,
"system.img_sparsechunk.0": reboot_device,
}
postprocessing = {
# reload bootloader after flashing
"bootloader.img": reboot_device
}
flash_device(
args.SRC_ROM_DIR,
preprocessing=preprocessing,
postprocessing=postprocessing,
dry_run=args.dry_run,
)
if __name__ == "__main__":
main()