Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StopSign and Cone Detection #1131

Open
wants to merge 3 commits into
base: tf_2_9
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions donkeycar/parts/object_detector/detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3
# Object Detection from Tensorflow Lite
# Find an object
# https://github.com/tensorflow/examples/tree/master/lite/examples/object_detection/raspberry_pi
# installation:
# pip install tflite-support
#
# Models
# https://www.tensorflow.org/lite/api_docs/python/tflite_model_maker
#
# Stop Sign Tflite model (complete set of COCO images) from tensorflow
# efficientdet_lite0.tflite - https://github.com/tensorflow/examples/tree/master/tensorflow_examples/lite/model_maker
# Traffic cone images source from Roboflow
# custom model created by Craig Fox using images from Roboflow with
# conemodel.tflite - https://universe.roboflow.com/robotica-xftin/traffic-cones-4laxg
#
# This is a general purpose detection class that uses a model to recognize an object.


import os
import time

from tflite_support.task import core
from tflite_support.task import processor
from tflite_support.task import vision

import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

class Object_Detector:
# Trained TFLITE object detection models
coco_object = 'efficientdet_lite0.tflite'
coco_object_edgetpu = 'efficientdet_lite0_edgetpu.tflite'
cone_object = 'conemodel.tflite'
cone_object_edgetpu = 'conemodel_edgetpu.tflite'

def __init__ (self,
category = None, # 'person', 'stop sign', 'cone' if None, use category_id to specify object to detect
category_id = 0, # person
enable_edgetpu = False, # Coral Edge TPU (USB Accelerator, Dev Board)
max_results = 3,
score_threshold = 0.3,
num_threads = 4,
):

# Initialize the object detection model and category_id
model = self.coco_object_edgetpu if enable_edgetpu else self.coco_object
self.category_id = category_id

if category == None or category == 'person':
pass
elif category == 'cone':
model = self.cone_object_edgetpu if enable_edgetpu else self.cone_object
elif category == 'stop sign':
self.category_id = 12
else:
raise(Exception(f'Category value is invalid: {category}'))
logger.debug(f'Detecting category: {category}')

model = os.path.join(os.path.dirname(__file__), model)
base_options = core.BaseOptions(file_name=model, use_coral=enable_edgetpu, num_threads=num_threads)
detection_options = processor.DetectionOptions(max_results=max_results, score_threshold=score_threshold)
options = vision.ObjectDetectorOptions(base_options=base_options, detection_options=detection_options)

self.detector = vision.ObjectDetector.create_from_options(options)

# Performance timer
self.loops = 0
self.total_time = 0

def average_perf(self):
p = 0 if self.loops == 0 else self.total_time / self.loops
return p

def detect(self, image):
# Create a TensorImage object from the RGB image.
input_tensor = vision.TensorImage.create_from_array(image)

# Detect objects
start = time.time()
detection_result = self.detector.detect(input_tensor)
self.loops += 1
self.total_time += (time.time() - start)

score = 0
name = None
bbox = None
for detection in detection_result.detections:
category = detection.categories[0]
if category.index == self.category_id:
if category.score > score:
# logger.info(f'{category.category_name} : {category.score}')
score = category.score
name = category.category_name
bbox = detection.bounding_box

return bbox, score, name

251 changes: 251 additions & 0 deletions donkeycar/parts/object_detector/od_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
"""
od_protocol.py
Donkeycar Parts to manage a sequence of events based upon object detection
author: cfox570 March 2023
The class Detection_Protocol is a base class that provides functions to manages calling the
Object Detector class. Run in pilot mode only.
Four protocols are defined:
1. Stop_and_Go - First stops the car, then pauses for delay, then passes the Stop Sign. Mimics
the behavior of a human driver on a road. Then the pilot takes over
2. Pass_Object - Swerves around a Traffic Cone and then resumes driving around the track

Threaded mode is likely only usable when running with the coGPUprocessor Google Coral Edge TPU. Without the
coprocessor, two Tensorflow Lite processes interfere with each other and the main Keras detection slows down.

In non threaded mode, the run_hz parameter throttles the objection detection. It is only necessary to set to
run 1 or 2 times a vehicle loop. The object detection algorithm takes about 80m secs to run. So if nothing is detected,
very little delay is added to the vehicle loop over one second. When an object is detected, the detection method
is run for each cycle until the protocol ends. Examples: Stop: The Stop Sign is not seen.
Stop and Go: The Stop Sign is passed; Pass Object: The cone is not detected. The method self.reset_run_counter() is
called to restart the run counter to only run the algorithm run_hz times a second.

"""
import time
import logging
from donkeycar.parts.object_detector.detector import Object_Detector

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

