Skip to content

Commit

Permalink
Add controlled zone monitoring func
Browse files Browse the repository at this point in the history
  • Loading branch information
yihong1120 committed Jul 25, 2024
1 parent c68fb05 commit a34afcf
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 157 deletions.
18 changes: 13 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dotenv import load_dotenv

from src.danger_detector import DangerDetector
from src.drawing_manager import DrawingManager
from src.line_notifier import LineNotifier
from src.live_stream_detection import LiveStreamDetector
from src.monitor_logger import LoggerConfig
Expand Down Expand Up @@ -62,11 +63,13 @@ def main(
"""
# Load environment variables
load_dotenv()
api_url = os.getenv('API_URL', 'http://localhost:5000')

# Initialise the stream capture object
streaming_capture = StreamCapture(stream_url=video_url)

# Get the API URL from environment variables
api_url = os.getenv('API_URL', 'http://localhost:5000')

# Initialise the live stream detector
live_stream_detector = LiveStreamDetector(
api_url=api_url,
Expand All @@ -75,6 +78,9 @@ def main(
run_local=run_local,
)

# Initialise the drawing manager
drawing_manager = DrawingManager()

# Initialise the LINE notifier
line_notifier = LineNotifier(line_token)

Expand All @@ -96,8 +102,8 @@ def main(
datas, _ = live_stream_detector.generate_detections(frame)

# Draw the detections on the frame
frame_with_detections = live_stream_detector.draw_detections_on_frame(
frame, datas,
frame_with_detections, controlled_zone_polygon = (
drawing_manager.draw_detections_on_frame(frame, datas)
)

# Convert the frame to a byte array
Expand All @@ -106,7 +112,7 @@ def main(

# Save the frame with detections
# save_file_name = f'{label}_{image_name}_{detection_time}'
# live_stream_detector.save_frame(
# drawing_manager.save_frame(
# frame_with_detections,
# save_file_name
# )
Expand All @@ -125,7 +131,9 @@ def main(
(7 <= current_hour < 18)
):
# Check for warnings and send notifications if necessary
warnings = danger_detector.detect_danger(datas)
warnings = danger_detector.detect_danger(
datas, controlled_zone_polygon,
)

# Check if there are any warnings
if warnings:
Expand Down
62 changes: 57 additions & 5 deletions src/danger_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

from typing import TypedDict

from shapely.geometry import MultiPoint
from shapely.geometry import Point
from shapely.geometry import Polygon


class InputData(TypedDict):
x1: float
Expand All @@ -27,23 +31,68 @@ def __init__(self):
"""
pass

def detect_danger(self, datas: list[list[float]]) -> set[str]:
def calculate_people_in_controlled_area(
self,
datas: list[list[float]],
polygon: Polygon | None,
) -> int:
"""
Calculates the number of people within the safety cone area.
Args:
datas (List[List[float]]): The detection data.
polygon (Optional[Polygon]): Polygon formed by the safety cones.
Returns:
int: Number of people detected within the controlled area.
"""
# Check if there are any detections
if not datas:
return 0

# Check if there is a valid polygon
if not polygon or not isinstance(polygon, Polygon):
return 0

# Count the number of people within the controlled area
people_count = 0
for data in datas:
if data[5] == 5: # Check if it's a person
x_center = (data[0] + data[2]) / 2
y_center = (data[1] + data[3]) / 2
if polygon.contains(Point(x_center, y_center)):
people_count += 1

return people_count

def detect_danger(
self,
datas: list[list[float]],
polygon: Polygon | None,
) -> set[str]:
"""
Detects potential safety violations in a construction site.
This function checks for two types of safety violations:
1. Workers not wearing hardhats or safety vests.
2. Workers dangerously close to machinery or vehicles.
1. Workers entering the controlled area.
2. Workers not wearing hardhats or safety vests.
3. Workers dangerously close to machinery or vehicles.
Args:
datas (List[List[float]]): A list of detections which includes
bounding box coordinates, confidence score, and class label.
polygon (Optional[Polygon]): Polygon formed by the safety cones.
Returns:
List[str]: A list of warning messages for safety violations.
Set[str]: A set of warning messages for safety violations.
"""
warnings = set() # Initialise the list to store warning messages

# Check if people are entering the controlled area
people_count = self.calculate_people_in_controlled_area(datas, polygon)
if people_count > 0:
warnings.add(f'警告: 有{people_count}個人進入受控區域!')

# Classify detected objects into different categories
persons = [d for d in datas if d[5] == 5] # Persons
hardhat_violations = [d for d in datas if d[5] == 2] # No hardhat
Expand Down Expand Up @@ -238,6 +287,9 @@ def is_dangerously_close(
[0.45513, 471.77, 662.03, 1071.4, 12, 0.75853],
[1042.7, 638.5, 1077.5, 731.98, 18, 0.56060],
]
warnings = detector.detect_danger(data)

polygon = MultiPoint([(0, 0), (0, 1000), (1000, 0)])

warnings = detector.detect_danger(data, polygon)
for warning in warnings:
print(warning)
Loading

0 comments on commit a34afcf

Please sign in to comment.