Skip to content

Commit

Permalink
feat: manage the object detection and actions with mediapipe
Browse files Browse the repository at this point in the history
  • Loading branch information
crazy-luke committed Sep 2, 2024
1 parent 21df551 commit 45a446b
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 0 deletions.
22 changes: 22 additions & 0 deletions donkeycar/parts/object_detector/action_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import time
import logging
from donkeycar.parts.object_detector.detector_manager import ActionProtocol

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


ACTION_DEMO_TRIGGER_TIMES = 10

class ActionDemo(ActionProtocol):
def __init__(self, **kwargs):
self.__run_trigger = 0
super().__init__(**kwargs)

def manage(self, angle, throttle, found: bool, position):
reset_action = False
self.__run_trigger += 1
if not found or self.__run_trigger >= ACTION_DEMO_TRIGGER_TIMES:
self.__run_trigger = 0
reset_action = True
return angle, throttle, reset_action
89 changes: 89 additions & 0 deletions donkeycar/parts/object_detector/action_stop_and_go.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import time
import logging
from donkeycar.parts.object_detector.detector_manager import ActionProtocol

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


class StopManager():
# Stop states
IDLE = 0
INITIATE = 1
POS_ONE = 3
NEG_ONE = 2
NEG_TWO = 4
THROTTLE_INC = 0.2

def __init__(self):
self.stop_state = self.IDLE
self.last_throttle = 0.0

def stop(self):
if self.stop_state == self.IDLE:
self.stop_state = self.INITIATE

def is_idle(self):
return self.stop_state == self.IDLE

def throttle(self):
# if self.stop_state == self.IDLE:
# pass
throttle = 0.0
if self.stop_state == self.INITIATE:
self.stop_state = self.NEG_ONE
throttle = -1.0
elif self.stop_state == self.NEG_ONE:
self.stop_state = self.POS_ONE
throttle = 0.0
elif self.stop_state == self.POS_ONE:
self.stop_state = self.NEG_TWO
throttle = -1.0
elif self.stop_state == self.NEG_TWO:
throttle = self.last_throttle + self.THROTTLE_INC
if throttle >= 0.0:
throttle = 0.0
self.stop_state = self.IDLE
self.last_throttle = throttle
return throttle


class ActionStopAndGo(ActionProtocol):
# Stop and Go protocol States
RUNNING = 0
STOPPING = 1
PAUSING = 2
PASSING = 3

def __init__(self, pause_time=2.0, **kwargs):
super().__init__(**kwargs)
self.pause = pause_time
self.state = self.RUNNING
self.timeout = 0.0
self.stopper = StopManager()

def manage(self, angle, throttle, found: bool, position):
reset_action = False
logger.debug(f'self.state: {self.state}')
if self.state == self.RUNNING:
if found:
self.state = self.STOPPING
self.stopper.stop()
else:
reset_action = True
if self.state == self.STOPPING:
throttle = self.stopper.throttle()
if self.stopper.is_idle():
self.state = self.PAUSING
self.timeout = time.time() + self.pause
elif self.state == self.PAUSING:
if time.time() < self.timeout:
throttle = 0.0
else:
self.state = self.PASSING
elif self.state == self.PASSING:
if not found:
self.state = self.RUNNING
reset_action = True

