diff --git a/src/stream_capture.py b/src/stream_capture.py index ea2a596..1278eb7 100644 --- a/src/stream_capture.py +++ b/src/stream_capture.py @@ -13,8 +13,8 @@ class InputData(TypedDict): - stream_url:str - capture_interval:int + stream_url: str + capture_interval: int class ResultData(TypedDict): @@ -38,16 +38,19 @@ def __init__(self, stream_url: str, capture_interval: int = 15): self.cap: cv2.VideoCapture | None = None self.capture_interval = capture_interval - def initialise_stream(self) -> None: + def initialise_stream(self, stream_url: str) -> None: """ Initialises the video stream and sets up the H264 codec. + + Args: + stream_url (str): The URL of the stream to initialise. """ - self.cap = cv2.VideoCapture(self.stream_url) + self.cap = cv2.VideoCapture(stream_url) self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) if not self.cap.isOpened(): time.sleep(5) - self.cap.open(self.stream_url) + self.cap.open(stream_url) def release_resources(self) -> None: """ @@ -58,28 +61,43 @@ def release_resources(self) -> None: self.cap = None gc.collect() - def capture_frames(self) -> Generator[tuple[cv2.Mat, float], None, None]: + def execute_capture(self) -> Generator[tuple[cv2.Mat, float], None, None]: """ Captures frames from the stream and yields them with timestamps. Yields: Tuple[cv2.Mat, float]: The captured frame and the timestamp. """ - self.initialise_stream() + self.initialise_stream(self.stream_url) last_process_time = datetime.datetime.now() - datetime.timedelta( seconds=self.capture_interval, ) + fail_count = 0 # Counter for consecutive failures + while True: if self.cap is None: - self.initialise_stream() + self.initialise_stream(self.stream_url) - ret, frame = self.cap.read() if self.cap else (False, None) + ret, frame = ( + self.cap.read() if self.cap is not None else (False, None) + ) if not ret: - print('Failed to read frame, trying to reinitialise stream.') + fail_count += 1 + print( + 'Failed to read frame, trying to reinitialise stream. ' + f"Fail count: {fail_count}", + ) self.release_resources() - self.initialise_stream() + self.initialise_stream(self.stream_url) + # After 5 consecutive failures, switch to generic capture + if fail_count >= 5: + print('Switching to generic frame capture method.') + yield from self.capture_generic_frames() + return continue + else: + fail_count = 0 # Reset fail count on successful read # Process the frame if the capture interval has elapsed current_time = datetime.datetime.now() @@ -126,7 +144,7 @@ def select_quality_based_on_speed(self) -> str | None: try: streams = streamlink.streams(self.stream_url) available_qualities = list(streams.keys()) - print('Available qualities:', available_qualities) + print(f"Available qualities: {available_qualities}") if download_speed > 10: preferred_qualities = [ @@ -154,11 +172,11 @@ def select_quality_based_on_speed(self) -> str | None: print(f"Error selecting quality based on speed: {e}") return None - def capture_youtube_frames( + def capture_generic_frames( self, ) -> Generator[tuple[cv2.Mat, float], None, None]: """ - Captures frames from a YouTube stream. + Captures frames from a generic stream. Yields: Tuple[cv2.Mat, float]: The captured frame and the timestamp. @@ -168,49 +186,49 @@ def capture_youtube_frames( print('Failed to get suitable stream quality.') return - try: - self.cap = cv2.VideoCapture(stream_url) - self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) - self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'H264')) - last_process_time = datetime.datetime.now() - while True: - ret, frame = self.cap.read() - if not ret: - print('Failed to read frame from YouTube stream.') - continue - - current_time = datetime.datetime.now() - if ( - current_time - last_process_time - ).total_seconds() >= self.capture_interval: - last_process_time = current_time - timestamp = current_time.timestamp() - yield frame, timestamp - - # 清理內存 - del frame, timestamp - gc.collect() - - time.sleep(0.01) # Adjust the sleep time as needed - except Exception as e: - print(f"Error: {e}") - finally: - self.release_resources() + self.initialise_stream(stream_url) - def execute_capture(self) -> Generator[tuple[cv2.Mat, float], None, None]: - """ - Returns capture generator for stream type. + last_process_time = datetime.datetime.now() + fail_count = 0 # Counter for consecutive failures - Returns: - Generator[Tuple[cv2.Mat, float]]: Yields frames and timestamps. - """ - if ( - 'youtube.com' in self.stream_url.lower() - or 'youtu.be' in self.stream_url.lower() - ): - return self.capture_youtube_frames() - else: - return self.capture_frames() + while True: + ret, frame = ( + self.cap.read() if self.cap is not None else (False, None) + ) + if not ret: + fail_count += 1 + print( + 'Failed to read frame from generic stream. ' + f"Fail count: {fail_count}", + ) + # After 5 consecutive failures, reinitialise the stream + if fail_count >= 5: + print('Reinitialising the generic stream.') + self.release_resources() + time.sleep(5) + stream_url = self.select_quality_based_on_speed() + if not stream_url: + print('Failed to get suitable stream quality.') + continue + self.initialise_stream(stream_url) + fail_count = 0 + continue + else: + fail_count = 0 # Reset fail count on successful read + + current_time = datetime.datetime.now() + elapsed_time = (current_time - last_process_time).total_seconds() + + if elapsed_time >= self.capture_interval: + last_process_time = current_time + timestamp = current_time.timestamp() + yield frame, timestamp + + # Clear memory + del frame, timestamp + gc.collect() + + time.sleep(0.01) # Adjust the sleep time as needed def update_capture_interval(self, new_interval: int) -> None: """