Skip to content

Commit

Permalink
Utilise HDBSCAN to cluster the controlled zone with safety zone
Browse files Browse the repository at this point in the history
  • Loading branch information
yihong1120 committed Aug 1, 2024
1 parent a29c2f7 commit 3a112a8
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 147 deletions.
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Flask-Limiter==3.8.0
Flask-SocketIO==5.3.6
Flask-SQLAlchemy==3.1.1
gunicorn==22.0.0
HDBSCAN==0.8.37
imagecorruptions==1.1.2
imageio==2.34.2
imgaug==0.4.0
Expand All @@ -22,6 +23,7 @@ python-dotenv==1.0.1
python-telegram-bot==21.4
redis==5.0.7
sahi==0.11.18
scikit-learn==1.5.1
speedtest-cli==2.1.3
streamlink==6.8.2
tenacity==8.5.0
Expand Down
147 changes: 99 additions & 48 deletions src/danger_detector.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
from __future__ import annotations

from typing import TypedDict

import numpy as np
from hdbscan import HDBSCAN
from shapely.geometry import MultiPoint
from shapely.geometry import Point
from shapely.geometry import Polygon


class InputData(TypedDict):
x1: float
y1: float
x2: float
y2: float
confidence: float
class_label: int


class ResultData(TypedDict):
warnings: set[str]


class DangerDetector:
"""
A class to detect potential safety hazards based on the detection data.
Expand All @@ -29,47 +16,112 @@ def __init__(self):
"""
Initialises the danger detector.
"""
pass
# Initialise the HDBSCAN clusterer
self.clusterer = HDBSCAN(min_samples=4, min_cluster_size=2)

def detect_polygon_from_cones(
self,
datas: list[list[float]],
) -> list[Polygon]:
"""
Detects polygons from the safety cones in the detection data.
Args:
datas (List[List[float]]): The detection data.
Returns:
List[Polygon]: A list of polygons formed by the safety cones.
"""
if not datas:
return []

# Get positions of safety cones
cone_positions = np.array([
(
(float(data[0]) + float(data[2])) / 2,
(float(data[1]) + float(data[3])) / 2,
)
for data in datas if data[5] == 6
])

# Check if there are at least three safety cones to form a polygon
if len(cone_positions) < 3:
return []

cone_positions = np.array([
[0, 0], [0, 1000], [1000, 0],
[200, 100], [300, 200], [400, 300],
[500, 400], [600, 500], [700, 600],
[800, 700], [900, 800], [100, 200],
[300, 400], [500, 600], [700, 800],
])
print(f"cone_positions: {cone_positions}")
labels = self.clusterer.fit_predict(cone_positions)
print(f"labels: {labels}")

# Extract clusters
clusters: dict[int, list[np.ndarray]] = {}
for point, label in zip(cone_positions, labels):
if label == -1:
continue # Skip noise points
if label not in clusters:
clusters[label] = []
clusters[label].append(point)

# Create polygons from clusters
polygons = []
for cluster_points in clusters.values():
if len(cluster_points) >= 3:
polygon = MultiPoint(cluster_points).convex_hull
polygons.append(polygon)

return polygons

def calculate_people_in_controlled_area(
self,
datas: list[list[float]],
polygon: Polygon | None,
) -> int:
) -> tuple[int, list[Polygon]]:
"""
Calculates the number of people within the safety cone area.
Args:
datas (List[List[float]]): The detection data.
polygon (Optional[Polygon]): Polygon formed by the safety cones.
Returns:
int: Number of people detected within the controlled area.
Tuple[int, List[Polygon]]: People count and polygons list.
"""
# Check if there are any detections
if not datas:
return 0
return 0, []

# Detect the polygons from the safety cones
polygons = self.detect_polygon_from_cones(datas)

# Check if there are valid polygons
if not polygons:
return 0, []

# Check if there is a valid polygon
if not polygon or not isinstance(polygon, Polygon):
return 0
# Use a set to track unique people
unique_people = set()

# Count the number of people within the controlled area
people_count = 0
for data in datas:
if data[5] == 5: # Check if it's a person
x_center = (data[0] + data[2]) / 2
y_center = (data[1] + data[3]) / 2
if polygon.contains(Point(x_center, y_center)):
people_count += 1
point = Point(x_center, y_center)
for polygon in polygons:
if polygon.contains(point):
# Update the set of unique people
unique_people.add((x_center, y_center))
break # No need to check other polygons

return people_count
return len(unique_people), polygons

def detect_danger(
self,
datas: list[list[float]],
polygon: Polygon | None,
) -> set[str]:
) -> tuple[set[str], list[Polygon]]:
"""
Detects potential safety violations in a construction site.
Expand All @@ -81,15 +133,16 @@ def detect_danger(
Args:
datas (List[List[float]]): A list of detections which includes
bounding box coordinates, confidence score, and class label.
polygon (Optional[Polygon]): Polygon formed by the safety cones.
Returns:
Set[str]: A set of warning messages for safety violations.
Tuple[Set[str], List[Polygon]]: Warnings and polygons list.
"""
warnings = set() # Initialise the list to store warning messages

# Check if people are entering the controlled area
people_count = self.calculate_people_in_controlled_area(datas, polygon)
people_count, polygons = self.calculate_people_in_controlled_area(
datas,
)
if people_count > 0:
warnings.add(f'警告: 有{people_count}個人進入受控區域!')

Expand All @@ -100,7 +153,8 @@ def detect_danger(
d for d in datas if d[5] == 4
] # No safety vest
machinery_vehicles = [
d for d in datas if d[5] in [8, 9]
d for d in datas if d[5]
in [8, 9]
] # Machinery and vehicles

# Filter out persons who are likely drivers
Expand Down Expand Up @@ -134,7 +188,7 @@ def detect_danger(
warnings.add(f"警告: 有人過於靠近{label}!")
break

return warnings
return warnings, polygons

@staticmethod
def is_driver(
Expand Down Expand Up @@ -260,15 +314,11 @@ def is_dangerously_close(

# Calculate min horizontal/vertical distance between person and vehicle
horizontal_distance = min(
abs(
person_bbox[2] - vehicle_bbox[0],
),
abs(person_bbox[2] - vehicle_bbox[0]),
abs(person_bbox[0] - vehicle_bbox[2]),
)
vertical_distance = min(
abs(
person_bbox[3] - vehicle_bbox[1],
),
abs(person_bbox[3] - vehicle_bbox[1]),
abs(person_bbox[1] - vehicle_bbox[3]),
)

Expand All @@ -282,14 +332,15 @@ def is_dangerously_close(
# Example usage
if __name__ == '__main__':
detector = DangerDetector()
data = [
[706.87, 445.07, 976.32, 1073.6, 3, 0.91],
[0.45513, 471.77, 662.03, 1071.4, 12, 0.75853],
[1042.7, 638.5, 1077.5, 731.98, 18, 0.56060],
data: list[list[float]] = [
[50, 50, 150, 150, 0.95, 0], # 安全帽
[200, 200, 300, 300, 0.85, 5], # 人員
[400, 400, 500, 500, 0.75, 2], # 無安背心
[0, 0, 10, 10, 0.88, 6],
[0, 1000, 10, 1010, 0.87, 6],
[1000, 0, 1010, 10, 0.89, 6],
]

polygon = MultiPoint([(0, 0), (0, 1000), (1000, 0)])

warnings = detector.detect_danger(data, polygon)
warnings, polygons = detector.detect_danger(data)
for warning in warnings:
print(warning)
Loading

0 comments on commit 3a112a8

Please sign in to comment.