From a34afcf5a6dce6c4fe250fb76d0d16753174de8a Mon Sep 17 00:00:00 2001 From: yihong1120 Date: Thu, 25 Jul 2024 17:17:14 +0800 Subject: [PATCH] Add controlled zone monitoring func --- main.py | 18 ++- src/danger_detector.py | 62 ++++++++- src/drawing_manager.py | 243 +++++++++++++++++++++++++++++++++++ src/live_stream_detection.py | 148 +-------------------- 4 files changed, 314 insertions(+), 157 deletions(-) create mode 100644 src/drawing_manager.py diff --git a/main.py b/main.py index e28ac47..973951d 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,7 @@ from dotenv import load_dotenv from src.danger_detector import DangerDetector +from src.drawing_manager import DrawingManager from src.line_notifier import LineNotifier from src.live_stream_detection import LiveStreamDetector from src.monitor_logger import LoggerConfig @@ -62,11 +63,13 @@ def main( """ # Load environment variables load_dotenv() - api_url = os.getenv('API_URL', 'http://localhost:5000') # Initialise the stream capture object streaming_capture = StreamCapture(stream_url=video_url) + # Get the API URL from environment variables + api_url = os.getenv('API_URL', 'http://localhost:5000') + # Initialise the live stream detector live_stream_detector = LiveStreamDetector( api_url=api_url, @@ -75,6 +78,9 @@ def main( run_local=run_local, ) + # Initialise the drawing manager + drawing_manager = DrawingManager() + # Initialise the LINE notifier line_notifier = LineNotifier(line_token) @@ -96,8 +102,8 @@ def main( datas, _ = live_stream_detector.generate_detections(frame) # Draw the detections on the frame - frame_with_detections = live_stream_detector.draw_detections_on_frame( - frame, datas, + frame_with_detections, controlled_zone_polygon = ( + drawing_manager.draw_detections_on_frame(frame, datas) ) # Convert the frame to a byte array @@ -106,7 +112,7 @@ def main( # Save the frame with detections # save_file_name = f'{label}_{image_name}_{detection_time}' - # live_stream_detector.save_frame( + # drawing_manager.save_frame( # frame_with_detections, # save_file_name # ) @@ -125,7 +131,9 @@ def main( (7 <= current_hour < 18) ): # Check for warnings and send notifications if necessary - warnings = danger_detector.detect_danger(datas) + warnings = danger_detector.detect_danger( + datas, controlled_zone_polygon, + ) # Check if there are any warnings if warnings: diff --git a/src/danger_detector.py b/src/danger_detector.py index d67f41e..a8e188c 100644 --- a/src/danger_detector.py +++ b/src/danger_detector.py @@ -2,6 +2,10 @@ from typing import TypedDict +from shapely.geometry import MultiPoint +from shapely.geometry import Point +from shapely.geometry import Polygon + class InputData(TypedDict): x1: float @@ -27,23 +31,68 @@ def __init__(self): """ pass - def detect_danger(self, datas: list[list[float]]) -> set[str]: + def calculate_people_in_controlled_area( + self, + datas: list[list[float]], + polygon: Polygon | None, + ) -> int: + """ + Calculates the number of people within the safety cone area. + + Args: + datas (List[List[float]]): The detection data. + polygon (Optional[Polygon]): Polygon formed by the safety cones. + + Returns: + int: Number of people detected within the controlled area. + """ + # Check if there are any detections + if not datas: + return 0 + + # Check if there is a valid polygon + if not polygon or not isinstance(polygon, Polygon): + return 0 + + # Count the number of people within the controlled area + people_count = 0 + for data in datas: + if data[5] == 5: # Check if it's a person + x_center = (data[0] + data[2]) / 2 + y_center = (data[1] + data[3]) / 2 + if polygon.contains(Point(x_center, y_center)): + people_count += 1 + + return people_count + + def detect_danger( + self, + datas: list[list[float]], + polygon: Polygon | None, + ) -> set[str]: """ Detects potential safety violations in a construction site. This function checks for two types of safety violations: - 1. Workers not wearing hardhats or safety vests. - 2. Workers dangerously close to machinery or vehicles. + 1. Workers entering the controlled area. + 2. Workers not wearing hardhats or safety vests. + 3. Workers dangerously close to machinery or vehicles. Args: datas (List[List[float]]): A list of detections which includes bounding box coordinates, confidence score, and class label. + polygon (Optional[Polygon]): Polygon formed by the safety cones. Returns: - List[str]: A list of warning messages for safety violations. + Set[str]: A set of warning messages for safety violations. """ warnings = set() # Initialise the list to store warning messages + # Check if people are entering the controlled area + people_count = self.calculate_people_in_controlled_area(datas, polygon) + if people_count > 0: + warnings.add(f'警告: 有{people_count}個人進入受控區域!') + # Classify detected objects into different categories persons = [d for d in datas if d[5] == 5] # Persons hardhat_violations = [d for d in datas if d[5] == 2] # No hardhat @@ -238,6 +287,9 @@ def is_dangerously_close( [0.45513, 471.77, 662.03, 1071.4, 12, 0.75853], [1042.7, 638.5, 1077.5, 731.98, 18, 0.56060], ] - warnings = detector.detect_danger(data) + + polygon = MultiPoint([(0, 0), (0, 1000), (1000, 0)]) + + warnings = detector.detect_danger(data, polygon) for warning in warnings: print(warning) diff --git a/src/drawing_manager.py b/src/drawing_manager.py new file mode 100644 index 0000000..05a014f --- /dev/null +++ b/src/drawing_manager.py @@ -0,0 +1,243 @@ +from __future__ import annotations + +import gc +from pathlib import Path + +import cv2 +import numpy as np +from PIL import Image +from PIL import ImageDraw +from PIL import ImageFont +from shapely.geometry import MultiPoint +from shapely.geometry import Polygon + + +class DrawingManager: + def __init__(self): + """ + Initialise the DrawingManager class. + """ + # Load the font used for drawing labels on the image + self.font = ImageFont.truetype( + 'assets/fonts/NotoSansTC-VariableFont_wght.ttf', 20, + ) + + # Mapping of category IDs to their corresponding names + self.category_id_to_name = { + 0: '安全帽', + 1: '口罩', + 2: '無安全帽', + 3: '無口罩', + 4: '無安全背心', + 5: '人員', + 6: '安全錐', + 7: '安全背心', + 8: '機具', + 9: '車輛', + } + + # Define colours for each category + self.colors = { + '安全帽': (0, 255, 0), + '安全背心': (0, 255, 0), + '機具': (255, 225, 0), + '車輛': (255, 255, 0), + '無安全帽': (255, 0, 0), + '無安全背心': (255, 0, 0), + '人員': (255, 165, 0), + } + + # Generate exclude_labels automatically + self.exclude_labels = [ + label for label in self.category_id_to_name.values() + if label not in self.colors + ] + + def draw_safety_cones_polygon( + self, + frame: np.ndarray, + datas: list[list[float]], + ) -> tuple[np.ndarray, Polygon | None]: + """ + Draws safety cones on the given frame and forms a polygon from them. + + Args: + frame (np.ndarray): The frame on which to draw safety cones. + datas (List[List[float]]): The detection data. + + Returns: + Tuple[np.ndarray, Optional[Polygon]]: The frame with + safety cones drawn, and the polygon formed by the safety cones. + """ + if not datas: + return frame, None + + # Get positions of safety cones + cone_positions = np.array([ + ( + (float(data[0]) + float(data[2])) / 2, + (float(data[1]) + float(data[3])) / 2, + ) + for data in datas if data[5] == 6 + ]) + + # Check if there are at least three safety cones to form a polygon + if len(cone_positions) < 3: + return frame, None + + # Create a polygon from the cone positions + polygon = MultiPoint(cone_positions).convex_hull + + if isinstance(polygon, Polygon): + polygon_points = np.array( + polygon.exterior.coords, dtype=np.float32, + ) + + # Convert frame to PIL image + frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) + + # Create a transparent layer + overlay = Image.new('RGBA', frame_pil.size, (255, 0, 0, 0)) + overlay_draw = ImageDraw.Draw(overlay) + + # Draw the polygon with semi-transparent pink colour + overlay_draw.polygon( + [tuple(point) for point in polygon_points], + fill=(255, 105, 180, 128), + ) + + # Composite the overlay with the original image + frame_pil = Image.alpha_composite( + frame_pil.convert('RGBA'), overlay, + ) + + # Convert back to OpenCV image + frame = cv2.cvtColor( + np.array(frame_pil.convert('RGB')), cv2.COLOR_RGB2BGR, + ) + + # Draw the polygon border + cv2.polylines( + frame, [ + polygon_points.astype( + np.int32, + ), + ], isClosed=True, color=(255, 0, 255), thickness=2, + ) + else: + print('Warning: Convex hull is not a polygon.') + + return frame, polygon + + def draw_detections_on_frame( + self, + frame: np.ndarray, + datas: list[list[float]], + ) -> tuple[np.ndarray, Polygon | None]: + """ + Draws detections on the given frame. + + Args: + frame (np.ndarray): The frame on which to draw detections. + datas (List[List[float]]): The detection data. + + Returns: + Tuple[np.ndarray, Optional[Polygon]]: The frame with + detections drawn, and the polygon formed by the safety cones. + """ + # Draw safety cones first + frame, polygon = self.draw_safety_cones_polygon(frame, datas) + + # Convert the frame to RGB and create a PIL image + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + pil_image = Image.fromarray(frame_rgb) + draw = ImageDraw.Draw(pil_image) + + for data in datas: + x1, y1, x2, y2, _, label_id = data + label_id = int(label_id) + if label_id in self.category_id_to_name: + label = self.category_id_to_name[label_id] + else: + continue + + x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) + if label not in self.exclude_labels: + color = self.colors.get(label, (255, 255, 255)) + draw.rectangle((x1, y1, x2, y2), outline=color, width=2) + text = f"{label}" + text_bbox = draw.textbbox((x1, y1), text, font=self.font) + text_width, text_height = text_bbox[2] - \ + text_bbox[0], text_bbox[3] - text_bbox[1] + text_background = ( + x1, y1 - text_height - + 5, x1 + text_width, y1, + ) + draw.rectangle(text_background, fill=color) + draw.text( + (x1, y1 - text_height - 5), text, + fill=(0, 0, 0), font=self.font, + ) + + # Convert the PIL image back to OpenCV format + frame_with_detections = cv2.cvtColor( + np.array(pil_image), cv2.COLOR_RGB2BGR, + ) + return frame_with_detections, polygon + + def save_frame(self, frame_bytes: bytearray, output_filename: str) -> None: + """ + Saves detected frame to given output folder and filename. + + Args: + frame_bytes (bytearray): The byte stream of the frame. + output_filename (str): The output filename. + """ + # Create the output directory if it does not exist + output_dir = Path('detected_frames') + output_dir.mkdir(parents=True, exist_ok=True) + + # Define the output path + output_path = output_dir / f"{output_filename}.png" + + # Save the byte stream to the output path + with open(output_path, 'wb') as f: + f.write(frame_bytes) + + # Clean up + del output_dir, output_path, frame_bytes + gc.collect() + + +if __name__ == '__main__': + # Example usage (replace with actual usage) + # Load frame and detection data (example) + frame = np.zeros((480, 640, 3), dtype=np.uint8) + + # Example data including objects and safety cones + datas = [ + # Example objects (安全帽, 人員, 車輛) + [50, 50, 150, 150, 0.95, 0], # 安全帽 + [200, 200, 300, 300, 0.85, 5], # 人員 + [400, 400, 500, 500, 0.75, 9], # 車輛 + + # Example safety cones (安全錐) + [100, 100, 120, 120, 0.9, 6], + [250, 250, 270, 270, 0.8, 6], + [450, 450, 470, 470, 0.7, 6], + [500, 200, 520, 220, 0.7, 6], + [150, 400, 170, 420, 0.7, 6], + ] + + # Initialise DrawingManager class + drawer_saver = DrawingManager() + + # Draw detections on frame (including safety cones) + frame_with_detections, polygon = drawer_saver.draw_detections_on_frame( + frame, datas, + ) + + # Save the frame with detections + output_filename = 'frame_001' + frame_bytes = cv2.imencode('.png', frame_with_detections)[1].tobytes() + drawer_saver.save_frame(frame_bytes, output_filename) diff --git a/src/live_stream_detection.py b/src/live_stream_detection.py index a40adab..c07d77b 100644 --- a/src/live_stream_detection.py +++ b/src/live_stream_detection.py @@ -9,12 +9,8 @@ from typing import TypedDict import cv2 -import numpy as np import requests from dotenv import load_dotenv -from PIL import Image -from PIL import ImageDraw -from PIL import ImageFont from requests.adapters import HTTPAdapter from sahi import AutoDetectionModel from sahi.predict import get_sliced_prediction @@ -72,145 +68,6 @@ def __init__( self.access_token = None self.token_expiry = 0.0 - # Load the font used for drawing labels on the image - self.font = ImageFont.truetype( - 'assets/fonts/NotoSansTC-VariableFont_wght.ttf', - 20, - ) - - # Mapping of category IDs to their corresponding names - self.category_id_to_name = { - 0: '安全帽', - 1: '口罩', - 2: '無安全帽', - 3: '無口罩', - 4: '無安全背心', - 5: '人員', - 6: '安全錐', - 7: '安全背心', - 8: '機具', - 9: '車輛', - } - - # Define colours for each category - self.colors = { - '安全帽': (0, 255, 0), - '安全背心': (0, 255, 0), - '機具': (255, 225, 0), - '車輛': (255, 255, 0), - '無安全帽': (255, 0, 0), - '無安全背心': (255, 0, 0), - '人員': (255, 165, 0), - } - - # Generate exclude_labels automatically - self.exclude_labels = [ - label - for label in self.category_id_to_name.values() - if label not in self.colors - ] - - def draw_detections_on_frame( - self, - frame: cv2.Mat, - datas: list[list[float]], - ) -> cv2.Mat: - """ - Draws detections on the given frame. - - Args: - frame (cv2.Mat): The frame on which to draw detections. - datas (List[List[float]]): The detection data. - - Returns: - frame_with_detections(cv2.Mat): The frame with detections drawn. - """ - # Convert the frame to RGB and create a PIL image - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - pil_image = Image.fromarray(frame_rgb) - draw = ImageDraw.Draw(pil_image) - - for data in datas: - x1, y1, x2, y2, _, label_id = data - label_id = int(label_id) # Ensure label_id is an integer - if label_id in self.category_id_to_name: - label = self.category_id_to_name[label_id] - else: - continue - - x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) - if label not in self.exclude_labels: - color = self.colors.get(label, (255, 255, 255)) - draw.rectangle((x1, y1, x2, y2), outline=color, width=2) - text = f"{label}" - text_bbox = draw.textbbox((x1, y1), text, font=self.font) - text_width, text_height = ( - text_bbox[2] - text_bbox[0], - text_bbox[3] - text_bbox[1], - ) - text_background = ( - x1, - y1 - text_height - 5, - x1 + text_width, - y1, - ) - draw.rectangle(text_background, fill=color) - - text_y = y1 - text_height - 5 / 2 - text_height / 2 - for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: - draw.text( - (x1 + dx, text_y + dy), - text, - fill=(0, 0, 0), - font=self.font, - ) - draw.text( - (x1, text_y), - text, - fill=( - 255, - 255, - 255, - ), - font=self.font, - ) - - # Convert the PIL image back to OpenCV format - frame_with_detections = cv2.cvtColor( - np.array(pil_image), - cv2.COLOR_RGB2BGR, - ) - - return frame_with_detections - - def save_frame(self, frame_bytes: bytearray, output_filename: str) -> None: - """ - Saves detected frame to given output folder and filename. - - Args: - frame_bytes (bytearray): The byte stream of the frame. - output_filename (str): The output filename. - """ - # Create the output directory if it does not exist - base_output_dir = Path('detected_frames') - output_dir = ( - base_output_dir / self.output_folder - if self.output_folder - else base_output_dir - ) - output_dir.mkdir(parents=True, exist_ok=True) - - # Define the output path - output_path = output_dir / f"{output_filename}.png" - - # Save the byte stream to the output path - with open(output_path, 'wb') as f: - f.write(frame_bytes) - - # Clean up - del output_dir, output_path, frame_bytes - gc.collect() - def requests_retry_session( self, retries: int = 7, @@ -568,10 +425,7 @@ def run_detection(self, stream_url: str) -> None: try: datas, _ = self.generate_detections(frame) - frame_with_detections = self.draw_detections_on_frame( - frame, datas, - ) - cv2.imshow('Live Stream Detection', frame_with_detections) + print(f"datas: {datas}") except Exception as e: print(f"Detection error: {e}")