From 45a446bf0e233a09855303c70f868ba8d0ecf207 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=96=AF=E7=8B=82=E8=B1=86=20Luke?= Date: Tue, 3 Sep 2024 00:50:07 +0800 Subject: [PATCH] feat: manage the object detection and actions with mediapipe --- .../parts/object_detector/action_demo.py | 22 +++ .../object_detector/action_stop_and_go.py | 89 ++++++++++ .../parts/object_detector/detector_manager.py | 155 ++++++++++++++++++ donkeycar/templates/cfg_complete.py | 15 ++ donkeycar/templates/complete.py | 28 ++++ 5 files changed, 309 insertions(+) create mode 100755 donkeycar/parts/object_detector/action_demo.py create mode 100755 donkeycar/parts/object_detector/action_stop_and_go.py create mode 100755 donkeycar/parts/object_detector/detector_manager.py mode change 100644 => 100755 donkeycar/templates/cfg_complete.py mode change 100644 => 100755 donkeycar/templates/complete.py diff --git a/donkeycar/parts/object_detector/action_demo.py b/donkeycar/parts/object_detector/action_demo.py new file mode 100755 index 000000000..1cd7cc8d5 --- /dev/null +++ b/donkeycar/parts/object_detector/action_demo.py @@ -0,0 +1,22 @@ +import time +import logging +from donkeycar.parts.object_detector.detector_manager import ActionProtocol + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +ACTION_DEMO_TRIGGER_TIMES = 10 + +class ActionDemo(ActionProtocol): + def __init__(self, **kwargs): + self.__run_trigger = 0 + super().__init__(**kwargs) + + def manage(self, angle, throttle, found: bool, position): + reset_action = False + self.__run_trigger += 1 + if not found or self.__run_trigger >= ACTION_DEMO_TRIGGER_TIMES: + self.__run_trigger = 0 + reset_action = True + return angle, throttle, reset_action diff --git a/donkeycar/parts/object_detector/action_stop_and_go.py b/donkeycar/parts/object_detector/action_stop_and_go.py new file mode 100755 index 000000000..1c7db5a0b --- /dev/null +++ b/donkeycar/parts/object_detector/action_stop_and_go.py @@ -0,0 +1,89 @@ +import time +import logging +from donkeycar.parts.object_detector.detector_manager import ActionProtocol + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class StopManager(): + # Stop states + IDLE = 0 + INITIATE = 1 + POS_ONE = 3 + NEG_ONE = 2 + NEG_TWO = 4 + THROTTLE_INC = 0.2 + + def __init__(self): + self.stop_state = self.IDLE + self.last_throttle = 0.0 + + def stop(self): + if self.stop_state == self.IDLE: + self.stop_state = self.INITIATE + + def is_idle(self): + return self.stop_state == self.IDLE + + def throttle(self): + # if self.stop_state == self.IDLE: + # pass + throttle = 0.0 + if self.stop_state == self.INITIATE: + self.stop_state = self.NEG_ONE + throttle = -1.0 + elif self.stop_state == self.NEG_ONE: + self.stop_state = self.POS_ONE + throttle = 0.0 + elif self.stop_state == self.POS_ONE: + self.stop_state = self.NEG_TWO + throttle = -1.0 + elif self.stop_state == self.NEG_TWO: + throttle = self.last_throttle + self.THROTTLE_INC + if throttle >= 0.0: + throttle = 0.0 + self.stop_state = self.IDLE + self.last_throttle = throttle + return throttle + + +class ActionStopAndGo(ActionProtocol): + # Stop and Go protocol States + RUNNING = 0 + STOPPING = 1 + PAUSING = 2 + PASSING = 3 + + def __init__(self, pause_time=2.0, **kwargs): + super().__init__(**kwargs) + self.pause = pause_time + self.state = self.RUNNING + self.timeout = 0.0 + self.stopper = StopManager() + + def manage(self, angle, throttle, found: bool, position): + reset_action = False + logger.debug(f'self.state: {self.state}') + if self.state == self.RUNNING: + if found: + self.state = self.STOPPING + self.stopper.stop() + else: + reset_action = True + if self.state == self.STOPPING: + throttle = self.stopper.throttle() + if self.stopper.is_idle(): + self.state = self.PAUSING + self.timeout = time.time() + self.pause + elif self.state == self.PAUSING: + if time.time() < self.timeout: + throttle = 0.0 + else: + self.state = self.PASSING + elif self.state == self.PASSING: + if not found: + self.state = self.RUNNING + reset_action = True + + return angle, throttle, reset_action diff --git a/donkeycar/parts/object_detector/detector_manager.py b/donkeycar/parts/object_detector/detector_manager.py new file mode 100755 index 000000000..9d12e76d7 --- /dev/null +++ b/donkeycar/parts/object_detector/detector_manager.py @@ -0,0 +1,155 @@ +""" +detector_manager.py +Donkeycar Parts to manage a sequence of events based upon object detection + + DetectorManager is a Donkeycar part that manages the object detection and the actions. + + ActionProtocol is a base class for the actions that can be managed by the DetectorManager. + * action_demo.py: An example that shows how to create an action. + * action_pass_object.py: Pass the target object if it is detected. + + Mediapipe_Object_Detector: detects objects in the image using the mediapipe object detection model. + +How to use: + 1. Download the object detection model to the Car directory + https://storage.googleapis.com/mediapipe-models/object_detector/efficientdet_lite0/int8/1/efficientdet_lite0.tflite + + 2. configure myconfig.py + 2.1 Set OBJECT_DETECTOR = True to enable object detection. + 2.2 OD_ACTION_DEMO = True, which allows recognition of OD_ACTION_DEMO_LABEL, default value is person. + 2.3 OD_ACTION_STOP_AND_GO = True to enable the stop sign feature. +""" + +import time +import logging +from donkeycar.parts.object_detector.mediapipe_object_detetor import MediapipeObjectDetector + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +MARK_TEXT_MARGIN = 10 # pixels +MARK_TEXT_ROW_SIZE = 30 +MARK_TEXT_SIZE = 1 +MARK_TEXT_THICKNESS = 1 +MARK_TEXT_COLOR = (0, 255, 0) + +class ActionProtocol: + def __init__(self, od_label: str): + self.od_label = od_label + + def manage(self, angle, throttle, found: bool, position): + reset_action = True + return angle, throttle, reset_action + +class DetectorManager: + + def __init__(self, + od_model_path, + score=0.5, + image_width=160, + run_hz=1, # 1 per second + vehicle_hz=20, + show_bounding_box = True): + + self.on = True + self.width = image_width + self.img_center = self.width / 2 + + self.running_action = None + + self.run_counter = 0 + self.run_trigger = int(vehicle_hz / run_hz) + self.run_inprogress = False + + self.show_bounding_box = show_bounding_box + + self.image = None + self.bbox = None + self.score = 0 + self.label = None + self.position = 0.0 + + self.__actions = {} + self._od_labels =[] + + self.detector = MediapipeObjectDetector( + od_model_path=od_model_path, + max_results=3, + score_threshold=score) + + def run(self, angle, throttle, image): + self.run_counter += 1 + start = time.time() + if self.run_counter >= self.run_trigger: + logger.debug(f'self.run_counter: {self.run_counter}') + self.image = image + self._detect() + if self.show_bounding_box and self.bbox is not None: + self._mark(self.image, self.bbox, self.label) + + angle, throttle = self._dispatch_action(self.label,angle, throttle) + logger.debug(f'run_time_cost:{(time.time() - start):5.3f}') + return angle, throttle, image + + def shutdown(self): + logger.info( + f'Detector - average detection time {self.detector.average_perf():5.3f}') + self.on = False + + def addAction(self,action: ActionProtocol): + logger.info(f'addAction label:{action.od_label}') + self._od_labels.append(action.od_label) + self.__actions[action.od_label] = action + + def _mark(self, image, bbox, label): + import cv2 + # top left corner of rectangle + start_point = (bbox.origin_x, bbox.origin_y) + # bottom right corner of rectangle + end_point = (bbox.origin_x + bbox.width, bbox.origin_y + bbox.height) + color = (255, 0, 0) # Red color + thickness = 1 + image = cv2.rectangle(image, start_point, end_point, color, thickness) + + text_location = (MARK_TEXT_MARGIN + bbox.origin_x, + MARK_TEXT_MARGIN + MARK_TEXT_ROW_SIZE + bbox.origin_y) + cv2.putText(image, label, text_location, cv2.FONT_HERSHEY_DUPLEX, + MARK_TEXT_SIZE, MARK_TEXT_COLOR, MARK_TEXT_THICKNESS, cv2.LINE_AA) + + def _detect(self): + self.bbox = None + self.score = 0 + self.label = None + self.position = 0.0 + if self.image is not None: + results = self.detector.detect(self.image) + for label, bbox, score in results: + if label in self._od_labels: + self.bbox = bbox + self.score = score + self.label = label + self.position = ((self.bbox.origin_x + (self.bbox.width / 2)) - self.img_center) / self.img_center + logger.debug(f'object label:{self.label }, bbox:{self.bbox}, score:{self.score}, position:{self.position }') + break + + def _dispatch_action(self, label, angle, throttle): + action_label = self.running_action + + if action_label == None: # if no action is running then check if there is an action for the label + if label in self.__actions: + self.running_action = label + action_label = label + + if action_label != None: # if there is an action running then manage it + # if the label is the same as the action label then found is True + found = True if label == action_label else False + + angle, throttle, reset_action = self.__actions[action_label].manage(angle, throttle, found, self.position) + if reset_action: + self.run_counter = 0 + self.running_action = None + logger.info(f'dispatch action_label:{action_label}, reset_action:{reset_action}, angle:{angle}, throttle:{throttle}') + else: + self.run_counter = 0 # reset the run counter if no action is running + + return angle, throttle diff --git a/donkeycar/templates/cfg_complete.py b/donkeycar/templates/cfg_complete.py old mode 100644 new mode 100755 index d468f6b3c..8ca8ad747 --- a/donkeycar/templates/cfg_complete.py +++ b/donkeycar/templates/cfg_complete.py @@ -764,3 +764,18 @@ # PI connection PI_USERNAME = "pi" PI_HOSTNAME = "donkeypi.local" + + +# # Object Detector +OBJECT_DETECTOR = False # enable Detector lab +OBJECT_DETECTOR_SHOW_BOUNDING_BOX = True # show bounding box on the web control +OD_MODEL_NAME ='efficientdet_lite0.tflite' # object detection model name, file path is CAR_PATH/DETECTOR_LAB_MODEL_NAME +OD_SCORE = 0.5 # Set the score threshold for detection. +OD_RUN_HZ = 1 # Run detection algorithm n times per drive_loop_hz ex. 1 time every 20 drive loop + +OD_ACTION_DEMO = False # enable detection to trigger a demo action +OD_ACTION_DEMO_LABEL = "person" # label to trigger demo action + +OD_ACTION_STOP_AND_GO = False # enable detection to stop and go +OD_ACTION_STOP_AND_GO_LABEL = "stop sign" # label to trigger stop and go +OD_ACTION_STOP_AND_GO_PAUSE_TIME = 2.0 # after stop sequence completes, pause for n seconds \ No newline at end of file diff --git a/donkeycar/templates/complete.py b/donkeycar/templates/complete.py old mode 100644 new mode 100755 index 32c4a7168..f964a9b09 --- a/donkeycar/templates/complete.py +++ b/donkeycar/templates/complete.py @@ -429,6 +429,34 @@ def run(self, *components): V.add(ThrottleFilter(), inputs=['pilot/throttle'], outputs=['pilot/throttle']) + + + # Stop at a stop sign and pause for n seconds then proceed + elif cfg.OBJECT_DETECTOR: + logging.info(f"OBJECT_DETECTOR = {cfg.OBJECT_DETECTOR}") + + from donkeycar.parts.object_detector.detector_manager import DetectorManager + dm = DetectorManager(od_model_path = os.path.join(cfg.CAR_PATH, cfg.OD_MODEL_NAME), + score = cfg.OD_SCORE, + image_width = cfg.IMAGE_W, + run_hz = cfg.OD_RUN_HZ, + vehicle_hz = cfg.DRIVE_LOOP_HZ, + show_bounding_box = cfg.OBJECT_DETECTOR_SHOW_BOUNDING_BOX) + if cfg.OD_ACTION_DEMO: + from donkeycar.parts.object_detector.action_demo import ActionDemo + action = ActionDemo(od_label=cfg.OD_ACTION_DEMO_LABEL) + dm.addAction(action) + + if cfg.OD_ACTION_STOP_AND_GO: + from donkeycar.parts.object_detector.action_stop_and_go import ActionStopAndGo + action = ActionStopAndGo(od_label=cfg.OD_ACTION_STOP_AND_GO_LABEL, + pause_time = cfg.OD_ACTION_STOP_AND_GO_PAUSE_TIME) + dm.addAction(action) + + V.add(dm, + inputs=['pilot/angle', 'pilot/throttle', 'cam/image_array'], + outputs=['pilot/angle', 'pilot/throttle', 'cam/image_array'], + run_condition="run_pilot") # # to give the car a boost when starting ai mode in a race.