Skip to content

Commit

Permalink
core: autopilot-manager: Add flight_controller module
Browse files Browse the repository at this point in the history
* Move flight controller / flight controller detector to its own module
  as preparation for platform spliting
  • Loading branch information
JoaoMario109 committed Oct 3, 2024
1 parent 77856fb commit 15b60a1
Show file tree
Hide file tree
Showing 18 changed files with 128 additions and 117 deletions.
3 changes: 2 additions & 1 deletion core/services/ardupilot_manager/api/v1/routers/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from autopilot_manager import AutoPilotManager
from exceptions import InvalidFirmwareFile
from typedefs import Firmware, FlightController, Parameters, Serial, SITLFrame, Vehicle
from flight_controller import FlightController
from typedefs import Firmware, Parameters, Serial, SITLFrame, Vehicle

index_router_v1 = APIRouter(
tags=["index_v1"],
Expand Down
24 changes: 10 additions & 14 deletions core/services/ardupilot_manager/autopilot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@
NoPreferredBoardSet,
)
from firmware.FirmwareManagement import FirmwareManager
from flight_controller_detector.Detector import Detector as BoardDetector
from flight_controller_detector.linux.linux_boards import LinuxFlightController
from mavlink_proxy.Endpoint import Endpoint, EndpointType
from mavlink_proxy.exceptions import EndpointAlreadyExists
from mavlink_proxy.Manager import Manager as MavlinkManager
from settings import Settings
from typedefs import (
Firmware,
from flight_controller import (
FlightController,
FlightControllerFlags,
Parameters,
Platform,
PlatformType,
Serial,
SITLFrame,
Vehicle,
)
from flight_controller.detector import FlightControllerDetector
from flight_controller.detector.linux.linux_boards import LinuxFlightController
from mavlink_proxy.Endpoint import Endpoint, EndpointType
from mavlink_proxy.exceptions import EndpointAlreadyExists
from mavlink_proxy.Manager import Manager as MavlinkManager
from settings import Settings
from typedefs import Firmware, Parameters, Serial, SITLFrame, Vehicle


class AutoPilotManager(metaclass=Singleton):
Expand Down Expand Up @@ -294,7 +290,7 @@ def get_available_routers(self) -> List[str]:
return [router.name() for router in self.mavlink_manager.available_interfaces()]

async def start_sitl(self) -> None:
self._current_board = BoardDetector.detect_sitl()
self._current_board = FlightControllerDetector.detect_sitl()
if not self.firmware_manager.is_firmware_installed(self._current_board):
self.firmware_manager.install_firmware_from_params(Vehicle.Sub, self._current_board)
frame = self.load_sitl_frame()
Expand Down Expand Up @@ -404,7 +400,7 @@ async def start_mavlink_manager(self, device: Endpoint) -> None:

@staticmethod
async def available_boards(include_bootloaders: bool = False) -> List[FlightController]:
all_boards = await BoardDetector.detect(True)
all_boards = await FlightControllerDetector.detect(True)
if include_bootloaders:
return all_boards
return [board for board in all_boards if FlightControllerFlags.is_bootloader not in board.flags]
Expand Down
3 changes: 2 additions & 1 deletion core/services/ardupilot_manager/firmware/FirmwareInstall.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from exceptions import FirmwareInstallFail, InvalidFirmwareFile, UnsupportedPlatform
from firmware.FirmwareDownload import FirmwareDownloader
from firmware.FirmwareUpload import FirmwareUploader
from typedefs import FirmwareFormat, FlightController, Platform, PlatformType
from flight_controller import FlightController, Platform, PlatformType
from typedefs import FirmwareFormat


def get_board_id(platform: Platform) -> int:
Expand Down
11 changes: 2 additions & 9 deletions core/services/ardupilot_manager/firmware/FirmwareManagement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@
)
from firmware.FirmwareDownload import FirmwareDownloader
from firmware.FirmwareInstall import FirmwareInstaller
from typedefs import (
Firmware,
FirmwareFormat,
FlightController,
Parameters,
Platform,
PlatformType,
Vehicle,
)
from flight_controller import FlightController, Platform, PlatformType
from typedefs import Firmware, FirmwareFormat, Parameters, Vehicle


class FirmwareManager:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from exceptions import InvalidFirmwareFile
from firmware.FirmwareDownload import FirmwareDownloader
from firmware.FirmwareInstall import FirmwareInstaller
from typedefs import FlightController, Platform, Vehicle
from flight_controller import FlightController, Platform
from typedefs import Vehicle


def test_firmware_validation() -> None:
Expand Down
8 changes: 8 additions & 0 deletions core/services/ardupilot_manager/flight_controller/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from flight_controller.flight_controller import (
FlightController,
FlightControllerFlags,
Platform,
PlatformType,
)

