-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdetector.py
More file actions
211 lines (161 loc) · 7.4 KB
/
detector.py
File metadata and controls
211 lines (161 loc) · 7.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import cv2
import numpy as np
class AvatarDetector:
def __init__(self, video_path, speakers, config):
self.video_path = video_path
self.speakers = speakers
self.config = config
self.cap = cv2.VideoCapture(video_path)
if not self.cap.isOpened():
raise ValueError(f"Could not open video: {video_path}")
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Determine baseline for each speaker
self.baselines = self._calculate_baselines()
# Reset capture to start
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
# Metrics storage
# { speaker_name: { 'brightness': [], 'motion': [] } }
self.metrics = {sp['name']: {'brightness': [], 'motion': []} for sp in speakers}
def _calculate_baselines(self):
baselines = {}
for sp in self.speakers:
idle_time = sp.get('idle_time', 0.0)
frame_idx = int(idle_time * self.fps)
frame_idx = max(0, min(frame_idx, self.frame_count - 1))
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = self.cap.read()
if not ret:
print(f"Warning: Could not read frame at {idle_time}s for speaker {sp['name']}. Using default baseline 0.")
baselines[sp['name']] = 0.0
continue
x, y, w, h = sp['rect']
y1 = max(0, y)
y2 = min(self.height, y + h)
x1 = max(0, x)
x2 = min(self.width, x + w)
if x1 >= x2 or y1 >= y2:
baselines[sp['name']] = 0.0
continue
roi = frame[y1:y2, x1:x2]
avg_brightness = np.mean(roi) if roi.size > 0 else 0
baselines[sp['name']] = avg_brightness
return baselines
def process_video(self):
ret, prev_frame = self.cap.read()
if not ret:
self.cap.release()
return self.metrics
# Initialize motion for first frame as 0
for sp in self.speakers:
x, y, w, h = sp['rect']
y1 = max(0, y)
y2 = min(self.height, y + h)
x1 = max(0, x)
x2 = min(self.width, x + w)
roi = prev_frame[y1:y2, x1:x2]
avg_brightness = np.mean(roi) if roi.size > 0 else 0
self.metrics[sp['name']]['brightness'].append(avg_brightness)
self.metrics[sp['name']]['motion'].append(0.0)
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
while True:
ret, frame = self.cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
for sp in self.speakers:
x, y, w, h = sp['rect']
y1 = max(0, y)
y2 = min(self.height, y + h)
x1 = max(0, x)
x2 = min(self.width, x + w)
if x1 >= x2 or y1 >= y2:
self.metrics[sp['name']]['brightness'].append(0)
self.metrics[sp['name']]['motion'].append(0)
continue
roi = frame[y1:y2, x1:x2]
avg_brightness = np.mean(roi) if roi.size > 0 else 0
# Motion
roi_gray = gray[y1:y2, x1:x2]
prev_roi_gray = prev_gray[y1:y2, x1:x2]
diff = cv2.absdiff(roi_gray, prev_roi_gray)
motion_score = np.sum(diff)
self.metrics[sp['name']]['brightness'].append(avg_brightness)
self.metrics[sp['name']]['motion'].append(motion_score)
prev_gray = gray
self.cap.release()
return self.metrics
def analyze_segments(self):
"""
Convert raw metrics into active segments.
Returns: { speaker_name: [(start_time, end_time), ...] }
"""
segments = {}
for sp_name, data in self.metrics.items():
brightness = np.array(data['brightness'])
motion = np.array(data['motion'])
baseline = self.baselines.get(sp_name, 0.0)
t_bright_delta = self.config.get('threshold_brightness', 50)
t_motion = self.config.get('threshold_motion', 1000)
# Brightness condition: current > baseline + threshold
# Or perhaps abs(current - baseline) > threshold?
# "avatar is in idle ... relatively dark". So speaking is brighter.
# So simple (current > baseline + threshold) is correct.
is_active_brightness = brightness > (baseline + t_bright_delta)
is_active_motion = motion > t_motion
is_active = np.logical_or(is_active_brightness, is_active_motion)
min_dur_seconds = self.config.get('min_duration_seconds')
merge_gap_seconds = self.config.get('merge_gap_seconds')
if min_dur_seconds is not None:
min_dur = int(min_dur_seconds * self.fps)
else:
min_dur = self.config.get('min_duration_frames', 5)
if merge_gap_seconds is not None:
merge_gap = int(merge_gap_seconds * self.fps)
else:
merge_gap = self.config.get('merge_gap_frames', 10)
sp_segments = self._get_segments_from_bool(is_active, min_dur, merge_gap)
time_segments = []
for start_f, end_f in sp_segments:
start_t = start_f / self.fps
end_t = end_f / self.fps
time_segments.append((start_t, end_t))
segments[sp_name] = time_segments
return segments
def _get_segments_from_bool(self, bool_array, min_dur, merge_gap):
frames = len(bool_array)
raw_segments = []
in_segment = False
start = 0
for i in range(frames):
if bool_array[i]:
if not in_segment:
start = i
in_segment = True
else:
if in_segment:
raw_segments.append([start, i])
in_segment = False
if in_segment:
raw_segments.append([start, frames])
if not raw_segments:
return []
merged_segments = []
if not raw_segments:
return []
curr_start, curr_end = raw_segments[0]
for i in range(1, len(raw_segments)):
next_start, next_end = raw_segments[i]
if next_start - curr_end <= merge_gap:
curr_end = next_end
else:
merged_segments.append((curr_start, curr_end))
curr_start, curr_end = next_start, next_end
merged_segments.append((curr_start, curr_end))
final_segments = []
for s, e in merged_segments:
if (e - s) >= min_dur:
final_segments.append((s, e))
return final_segments