return angle, throttle, reset_action
155 changes: 155 additions & 0 deletions donkeycar/parts/object_detector/detector_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""
detector_manager.py
Donkeycar Parts to manage a sequence of events based upon object detection
DetectorManager is a Donkeycar part that manages the object detection and the actions.
ActionProtocol is a base class for the actions that can be managed by the DetectorManager.
* action_demo.py: An example that shows how to create an action.
* action_pass_object.py: Pass the target object if it is detected.
Mediapipe_Object_Detector: detects objects in the image using the mediapipe object detection model.
How to use:
1. Download the object detection model to the Car directory
https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite0/int8/1/efficientdet_lite0.tflite
2. configure myconfig.py
2.1 Set OBJECT_DETECTOR = True to enable object detection.
2.2 OD_ACTION_DEMO = True, which allows recognition of OD_ACTION_DEMO_LABEL, default value is person.
2.3 OD_ACTION_STOP_AND_GO = True to enable the stop sign feature.
"""

import time
import logging
from donkeycar.parts.object_detector.mediapipe_object_detetor import MediapipeObjectDetector

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

MARK_TEXT_MARGIN = 10 # pixels
MARK_TEXT_ROW_SIZE = 30
MARK_TEXT_SIZE = 1
MARK_TEXT_THICKNESS = 1
MARK_TEXT_COLOR = (0, 255, 0)

class ActionProtocol:
def __init__(self, od_label: str):
self.od_label = od_label

def manage(self, angle, throttle, found: bool, position):
reset_action = True
return angle, throttle, reset_action

class DetectorManager:

def __init__(self,
od_model_path,
score=0.5,
image_width=160,
run_hz=1, # 1 per second
vehicle_hz=20,
show_bounding_box = True):

self.on = True
self.width = image_width
self.img_center = self.width / 2

self.running_action = None

self.run_counter = 0
self.run_trigger = int(vehicle_hz / run_hz)
self.run_inprogress = False

self.show_bounding_box = show_bounding_box

self.image = None
self.bbox = None
self.score = 0
self.label = None
self.position = 0.0

self.__actions = {}
self._od_labels =[]

self.detector = MediapipeObjectDetector(
od_model_path=od_model_path,
max_results=3,
score_threshold=score)

def run(self, angle, throttle, image):
self.run_counter += 1
start = time.time()
if self.run_counter >= self.run_trigger:
logger.debug(f'self.run_counter: {self.run_counter}')
self.image = image
self._detect()
if self.show_bounding_box and self.bbox is not None:
self._mark(self.image, self.bbox, self.label)

angle, throttle = self._dispatch_action(self.label,angle, throttle)
logger.debug(f'run_time_cost:{(time.time() - start):5.3f}')
return angle, throttle, image

def shutdown(self):
logger.info(
f'Detector - average detection time {self.detector.average_perf():5.3f}')
self.on = False

def addAction(self,action: ActionProtocol):
logger.info(f'addAction label:{action.od_label}')
self._od_labels.append(action.od_label)
self.__actions[action.od_label] = action

def _mark(self, image, bbox, label):
import cv2
# top left corner of rectangle
start_point = (bbox.origin_x, bbox.origin_y)
# bottom right corner of rectangle
end_point = (bbox.origin_x + bbox.width, bbox.origin_y + bbox.height)
color = (255, 0, 0) # Red color
thickness = 1
image = cv2.rectangle(image, start_point, end_point, color, thickness)

text_location = (MARK_TEXT_MARGIN + bbox.origin_x,
MARK_TEXT_MARGIN + MARK_TEXT_ROW_SIZE + bbox.origin_y)
cv2.putText(image, label, text_location, cv2.FONT_HERSHEY_DUPLEX,
MARK_TEXT_SIZE, MARK_TEXT_COLOR, MARK_TEXT_THICKNESS, cv2.LINE_AA)

def _detect(self):
self.bbox = None
self.score = 0
self.label = None
self.position = 0.0
if self.image is not None:
results = self.detector.detect(self.image)
for label, bbox, score in results:
if label in self._od_labels:
self.bbox = bbox
self.score = score
self.label = label
self.position = ((self.bbox.origin_x + (self.bbox.width / 2)) - self.img_center) / self.img_center
logger.debug(f'object label:{self.label }, bbox:{self.bbox}, score:{self.score}, position:{self.position }')
break

def _dispatch_action(self, label, angle, throttle):
action_label = self.running_action

if action_label == None: # if no action is running then check if there is an action for the label
if label in self.__actions:
self.running_action = label
action_label = label

if action_label != None: # if there is an action running then manage it
# if the label is the same as the action label then found is True
found = True if label == action_label else False

angle, throttle, reset_action = self.__actions[action_label].manage(angle, throttle, found, self.position)
if reset_action:
self.run_counter = 0
self.running_action = None
logger.info(f'dispatch action_label:{action_label}, reset_action:{reset_action}, angle:{angle}, throttle:{throttle}')
else:
self.run_counter = 0 # reset the run counter if no action is running

return angle, throttle
15 changes: 15 additions & 0 deletions donkeycar/templates/cfg_complete.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,18 @@
# PI connection
PI_USERNAME = "pi"
PI_HOSTNAME = "donkeypi.local"


# # Object Detector
OBJECT_DETECTOR = False # enable Detector lab
OBJECT_DETECTOR_SHOW_BOUNDING_BOX = True # show bounding box on the web control
OD_MODEL_NAME ='efficientdet_lite0.tflite' # object detection model name, file path is CAR_PATH/DETECTOR_LAB_MODEL_NAME
OD_SCORE = 0.5 # Set the score threshold for detection.
OD_RUN_HZ = 1 # Run detection algorithm n times per drive_loop_hz ex. 1 time every 20 drive loop

OD_ACTION_DEMO = False # enable detection to trigger a demo action
OD_ACTION_DEMO_LABEL = "person" # label to trigger demo action

OD_ACTION_STOP_AND_GO = False # enable detection to stop and go
OD_ACTION_STOP_AND_GO_LABEL = "stop sign" # label to trigger stop and go
OD_ACTION_STOP_AND_GO_PAUSE_TIME = 2.0 # after stop sequence completes, pause for n seconds
28 changes: 28 additions & 0 deletions donkeycar/templates/complete.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,34 @@ def run(self, *components):
V.add(ThrottleFilter(),
inputs=['pilot/throttle'],
outputs=['pilot/throttle'])


# Stop at a stop sign and pause for n seconds then proceed
elif cfg.OBJECT_DETECTOR:
logging.info(f"OBJECT_DETECTOR = {cfg.OBJECT_DETECTOR}")

from donkeycar.parts.object_detector.detector_manager import DetectorManager
dm = DetectorManager(od_model_path = os.path.join(cfg.CAR_PATH, cfg.OD_MODEL_NAME),
score = cfg.OD_SCORE,
image_width = cfg.IMAGE_W,
run_hz = cfg.OD_RUN_HZ,
vehicle_hz = cfg.DRIVE_LOOP_HZ,
show_bounding_box = cfg.OBJECT_DETECTOR_SHOW_BOUNDING_BOX)
if cfg.OD_ACTION_DEMO:
from donkeycar.parts.object_detector.action_demo import ActionDemo
action = ActionDemo(od_label=cfg.OD_ACTION_DEMO_LABEL)
dm.addAction(action)

if cfg.OD_ACTION_STOP_AND_GO:
from donkeycar.parts.object_detector.action_stop_and_go import ActionStopAndGo
action = ActionStopAndGo(od_label=cfg.OD_ACTION_STOP_AND_GO_LABEL,
pause_time = cfg.OD_ACTION_STOP_AND_GO_PAUSE_TIME)
dm.addAction(action)

V.add(dm,
inputs=['pilot/angle', 'pilot/throttle', 'cam/image_array'],
outputs=['pilot/angle', 'pilot/throttle', 'cam/image_array'],
run_condition="run_pilot")

#
# to give the car a boost when starting ai mode in a race.
Expand Down

0 comments on commit 45a446b

Please sign in to comment.