__all__ = ["FlightController", "FlightControllerFlags", "Platform", "PlatformType"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from flight_controller.detector.detector import FlightControllerDetector

__all__ = ["FlightControllerDetector"]
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from commonwealth.utils.general import is_running_as_root
from serial.tools.list_ports_linux import SysFS, comports

from flight_controller_detector.board_identification import identifiers
from flight_controller_detector.linux.detector import LinuxFlightControllerDetector
from typedefs import FlightController, FlightControllerFlags, Platform
from flight_controller import FlightController, FlightControllerFlags, Platform
from flight_controller.detector.board_identification import identifiers
from flight_controller.detector.linux.detector import LinuxFlightControllerDetector


class Detector:
class FlightControllerDetector:
@classmethod
async def detect_linux_board(cls) -> Optional[FlightController]:
for _i in range(5):
Expand Down Expand Up @@ -59,16 +59,16 @@ def detect_serial_flight_controllers() -> List[FlightController]:
FlightController(
name=port.product or port.name,
manufacturer=port.manufacturer,
platform=Detector.detect_serial_platform(port)
platform=FlightControllerDetector.detect_serial_platform(port)
or Platform(), # this is just to make CI happy. check line 82
path=port.device,
)
for port in unique_serial_devices
if Detector.detect_serial_platform(port) is not None
if FlightControllerDetector.detect_serial_platform(port) is not None
]
for port in unique_serial_devices:
for board in boards:
if board.path == port.device and Detector.is_serial_bootloader(port):
if board.path == port.device and FlightControllerDetector.is_serial_bootloader(port):
board.flags.append(FlightControllerFlags.is_bootloader)
return boards

Expand Down Expand Up @@ -97,6 +97,6 @@ async def detect(cls, include_sitl: bool = True) -> List[FlightController]:
available.extend(cls().detect_serial_flight_controllers())

if include_sitl:
available.append(Detector.detect_sitl())
available.append(FlightControllerDetector.detect_sitl())

return available
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flight_controller_detector.linux.navigator import NavigatorPi4
from flight_controller.detector.linux.navigator import NavigatorPi4
from typedefs import Platform


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

from loguru import logger

from flight_controller_detector.linux.argonot import Argonot
from flight_controller_detector.linux.linux_boards import LinuxFlightController
from flight_controller_detector.linux.navigator import NavigatorPi4, NavigatorPi5
from flight_controller.detector.linux.argonot import Argonot
from flight_controller.detector.linux.linux_boards import LinuxFlightController
from flight_controller.detector.linux.navigator import NavigatorPi4, NavigatorPi5


class LinuxFlightControllerDetector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from smbus2 import SMBus

from typedefs import FlightController, PlatformType, Serial
from flight_controller import FlightController, PlatformType
from typedefs import Serial


class LinuxFlightController(FlightController):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

from commonwealth.utils.commands import load_file

from flight_controller_detector.linux.linux_boards import LinuxFlightController
from typedefs import Platform, Serial
from flight_controller import Platform
from flight_controller.detector.linux.linux_boards import LinuxFlightController
from typedefs import Serial


class Navigator(LinuxFlightController):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from enum import Enum, auto
from platform import machine
from typing import List, Optional

from pydantic import BaseModel


def get_sitl_platform_name(machine_arch: str) -> str:
"""Get SITL platform name based on machine architecture."""

if "arm" not in machine_arch.lower() and "aarch" not in machine_arch.lower():
return "SITL_x86_64_linux_gnu"
return "SITL_arm_linux_gnueabihf"


# TODO: This class can be deprecated once we move to Python 3.11, which introduces the equivalent StrEnum
class LowerStringEnum(str, Enum):
def __str__(self) -> str:
return self.name.lower()


class PlatformType(LowerStringEnum):
Serial = auto()
Linux = auto()
SITL = auto()
Unknown = auto()


class Platform(str, Enum):
"""Valid Ardupilot platform types.
The Enum values are 1:1 representations of the platforms available on the ArduPilot manifest."""

Pixhawk1 = "Pixhawk1"
Pixhawk4 = "Pixhawk4"
Pixhawk6X = "Pixhawk6X"
Pixhawk6C = "Pixhawk6C"
CubeOrange = "CubeOrange"
GenericSerial = "GenericSerial"
Navigator = "navigator"
Argonot = "argonot"
SITL = get_sitl_platform_name(machine())

