Skip to content

Commit

Permalink
Refactor StreamCapture to handle long lines and improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
yihong1120 committed Jul 11, 2024
1 parent a70c042 commit 5bca273
Showing 1 changed file with 73 additions and 55 deletions.
128 changes: 73 additions & 55 deletions src/stream_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@


class InputData(TypedDict):
stream_url:str
capture_interval:int
stream_url: str
capture_interval: int


class ResultData(TypedDict):
Expand All @@ -38,16 +38,19 @@ def __init__(self, stream_url: str, capture_interval: int = 15):
self.cap: cv2.VideoCapture | None = None
self.capture_interval = capture_interval

def initialise_stream(self) -> None:
def initialise_stream(self, stream_url: str) -> None:
"""
Initialises the video stream and sets up the H264 codec.
Args:
stream_url (str): The URL of the stream to initialise.
"""
self.cap = cv2.VideoCapture(self.stream_url)
self.cap = cv2.VideoCapture(stream_url)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
if not self.cap.isOpened():
time.sleep(5)
self.cap.open(self.stream_url)
self.cap.open(stream_url)

def release_resources(self) -> None:
"""
Expand All @@ -58,28 +61,43 @@ def release_resources(self) -> None:
self.cap = None
gc.collect()

def capture_frames(self) -> Generator[tuple[cv2.Mat, float], None, None]:
def execute_capture(self) -> Generator[tuple[cv2.Mat, float], None, None]:
"""
Captures frames from the stream and yields them with timestamps.
Yields:
Tuple[cv2.Mat, float]: The captured frame and the timestamp.
"""
self.initialise_stream()
self.initialise_stream(self.stream_url)
last_process_time = datetime.datetime.now() - datetime.timedelta(
seconds=self.capture_interval,
)
fail_count = 0 # Counter for consecutive failures

while True:
if self.cap is None:
self.initialise_stream()
self.initialise_stream(self.stream_url)

ret, frame = self.cap.read() if self.cap else (False, None)
ret, frame = (
self.cap.read() if self.cap is not None else (False, None)
)

if not ret:
print('Failed to read frame, trying to reinitialise stream.')
fail_count += 1
print(
'Failed to read frame, trying to reinitialise stream. '
f"Fail count: {fail_count}",
)
self.release_resources()
self.initialise_stream()
self.initialise_stream(self.stream_url)
# After 5 consecutive failures, switch to generic capture
if fail_count >= 5:
print('Switching to generic frame capture method.')
yield from self.capture_generic_frames()
return
continue
else:
fail_count = 0 # Reset fail count on successful read

# Process the frame if the capture interval has elapsed
current_time = datetime.datetime.now()
Expand Down Expand Up @@ -126,7 +144,7 @@ def select_quality_based_on_speed(self) -> str | None:
try:
streams = streamlink.streams(self.stream_url)
available_qualities = list(streams.keys())
print('Available qualities:', available_qualities)
print(f"Available qualities: {available_qualities}")

if download_speed > 10:
preferred_qualities = [
Expand Down Expand Up @@ -154,11 +172,11 @@ def select_quality_based_on_speed(self) -> str | None:
print(f"Error selecting quality based on speed: {e}")
return None

def capture_youtube_frames(
def capture_generic_frames(
self,
) -> Generator[tuple[cv2.Mat, float], None, None]:
"""
Captures frames from a YouTube stream.
Captures frames from a generic stream.
Yields:
Tuple[cv2.Mat, float]: The captured frame and the timestamp.
Expand All @@ -168,49 +186,49 @@ def capture_youtube_frames(
print('Failed to get suitable stream quality.')
return

try:
self.cap = cv2.VideoCapture(stream_url)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264'))
last_process_time = datetime.datetime.now()
while True:
ret, frame = self.cap.read()
if not ret:
print('Failed to read frame from YouTube stream.')
continue

current_time = datetime.datetime.now()
if (
current_time - last_process_time
).total_seconds() >= self.capture_interval:
last_process_time = current_time
timestamp = current_time.timestamp()
yield frame, timestamp

# 清理內存
del frame, timestamp
gc.collect()

time.sleep(0.01) # Adjust the sleep time as needed
except Exception as e:
print(f"Error: {e}")
finally:
self.release_resources()
self.initialise_stream(stream_url)

def execute_capture(self) -> Generator[tuple[cv2.Mat, float], None, None]:
"""
Returns capture generator for stream type.
last_process_time = datetime.datetime.now()
fail_count = 0 # Counter for consecutive failures

Returns:
Generator[Tuple[cv2.Mat, float]]: Yields frames and timestamps.
"""
if (
'youtube.com' in self.stream_url.lower()
or 'youtu.be' in self.stream_url.lower()
):
return self.capture_youtube_frames()
else:
return self.capture_frames()
while True:
ret, frame = (
self.cap.read() if self.cap is not None else (False, None)
)
if not ret:
fail_count += 1
print(
'Failed to read frame from generic stream. '
f"Fail count: {fail_count}",
)
# After 5 consecutive failures, reinitialise the stream
if fail_count >= 5:
print('Reinitialising the generic stream.')
self.release_resources()
time.sleep(5)
stream_url = self.select_quality_based_on_speed()
if not stream_url:
print('Failed to get suitable stream quality.')
continue
self.initialise_stream(stream_url)
fail_count = 0
continue
else:
fail_count = 0 # Reset fail count on successful read

current_time = datetime.datetime.now()
elapsed_time = (current_time - last_process_time).total_seconds()

if elapsed_time >= self.capture_interval:
last_process_time = current_time
timestamp = current_time.timestamp()
yield frame, timestamp

# Clear memory
del frame, timestamp
gc.collect()

time.sleep(0.01) # Adjust the sleep time as needed

def update_capture_interval(self, new_interval: int) -> None:
"""
Expand Down

0 comments on commit 5bca273

Please sign in to comment.