-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathMagiaTimeline.py
149 lines (127 loc) · 6.04 KB
/
MagiaTimeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import cv2 as cv
import av.container
import av.container.input
import av.video
import av.video.stream
import argparse
import json
import jsonschema
import yaml
import fractions
import time
import typing
from Rectangle import *
from IR import *
from Util import *
from Strategies.MagirecoStrategy import *
from Strategies.MagirecoScene0Strategy import *
from Strategies.LimbusCompanyStrategy import *
from Strategies.LimbusCompanyMechanicsStrategy import *
from Strategies.PokemonEmeraldStrategy import *
from Strategies.ParakoStrategy import *
from Strategies.BanGDreamStrategy import *
from Strategies.OutlineStrategy import *
from Strategies.BoxColourStatStrategy import *
from Engines.SpeculativeEngine import *
from Engines.FramewiseEngine import *
from ExtraJobs import *
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--config", type=str, default="config.yml", help="config file stating parameters to run with")
parser.add_argument("--schema", type=str, default="ConfigSchema.json", help="schema file specifying the format of config file")
args = parser.parse_args()
schema = json.load(open(args.schema, "r"))
config = yaml.load(open(args.config, "r").read(), Loader=yaml.FullLoader)
if True: # config validation
jsonschema.validate(config, schema=schema) # raises exception on failure
if len(config["source"]) != len(config["destination"]):
raise Exception("Source and destination have different length")
for src in config["source"]:
srcMp4Test = open(src, "rb") # raises exception on failure
srcMp4Test.close()
if not config["strategy"] in config:
raise Exception("No config found for strategy \"" + config["strategy"] + "\"")
if not config["preset"] in config[config["strategy"]]:
raise Exception("No preset \"" + config["preset"] + "\" found for strategy \"" + config["strategy"] + "\"")
if not config["engine"] in config:
raise Exception("No config found for engine \"" + config["engine"] + "\"")
strategyConfig = config[config["strategy"]][config["preset"]]
engineConfig = config[config["engine"]]
cv.ocl.setUseOpenCL(config["enableOpenCL"])
for nTask, src in enumerate(config["source"]):
timeStart = time.time()
dst = config["destination"][nTask]
print("")
print("Task {}: {} -> {}".format(nTask, src, dst))
srcContainer: "av.container.InputContainer" = av.open(src, mode='r')
srcStream: "av.video.stream.VideoStream" = srcContainer.streams.video[0]
srcStream.thread_type = 'FRAME'
size: typing.Tuple[int, int] = (srcStream.codec_context.width, srcStream.codec_context.height)
timeBase: fractions.Fraction = srcStream.time_base
fps: fractions.Fraction = srcStream.average_rate
templateAsst = open(config["assTemplate"], "r")
asstStr: str = templateAsst.read()
templateAsst.close()
dstAss = open(dst + ".ass", "w")
contentRect = RatioRectangle(SrcRectangle(*size), *config["contentRect"])
strategy: AbstractStrategy | None = None
print("Strategy:", config["strategy"])
print("Preset:", config["preset"])
if config["strategy"] == "mr":
strategy = MagirecoStrategy(strategyConfig, contentRect)
elif config["strategy"] == "mr-s0":
strategy = MagirecoScene0Strategy(strategyConfig, contentRect)
elif config["strategy"] == "lcb":
strategy = LimbusCompanyStrategy(strategyConfig, contentRect)
elif config["strategy"] == "lcb-mech":
strategy = LimbusCompanyMechanicsStrategy(strategyConfig, contentRect)
elif config["strategy"] == "pkm":
strategy = PokemonEmeraldStrategy(strategyConfig, contentRect)
elif config["strategy"] == "prk":
strategy = ParakoStrategy(strategyConfig, contentRect)
elif config["strategy"] == "bdr":
strategy = BanGDreamStrategy(strategyConfig, contentRect)
elif config["strategy"] == "otl":
strategy = OutlineStrategy(strategyConfig, contentRect)
elif config["strategy"] == "bcs":
strategy = BoxColourStatStrategy(strategyConfig, contentRect)
else:
raise Exception("Unknown strategy! ")
assert strategy is not None
engine: AbstractEngine | None = None
print("Engine:", config["engine"])
if config["engine"] == "speculative":
engine = SpeculativeEngine(engineConfig)
elif config["engine"] == "framewise":
engine = FramewiseEngine(engineConfig)
else:
raise Exception("Unknown engine! ")
assert engine is not None
print("==== Running Engine ====")
iir: IIR = engine.checkAndRun(strategy, srcContainer, srcStream)
print("==== IIR to ASS ====")
asstStr = asstStr.format(styles = "".join(strategy.getStyles()), events = iir.toAss(timeBase))
dstAss.write(asstStr)
dstAss.close()
timeTimelineEnd = time.time()
timeTimelineElapsed = timeTimelineEnd - timeStart
print("Timeline Elapsed", timeTimelineElapsed, "s")
print("Timeline Speed", float(srcStream.frames / fps) / timeTimelineElapsed, "x")
if "ocr" in config["extraJobs"]:
print("Extra job: ocr")
if not isinstance(strategy, AbstractOcrStrategy):
print("Error: Strategy does not support OCR. Skipping. ")
continue
iirOcrPass = IIROcrPass(config["ocr"], dst, strategy)
iirOcrPass.apply(iir)
timeOverallEnd = time.time()
timeOverallElapsed = timeOverallEnd - timeStart
print("Overall Elapsed", timeOverallElapsed, "s")
print("Overall Speed", float(srcStream.frames / fps) / timeOverallElapsed, "x")
srcContainer.close()
if __name__ == "__main__":
try:
main()
except Exception as e:
print("Exception caught: ", e)
input("Press Enter to continue...")