@property
def type(self) -> PlatformType:
platform_types = {
Platform.Pixhawk1: PlatformType.Serial,
Platform.Pixhawk4: PlatformType.Serial,
Platform.Pixhawk6X: PlatformType.Serial,
Platform.Pixhawk6C: PlatformType.Serial,
Platform.CubeOrange: PlatformType.Serial,
Platform.GenericSerial: PlatformType.Serial,
Platform.Navigator: PlatformType.Linux,
Platform.Argonot: PlatformType.Linux,
Platform.SITL: PlatformType.SITL,
}
return platform_types.get(self, PlatformType.Unknown)


class FlightControllerFlags(str, Enum):
"""Flags for the Flight-controller class."""

is_bootloader = "is_bootloader"


class FlightController(BaseModel):
"""Flight-controller board."""

name: str
manufacturer: Optional[str]
platform: Platform
path: Optional[str]
flags: List[FlightControllerFlags] = []

@property
def type(self) -> PlatformType:
return self.platform.type
Empty file.
4 changes: 2 additions & 2 deletions core/services/ardupilot_manager/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from args import CommandLineArgs
from autopilot_manager import AutoPilotManager
from flight_controller_detector.Detector import Detector as BoardDetector
from flight_controller.detector.detector import FlightControllerDetector
from settings import SERVICE_NAME

logging.basicConfig(handlers=[InterceptHandler()], level=0)
Expand All @@ -36,7 +36,7 @@
server = Server(config)

if args.sitl:
autopilot.set_preferred_board(BoardDetector.detect_sitl())
autopilot.set_preferred_board(FlightControllerDetector.detect_sitl())
try:
loop.run_until_complete(autopilot.start_ardupilot())
except Exception as start_error:
Expand Down
78 changes: 4 additions & 74 deletions core/services/ardupilot_manager/typedefs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import ipaddress
import re
from enum import Enum, auto
from enum import Enum
from pathlib import Path
from platform import machine
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List

from pydantic import BaseModel, validator

from flight_controller import FlightController


class SITLFrame(str, Enum):
"""Valid SITL frame types"""
Expand Down Expand Up @@ -61,14 +62,6 @@ class SITLFrame(str, Enum):
UNDEFINED = " undefined"


def get_sitl_platform_name(machine_arch: str) -> str:
"""Get SITL platform name based on machine architecture."""

if "arm" not in machine_arch.lower() and "aarch" not in machine_arch.lower():
return "SITL_x86_64_linux_gnu"
return "SITL_arm_linux_gnueabihf"


class Firmware(BaseModel):
"""Simplified representation of a firmware, as available on Ardupilot's manifest."""

Expand All @@ -86,73 +79,10 @@ class Vehicle(str, Enum):
Copter = "Copter"


# TODO: This class can be deprecated once we move to Python 3.11, which introduces the equivalent StrEnum
class LowerStringEnum(str, Enum):
def __str__(self) -> str:
return self.name.lower()


class PlatformType(LowerStringEnum):
Serial = auto()
Linux = auto()
SITL = auto()
Unknown = auto()


class Platform(str, Enum):
"""Valid Ardupilot platform types.
The Enum values are 1:1 representations of the platforms available on the ArduPilot manifest."""

Pixhawk1 = "Pixhawk1"
Pixhawk4 = "Pixhawk4"
Pixhawk6X = "Pixhawk6X"
Pixhawk6C = "Pixhawk6C"
CubeOrange = "CubeOrange"
GenericSerial = "GenericSerial"
Navigator = "navigator"
Argonot = "argonot"
SITL = get_sitl_platform_name(machine())

@property
def type(self) -> PlatformType:
platform_types = {
Platform.Pixhawk1: PlatformType.Serial,
Platform.Pixhawk4: PlatformType.Serial,
Platform.Pixhawk6X: PlatformType.Serial,
Platform.Pixhawk6C: PlatformType.Serial,
Platform.CubeOrange: PlatformType.Serial,
Platform.GenericSerial: PlatformType.Serial,
Platform.Navigator: PlatformType.Linux,
Platform.Argonot: PlatformType.Linux,
Platform.SITL: PlatformType.SITL,
}
return platform_types.get(self, PlatformType.Unknown)


class FlightControllerFlags(str, Enum):
"""Flags for the Flight-controller class."""

is_bootloader = "is_bootloader"


class Parameters(BaseModel):
params: Dict[str, float]


class FlightController(BaseModel):
"""Flight-controller board."""

name: str
manufacturer: Optional[str]
platform: Platform
path: Optional[str]
flags: List[FlightControllerFlags] = []

@property
def type(self) -> PlatformType:
return self.platform.type


class AvailableBoards(BaseModel):
regular: List[FlightController]
bootloaders: List[FlightController]
Expand Down

0 comments on commit 15b60a1

Please sign in to comment.