class Detection_Protocol:

def __init__ (self,
category = 0,
use_edgetpu = False,
score = 0.5,
image_width = 160,
run_hz = 1, # seconds; run 1 per second
vehicle_hz = 20):

self.on = True

self.width = image_width
self.img_center = self.width / 2

self.run_trigger = int(vehicle_hz / run_hz)
self.run_counter = 0
self.run_inprogress = False

self.newimageready = False
self.detector_ready = True

self.found = False
self.not_found = True

self.image = None
self.detector_image = None
self.bbox = None
self.score = 0
self.name = None
self.position = 0.0

self.detector = Object_Detector(category=category, enable_edgetpu = use_edgetpu, score_threshold=score)

def mark(self, image, bbox):
import cv2
# top left corner of rectangle
start_point = (bbox.origin_x, bbox.origin_y)
# bottom right corner of rectangle
end_point = (bbox.origin_x + bbox.width, bbox.origin_y + bbox.height)
color = (255, 0, 0) # Red color
thickness = 1
image = cv2.rectangle(image, start_point, end_point, color, thickness)

def ready(self):
self.found = False
self.not_found = False
self.detector_image = self.image
if self.detector_ready:
if self.bbox is not None:
self.found = True
self.mark(self.detector_image, self.bbox)
else:
self.not_found = True

def next_image(self, image):
if self.detector_ready:
self.image = image
self.newimageready = True

def reset_run_counter(self):
self.run_counter = 0

def detect(self):
self.newimageready = False
self.detector_ready = False
self.bbox = None
self.score = 0
self.name = None
self.position = 0.0
if self.image is not None:
self.bbox, self.score, self.name = self.detector.detect(self.image)
if self.bbox != None:
self.position = ((self.bbox.origin_x + (self.bbox.width / 2)) - self.img_center) / self.img_center
self.detector_ready = True

def update(self):
while self.on:
if self.newimageready:
self.detect()

def manage(self, angle, throttle):
# template routine
return angle, throttle

def run_threaded(self, angle, throttle, image):
self.ready()
angle, throttle = self.manage(angle, throttle)
self.next_image(image)
if self.detector_image is None:
self.detector_image = image
return angle, throttle, self.detector_image

def run(self, angle, throttle, image):
self.run_counter += 1
if self.run_counter >= self.run_trigger:
self.image = image
self.detect()
self.ready()
angle, throttle = self.manage(angle, throttle)
return angle, throttle, self.detector_image
else:
return angle, throttle, image

def shutdown(self):
logger.info(f'Detector - average detection time {self.detector.average_perf():5.3f}')
self.on = False

class Stop_Manager():
# Stop states
IDLE = 0
INITIATE = 1
POS_ONE = 3
NEG_ONE = 2
NEG_TWO = 4
THROTTLE_INC = 0.2

def __init__(self):
self.stop_state = self.IDLE
self.last_throttle = 0.0

def stop(self):
if self.stop_state == self.IDLE:
self.stop_state = self.INITIATE

def is_idle(self):
return self.stop_state == self.IDLE

def throttle(self):
# if self.stop_state == self.IDLE:
# pass
throttle = 0.0
if self.stop_state == self.INITIATE:
self.stop_state = self.NEG_ONE
throttle = -1.0
elif self.stop_state == self.NEG_ONE:
self.stop_state = self.POS_ONE
throttle = 0.0
elif self.stop_state == self.POS_ONE:
self.stop_state = self.NEG_TWO
throttle = -1.0
elif self.stop_state == self.NEG_TWO:
throttle = self.last_throttle + self.THROTTLE_INC
if throttle >= 0.0:
throttle = 0.0
self.stop_state = self.IDLE
self.last_throttle = throttle
return throttle

class Stop_And_Go(Detection_Protocol):
# Stop and Go protocol States
RUNNING = 0
STOPPING = 1
PAUSING = 2
PASSING = 3

