diff --git a/requirements.txt b/requirements.txt index ff21129..6678996 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ Flask-Limiter==3.8.0 Flask-SocketIO==5.3.6 Flask-SQLAlchemy==3.1.1 gunicorn==22.0.0 +HDBSCAN==0.8.37 imagecorruptions==1.1.2 imageio==2.34.2 imgaug==0.4.0 @@ -22,6 +23,7 @@ python-dotenv==1.0.1 python-telegram-bot==21.4 redis==5.0.7 sahi==0.11.18 +scikit-learn==1.5.1 speedtest-cli==2.1.3 streamlink==6.8.2 tenacity==8.5.0 diff --git a/src/danger_detector.py b/src/danger_detector.py index a8e188c..42748e6 100644 --- a/src/danger_detector.py +++ b/src/danger_detector.py @@ -1,25 +1,12 @@ from __future__ import annotations -from typing import TypedDict - +import numpy as np +from hdbscan import HDBSCAN from shapely.geometry import MultiPoint from shapely.geometry import Point from shapely.geometry import Polygon -class InputData(TypedDict): - x1: float - y1: float - x2: float - y2: float - confidence: float - class_label: int - - -class ResultData(TypedDict): - warnings: set[str] - - class DangerDetector: """ A class to detect potential safety hazards based on the detection data. @@ -29,47 +16,112 @@ def __init__(self): """ Initialises the danger detector. """ - pass + # Initialise the HDBSCAN clusterer + self.clusterer = HDBSCAN(min_samples=4, min_cluster_size=2) + + def detect_polygon_from_cones( + self, + datas: list[list[float]], + ) -> list[Polygon]: + """ + Detects polygons from the safety cones in the detection data. + + Args: + datas (List[List[float]]): The detection data. + + Returns: + List[Polygon]: A list of polygons formed by the safety cones. + """ + if not datas: + return [] + + # Get positions of safety cones + cone_positions = np.array([ + ( + (float(data[0]) + float(data[2])) / 2, + (float(data[1]) + float(data[3])) / 2, + ) + for data in datas if data[5] == 6 + ]) + + # Check if there are at least three safety cones to form a polygon + if len(cone_positions) < 3: + return [] + + cone_positions = np.array([ + [0, 0], [0, 1000], [1000, 0], + [200, 100], [300, 200], [400, 300], + [500, 400], [600, 500], [700, 600], + [800, 700], [900, 800], [100, 200], + [300, 400], [500, 600], [700, 800], + ]) + print(f"cone_positions: {cone_positions}") + labels = self.clusterer.fit_predict(cone_positions) + print(f"labels: {labels}") + + # Extract clusters + clusters: dict[int, list[np.ndarray]] = {} + for point, label in zip(cone_positions, labels): + if label == -1: + continue # Skip noise points + if label not in clusters: + clusters[label] = [] + clusters[label].append(point) + + # Create polygons from clusters + polygons = [] + for cluster_points in clusters.values(): + if len(cluster_points) >= 3: + polygon = MultiPoint(cluster_points).convex_hull + polygons.append(polygon) + + return polygons def calculate_people_in_controlled_area( self, datas: list[list[float]], - polygon: Polygon | None, - ) -> int: + ) -> tuple[int, list[Polygon]]: """ Calculates the number of people within the safety cone area. Args: datas (List[List[float]]): The detection data. - polygon (Optional[Polygon]): Polygon formed by the safety cones. Returns: - int: Number of people detected within the controlled area. + Tuple[int, List[Polygon]]: People count and polygons list. """ # Check if there are any detections if not datas: - return 0 + return 0, [] + + # Detect the polygons from the safety cones + polygons = self.detect_polygon_from_cones(datas) + + # Check if there are valid polygons + if not polygons: + return 0, [] - # Check if there is a valid polygon - if not polygon or not isinstance(polygon, Polygon): - return 0 + # Use a set to track unique people + unique_people = set() # Count the number of people within the controlled area - people_count = 0 for data in datas: if data[5] == 5: # Check if it's a person x_center = (data[0] + data[2]) / 2 y_center = (data[1] + data[3]) / 2 - if polygon.contains(Point(x_center, y_center)): - people_count += 1 + point = Point(x_center, y_center) + for polygon in polygons: + if polygon.contains(point): + # Update the set of unique people + unique_people.add((x_center, y_center)) + break # No need to check other polygons - return people_count + return len(unique_people), polygons def detect_danger( self, datas: list[list[float]], - polygon: Polygon | None, - ) -> set[str]: + ) -> tuple[set[str], list[Polygon]]: """ Detects potential safety violations in a construction site. @@ -81,15 +133,16 @@ def detect_danger( Args: datas (List[List[float]]): A list of detections which includes bounding box coordinates, confidence score, and class label. - polygon (Optional[Polygon]): Polygon formed by the safety cones. Returns: - Set[str]: A set of warning messages for safety violations. + Tuple[Set[str], List[Polygon]]: Warnings and polygons list. """ warnings = set() # Initialise the list to store warning messages # Check if people are entering the controlled area - people_count = self.calculate_people_in_controlled_area(datas, polygon) + people_count, polygons = self.calculate_people_in_controlled_area( + datas, + ) if people_count > 0: warnings.add(f'警告: 有{people_count}個人進入受控區域!') @@ -100,7 +153,8 @@ def detect_danger( d for d in datas if d[5] == 4 ] # No safety vest machinery_vehicles = [ - d for d in datas if d[5] in [8, 9] + d for d in datas if d[5] + in [8, 9] ] # Machinery and vehicles # Filter out persons who are likely drivers @@ -134,7 +188,7 @@ def detect_danger( warnings.add(f"警告: 有人過於靠近{label}!") break - return warnings + return warnings, polygons @staticmethod def is_driver( @@ -260,15 +314,11 @@ def is_dangerously_close( # Calculate min horizontal/vertical distance between person and vehicle horizontal_distance = min( - abs( - person_bbox[2] - vehicle_bbox[0], - ), + abs(person_bbox[2] - vehicle_bbox[0]), abs(person_bbox[0] - vehicle_bbox[2]), ) vertical_distance = min( - abs( - person_bbox[3] - vehicle_bbox[1], - ), + abs(person_bbox[3] - vehicle_bbox[1]), abs(person_bbox[1] - vehicle_bbox[3]), ) @@ -282,14 +332,15 @@ def is_dangerously_close( # Example usage if __name__ == '__main__': detector = DangerDetector() - data = [ - [706.87, 445.07, 976.32, 1073.6, 3, 0.91], - [0.45513, 471.77, 662.03, 1071.4, 12, 0.75853], - [1042.7, 638.5, 1077.5, 731.98, 18, 0.56060], + data: list[list[float]] = [ + [50, 50, 150, 150, 0.95, 0], # 安全帽 + [200, 200, 300, 300, 0.85, 5], # 人員 + [400, 400, 500, 500, 0.75, 2], # 無安背心 + [0, 0, 10, 10, 0.88, 6], + [0, 1000, 10, 1010, 0.87, 6], + [1000, 0, 1010, 10, 0.89, 6], ] - polygon = MultiPoint([(0, 0), (0, 1000), (1000, 0)]) - - warnings = detector.detect_danger(data, polygon) + warnings, polygons = detector.detect_danger(data) for warning in warnings: print(warning) diff --git a/src/drawing_manager.py b/src/drawing_manager.py index 05a014f..eddcf2f 100644 --- a/src/drawing_manager.py +++ b/src/drawing_manager.py @@ -8,7 +8,6 @@ from PIL import Image from PIL import ImageDraw from PIL import ImageFont -from shapely.geometry import MultiPoint from shapely.geometry import Polygon @@ -56,84 +55,59 @@ def __init__(self): def draw_safety_cones_polygon( self, frame: np.ndarray, - datas: list[list[float]], - ) -> tuple[np.ndarray, Polygon | None]: + polygons: list[Polygon], + ) -> np.ndarray: """ Draws safety cones on the given frame and forms a polygon from them. Args: frame (np.ndarray): The frame on which to draw safety cones. - datas (List[List[float]]): The detection data. - + polygons (List[Polygon]): list of polygons containing safety cones. Returns: - Tuple[np.ndarray, Optional[Polygon]]: The frame with - safety cones drawn, and the polygon formed by the safety cones. + np.ndarray: The frame with safety cones drawn. """ - if not datas: - return frame, None - - # Get positions of safety cones - cone_positions = np.array([ - ( - (float(data[0]) + float(data[2])) / 2, - (float(data[1]) + float(data[3])) / 2, - ) - for data in datas if data[5] == 6 - ]) + # Convert frame to PIL image + frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - # Check if there are at least three safety cones to form a polygon - if len(cone_positions) < 3: - return frame, None + # Create a transparent layer + overlay = Image.new('RGBA', frame_pil.size, (255, 0, 0, 0)) + overlay_draw = ImageDraw.Draw(overlay) - # Create a polygon from the cone positions - polygon = MultiPoint(cone_positions).convex_hull - - if isinstance(polygon, Polygon): + # Draw the safety cones + for polygon in polygons: polygon_points = np.array( polygon.exterior.coords, dtype=np.float32, ) - # Convert frame to PIL image - frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) - - # Create a transparent layer - overlay = Image.new('RGBA', frame_pil.size, (255, 0, 0, 0)) - overlay_draw = ImageDraw.Draw(overlay) - # Draw the polygon with semi-transparent pink colour overlay_draw.polygon( [tuple(point) for point in polygon_points], fill=(255, 105, 180, 128), ) - # Composite the overlay with the original image - frame_pil = Image.alpha_composite( - frame_pil.convert('RGBA'), overlay, + # Draw the polygon border + overlay_draw.line( + [tuple(point) for point in polygon_points] + + [tuple(polygon_points[0])], + fill=(255, 0, 255), width=2, ) - # Convert back to OpenCV image - frame = cv2.cvtColor( - np.array(frame_pil.convert('RGB')), cv2.COLOR_RGB2BGR, - ) + # Composite the overlay with the original image + frame_pil = Image.alpha_composite(frame_pil.convert('RGBA'), overlay) - # Draw the polygon border - cv2.polylines( - frame, [ - polygon_points.astype( - np.int32, - ), - ], isClosed=True, color=(255, 0, 255), thickness=2, - ) - else: - print('Warning: Convex hull is not a polygon.') + # Convert back to OpenCV image + frame = cv2.cvtColor( + np.array(frame_pil.convert('RGB')), cv2.COLOR_RGB2BGR, + ) - return frame, polygon + return frame def draw_detections_on_frame( self, frame: np.ndarray, + polygons: list[Polygon], datas: list[list[float]], - ) -> tuple[np.ndarray, Polygon | None]: + ) -> np.ndarray: """ Draws detections on the given frame. @@ -142,17 +116,18 @@ def draw_detections_on_frame( datas (List[List[float]]): The detection data. Returns: - Tuple[np.ndarray, Optional[Polygon]]: The frame with - detections drawn, and the polygon formed by the safety cones. + np.ndarray: The frame with detections drawn. """ # Draw safety cones first - frame, polygon = self.draw_safety_cones_polygon(frame, datas) + if polygons: + frame = self.draw_safety_cones_polygon(frame, polygons) # Convert the frame to RGB and create a PIL image frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(frame_rgb) draw = ImageDraw.Draw(pil_image) + # Draw the detections on the frame for data in datas: x1, y1, x2, y2, _, label_id = data label_id = int(label_id) @@ -161,6 +136,7 @@ def draw_detections_on_frame( else: continue + # Draw the bounding box x1, y1, x2, y2 = map(int, [x1, y1, x2, y2]) if label not in self.exclude_labels: color = self.colors.get(label, (255, 255, 255)) @@ -183,7 +159,8 @@ def draw_detections_on_frame( frame_with_detections = cv2.cvtColor( np.array(pil_image), cv2.COLOR_RGB2BGR, ) - return frame_with_detections, polygon + + return frame_with_detections def save_frame(self, frame_bytes: bytearray, output_filename: str) -> None: """ @@ -229,12 +206,16 @@ def save_frame(self, frame_bytes: bytearray, output_filename: str) -> None: [150, 400, 170, 420, 0.7, 6], ] + # Define the points directly + points = [(100, 100), (250, 250), (450, 450), (500, 200), (150, 400)] + polygon = Polygon(points).convex_hull + # Initialise DrawingManager class drawer_saver = DrawingManager() # Draw detections on frame (including safety cones) - frame_with_detections, polygon = drawer_saver.draw_detections_on_frame( - frame, datas, + frame_with_detections = drawer_saver.draw_detections_on_frame( + frame, polygon, datas, ) # Save the frame with detections diff --git a/tests/danger_detector_test.py b/tests/danger_detector_test.py index 312e1fa..fd74f5c 100644 --- a/tests/danger_detector_test.py +++ b/tests/danger_detector_test.py @@ -2,9 +2,6 @@ import unittest -from shapely.geometry import MultiPoint -from shapely.geometry import Polygon - from src.danger_detector import DangerDetector @@ -73,26 +70,28 @@ def test_calculate_people_in_controlled_area(self) -> None: [200, 200, 300, 300, 0.85, 5], # 人員 [400, 400, 500, 500, 0.75, 9], # 車輛 ] - polygon: Polygon = MultiPoint( - [(100, 100), (250, 250), (450, 450), (500, 200), (150, 400)], - ).convex_hull - people_count: int = self.detector.calculate_people_in_controlled_area( - datas, polygon, + people_count, polygons = ( + self.detector.calculate_people_in_controlled_area( + datas, + ) ) - self.assertEqual(people_count, 1) + self.assertEqual(people_count, 0) datas = [ [100, 100, 120, 120, 0.9, 6], # Safety cone [150, 150, 170, 170, 0.85, 6], # Safety cone [130, 130, 140, 140, 0.95, 5], # Person inside the area [300, 300, 320, 320, 0.85, 5], # Person outside the area + [50, 50, 70, 70, 0.89, 6], + [250, 250, 270, 270, 0.85, 6], + [450, 450, 470, 470, 0.92, 6], ] - # Not enough cones to form a polygon - polygon = MultiPoint([(100, 100), (150, 150)]).convex_hull - people_count = self.detector.calculate_people_in_controlled_area( - datas, None, - ) # should pass None - self.assertEqual(people_count, 0) + people_count, polygons = ( + self.detector.calculate_people_in_controlled_area( + datas, + ) + ) + self.assertEqual(people_count, 1) def test_detect_danger(self) -> None: """ @@ -102,12 +101,11 @@ def test_detect_danger(self) -> None: [50, 50, 150, 150, 0.95, 0], # 安全帽 [200, 200, 300, 300, 0.85, 5], # 人員 [400, 400, 500, 500, 0.75, 2], # 無安背心 + [0, 0, 10, 10, 0.88, 6], + [0, 1000, 10, 1010, 0.87, 6], + [1000, 0, 1010, 10, 0.89, 6], ] - polygon: Polygon = MultiPoint( - [(100, 100), (250, 250), (450, 450), (500, 200), (150, 400)], - ).convex_hull - warnings: set[str] = self.detector.detect_danger(data, polygon) - print(f"warnings: {warnings}") + warnings, polygons = self.detector.detect_danger(data) self.assertIn('警告: 有1個人進入受控區域!', warnings) self.assertIn('警告: 有人無配戴安全帽!', warnings) @@ -116,19 +114,22 @@ def test_detect_danger(self) -> None: [0.45513, 471.77, 662.03, 1071.4, 0.75853, 5.0], # Person [1042.7, 638.5, 1077.5, 731.98, 0.56060, 4.0], # No safety vest [500, 500, 700, 700, 0.95, 8], # Machinery + [50, 50, 70, 70, 0.89, 6], + [250, 250, 270, 270, 0.85, 6], + [450, 450, 470, 470, 0.92, 6], ] - polygon = MultiPoint([(100, 100), (150, 150), (200, 200)]).convex_hull - warnings = self.detector.detect_danger(data, polygon) - self.assertNotIn('警告: 有人無配戴安全帽!', warnings) + warnings, polygons = self.detector.detect_danger(data) self.assertIn('警告: 有人無穿著安全背心!', warnings) data = [ [706.87, 445.07, 976.32, 1073.6, 0.91, 2.0], # No hardhat [0.45513, 471.77, 662.03, 1071.4, 0.75853, 4.0], # No safety vest [500, 500, 700, 700, 0.95, 8], # Machinery + [50, 50, 70, 70, 0.89, 6], + [250, 250, 270, 270, 0.85, 6], + [450, 450, 470, 470, 0.92, 6], ] - polygon = MultiPoint([(100, 100), (150, 150), (200, 200)]).convex_hull - warnings = self.detector.detect_danger(data, polygon) + warnings, polygons = self.detector.detect_danger(data) self.assertIn('警告: 有人無配戴安全帽!', warnings) self.assertIn('警告: 有人無穿著安全背心!', warnings) self.assertNotIn('警告: 有人過於靠近機具!', warnings) diff --git a/tests/drawing_manager_test.py b/tests/drawing_manager_test.py index 7987996..7aca8e7 100644 --- a/tests/drawing_manager_test.py +++ b/tests/drawing_manager_test.py @@ -32,6 +32,12 @@ def setUp(self) -> None: [500, 200, 520, 220, 0.7, 6], # 安全錐 [150, 400, 170, 420, 0.7, 6], # 安全錐 ] + self.polygons: list[Polygon] = [ + Polygon([ + (100, 100), (250, 250), (450, 450), + (500, 200), (150, 400), + ]).convex_hull, + ] def tearDown(self) -> None: """ @@ -40,6 +46,7 @@ def tearDown(self) -> None: del self.drawer del self.frame del self.datas + del self.polygons # Remove the output directory output_dir: Path = Path('detected_frames/test_output') @@ -54,8 +61,8 @@ def test_draw_detections_on_frame(self) -> None: """ Test drawing detections on a frame. """ - frame_with_detections, _ = self.drawer.draw_detections_on_frame( - self.frame.copy(), self.datas, + frame_with_detections = self.drawer.draw_detections_on_frame( + self.frame.copy(), self.polygons, self.datas, ) # Check if the frame returned is a numpy array @@ -102,8 +109,8 @@ def test_draw_detections_on_frame_with_no_detections(self) -> None: """ Test drawing on a frame with no detections. """ - frame_with_detections, _ = self.drawer.draw_detections_on_frame( - self.frame.copy(), [], + frame_with_detections = self.drawer.draw_detections_on_frame( + self.frame.copy(), [], [], ) # Check if the frame returned is a numpy array @@ -122,7 +129,7 @@ def test_draw_safety_cones_polygon_with_no_cones(self) -> None: """ Test drawing a safety cones polygon with no cones. """ - frame_with_polygon, _ = self.drawer.draw_safety_cones_polygon( + frame_with_polygon = self.drawer.draw_safety_cones_polygon( self.frame.copy(), [], ) @@ -142,8 +149,8 @@ def test_draw_safety_cones_polygon(self) -> None: """ Test drawing a safety cones polygon. """ - frame_with_polygon, polygon = self.drawer.draw_safety_cones_polygon( - self.frame.copy(), self.datas, + frame_with_polygon = self.drawer.draw_safety_cones_polygon( + self.frame.copy(), self.polygons, ) # Check if the frame returned is a numpy array @@ -157,7 +164,7 @@ def test_draw_safety_cones_polygon(self) -> None: # Colour of the polygon border (pink) expected_color: tuple[int, int, int] = (255, 0, 255) - if isinstance(polygon, Polygon): + for polygon in self.polygons: polygon_points: np.ndarray = np.array( polygon.exterior.coords, dtype=np.int32, ) @@ -185,8 +192,18 @@ def test_draw_safety_cones_polygon_less_three_cones(self) -> None: datas: list[list[float]] = [ [300, 50, 400, 150, 0.75, 6], # Only one safety cone detection ] - frame_with_polygon, _ = self.drawer.draw_safety_cones_polygon( - self.frame.copy(), datas, + # Extract only the coordinates for creating the Polygon + coords = [ + ( + (float(data[0]) + float(data[2])) / 2, + (float(data[1]) + float(data[3])) / 2, + ) + for data in datas + ] + frame_with_polygon = self.drawer.draw_safety_cones_polygon( + self.frame.copy(), [Polygon(coords).convex_hull] if len( + coords, + ) >= 3 else [], ) # Check if the frame returned is a numpy array @@ -207,13 +224,24 @@ def test_draw_safety_cones_polygon_with_many_cones(self) -> None: """ # Generate a large number of safety cones num_cones: int = 100 - datas: list[list[float]] = [[ - np.random.randint(0, 640), np.random.randint(0, 480), - np.random.randint(0, 640), np.random.randint(0, 480), - 0.75, 6, - ] for _ in range(num_cones)] - frame_with_polygon, _ = self.drawer.draw_safety_cones_polygon( - self.frame.copy(), datas, + datas: list[list[float]] = [ + [ + np.random.randint(0, 640), np.random.randint(0, 480), + np.random.randint(0, 640), np.random.randint(0, 480), + 0.75, 6, + ] + for _ in range(num_cones) + ] + # Extract only the coordinates for creating the Polygon + coords = [ + ( + (float(data[0]) + float(data[2])) / 2, + (float(data[1]) + float(data[3])) / 2, + ) + for data in datas + ] + frame_with_polygon = self.drawer.draw_safety_cones_polygon( + self.frame.copy(), [Polygon(coords).convex_hull], ) # Check if the frame returned is a numpy array