Skip to content

Commit

Permalink
Adjust package for new version of isar
Browse files Browse the repository at this point in the history
  • Loading branch information
mrica-equinor committed Oct 10, 2024
1 parent b4b6df4 commit 5a44c04
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 88 deletions.
30 changes: 15 additions & 15 deletions src/isar_robot/inspections.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Video,
VideoMetadata,
)
from robot_interface.models.mission.step import (
from robot_interface.models.mission.task import (
RecordAudio,
TakeImage,
TakeThermalImage,
Expand All @@ -40,16 +40,16 @@
)


def create_image(step: Union[TakeImage, TakeThermalImage]):
def create_image(task_actions: Union[TakeImage, TakeThermalImage]):
now: datetime = datetime.utcnow()
image_metadata: ImageMetadata = ImageMetadata(
start_time=now,
pose=telemetry.get_pose(),
file_type="jpg",
)
image_metadata.tag_id = step.tag_id
image_metadata.tag_id = task_actions.tag_id
image_metadata.analysis_type = ["test1", "test2"]
image_metadata.additional = step.metadata
image_metadata.additional = task_actions.metadata

image: Image = Image(metadata=image_metadata)

Expand All @@ -59,17 +59,17 @@ def create_image(step: Union[TakeImage, TakeThermalImage]):
return [image]


def create_video(step: TakeVideo):
def create_video(task_actions: TakeVideo):
now: datetime = datetime.utcnow()
video_metadata: VideoMetadata = VideoMetadata(
start_time=now,
pose=telemetry.get_pose(),
file_type="mp4",
duration=11,
)
video_metadata.tag_id = step.tag_id
video_metadata.tag_id = task_actions.tag_id
video_metadata.analysis_type = ["test1", "test2"]
video_metadata.additional = step.metadata
video_metadata.additional = task_actions.metadata

video: Video = Video(metadata=video_metadata)

Expand All @@ -79,17 +79,17 @@ def create_video(step: TakeVideo):
return [video]


def create_thermal_video(step: TakeThermalVideo):
def create_thermal_video(task_actions: TakeThermalVideo):
now: datetime = datetime.utcnow()
thermal_video_metadata: ThermalVideoMetadata = ThermalVideoMetadata(
start_time=now,
pose=telemetry.get_pose(),
file_type="mp4",
duration=step.duration,
duration=task_actions.duration,
)
thermal_video_metadata.tag_id = step.tag_id
thermal_video_metadata.tag_id = task_actions.tag_id
thermal_video_metadata.analysis_type = ["test1", "test2"]
thermal_video_metadata.additional = step.metadata
thermal_video_metadata.additional = task_actions.metadata

thermal_video: ThermalVideo = ThermalVideo(metadata=thermal_video_metadata)

Expand All @@ -99,17 +99,17 @@ def create_thermal_video(step: TakeThermalVideo):
return [thermal_video]


def create_audio(step: RecordAudio):
def create_audio(task_actions: RecordAudio):
now: datetime = datetime.utcnow()
audio_metadata: AudioMetadata = AudioMetadata(
start_time=now,
pose=telemetry.get_pose(),
file_type="wav",
duration=step.duration,
duration=task_actions.duration,
)
audio_metadata.tag_id = step.tag_id
audio_metadata.tag_id = task_actions.tag_id
audio_metadata.analysis_type = ["test1", "test2"]
audio_metadata.additional = step.metadata
audio_metadata.additional = task_actions.metadata

audio: Audio = Audio(metadata=audio_metadata)

Expand Down
96 changes: 44 additions & 52 deletions src/isar_robot/robotinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from robot_interface.models.initialize import InitializeParams
from robot_interface.models.inspection.inspection import Inspection
from robot_interface.models.mission.mission import Mission
from robot_interface.models.mission.status import MissionStatus, RobotStatus, StepStatus
from robot_interface.models.mission.step import (
InspectionStep,
from robot_interface.models.mission.task import Task
from robot_interface.models.mission.status import MissionStatus, RobotStatus, TaskStatus
from robot_interface.models.mission.task import (
InspectionTask,
RecordAudio,
Step,
TakeImage,
TakeThermalImage,
TakeThermalVideo,
Expand All @@ -25,80 +25,72 @@
from isar_robot.config.settings import settings
from isar_robot.utilities import (
is_localization_mission,
is_localization_step,
is_localization_task,
is_return_to_home_mission,
is_return_to_home_step,
is_return_to_home_task,
)


class Robot(RobotInterface):
def __init__(self) -> None:
self.logger: Logger = logging.getLogger("isar_robot")
self.current_mission: Optional[Mission] = None
self.current_step: Optional[Step] = None
self.current_task: Optional[Task] = None

def initiate_mission(self, mission: Mission) -> None:
time.sleep(settings.MISSION_DURATION_IN_SECONDS)
self.current_mission = mission
self.current_task_ix = 0
self.current_task = mission.tasks[self.current_task_ix]
self.task_len = len(mission.tasks)

def mission_status(self) -> MissionStatus:
if is_localization_mission(self.current_mission):
self.current_mission = None
if settings.SHOULD_FAIL_LOCALIZATION_MISSION:
return MissionStatus.Failed
return MissionStatus.Successful

if is_return_to_home_mission(self.current_mission):
self.current_step = None
if settings.SHOULD_FAIL_RETURN_TO_HOME_MISSION:
return MissionStatus.Failed
return MissionStatus.Successful

self.current_mission = None
if settings.SHOULD_FAIL_NORMAL_MISSION:
return MissionStatus.Failed
return MissionStatus.Successful

def initiate_step(self, step: Step) -> None:
self.logger.info(f"Initiated step of type {step.__class__.__name__}")
self.current_step = step
def initiate_task(self, task: Task) -> None:
self.logger.info(f"Initiated task of type {task.__class__.__name__}")
self.current_task = task
time.sleep(settings.STEP_DURATION_IN_SECONDS)

def step_status(self) -> StepStatus:
def task_status(self, task_id: str) -> TaskStatus:

next_task: Task = None
if self.current_mission:
if is_localization_mission(self.current_mission):
if settings.SHOULD_FAIL_LOCALIZATION_MISSION:
return StepStatus.Failed
if self.current_task_ix < self.task_len - 1:
self.current_task_ix = self.current_task_ix + 1
next_task = self.current_mission.tasks[self.current_task_ix]

if is_localization_step(self.current_step):
self.current_step = None
# This only happens for spetwise
if is_localization_task(self.current_task):
self.current_task = None
if settings.SHOULD_FAIL_LOCALIZATION_STEP:
return StepStatus.Failed
return StepStatus.Successful
return TaskStatus.Failed
return TaskStatus.Successful

if is_return_to_home_step(self.current_step):
self.current_step = None
# This only happens for last task in mission
if is_return_to_home_task(self.current_task):
self.current_task = None
if settings.SHOULD_FAIL_RETURN_TO_HOME_STEP:
return StepStatus.Failed
return StepStatus.Successful
return TaskStatus.Failed
return TaskStatus.Successful

self.current_step = None
if next_task:
self.current_task = next_task
else:
self.current_task = None
if settings.SHOULD_FAIL_NORMAL_STEP:
return StepStatus.Failed
return StepStatus.Successful
return TaskStatus.Failed
return TaskStatus.Successful

def stop(self) -> None:
return

def get_inspections(self, step: InspectionStep) -> Sequence[Inspection]:
if type(step) in [TakeImage, TakeThermalImage]:
return inspections.create_image(step)
elif type(step) is TakeVideo:
return inspections.create_video(step)
elif type(step) is TakeThermalVideo:
return inspections.create_thermal_video(step)
elif type(step) is RecordAudio:
return inspections.create_audio(step)
def get_inspection(self, task: InspectionTask) -> Inspection:
if type(task) in [TakeImage, TakeThermalImage]:
return inspections.create_image(task)
elif type(task) is TakeVideo:
return inspections.create_video(task)
elif type(task) is TakeThermalVideo:
return inspections.create_thermal_video(task)
elif type(task) is RecordAudio:
return inspections.create_audio(task)
else:
return None

Expand Down
24 changes: 12 additions & 12 deletions src/isar_robot/utilities.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
from robot_interface.models.mission.mission import Mission
from robot_interface.models.mission.step import Localize, ReturnToHome, Step
from robot_interface.models.mission.task import Task
from robot_interface.models.mission.task import Localize, ReturnToHome


def is_localization_mission(mission: Mission):
if mission.start_pose is not None:
if len(mission.tasks) != 1:
return False
if isinstance(mission.tasks[0], Localize):
return True

return False


def is_localization_step(step: Step):
return isinstance(step, Localize)
def is_localization_task(task: Task):
return isinstance(task, Localize)


def is_return_to_home_mission(mission: Mission):
if len(mission.tasks) != 1:
return False
if len(mission.tasks[0].steps) != 1:
return False
if not isinstance(mission.tasks[0].steps[0], ReturnToHome):
return False
return True
if isinstance(mission.tasks[0], ReturnToHome):
return True
return False


def is_return_to_home_step(step: Step):
return isinstance(step, ReturnToHome)
def is_return_to_home_task(task: Task):
return isinstance(task, ReturnToHome)
22 changes: 13 additions & 9 deletions tests/test_inspections.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
from alitra import Frame, Position
from robot_interface.models.mission.step import RecordAudio, TakeImage, TakeThermalVideo
from robot_interface.models.mission.task_action import (
RecordAudio,
TakeImage,
TakeThermalVideo,
)

from isar_robot import inspections

target = Position(x=0, y=0, z=0, frame=Frame("robot"))


def test_create_image():
step = TakeImage(target=target)
task_actions = TakeImage(target=target)

list_of_images = inspections.create_image(step)
list_of_images = inspections.create_image(task_actions)

assert len(list_of_images) == 1

Expand All @@ -18,9 +22,9 @@ def test_create_image():


def test_create_video():
step = TakeImage(target=target)
task_actions = TakeImage(target=target)

list_of_videos = inspections.create_video(step)
list_of_videos = inspections.create_video(task_actions)

assert len(list_of_videos) == 1

Expand All @@ -29,9 +33,9 @@ def test_create_video():


def test_create_thermal_video():
step = TakeThermalVideo(target=target, duration=10)
task_actions = TakeThermalVideo(target=target, duration=10)

list_of_thermal_videos = inspections.create_thermal_video(step)
list_of_thermal_videos = inspections.create_thermal_video(task_actions)

assert len(list_of_thermal_videos) == 1

Expand All @@ -41,9 +45,9 @@ def test_create_thermal_video():


def test_create_audio():
step = RecordAudio(target=target, duration=10)
task_actions = RecordAudio(target=target, duration=10)

list_of_audio_recordings = inspections.create_audio(step)
list_of_audio_recordings = inspections.create_audio(task_actions)

assert len(list_of_audio_recordings) == 1

Expand Down

0 comments on commit 5a44c04

Please sign in to comment.