def __init__(self, pause_time=2.0, **kwargs):
super().__init__(category = 'stop sign', **kwargs)
self.pause = pause_time
self.state = self.RUNNING
self.timeout = 0.0
self.stopper = Stop_Manager()

def manage(self, angle, throttle):
if self.state == self.RUNNING:
if self.found:
self.state = self.STOPPING
self.stopper.stop()
else:
self.reset_run_counter()
if self.state == self.STOPPING:
throttle = self.stopper.throttle()
if self.stopper.is_idle():
self.state = self.PAUSING
self.timeout = time.time() + self.pause
elif self.state == self.PAUSING:
if time.time() < self.timeout:
throttle = 0.0
else:
self.state = self.PASSING
elif self.state == self.PASSING:
if self.not_found:
self.state = self.RUNNING
self.reset_run_counter()

return angle, throttle

class Pass_Object(Detection_Protocol):

def __init__(self, speedup_multiplier=1.0, tolerance=.25, max_angle=.75, **kwargs):
super().__init__(category = 'cone', **kwargs)
self.speedup = speedup_multiplier
self.max_angle = max_angle
self.tolerance = tolerance

def avoid(self, angle):
cone_tol = self.position + self.tolerance
cone_tol_neg = self.position - self.tolerance

if angle >= self.position:
if cone_tol <= self.max_angle:
new_angle = max(angle, cone_tol)
else:
new_angle = angle if cone_tol <= angle else cone_tol_neg
else: # ai angle < cone position
if cone_tol_neg >= -self.max_angle:
new_angle = min(angle, cone_tol_neg)
else:
new_angle = angle if cone_tol_neg >= angle else cone_tol

return new_angle

def manage(self, angle, throttle):
new_angle = angle
if self.detector_ready:
if not self.found:
self.reset_run_counter()
else: # found
throttle *= self.speedup
new_angle = self.avoid(angle)
return new_angle, throttle

19 changes: 19 additions & 0 deletions donkeycar/templates/cfg_complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,13 +750,32 @@
REALSENSE_D435_IMU = False # True to capture IMU data (D435i only)
REALSENSE_D435_ID = None # serial number of camera or None if you only have one camera (it will autodetect)

# Only enable one of the following object detector protocols
# 1) STOP_SIGN_DETECTOR 2) OD_STOPANDGO 3) OD_PASS_CONE

# Stop Sign Detector
STOP_SIGN_DETECTOR = False
STOP_SIGN_MIN_SCORE = 0.2
STOP_SIGN_SHOW_BOUNDING_BOX = True
STOP_SIGN_MAX_REVERSE_COUNT = 10 # How many times should the car reverse when detected a stop sign, set to 0 to disable reversing
STOP_SIGN_REVERSE_THROTTLE = -0.5 # Throttle during reversing when detected a stop sign

# Object Detector - Common
# Unlike the Stop Sign Detector, the object detector works with or without Coral Edge TPU. This part
# leverages the Tensorflow Lite object detection model.
OD_USE_EDGETPU = False # If True, Coral Edge TPU connected. Use the edgeTPU model
OD_SCORE = 0.5 # Set the score threshold for detection.
OD_RUN_HZ = 1 # Run detection algorithm n times per drive_loop_hz ex. 1 time every 20 drive loop
# Recommend 1 for Stop And Go protocol and 3 or 4 for Pass Cone protocol
# Object Detector - Stop and Go protocol
OD_STOPANDGO = False # enable stop and go; stop and pause, then proceed past the sign before looking again
OD_PAUSE_TIME = 2.0 # after stop sequence completes, pause for n seconds
#Object Detector - Pass a Cone protocol
OD_PASS_CONE = False # enable detection to swerve around a cone in roadway
OD_SPEEDUP_MULTIPLIER = 1.0 # scale throttle of the car as it passes the object (cone)
OD_MAX_ANGLE = 0.75 # maximum and minimum drive angle target
OD_TOLERANCE = 0.45 # drive angle is set to be larger than TOLERANCE + CONE ANGLE or smaller than TOL - CONE angle

# FPS counter
SHOW_FPS = False
FPS_DEBUG_INTERVAL = 10 # the interval in seconds for printing the frequency info into the shell
Loading