|
6 | 6 |
|
7 | 7 | # import the Queue class from Python 3
|
8 | 8 | if sys.version_info >= (3, 0):
|
9 |
| - from queue import Queue |
| 9 | + from queue import Queue, Full, Empty |
10 | 10 |
|
11 | 11 | # otherwise, import the Queue class for Python 2.7
|
12 | 12 | else:
|
13 |
| - from Queue import Queue |
| 13 | + from Queue import Queue |
14 | 14 |
|
15 | 15 |
|
16 | 16 | class FileVideoStream:
|
17 |
| - def __init__(self, path, transform=None, queue_size=128): |
18 |
| - # initialize the file video stream along with the boolean |
19 |
| - # used to indicate if the thread should be stopped or not |
20 |
| - self.stream = cv2.VideoCapture(path) |
21 |
| - self.stopped = False |
22 |
| - self.transform = transform |
23 |
| - |
24 |
| - # initialize the queue used to store frames read from |
25 |
| - # the video file |
26 |
| - self.Q = Queue(maxsize=queue_size) |
27 |
| - # intialize thread |
28 |
| - self.thread = Thread(target=self.update, args=()) |
29 |
| - self.thread.daemon = True |
30 |
| - |
31 |
| - def start(self): |
32 |
| - # start a thread to read frames from the file video stream |
33 |
| - self.thread.start() |
34 |
| - return self |
35 |
| - |
36 |
| - def update(self): |
37 |
| - # keep looping infinitely |
38 |
| - while True: |
39 |
| - # if the thread indicator variable is set, stop the |
40 |
| - # thread |
41 |
| - if self.stopped: |
42 |
| - break |
43 |
| - |
44 |
| - # otherwise, ensure the queue has room in it |
45 |
| - if not self.Q.full(): |
46 |
| - # read the next frame from the file |
47 |
| - (grabbed, frame) = self.stream.read() |
48 |
| - |
49 |
| - # if the `grabbed` boolean is `False`, then we have |
50 |
| - # reached the end of the video file |
51 |
| - if not grabbed: |
52 |
| - self.stopped = True |
53 |
| - |
54 |
| - # if there are transforms to be done, might as well |
55 |
| - # do them on producer thread before handing back to |
56 |
| - # consumer thread. ie. Usually the producer is so far |
57 |
| - # ahead of consumer that we have time to spare. |
58 |
| - # |
59 |
| - # Python is not parallel but the transform operations |
60 |
| - # are usually OpenCV native so release the GIL. |
61 |
| - # |
62 |
| - # Really just trying to avoid spinning up additional |
63 |
| - # native threads and overheads of additional |
64 |
| - # producer/consumer queues since this one was generally |
65 |
| - # idle grabbing frames. |
66 |
| - if self.transform: |
67 |
| - frame = self.transform(frame) |
68 |
| - |
69 |
| - # add the frame to the queue |
70 |
| - self.Q.put(frame) |
71 |
| - else: |
72 |
| - time.sleep(0.1) # Rest for 10ms, we have a full queue |
73 |
| - |
74 |
| - self.stream.release() |
75 |
| - |
76 |
| - def read(self): |
77 |
| - # return next frame in the queue |
78 |
| - return self.Q.get() |
79 |
| - |
80 |
| - # Insufficient to have consumer use while(more()) which does |
81 |
| - # not take into account if the producer has reached end of |
82 |
| - # file stream. |
83 |
| - def running(self): |
84 |
| - return self.more() or not self.stopped |
85 |
| - |
86 |
| - def more(self): |
87 |
| - # return True if there are still frames in the queue. If stream is not stopped, try to wait a moment |
88 |
| - tries = 0 |
89 |
| - while self.Q.qsize() == 0 and not self.stopped and tries < 5: |
90 |
| - time.sleep(0.1) |
91 |
| - tries += 1 |
92 |
| - |
93 |
| - return self.Q.qsize() > 0 |
94 |
| - |
95 |
| - def stop(self): |
96 |
| - # indicate that the thread should be stopped |
97 |
| - self.stopped = True |
98 |
| - # wait until stream resources are released (producer thread might be still grabbing frame) |
99 |
| - self.thread.join() |
| 17 | + |
| 18 | + def __init__(self, path, transform=None, queue_size=128, skip_frames=True, read_timeout=2): |
| 19 | + # initialize the file video stream along with the boolean |
| 20 | + # used to indicate if the thread should be stopped or not |
| 21 | + self.queue_get_timeout = read_timeout |
| 22 | + self.skip_frames = skip_frames |
| 23 | + self.stream = cv2.VideoCapture(path) |
| 24 | + self.total_frames = self.stream.get(cv2.CAP_PROP_FRAME_COUNT) |
| 25 | + self.skipped_frames = 0 |
| 26 | + self.stopped = False |
| 27 | + self.transform = transform |
| 28 | + |
| 29 | + # initialize the queue used to store frames read from |
| 30 | + # the video file |
| 31 | + self.Q = Queue(maxsize=queue_size) |
| 32 | + # initialize thread |
| 33 | + self.thread = Thread(target=self.update, args=(), daemon=True) |
| 34 | + |
| 35 | + def is_check_eos(self): |
| 36 | + return self.stream.get(cv2.CAP_PROP_POS_FRAMES) >= self.total_frames |
| 37 | + |
| 38 | + def start(self): |
| 39 | + # start a thread to read frames from the file video stream |
| 40 | + self.thread.start() |
| 41 | + return self |
| 42 | + |
| 43 | + def get_frame(self): |
| 44 | + # grab the current frame |
| 45 | + flag, frame = self.stream.read() |
| 46 | + |
| 47 | + while not flag and self.skip_frames: |
| 48 | + if self.is_check_eos(): |
| 49 | + return None, None |
| 50 | + if self.skipped_frames == 0: |
| 51 | + print(f"Skipping frame(s)") |
| 52 | + self.skipped_frames += 1 |
| 53 | + flag, frame = self.stream.read() |
| 54 | + if self.skipped_frames > 0: |
| 55 | + print(f"Resuming video...") |
| 56 | + self.skipped_frames = 0 |
| 57 | + |
| 58 | + return flag, frame |
| 59 | + |
| 60 | + def update(self): |
| 61 | + while not self.stopped: |
| 62 | + # read the next frame from the file |
| 63 | + grabbed, frame = self.get_frame() |
| 64 | + # if the `grabbed` boolean is `False`, then we have |
| 65 | + # reached the end of the video file |
| 66 | + if grabbed is None or not grabbed: |
| 67 | + print("Could not grab") |
| 68 | + self.stopped = True |
| 69 | + break |
| 70 | + |
| 71 | + # if there are transforms to be done, might as well |
| 72 | + # do them on producer thread before handing back to |
| 73 | + # consumer thread. i.e. Usually the producer is so far |
| 74 | + # ahead of consumer that we have time to spare. |
| 75 | + # |
| 76 | + # Python is not parallel but the transform operations |
| 77 | + # are typically OpenCV native so release the GIL. |
| 78 | + # |
| 79 | + # Really just trying to avoid spinning up additional |
| 80 | + # native threads and overheads of additional |
| 81 | + # producer/consumer queues since this one was generally |
| 82 | + # idle grabbing frames. |
| 83 | + if self.transform: |
| 84 | + frame = self.transform(frame) |
| 85 | + while True: |
| 86 | + try: |
| 87 | + # try to put it into the queue |
| 88 | + self.Q.put(frame, True, 0.5) |
| 89 | + break |
| 90 | + except Full: |
| 91 | + print("Queue is full") |
| 92 | + if self.stopped: |
| 93 | + break |
| 94 | + print("Release") |
| 95 | + self.stream.release() |
| 96 | + |
| 97 | + def dim(self): |
| 98 | + width = int(self.stream.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| 99 | + height = int(self.stream.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| 100 | + |
| 101 | + return [width, height] |
| 102 | + |
| 103 | + def read(self): |
| 104 | + # return next frame in the queue |
| 105 | + try: |
| 106 | + return self.Q.get(timeout=self.queue_get_timeout) |
| 107 | + except Empty: |
| 108 | + return None |
| 109 | + |
| 110 | + # Insufficient to have consumer use while(more()) which does |
| 111 | + # not take into account if the producer has reached end of |
| 112 | + # file stream. |
| 113 | + def running(self): |
| 114 | + return self.more() or not self.stopped |
| 115 | + |
| 116 | + def more(self): |
| 117 | + # return True if there are still frames in the queue. If stream is not stopped, try to wait a moment |
| 118 | + tries = 0 |
| 119 | + while self.Q.qsize() == 0 and not self.stopped and tries < 5: |
| 120 | + time.sleep(0.1) |
| 121 | + tries += 1 |
| 122 | + |
| 123 | + return self.Q.qsize() > 0 |
| 124 | + |
| 125 | + def stop(self): |
| 126 | + # indicate that the thread should be stopped |
| 127 | + self.stopped = True |
| 128 | + return self |
0 commit comments