-
Notifications
You must be signed in to change notification settings - Fork 0
/
capture.py
78 lines (67 loc) · 2.24 KB
/
capture.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import cv2
import queue
import threading
# 无缓存读取视频流类
# class VideoCapture:
# def __init__(self, addr):
# self.cap = cv2.VideoCapture(addr) # 打开视频流
# self.q = queue.Queue() # 创建队列
# self.t = threading.Thread(target=self._reader)
# self.t.daemon = True
# self.t.start()
# # 帧可用时立即读取帧,只保留最新的帧
# def _reader(self):
# while True:
# ret, frame = self.cap.read()
# if not ret:
# break
# if not self.q.empty():
# try:
# self.q.get_nowait() # 删除上一个(未处理的)帧
# except queue.Empty:
# pass
# self.q.put(frame)
# def read(self):
# return self.q.get()
# def release(self):
# self.cap.release()
# # 停止线程self.t
# self.t.stop()
# self.t.join()
class RTSCapture(cv2.VideoCapture):
"""
Real Time Streaming Capture Class
"""
def __init__(self, index, apiPreference=cv2.CAP_DSHOW):
super().__init__(index=index, apiPreference=apiPreference)
self.q = queue.Queue() # 创建队列
self.t = threading.Thread(target=self._reader)
self.t.daemon = True
self.t.start()
# 帧可用时立即读取帧,只保留最新的帧
def _reader(self):
while True:
ret, frame = self.read()
if not ret:
break
if not self.q.empty():
try:
self.q.get_nowait() # 删除上一个(未处理的)帧
except queue.Empty:
pass
self.q.put(frame)
def read_latest_frame(self):
return self.q.get()
def release(self):
super().release()
# 停止线程self.t
if self.t.is_alive():
self.t.join()
if __name__ == '__main__':
# cap = VideoCapture(0)
cap = RTSCapture(0, cv2.CAP_DSHOW)
while True:
frame = cap.read_latest_frame()
cv2.imshow("frame", frame)
if chr(cv2.waitKey(1)&255) == 'q':
break