diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml
new file mode 100644
index 0000000..5a9259e
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/bug_report.yml
@@ -0,0 +1,100 @@
+name: Report an issue with Xiaomi Cloud Map Extractor
+description: Report an issue with Xiaomi Cloud Map Extractor.
+labels: bug
+assignees: 'PiotrMachowski'
+body:
+ - type: markdown
+ attributes:
+ value: |
+ This issue form is for reporting bugs only!
+
+ If you have a question, feature or enhancement request, please use the dedicated form.
+ - type: checkboxes
+ id: checklist
+ attributes:
+ label: Checklist
+ options:
+ - label: I have updated the integration to the latest version available
+ required: true
+ - label: I have checked if the problem is already reported
+ required: true
+ - type: textarea
+ validations:
+ required: true
+ attributes:
+ label: The problem
+ description: >-
+ Describe the issue you are experiencing here.
+ - type: markdown
+ attributes:
+ value: |
+ ## Environment
+ - type: input
+ id: bug-version
+ validations:
+ required: true
+ attributes:
+ label: What version of am integration has described problem?
+ placeholder: vX.X.X
+ - type: input
+ id: last-working-version
+ validations:
+ required: false
+ attributes:
+ label: What was the last working version of an integration?
+ placeholder: vX.X.X
+ description: >
+ If known, otherwise leave blank.
+ - type: input
+ id: vacuum-model
+ validations:
+ required: true
+ attributes:
+ label: What vacuum model do you have problems with?
+ placeholder: roborock.vacuum.s5e
+ - type: input
+ id: ha-version
+ validations:
+ required: true
+ attributes:
+ label: What version of Home Assistant do you use?
+ placeholder: core-
+ description: >
+ Can be found in: [Configuration panel -> Info](https://my.home-assistant.io/redirect/info/).
+
+ [![Open your Home Assistant instance and show your Home Assistant version information.](https://my.home-assistant.io/badges/info.svg)](https://my.home-assistant.io/redirect/info/)
+ - type: dropdown
+ validations:
+ required: true
+ attributes:
+ label: What type of installation are you running?
+ description: >
+ Can be found in: [Configuration panel -> Info](https://my.home-assistant.io/redirect/info/).
+
+ [![Open your Home Assistant instance and show your Home Assistant version information.](https://my.home-assistant.io/badges/info.svg)](https://my.home-assistant.io/redirect/info/)
+ options:
+ - Home Assistant OS
+ - Home Assistant Container
+ - Home Assistant Supervised
+ - Home Assistant Core
+ - type: markdown
+ attributes:
+ value: |
+ # Details
+ - type: textarea
+ validations:
+ required: true
+ attributes:
+ label: Camera's configuration
+ description: |
+ Please provide camera's YAML configuration to help with problem reproduction.
+ render: yaml
+ - type: textarea
+ attributes:
+ label: Errors shown in the HA logs (if applicable)
+ render: shell
+ - type: textarea
+ attributes:
+ label: Additional information
+ description: >
+ If you have any additional information, use the field below.
diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml
new file mode 100644
index 0000000..57ad01b
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/config.yml
@@ -0,0 +1,8 @@
+blank_issues_enabled: false
+contact_links:
+ - name: Ask a question/open a discussion
+ url: https://github.com/PiotrMachowski/Home-Assistant-custom-components-Xiaomi-Cloud-Map-Extractor/discussions
+ about: To ask a question or open a discussion please use a dedicated section.
+ - name: Report a bug in the map card (Lovelace Vacuum Map card)
+ url: https://github.com/PiotrMachowski/lovelace-xiaomi-vacuum-map-card/issues
+ about: This is the issue tracker for the Map Extractor. Please report issues with the card in its repository.
diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml
new file mode 100644
index 0000000..bbf9981
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/feature_request.yml
@@ -0,0 +1,39 @@
+name: Request a feature in Xiaomi Cloud Map Extractor
+description: Request a feature in Xiaomi Cloud Map Extractor
+labels: enhancement
+assignees: 'PiotrMachowski'
+body:
+ - type: markdown
+ attributes:
+ value: |
+ This issue form is for feature requests or enhancements only!
+
+ If you have a bug or a question, please use the dedicated form.
+ - type: textarea
+ validations:
+ required: true
+ attributes:
+ label: Description
+ description: >-
+ A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
+ - type: textarea
+ validations:
+ required: true
+ attributes:
+ label: Solution
+ description: >-
+ A clear and concise description of what you want to happen.
+ - type: textarea
+ validations:
+ required: false
+ attributes:
+ label: Alternatives
+ description: >-
+ A clear and concise description of any alternative solutions or features you've considered.
+ - type: textarea
+ validations:
+ required: false
+ attributes:
+ label: Context
+ description: >-
+ Add any other context about the feature request here.
diff --git a/.github/ISSUE_TEMPLATE/new_platform_request.yml b/.github/ISSUE_TEMPLATE/new_platform_request.yml
new file mode 100644
index 0000000..b6fcce9
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/new_platform_request.yml
@@ -0,0 +1,54 @@
+name: Request support for a new vacuum/platform
+description: Request support for a new vacuum/platform
+labels:
+ - new platform
+ - enhancement
+assignees: 'PiotrMachowski'
+body:
+ - type: markdown
+ attributes:
+ value: |
+ Using this form you can create a request for a new supported vacuum/platform.
+ - type: checkboxes
+ id: checklist
+ attributes:
+ label: Checklist
+ options:
+ - label: I have updated the integration to the latest version available
+ required: true
+ - label: I have checked if the vacuum/platform is already requested
+ required: true
+ - label: I have sent raw map file to `piotr.machowski.dev [at] gmail.com` ([Retrieving map](https://github.com/PiotrMachowski/Home-Assistant-custom-components-Xiaomi-Cloud-Map-Extractor#retrieving-map); please provide your GitHub username in the email)
+ required: false
+ - type: input
+ id: vacuum-model
+ validations:
+ required: true
+ attributes:
+ label: What vacuum model do you want to be supported?
+ placeholder: roborock.vacuum.s5e
+ - type: input
+ id: vacuum-model
+ validations:
+ required: true
+ attributes:
+ label: What is its name?
+ placeholder: Roborock S5 Max
+ - type: checkboxes
+ id: available-apis
+ attributes:
+ label: Available APIs
+ description: >-
+ Please check if map appears when you use following APIs in `force_api` config
+ options:
+ - label: `xiaomi`
+ - label: `viomi`
+ - label: `roidmi`
+ - label: `dreame`
+ - type: textarea
+ attributes:
+ label: Errors shown in the HA logs (if applicable)
+ render: shell
+ - type: textarea
+ attributes:
+ label: Other info
\ No newline at end of file
diff --git a/.github/workflows/lint_python.yml b/.github/workflows/lint_python.yml
index 3ac6408..7faf0eb 100644
--- a/.github/workflows/lint_python.yml
+++ b/.github/workflows/lint_python.yml
@@ -9,15 +9,15 @@ jobs:
- run: pip install --upgrade pip wheel
- run: pip install bandit black codespell flake8 flake8-2020 flake8-bugbear
flake8-comprehensions isort mypy pytest pyupgrade safety
- - run: bandit --recursive --skip B105,B108,B303,B304,B311,B413 .
+ - run: bandit --recursive --skip B105,B108,B303,B304,B324,B311,B413,B506 .
- run: black --check . || true
- run: codespell --ignore-words-list="hass"
- - run: flake8 . --count --ignore=B001,E241,E265,E302,E722,E731,F403,F405,F841,W504
+ - run: flake8 custom_components --count --ignore=B001,E241,E265,E302,E722,E731,F403,F405,F841,W504
--max-complexity=21 --max-line-length=184 --show-source --statistics
- - run: isort --check-only --profile black . || true
+ - run: isort --check-only --profile black custom_components || true
- run: pip install -r requirements.txt || pip install --editable . || true
- run: mkdir --parents --verbose .mypy_cache
- - run: mypy --ignore-missing-imports --install-types --non-interactive . || true
+ - run: mypy --ignore-missing-imports --install-types --non-interactive custom_components || true
- run: pytest . || true
- run: pytest --doctest-modules . || true
- run: shopt -s globstar && pyupgrade --py36-plus **/*.py || true
diff --git a/README.md b/README.md
index 0add843..63c4f81 100644
--- a/README.md
+++ b/README.md
@@ -27,7 +27,7 @@
# Xiaomi Cloud Map Extractor
-This custom integration provides a way to present a live view of a map for Xiaomi, Roborock, Viomi and Roidmi vacuums.
+This custom integration provides a way to present a live view of a map for Xiaomi, Roborock, Viomi, Roidmi and Dreame vacuums.
([Supported devices](#supported-devices))
@@ -191,6 +191,7 @@ camera:
sizes:
charger_radius: 4
vacuum_radius: 6.5
+ path_width: 1
obstacle_radius: 3
ignored_obstacle_radius: 3
obstacle_with_photo_radius: 3
@@ -253,7 +254,7 @@ camera:
| `store_map_raw` | boolean | false | default: `false` | Enables storing raw map data in `store_map_path` directory ([more info](#retrieving-map)). Xiaomi map can be opened with [RoboMapViewer](https://github.com/marcelrv/XiaomiRobotVacuumProtocol/tree/master/RRMapFile). |
| `store_map_image` | boolean | false | default: `false` | Enables storing map image in `store_map_path` path with name `map_image_.png` |
| `store_map_path` | string | false | default: `/tmp` | Storing map data directory |
-| `force_api` | string | false | One of: `xiaomi`, `viomi`, `roidmi` | Forces usage of specific API. |
+| `force_api` | string | false | One of: `xiaomi`, `viomi`, `roidmi`, `dreame` | Forces usage of specific API. |
#### Colors configuration
@@ -264,8 +265,9 @@ camera:
| Color name | Description |
| --- | --- |
- | `color_charger` | Charger position |
- | `color_cleaned_area` | Fill of area that already has been cleaned (Viomi) |
+ | `color_charger` | Charger fill |
+ | `color_charger_outline` | Charger outline |
+ | `color_cleaned_area` | Fill of area that already has been cleaned |
| `color_goto_path` | Path for goto mode |
| `color_grey_wall` | Obstacles (e.g. chairs, table legs) |
| `color_ignored_obstacle_with_photo` | Ignored obstacle with photo mark on a map |
@@ -274,7 +276,7 @@ camera:
| `color_map_outside` | Map outside |
| `color_map_wall_v2` | Walls (for software with rooms support) |
| `color_map_wall` | Walls (for software without rooms support) |
- | `color_new_discovered_area` | Newly discovered areas (Viomi) |
+ | `color_new_discovered_area` | Newly discovered areas |
| `color_no_go_zones_outline` | Outline of no-go zones |
| `color_no_go_zones` | Fill of no-go zones |
| `color_no_mop_zones_outline` | Outline of no-mopping zones |
@@ -283,7 +285,8 @@ camera:
| `color_obstacle` | Obstacle mark on a map |
| `color_path` | Path of a vacuum |
| `color_predicted_path` | Predicted path to a point in goto mode |
- | `color_robo` | Vacuum position |
+ | `color_robo` | Vacuum fill |
+ | `color_robo_outline` | Vacuum outline |
| `color_room_names` | Room names (if available) |
| `color_scan` | Areas not assigned to any room (for software with rooms support) |
| `color_unknown` | Other areas |
@@ -349,12 +352,13 @@ fc-list | grep ttf | sed "s/.*\///"| sed "s/ttf.*/ttf/"
| Parameter | Type | Required | Default value | Description |
|---|---|---|---|---|
- | `charger_radius` | float | false | 4 | Radius of a charger circle. |
- | `vacuum_radius` | float | false | 4 | Radius of a vacuum circle. |
+ | `charger_radius` | float | false | 6 | Radius of a charger circle. |
+ | `vacuum_radius` | float | false | 6 | Radius of a vacuum semi-circle. |
| `obstacle_radius` | float | false | 3 | Radius of an obstacle circle. |
| `ignored_obstacle_radius` | float | false | 3 | Radius of an ignored obstacle circle circle. |
| `obstacle_with_photo_radius` | float | false | 3 | Radius of an obstacle with photo circle. |
| `ignored_obstacle_with_photo_radius` | float | false | 3 | Radius of an ignored obstacle with photo circle. |
+ | `path_width` | float | false | 1 | Width of path line. |
#### Attributes configuration
@@ -414,6 +418,7 @@ This integration was tested on following vacuums:
- `roborock.vacuum.a08` (Roborock S6 Pure)
- `roborock.vacuum.a10` (Roborock S6 MaxV)
- `roborock.vacuum.a15` (Roborock S7)
+ - `roborock.vacuum.a27` (Roborock S7 MaxV)
- Viomi map format:
- `viomi.vacuum.v6` (Viomi Vacuum V2 Pro, Xiaomi Mijia STYJ02YM, Mi Robot Vacuum Mop Pro)
- `viomi.vacuum.v7` (Mi Robot Vacuum-Mop Pro)
@@ -422,6 +427,18 @@ This integration was tested on following vacuums:
- Roidmi map format:
- `roidmi.vacuum.v60` (Roidmi EVE Plus)
- `viomi.vacuum.v18` (Viomi S9)
+ - `zhimi.vacuum.xa1` (Lydsto R1)
+ - Dreame map format:
+ - `dreame.vacuum.mc1808` (Xiaomi Mi Mop/Xiaomi Mijia 1C)
+ - `dreame.vacuum.p2008` (Dreame F9)
+ - `dreame.vacuum.p2009` (Dreame D9)
+ - `dreame.vacuum.p2028` (Dreame Z10 Pro)
+ - `dreame.vacuum.p2029` (Dreame L10 Pro)
+ - `dreame.vacuum.p2036` (Trouver LDS Cleaner)
+ - `dreame.vacuum.p2041o` (Xiaomi Mop 2 Pro+)
+ - `dreame.vacuum.p2140` (Mijia Robot Vacuum-Mop 2C)
+ - `dreame.vacuum.p2157` (MOVA L600)
+ - `dreame.vacuum.p2259` (Dreame D9 Max)
## Unsupported devices
diff --git a/custom_components/xiaomi_cloud_map_extractor/camera.py b/custom_components/xiaomi_cloud_map_extractor/camera.py
index 01fb64e..0568631 100644
--- a/custom_components/xiaomi_cloud_map_extractor/camera.py
+++ b/custom_components/xiaomi_cloud_map_extractor/camera.py
@@ -3,7 +3,11 @@
import time
from datetime import timedelta
from enum import Enum
-from typing import Optional
+from typing import Any, Dict, List, Optional
+
+from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData
+from custom_components.xiaomi_cloud_map_extractor.common.vacuum import XiaomiCloudVacuum
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts
try:
from miio import RoborockVacuum, DeviceException
@@ -22,6 +26,7 @@
from custom_components.xiaomi_cloud_map_extractor.const import *
from custom_components.xiaomi_cloud_map_extractor.dreame.vacuum import DreameVacuum
from custom_components.xiaomi_cloud_map_extractor.roidmi.vacuum import RoidmiVacuum
+from custom_components.xiaomi_cloud_map_extractor.unsupported.vacuum import UnsupportedVacuum
from custom_components.xiaomi_cloud_map_extractor.viomi.vacuum import ViomiVacuum
from custom_components.xiaomi_cloud_map_extractor.xiaomi.vacuum import XiaomiVacuum
@@ -37,12 +42,13 @@
}
DEFAULT_SIZES = {
- CONF_SIZE_VACUUM_RADIUS: 4,
+ CONF_SIZE_VACUUM_RADIUS: 6,
+ CONF_SIZE_PATH_WIDTH: 1,
CONF_SIZE_IGNORED_OBSTACLE_RADIUS: 3,
CONF_SIZE_IGNORED_OBSTACLE_WITH_PHOTO_RADIUS: 3,
CONF_SIZE_OBSTACLE_RADIUS: 3,
CONF_SIZE_OBSTACLE_WITH_PHOTO_RADIUS: 3,
- CONF_SIZE_CHARGER_RADIUS: 4
+ CONF_SIZE_CHARGER_RADIUS: 6
}
COLOR_SCHEMA = vol.Or(
@@ -94,6 +100,8 @@
vol.Optional(CONF_SIZES, default=DEFAULT_SIZES): vol.Schema({
vol.Optional(CONF_SIZE_VACUUM_RADIUS,
default=DEFAULT_SIZES[CONF_SIZE_VACUUM_RADIUS]): POSITIVE_FLOAT_SCHEMA,
+ vol.Optional(CONF_SIZE_PATH_WIDTH,
+ default=DEFAULT_SIZES[CONF_SIZE_PATH_WIDTH]): POSITIVE_FLOAT_SCHEMA,
vol.Optional(CONF_SIZE_IGNORED_OBSTACLE_RADIUS,
default=DEFAULT_SIZES[CONF_SIZE_IGNORED_OBSTACLE_RADIUS]): POSITIVE_FLOAT_SCHEMA,
vol.Optional(CONF_SIZE_IGNORED_OBSTACLE_WITH_PHOTO_RADIUS,
@@ -144,8 +152,10 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
class VacuumCamera(Camera):
- def __init__(self, entity_id, host, token, username, password, country, name, should_poll, image_config, colors,
- drawables, sizes, texts, attributes, store_map_raw, store_map_image, store_map_path, force_api):
+ def __init__(self, entity_id: str, host: str, token: str, username: str, password: str, country: str, name: str,
+ should_poll: bool, image_config: ImageConfig, colors: Colors, drawables: Drawables, sizes: Sizes,
+ texts: Texts, attributes: List[str], store_map_raw: bool, store_map_image: bool, store_map_path: str,
+ force_api: str):
super().__init__()
self.entity_id = entity_id
self.content_type = CONTENT_TYPE
@@ -178,14 +188,14 @@ async def async_added_to_hass(self) -> None:
self.async_schedule_update_ha_state(True)
@property
- def frame_interval(self):
+ def frame_interval(self) -> float:
return 1
def camera_image(self, width: Optional[int] = None, height: Optional[int] = None) -> Optional[bytes]:
return self._image
@property
- def name(self):
+ def name(self) -> str:
return self._name
def turn_on(self):
@@ -195,58 +205,65 @@ def turn_off(self):
self._should_poll = False
@property
- def supported_features(self):
+ def supported_features(self) -> int:
return SUPPORT_ON_OFF
@property
- def extra_state_attributes(self):
+ def extra_state_attributes(self) -> Dict[str, Any]:
attributes = {}
if self._map_data is not None:
- rooms = []
- if self._map_data.rooms is not None:
- rooms = dict(
- filter(lambda x: x[0] is not None, map(lambda x: (x[0], x[1].name), self._map_data.rooms.items())))
- if len(rooms) == 0:
- rooms = list(self._map_data.rooms.keys())
- for name, value in {
- ATTRIBUTE_CALIBRATION: self._map_data.calibration(),
- ATTRIBUTE_CHARGER: self._map_data.charger,
- ATTRIBUTE_CLEANED_ROOMS: self._map_data.cleaned_rooms,
- ATTRIBUTE_COUNTRY: self._country,
- ATTRIBUTE_GOTO: self._map_data.goto,
- ATTRIBUTE_GOTO_PATH: self._map_data.goto_path,
- ATTRIBUTE_GOTO_PREDICTED_PATH: self._map_data.predicted_path,
- ATTRIBUTE_IGNORED_OBSTACLES: self._map_data.ignored_obstacles,
- ATTRIBUTE_IGNORED_OBSTACLES_WITH_PHOTO: self._map_data.ignored_obstacles_with_photo,
- ATTRIBUTE_IMAGE: self._map_data.image,
- ATTRIBUTE_IS_EMPTY: self._map_data.image.is_empty,
- ATTRIBUTE_MAP_NAME: self._map_data.map_name,
- ATTRIBUTE_NO_GO_AREAS: self._map_data.no_go_areas,
- ATTRIBUTE_NO_MOPPING_AREAS: self._map_data.no_mopping_areas,
- ATTRIBUTE_OBSTACLES: self._map_data.obstacles,
- ATTRIBUTE_OBSTACLES_WITH_PHOTO: self._map_data.obstacles_with_photo,
- ATTRIBUTE_PATH: self._map_data.path,
- ATTRIBUTE_ROOM_NUMBERS: rooms,
- ATTRIBUTE_ROOMS: self._map_data.rooms,
- ATTRIBUTE_VACUUM_POSITION: self._map_data.vacuum_position,
- ATTRIBUTE_VACUUM_ROOM: self._map_data.vacuum_room,
- ATTRIBUTE_VACUUM_ROOM_NAME: self._map_data.vacuum_room_name,
- ATTRIBUTE_WALLS: self._map_data.walls,
- ATTRIBUTE_ZONES: self._map_data.zones
- }.items():
- if name in self._attributes:
- attributes[name] = value
+ attributes.update(self.extract_attributes(self._map_data, self._attributes, self._country))
if self._store_map_raw:
attributes[ATTRIBUTE_MAP_SAVED] = self._map_saved
if self._device is not None:
attributes[ATTR_MODEL] = self._device.model
attributes[ATTR_USED_API] = self._used_api
+ if self._connector.two_factor_auth_url is not None:
+ attributes[ATTR_TWO_FACTOR_AUTH] = self._connector.two_factor_auth_url
return attributes
@property
- def should_poll(self):
+ def should_poll(self) -> bool:
return self._should_poll
+ @staticmethod
+ def extract_attributes(map_data: MapData, attributes_to_return: List[str], country) -> Dict[str, Any]:
+ attributes = {}
+ rooms = []
+ if map_data.rooms is not None:
+ rooms = dict(filter(lambda x: x[0] is not None, ((x[0], x[1].name) for x in map_data.rooms.items())))
+ if len(rooms) == 0:
+ rooms = list(map_data.rooms.keys())
+ for name, value in {
+ ATTRIBUTE_CALIBRATION: map_data.calibration(),
+ ATTRIBUTE_CHARGER: map_data.charger,
+ ATTRIBUTE_CLEANED_ROOMS: map_data.cleaned_rooms,
+ ATTRIBUTE_COUNTRY: country,
+ ATTRIBUTE_GOTO: map_data.goto,
+ ATTRIBUTE_GOTO_PATH: map_data.goto_path,
+ ATTRIBUTE_GOTO_PREDICTED_PATH: map_data.predicted_path,
+ ATTRIBUTE_IGNORED_OBSTACLES: map_data.ignored_obstacles,
+ ATTRIBUTE_IGNORED_OBSTACLES_WITH_PHOTO: map_data.ignored_obstacles_with_photo,
+ ATTRIBUTE_IMAGE: map_data.image,
+ ATTRIBUTE_IS_EMPTY: map_data.image.is_empty,
+ ATTRIBUTE_MAP_NAME: map_data.map_name,
+ ATTRIBUTE_NO_GO_AREAS: map_data.no_go_areas,
+ ATTRIBUTE_NO_MOPPING_AREAS: map_data.no_mopping_areas,
+ ATTRIBUTE_OBSTACLES: map_data.obstacles,
+ ATTRIBUTE_OBSTACLES_WITH_PHOTO: map_data.obstacles_with_photo,
+ ATTRIBUTE_PATH: map_data.path,
+ ATTRIBUTE_ROOM_NUMBERS: rooms,
+ ATTRIBUTE_ROOMS: map_data.rooms,
+ ATTRIBUTE_VACUUM_POSITION: map_data.vacuum_position,
+ ATTRIBUTE_VACUUM_ROOM: map_data.vacuum_room,
+ ATTRIBUTE_VACUUM_ROOM_NAME: map_data.vacuum_room_name,
+ ATTRIBUTE_WALLS: map_data.walls,
+ ATTRIBUTE_ZONES: map_data.zones
+ }.items():
+ if name in attributes_to_return:
+ attributes[name] = value
+ return attributes
+
def update(self):
counter = 10
if self._status != CameraStatus.TWO_FACTOR_AUTH_REQUIRED and not self._logged_in:
@@ -292,7 +309,7 @@ def _handle_device(self):
_LOGGER.error("Failed to retrieve model")
self._status = CameraStatus.FAILED_TO_RETRIEVE_DEVICE
- def _handle_map_name(self, counter):
+ def _handle_map_name(self, counter: int) -> str:
map_name = "retry"
if self._device is not None and not self._device.should_get_map_from_vacuum():
map_name = "0"
@@ -312,7 +329,7 @@ def _handle_map_name(self, counter):
counter = counter - 1
return map_name
- def _handle_map_data(self, map_name):
+ def _handle_map_data(self, map_name: str):
_LOGGER.debug("Retrieving map from Xiaomi cloud")
store_map_path = self._store_map_path if self._store_map_raw else None
map_data, map_stored = self._device.get_map(map_name, self._colors, self._drawables, self._texts,
@@ -321,13 +338,15 @@ def _handle_map_data(self, map_name):
# noinspection PyBroadException
try:
_LOGGER.debug("Map data retrieved")
- self._set_map_data(map_data)
self._map_saved = map_stored
- if self._map_data.image.is_empty:
+ if map_data.image.is_empty:
_LOGGER.debug("Map is empty")
self._status = CameraStatus.EMPTY_MAP
+ if self._map_data is None or self._map_data.image.is_empty:
+ self._set_map_data(map_data)
else:
_LOGGER.debug("Map is ok")
+ self._set_map_data(map_data)
self._status = CameraStatus.OK
except:
_LOGGER.warning("Unable to parse map data")
@@ -337,14 +356,14 @@ def _handle_map_data(self, map_name):
_LOGGER.warning("Unable to retrieve map data")
self._status = CameraStatus.UNABLE_TO_RETRIEVE_MAP
- def _set_map_data(self, map_data):
+ def _set_map_data(self, map_data: MapData):
img_byte_arr = io.BytesIO()
map_data.image.data.save(img_byte_arr, format='PNG')
self._image = img_byte_arr.getvalue()
self._map_data = map_data
self._store_image()
- def _create_device(self, user_id, device_id, model):
+ def _create_device(self, user_id: str, device_id: str, model: str) -> XiaomiCloudVacuum:
self._used_api = self._detect_api(model)
if self._used_api == CONF_AVAILABLE_API_XIAOMI:
return XiaomiVacuum(self._connector, self._country, user_id, device_id, model)
@@ -354,21 +373,21 @@ def _create_device(self, user_id, device_id, model):
return RoidmiVacuum(self._connector, self._country, user_id, device_id, model)
if self._used_api == CONF_AVAILABLE_API_DREAME:
return DreameVacuum(self._connector, self._country, user_id, device_id, model)
- return XiaomiVacuum(self._connector, self._country, user_id, device_id, model)
+ return UnsupportedVacuum(self._connector, self._country, user_id, device_id, model)
- def _detect_api(self, model: str):
+ def _detect_api(self, model: str) -> Optional[str]:
if self._forced_api is not None:
return self._forced_api
if model in API_EXCEPTIONS:
return API_EXCEPTIONS[model]
- def list_contains_model(prefixes):
- return len(list(filter(lambda x: model.startswith(x), prefixes))) > 0
+ def list_contains_model(prefixes, model_to_check):
+ return len(list(filter(lambda x: model_to_check.startswith(x), prefixes))) > 0
- filtered = list(filter(lambda x: list_contains_model(x[1]), AVAILABLE_APIS.items()))
+ filtered = list(filter(lambda x: list_contains_model(x[1], model), AVAILABLE_APIS.items()))
if len(filtered) > 0:
return filtered[0][0]
- return CONF_AVAILABLE_API_XIAOMI
+ return None
def _store_image(self):
if self._store_map_image:
diff --git a/custom_components/xiaomi_cloud_map_extractor/common/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/common/image_handler.py
index 9580157..ab9414a 100644
--- a/custom_components/xiaomi_cloud_map_extractor/common/image_handler.py
+++ b/custom_components/xiaomi_cloud_map_extractor/common/image_handler.py
@@ -1,11 +1,14 @@
import logging
-from typing import Callable
+import math
+from typing import Callable, Dict, List
from PIL import Image, ImageDraw, ImageFont
from PIL.Image import Image as ImageType
-from custom_components.xiaomi_cloud_map_extractor.common.map_data import ImageData
+from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, Obstacle, Path, Point, Room, \
+ Wall, Zone
from custom_components.xiaomi_cloud_map_extractor.const import *
+from custom_components.xiaomi_cloud_map_extractor.types import Color, Colors, Sizes, Texts
_LOGGER = logging.getLogger(__name__)
@@ -30,7 +33,9 @@ class ImageHandler:
COLOR_NO_MOPPING_ZONES: (163, 130, 211, 127),
COLOR_NO_MOPPING_ZONES_OUTLINE: (163, 130, 211),
COLOR_CHARGER: (0x66, 0xfe, 0xda, 0x7f),
- COLOR_ROBO: (75, 235, 149),
+ COLOR_CHARGER_OUTLINE: (0x66, 0xfe, 0xda, 0x7f),
+ COLOR_ROBO: (0xff, 0xff, 0xff),
+ COLOR_ROBO_OUTLINE: (0, 0, 0),
COLOR_ROOM_NAMES: (0, 0, 0),
COLOR_OBSTACLE: (0, 0, 0, 128),
COLOR_IGNORED_OBSTACLE: (0, 0, 0, 128),
@@ -60,7 +65,7 @@ class ImageHandler:
COLOR_ROOM_14, COLOR_ROOM_15, COLOR_ROOM_16]
@staticmethod
- def create_empty_map_image(colors, text="NO MAP") -> ImageType:
+ def create_empty_map_image(colors: Colors, text: str = "NO MAP") -> ImageType:
color = ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors)
image = Image.new('RGBA', (300, 200), color=color)
if sum(color[0:3]) > 382:
@@ -73,86 +78,88 @@ def create_empty_map_image(colors, text="NO MAP") -> ImageType:
return image
@staticmethod
- def draw_path(image: ImageData, path, colors, scale):
- ImageHandler.__draw_path__(image, path, ImageHandler.__get_color__(COLOR_PATH, colors), scale)
+ def draw_path(image: ImageData, path: Path, sizes: Sizes, colors: Colors, scale: float):
+ ImageHandler.__draw_path__(image, path, sizes, ImageHandler.__get_color__(COLOR_PATH, colors), scale)
@staticmethod
- def draw_goto_path(image: ImageData, path, colors, scale):
- ImageHandler.__draw_path__(image, path, ImageHandler.__get_color__(COLOR_GOTO_PATH, colors), scale)
+ def draw_goto_path(image: ImageData, path: Path, sizes: Sizes, colors: Colors, scale: float):
+ ImageHandler.__draw_path__(image, path, sizes, ImageHandler.__get_color__(COLOR_GOTO_PATH, colors), scale)
@staticmethod
- def draw_predicted_path(image: ImageData, path, colors, scale):
- ImageHandler.__draw_path__(image, path, ImageHandler.__get_color__(COLOR_PREDICTED_PATH, colors), scale)
+ def draw_predicted_path(image: ImageData, path: Path, sizes: Sizes, colors: Colors, scale: float):
+ ImageHandler.__draw_path__(image, path, sizes, ImageHandler.__get_color__(COLOR_PREDICTED_PATH, colors), scale)
@staticmethod
- def draw_no_go_areas(image: ImageData, areas, colors):
+ def draw_no_go_areas(image: ImageData, areas: List[Area], colors: Colors):
ImageHandler.__draw_areas__(image, areas,
ImageHandler.__get_color__(COLOR_NO_GO_ZONES, colors),
ImageHandler.__get_color__(COLOR_NO_GO_ZONES_OUTLINE, colors))
@staticmethod
- def draw_no_mopping_areas(image: ImageData, areas, colors):
+ def draw_no_mopping_areas(image: ImageData, areas: List[Area], colors: Colors):
ImageHandler.__draw_areas__(image, areas,
ImageHandler.__get_color__(COLOR_NO_MOPPING_ZONES, colors),
ImageHandler.__get_color__(COLOR_NO_MOPPING_ZONES_OUTLINE, colors))
@staticmethod
- def draw_walls(image: ImageData, walls, colors):
+ def draw_walls(image: ImageData, walls: List[Wall], colors: Colors):
draw = ImageDraw.Draw(image.data, 'RGBA')
for wall in walls:
draw.line(wall.to_img(image.dimensions).as_list(),
ImageHandler.__get_color__(COLOR_VIRTUAL_WALLS, colors), width=2)
@staticmethod
- def draw_zones(image: ImageData, zones, colors):
- areas = list(map(lambda z: z.as_area(), zones))
+ def draw_zones(image: ImageData, zones: List[Zone], colors: Colors):
+ areas = [z.as_area() for z in zones]
ImageHandler.__draw_areas__(image, areas,
ImageHandler.__get_color__(COLOR_ZONES, colors),
ImageHandler.__get_color__(COLOR_ZONES_OUTLINE, colors))
@staticmethod
- def draw_charger(image: ImageData, charger, sizes, colors):
+ def draw_charger(image: ImageData, charger: Point, sizes: Sizes, colors: Colors):
color = ImageHandler.__get_color__(COLOR_CHARGER, colors)
+ outline = ImageHandler.__get_color__(COLOR_CHARGER_OUTLINE, colors)
radius = sizes[CONF_SIZE_CHARGER_RADIUS]
- ImageHandler.__draw_circle__(image, charger, radius, color, color)
+ ImageHandler.__draw_pieslice__(image, charger, radius, outline, color)
@staticmethod
- def draw_obstacles(image: ImageData, obstacles, sizes, colors):
+ def draw_obstacles(image: ImageData, obstacles, sizes: Sizes, colors: Colors):
color = ImageHandler.__get_color__(COLOR_OBSTACLE, colors)
radius = sizes[CONF_SIZE_OBSTACLE_RADIUS]
ImageHandler.draw_all_obstacles(image, obstacles, radius, color)
@staticmethod
- def draw_ignored_obstacles(image: ImageData, obstacles, sizes, colors):
+ def draw_ignored_obstacles(image: ImageData, obstacles: List[Obstacle], sizes: Sizes, colors: Colors):
color = ImageHandler.__get_color__(COLOR_IGNORED_OBSTACLE, colors)
radius = sizes[CONF_SIZE_IGNORED_OBSTACLE_RADIUS]
ImageHandler.draw_all_obstacles(image, obstacles, radius, color)
@staticmethod
- def draw_obstacles_with_photo(image: ImageData, obstacles, sizes, colors):
+ def draw_obstacles_with_photo(image: ImageData, obstacles: List[Obstacle], sizes: Sizes, colors: Colors):
color = ImageHandler.__get_color__(COLOR_OBSTACLE_WITH_PHOTO, colors)
radius = sizes[CONF_SIZE_OBSTACLE_WITH_PHOTO_RADIUS]
ImageHandler.draw_all_obstacles(image, obstacles, radius, color)
@staticmethod
- def draw_ignored_obstacles_with_photo(image: ImageData, obstacles, sizes, colors):
+ def draw_ignored_obstacles_with_photo(image: ImageData, obstacles: List[Obstacle], sizes: Sizes, colors: Colors):
color = ImageHandler.__get_color__(COLOR_IGNORED_OBSTACLE_WITH_PHOTO, colors)
radius = sizes[CONF_SIZE_IGNORED_OBSTACLE_WITH_PHOTO_RADIUS]
ImageHandler.draw_all_obstacles(image, obstacles, radius, color)
@staticmethod
- def draw_all_obstacles(image: ImageData, obstacles, radius, color):
+ def draw_all_obstacles(image: ImageData, obstacles: List[Obstacle], radius: float, color: Color):
for obstacle in obstacles:
ImageHandler.__draw_circle__(image, obstacle, radius, color, color)
@staticmethod
- def draw_vacuum_position(image: ImageData, vacuum_position, sizes, colors):
+ def draw_vacuum_position(image: ImageData, vacuum_position: Point, sizes: Sizes, colors: Colors):
color = ImageHandler.__get_color__(COLOR_ROBO, colors)
+ outline = ImageHandler.__get_color__(COLOR_ROBO_OUTLINE, colors)
radius = sizes[CONF_SIZE_VACUUM_RADIUS]
- ImageHandler.__draw_circle__(image, vacuum_position, radius, color, color)
+ ImageHandler.__draw_vacuum__(image, vacuum_position, radius, outline, color)
@staticmethod
- def draw_room_names(image: ImageData, rooms, colors):
+ def draw_room_names(image: ImageData, rooms: Dict[int, Room], colors: Colors):
color = ImageHandler.__get_color__(COLOR_ROOM_NAMES, colors)
for room in rooms.values():
p = room.point()
@@ -170,7 +177,7 @@ def rotate(image: ImageData):
image.data = image.data.transpose(Image.ROTATE_270)
@staticmethod
- def draw_texts(image: ImageData, texts):
+ def draw_texts(image: ImageData, texts: Texts):
for text_config in texts:
x = text_config[CONF_X] * image.data.size[0] / 100
y = text_config[CONF_Y] * image.data.size[1] / 100
@@ -178,44 +185,114 @@ def draw_texts(image: ImageData, texts):
text_config[CONF_FONT], text_config[CONF_FONT_SIZE])
@staticmethod
- def draw_layer(image: ImageData, layer_name):
+ def draw_layer(image: ImageData, layer_name: str):
ImageHandler.__draw_layer__(image, image.additional_layers[layer_name])
@staticmethod
- def __draw_circle__(image: ImageData, center, r, outline, fill):
+ def __use_transparency__(*colors):
+ return any(len(color) > 3 for color in colors)
+
+ @staticmethod
+ def __draw_vacuum__(image: ImageData, vacuum_pos, r, outline, fill):
+ def draw_func(draw: ImageDraw):
+ if vacuum_pos.a is None:
+ vacuum_pos.a = 0
+ point = vacuum_pos.to_img(image.dimensions)
+ r_scaled = r / 16
+ # main outline
+ coords = [point.x - r, point.y - r, point.x + r, point.y + r]
+ draw.ellipse(coords, outline=outline, fill=fill)
+ if r >= 8:
+ # secondary outline
+ r2 = r_scaled * 14
+ x = point.x
+ y = point.y
+ coords = [x - r2, y - r2, x + r2, y + r2]
+ draw.ellipse(coords, outline=outline, fill=None)
+ # bin cover
+ a1 = (vacuum_pos.a + 104) / 180 * math.pi
+ a2 = (vacuum_pos.a - 104) / 180 * math.pi
+ r2 = r_scaled * 13
+ x1 = point.x - r2 * math.cos(a1)
+ y1 = point.y + r2 * math.sin(a1)
+ x2 = point.x - r2 * math.cos(a2)
+ y2 = point.y + r2 * math.sin(a2)
+ draw.line([x1, y1, x2, y2], width=1, fill=outline)
+ # lidar
+ angle = vacuum_pos.a / 180 * math.pi
+ r2 = r_scaled * 3
+ x = point.x + r2 * math.cos(angle)
+ y = point.y - r2 * math.sin(angle)
+ r2 = r_scaled * 4
+ coords = [x - r2, y - r2, x + r2, y + r2]
+ draw.ellipse(coords, outline=outline, fill=fill)
+ # button
+ half_color = (
+ (outline[0] + fill[0]) // 2,
+ (outline[1] + fill[1]) // 2,
+ (outline[2] + fill[2]) // 2
+ )
+ r2 = r_scaled * 10
+ x = point.x + r2 * math.cos(angle)
+ y = point.y - r2 * math.sin(angle)
+ r2 = r_scaled * 2
+ coords = [x - r2, y - r2, x + r2, y + r2]
+ draw.ellipse(coords, outline=half_color, fill=half_color)
+
+ ImageHandler.__draw_on_new_layer__(image, draw_func, 1, ImageHandler.__use_transparency__(outline, fill))
+
+ @staticmethod
+ def __draw_circle__(image: ImageData, center: Point, r: float, outline: Color, fill: Color):
def draw_func(draw: ImageDraw):
point = center.to_img(image.dimensions)
coords = [point.x - r, point.y - r, point.x + r, point.y + r]
draw.ellipse(coords, outline=outline, fill=fill)
- ImageHandler.__draw_on_new_layer__(image, draw_func)
+ ImageHandler.__draw_on_new_layer__(image, draw_func, 1, ImageHandler.__use_transparency__(outline, fill))
@staticmethod
- def __draw_areas__(image: ImageData, areas, fill, outline):
+ def __draw_pieslice__(image: ImageData, position, r, outline, fill):
+ def draw_func(draw: ImageDraw):
+ point = position.to_img(image.dimensions)
+ angle = -position.a if position.a is not None else 0
+ coords = [point.x - r, point.y - r, point.x + r, point.y + r]
+ draw.pieslice(coords, angle + 90, angle - 90, outline="black", fill=fill)
+
+ ImageHandler.__draw_on_new_layer__(image, draw_func, 1, ImageHandler.__use_transparency__(outline, fill))
+
+ @staticmethod
+ def __draw_areas__(image: ImageData, areas: List[Area], fill: Color, outline: Color):
if len(areas) == 0:
return
+
+ use_transparency = ImageHandler.__use_transparency__(outline, fill)
for area in areas:
def draw_func(draw: ImageDraw):
draw.polygon(area.to_img(image.dimensions).as_list(), fill, outline)
- ImageHandler.__draw_on_new_layer__(image, draw_func)
+ ImageHandler.__draw_on_new_layer__(image, draw_func, 1, use_transparency)
@staticmethod
- def __draw_path__(image: ImageData, path, color, scale):
- if len(path.path) < 2:
+ def __draw_path__(image: ImageData, path: Path, sizes: Sizes, color: Color, scale: float):
+ if len(path.path) < 1:
return
+ path_width = sizes[CONF_SIZE_PATH_WIDTH]
+
def draw_func(draw: ImageDraw):
- s = path.path[0].to_img(image.dimensions)
- for point in path.path[1:]:
- e = point.to_img(image.dimensions)
- draw.line([s.x * scale, s.y * scale, e.x * scale, e.y * scale], width=int(scale), fill=color)
- s = e
+ for current_path in path.path:
+ if len(current_path) > 1:
+ s = current_path[0].to_img(image.dimensions)
+ for point in current_path[1:]:
+ e = point.to_img(image.dimensions)
+ draw.line([s.x * scale, s.y * scale, e.x * scale, e.y * scale],
+ width=int(scale * path_width), fill=color)
+ s = e
- ImageHandler.__draw_on_new_layer__(image, draw_func, scale)
+ ImageHandler.__draw_on_new_layer__(image, draw_func, scale, ImageHandler.__use_transparency__(color))
@staticmethod
- def __draw_text__(image: ImageData, text, x, y, color, font_file=None, font_size=None):
+ def __draw_text__(image: ImageData, text: str, x: float, y: float, color: Color, font_file=None, font_size=None):
def draw_func(draw: ImageDraw):
font = ImageFont.load_default()
try:
@@ -229,10 +306,10 @@ def draw_func(draw: ImageDraw):
w, h = draw.textsize(text, font)
draw.text((x - w / 2, y - h / 2), text, font=font, fill=color)
- ImageHandler.__draw_on_new_layer__(image, draw_func)
+ ImageHandler.__draw_on_new_layer__(image, draw_func, 1, ImageHandler.__use_transparency__(color))
@staticmethod
- def __get_color__(name, colors, default_name=None):
+ def __get_color__(name, colors: Colors, default_name: str = None) -> Color:
if name in colors:
return colors[name]
if default_name is None:
@@ -240,17 +317,18 @@ def __get_color__(name, colors, default_name=None):
return ImageHandler.COLORS[default_name]
@staticmethod
- def __draw_on_new_layer__(image: ImageData, draw_function: Callable, scale=1):
- if scale == 1:
- size = image.data.size
+ def __draw_on_new_layer__(image: ImageData, draw_function: Callable, scale: float = 1, use_transparency=False):
+ if scale == 1 and not use_transparency:
+ draw = ImageDraw.Draw(image.data, "RGBA")
+ draw_function(draw)
else:
size = [int(image.data.size[0] * scale), int(image.data.size[1] * scale)]
- layer = Image.new("RGBA", size, (255, 255, 255, 0))
- draw = ImageDraw.Draw(layer, "RGBA")
- draw_function(draw)
- if scale != 1:
- layer = layer.resize(image.data.size, resample=Image.BOX)
- ImageHandler.__draw_layer__(image, layer)
+ layer = Image.new("RGBA", size, (255, 255, 255, 0))
+ draw = ImageDraw.Draw(layer, "RGBA")
+ draw_function(draw)
+ if scale != 1:
+ layer = layer.resize(image.data.size, resample=Image.BOX)
+ ImageHandler.__draw_layer__(image, layer)
@staticmethod
def __draw_layer__(image: ImageData, layer: ImageType):
diff --git a/custom_components/xiaomi_cloud_map_extractor/common/map_data.py b/custom_components/xiaomi_cloud_map_extractor/common/map_data.py
index 8a36a36..e5227c9 100644
--- a/custom_components/xiaomi_cloud_map_extractor/common/map_data.py
+++ b/custom_components/xiaomi_cloud_map_extractor/common/map_data.py
@@ -1,27 +1,31 @@
from __future__ import annotations
-from typing import Dict, List, Optional, Set
+from typing import Any, Callable, Dict, List, Optional, Set
from PIL.Image import Image as ImageType
from custom_components.xiaomi_cloud_map_extractor.const import *
+from custom_components.xiaomi_cloud_map_extractor.types import CalibrationPoints, ImageConfig
class Point:
- def __init__(self, x, y, a=None):
+ def __init__(self, x: float, y: float, a=None):
self.x = x
self.y = y
self.a = a
- def __str__(self):
+ def __str__(self) -> str:
if self.a is None:
return f"({self.x}, {self.y})"
return f"({self.x}, {self.y}, a = {self.a})"
- def __eq__(self, other):
+ def __repr__(self) -> str:
+ return self.__str__()
+
+ def __eq__(self, other: Point) -> bool:
return other is not None and self.x == other.x and self.y == other.y and self.a == other.a
- def as_dict(self):
+ def as_dict(self) -> Dict[str, Any]:
if self.a is None:
return {
ATTR_X: self.x,
@@ -33,10 +37,10 @@ def as_dict(self):
ATTR_A: self.a
}
- def to_img(self, image_dimensions):
+ def to_img(self, image_dimensions) -> Point:
return image_dimensions.to_img(self)
- def rotated(self, image_dimensions):
+ def rotated(self, image_dimensions) -> Point:
alpha = image_dimensions.rotation
w = int(image_dimensions.width * image_dimensions.scale)
h = int(image_dimensions.height * image_dimensions.scale)
@@ -52,27 +56,28 @@ def rotated(self, image_dimensions):
alpha = alpha - 90
return Point(x, y)
- def __mul__(self, other):
+ def __mul__(self, other) -> Point:
return Point(self.x * other, self.y * other, self.a)
- def __truediv__(self, other):
+ def __truediv__(self, other) -> Point:
return Point(self.x / other, self.y / other, self.a)
class Obstacle(Point):
- def __init__(self, x, y, details):
+ def __init__(self, x: float, y: float, details: Dict[str, Any]):
super().__init__(x, y)
self.details = details
- def as_dict(self):
+ def as_dict(self) -> Dict[str, Any]:
return {**super(Obstacle, self).as_dict(), **self.details}
- def __str__(self):
+ def __str__(self) -> str:
return f"({self.x}, {self.y}, details = {self.details})"
class ImageDimensions:
- def __init__(self, top, left, height, width, scale, rotation, img_transformation):
+ def __init__(self, top: int, left: int, height: int, width: int, scale: float, rotation: int,
+ img_transformation: Callable[[Point], Point]):
self.top = top
self.left = left
self.height = height
@@ -81,14 +86,14 @@ def __init__(self, top, left, height, width, scale, rotation, img_transformation
self.rotation = rotation
self.img_transformation = img_transformation
- def to_img(self, point: Point):
+ def to_img(self, point: Point) -> Point:
p = self.img_transformation(point)
return Point((p.x - self.left) * self.scale, (self.height - (p.y - self.top) - 1) * self.scale)
class ImageData:
- def __init__(self, size, top, left, height, width, image_config, data, img_transformation,
- additional_layers: Dict = None):
+ def __init__(self, size: int, top: int, left: int, height: int, width: int, image_config: ImageConfig,
+ data: ImageType, img_transformation: Callable[[Point], Point], additional_layers: dict = None):
trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100)
trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * width / 100)
trim_top = int(image_config[CONF_TRIM][CONF_TOP] * height / 100)
@@ -109,7 +114,7 @@ def __init__(self, size, top, left, height, width, image_config, data, img_trans
else:
self.additional_layers = dict(filter(lambda l: l[1] is not None, additional_layers.items()))
- def as_dict(self):
+ def as_dict(self) -> Dict[str, Any]:
return {
ATTR_SIZE: self.size,
ATTR_OFFSET_Y: self.dimensions.top,
@@ -136,13 +141,14 @@ def create_empty(data: ImageType) -> ImageData:
class Path:
- def __init__(self, point_length, point_size, angle, path: list):
+ def __init__(self, point_length: Optional[int], point_size: Optional[int], angle: Optional[int],
+ path: List[List[Point]]):
self.point_length = point_length
self.point_size = point_size
self.angle = angle
self.path = path
- def as_dict(self):
+ def as_dict(self) -> Dict[str, Any]:
return {
ATTR_POINT_LENGTH: self.point_length,
ATTR_POINT_SIZE: self.point_size,
@@ -152,16 +158,19 @@ def as_dict(self):
class Zone:
- def __init__(self, x0, y0, x1, y1):
+ def __init__(self, x0: float, y0: float, x1: float, y1: float):
self.x0 = x0
self.y0 = y0
self.x1 = x1
self.y1 = y1
- def __str__(self):
+ def __str__(self) -> str:
return f"[{self.x0}, {self.y0}, {self.x1}, {self.y1}]"
- def as_dict(self):
+ def __repr__(self) -> str:
+ return self.__str__()
+
+ def as_dict(self) -> Dict[str, Any]:
return {
ATTR_X0: self.x0,
ATTR_Y0: self.y0,
@@ -169,47 +178,55 @@ def as_dict(self):
ATTR_Y1: self.y1
}
- def as_area(self):
+ def as_area(self) -> Area:
return Area(self.x0, self.y0, self.x0, self.y1, self.x1, self.y1, self.x1, self.y0)
class Room(Zone):
- def __init__(self, number, x0, y0, x1, y1, name=None, pos_x=None, pos_y=None):
+ def __init__(self, number: int, x0: Optional[float], y0: Optional[float], x1: Optional[float], y1: Optional[float],
+ name: str = None, pos_x: float = None, pos_y: float = None):
super().__init__(x0, y0, x1, y1)
self.number = number
self.name = name
self.pos_x = pos_x
self.pos_y = pos_y
- def as_dict(self):
+ def as_dict(self) -> Dict[str, Any]:
super_dict = {**super(Room, self).as_dict()}
if self.name is not None:
super_dict[ATTR_NAME] = self.name
if self.pos_x is not None:
super_dict[ATTR_X] = self.pos_x
- if self.name is not None:
+ if self.pos_y is not None:
super_dict[ATTR_Y] = self.pos_y
return super_dict
- def __str__(self):
+ def __str__(self) -> str:
return f"[number: {self.number}, name: {self.name}, {self.x0}, {self.y0}, {self.x1}, {self.y1}]"
+ def __repr__(self) -> str:
+ return self.__str__()
+
def point(self) -> Optional[Point]:
if self.pos_x is not None and self.pos_y is not None and self.name is not None:
return Point(self.pos_x, self.pos_y)
return None
+
class Wall:
- def __init__(self, x0, y0, x1, y1):
+ def __init__(self, x0: float, y0: float, x1: float, y1: float):
self.x0 = x0
self.y0 = y0
self.x1 = x1
self.y1 = y1
- def __str__(self):
+ def __str__(self) -> str:
return f"[{self.x0}, {self.y0}, {self.x1}, {self.y1}]"
- def as_dict(self):
+ def __repr__(self) -> str:
+ return self.__str__()
+
+ def as_dict(self) -> Dict[str, Any]:
return {
ATTR_X0: self.x0,
ATTR_Y0: self.y0,
@@ -217,17 +234,17 @@ def as_dict(self):
ATTR_Y1: self.y1
}
- def to_img(self, image_dimensions):
+ def to_img(self, image_dimensions) -> Wall:
p0 = Point(self.x0, self.y0).to_img(image_dimensions)
p1 = Point(self.x1, self.y1).to_img(image_dimensions)
return Wall(p0.x, p0.y, p1.x, p1.y)
- def as_list(self):
+ def as_list(self) -> List[float]:
return [self.x0, self.y0, self.x1, self.y1]
class Area:
- def __init__(self, x0, y0, x1, y1, x2, y2, x3, y3):
+ def __init__(self, x0: float, y0: float, x1: float, y1: float, x2: float, y2: float, x3: float, y3: float):
self.x0 = x0
self.y0 = y0
self.x1 = x1
@@ -237,10 +254,13 @@ def __init__(self, x0, y0, x1, y1, x2, y2, x3, y3):
self.x3 = x3
self.y3 = y3
- def __str__(self):
+ def __str__(self) -> str:
return f"[{self.x0}, {self.y0}, {self.x1}, {self.y1}, {self.x2}, {self.y2}, {self.x3}, {self.y3}]"
- def as_dict(self):
+ def __repr__(self) -> str:
+ return self.__str__()
+
+ def as_dict(self) -> Dict[str, Any]:
return {
ATTR_X0: self.x0,
ATTR_Y0: self.y0,
@@ -252,10 +272,10 @@ def as_dict(self):
ATTR_Y3: self.y3
}
- def as_list(self):
+ def as_list(self) -> List[float]:
return [self.x0, self.y0, self.x1, self.y1, self.x2, self.y2, self.x3, self.y3]
- def to_img(self, image_dimensions):
+ def to_img(self, image_dimensions) -> Area:
p0 = Point(self.x0, self.y0).to_img(image_dimensions)
p1 = Point(self.x1, self.y1).to_img(image_dimensions)
p2 = Point(self.x2, self.y2).to_img(image_dimensions)
@@ -264,13 +284,13 @@ def to_img(self, image_dimensions):
class MapData:
- def __init__(self, calibration_center=0, calibration_diff=0):
+ def __init__(self, calibration_center: float = 0, calibration_diff: float = 0):
self._calibration_center = calibration_center
self._calibration_diff = calibration_diff
self.blocks = None
self.charger: Optional[Point] = None
self.goto: Optional[List[Point]] = None
- self.goto_path: Optional[List[Point]] = None
+ self.goto_path: Optional[Path] = None
self.image: Optional[ImageData] = None
self.no_go_areas: Optional[List[Area]] = None
self.no_mopping_areas: Optional[List[Area]] = None
@@ -278,8 +298,8 @@ def __init__(self, calibration_center=0, calibration_diff=0):
self.ignored_obstacles: Optional[List[Obstacle]] = None
self.obstacles_with_photo: Optional[List[Obstacle]] = None
self.ignored_obstacles_with_photo: Optional[List[Obstacle]] = None
- self.path: Optional[List[Point]] = None
- self.predicted_path: Optional[List[Point]] = None
+ self.path: Optional[Path] = None
+ self.predicted_path: Optional[Path] = None
self.rooms: Optional[Dict[int, Room]] = None
self.vacuum_position: Optional[Point] = None
self.vacuum_room: Optional[int] = None
@@ -289,7 +309,7 @@ def __init__(self, calibration_center=0, calibration_diff=0):
self.cleaned_rooms: Optional[Set[int]] = None
self.map_name: Optional[str] = None
- def calibration(self):
+ def calibration(self) -> Optional[CalibrationPoints]:
if self.image.is_empty:
return None
calibration_points = []
diff --git a/custom_components/xiaomi_cloud_map_extractor/common/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/common/map_data_parser.py
index 63a0878..7dc34f7 100644
--- a/custom_components/xiaomi_cloud_map_extractor/common/map_data_parser.py
+++ b/custom_components/xiaomi_cloud_map_extractor/common/map_data_parser.py
@@ -3,6 +3,7 @@
from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler
from custom_components.xiaomi_cloud_map_extractor.common.map_data import ImageData, MapData
from custom_components.xiaomi_cloud_map_extractor.const import *
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts
_LOGGER = logging.getLogger(__name__)
@@ -10,18 +11,19 @@
class MapDataParser:
@staticmethod
- def create_empty(colors, text) -> MapData:
+ def create_empty(colors: Colors, text: str) -> MapData:
map_data = MapData()
empty_map = ImageHandler.create_empty_map_image(colors, text)
map_data.image = ImageData.create_empty(empty_map)
return map_data
@staticmethod
- def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
+ def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes,
+ image_config: ImageConfig, *args, **kwargs) -> MapData:
pass
@staticmethod
- def draw_elements(colors, drawables, sizes, map_data: MapData, image_config):
+ def draw_elements(colors: Colors, drawables: Drawables, sizes: Sizes, map_data: MapData, image_config: ImageConfig):
scale = float(image_config[CONF_SCALE])
for drawable in drawables:
if DRAWABLE_CHARGER == drawable and map_data.charger is not None:
@@ -38,11 +40,11 @@ def draw_elements(colors, drawables, sizes, map_data: MapData, image_config):
ImageHandler.draw_ignored_obstacles_with_photo(map_data.image, map_data.ignored_obstacles_with_photo,
sizes, colors)
if DRAWABLE_PATH == drawable and map_data.path is not None:
- ImageHandler.draw_path(map_data.image, map_data.path, colors, scale)
+ ImageHandler.draw_path(map_data.image, map_data.path, sizes, colors, scale)
if DRAWABLE_GOTO_PATH == drawable and map_data.goto_path is not None:
- ImageHandler.draw_goto_path(map_data.image, map_data.goto_path, colors, scale)
+ ImageHandler.draw_goto_path(map_data.image, map_data.goto_path, sizes, colors, scale)
if DRAWABLE_PREDICTED_PATH == drawable and map_data.predicted_path is not None:
- ImageHandler.draw_predicted_path(map_data.image, map_data.predicted_path, colors, scale)
+ ImageHandler.draw_predicted_path(map_data.image, map_data.predicted_path, sizes, colors, scale)
if DRAWABLE_NO_GO_AREAS == drawable and map_data.no_go_areas is not None:
ImageHandler.draw_no_go_areas(map_data.image, map_data.no_go_areas, colors)
if DRAWABLE_NO_MOPPING_AREAS == drawable and map_data.no_mopping_areas is not None:
diff --git a/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py
index aa44a15..6b46841 100644
--- a/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py
+++ b/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py
@@ -3,19 +3,27 @@
from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData
from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser
+from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts
class XiaomiCloudVacuum:
- def __init__(self, connector, country, user_id, device_id, model):
+ def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str):
self._connector = connector
self._country = country
self._user_id = user_id
self._device_id = device_id
self.model = model
- def get_map(self, map_name, colors, drawables, texts, sizes, image_config, store_map_path=None) \
- -> Tuple[Optional[MapData], bool]:
+ def get_map(self,
+ map_name: str,
+ colors: Colors,
+ drawables: Drawables,
+ texts: Texts,
+ sizes: Sizes,
+ image_config: ImageConfig,
+ store_map_path: Optional[str] = None) -> Tuple[Optional[MapData], bool]:
response = self.get_raw_map_data(map_name)
if response is None:
return None, False
@@ -31,22 +39,28 @@ def get_map(self, map_name, colors, drawables, texts, sizes, image_config, store
map_data.map_name = map_name
return map_data, map_stored
- def get_raw_map_data(self, map_name) -> Optional[bytes]:
+ def get_raw_map_data(self, map_name: Optional[str]) -> Optional[bytes]:
if map_name is None:
return None
map_url = self.get_map_url(map_name)
return self._connector.get_raw_map_data(map_url)
- def decode_map(self, raw_map, colors, drawables, texts, sizes, image_config) -> Optional[MapData]:
+ def decode_map(self,
+ raw_map: bytes,
+ colors: Colors,
+ drawables: Drawables,
+ texts: Texts,
+ sizes: Sizes,
+ image_config: ImageConfig) -> Optional[MapData]:
return MapDataParser.create_empty(colors, f"Vacuum\n{self.model}\nis not supported")
@abstractmethod
- def get_map_url(self, map_name):
+ def get_map_url(self, map_name: str) -> Optional[str]:
pass
@abstractmethod
- def should_get_map_from_vacuum(self):
+ def should_get_map_from_vacuum(self) -> bool:
pass
- def get_map_archive_extension(self):
+ def get_map_archive_extension(self) -> str:
pass
diff --git a/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py b/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py
index 9ef4973..6375a53 100644
--- a/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py
+++ b/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py
@@ -1,12 +1,15 @@
+from typing import Optional
+
from custom_components.xiaomi_cloud_map_extractor.common.vacuum import XiaomiCloudVacuum
+from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector
class XiaomiCloudVacuumV2(XiaomiCloudVacuum):
- def __init__(self, connector, country, user_id, device_id, model):
+ def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str):
super().__init__(connector, country, user_id, device_id, model)
- def get_map_url(self, map_name):
+ def get_map_url(self, map_name: str) -> Optional[str]:
url = self._connector.get_api_url(self._country) + '/v2/home/get_interim_file_url'
params = {
"data": f'{{"obj_name":"{self._user_id}/{self._device_id}/{map_name}"}}'
@@ -16,5 +19,5 @@ def get_map_url(self, map_name):
return None
return api_response["result"]["url"]
- def should_get_map_from_vacuum(self):
+ def should_get_map_from_vacuum(self) -> bool:
return False
diff --git a/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py b/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py
index 1f2c92a..94fb641 100644
--- a/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py
+++ b/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py
@@ -6,7 +6,7 @@
import os
import random
import time
-from typing import Optional
+from typing import Any, Dict, Optional, Tuple
from Crypto.Cipher import ARC4
import requests
@@ -19,7 +19,8 @@
# noinspection PyBroadException
class XiaomiCloudConnector:
- def __init__(self, username, password):
+ def __init__(self, username: str, password: str):
+ self.two_factor_auth_url = None
self._username = username
self._password = password
self._agent = self.generate_agent()
@@ -34,7 +35,7 @@ def __init__(self, username, password):
self._code = None
self._serviceToken = None
- def login_step_1(self):
+ def login_step_1(self) -> bool:
url = "https://account.xiaomi.com/pass/serviceLogin?sid=xiaomiio&_json=true"
headers = {
"User-Agent": self._agent,
@@ -52,7 +53,7 @@ def login_step_1(self):
self._sign = self.to_json(response.text)["_sign"]
return successful
- def login_step_2(self):
+ def login_step_2(self) -> bool:
url = "https://account.xiaomi.com/pass/serviceLoginAuth2"
headers = {
"User-Agent": self._agent,
@@ -82,6 +83,7 @@ def login_step_2(self):
self._passToken = json_resp["passToken"]
self._location = json_resp["location"]
self._code = json_resp["code"]
+ self.two_factor_auth_url = None
else:
if "notificationUrl" in json_resp:
_LOGGER.error(
@@ -89,11 +91,12 @@ def login_step_2(self):
"Open following URL using device that has the same public IP, " +
"as your Home Assistant instance: %s ",
json_resp["notificationUrl"])
+ self.two_factor_auth_url = json_resp["notificationUrl"]
successful = None
return successful
- def login_step_3(self):
+ def login_step_3(self) -> bool:
headers = {
"User-Agent": self._agent,
"Content-Type": "application/x-www-form-urlencoded"
@@ -107,7 +110,7 @@ def login_step_3(self):
self._serviceToken = response.cookies.get("serviceToken")
return successful
- def login(self):
+ def login(self) -> bool:
self._session.close()
self._session = requests.session()
self._agent = self.generate_agent()
@@ -128,7 +131,8 @@ def get_raw_map_data(self, map_url) -> Optional[bytes]:
return response.content
return None
- def get_device_details(self, token, country):
+ def get_device_details(self, token: str,
+ country: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
countries_to_check = CONF_AVAILABLE_COUNTRIES
if country is not None:
countries_to_check = [country]
@@ -145,14 +149,14 @@ def get_device_details(self, token, country):
return country, user_id, device_id, model
return None, None, None, None
- def get_devices(self, country):
+ def get_devices(self, country: str) -> Any:
url = self.get_api_url(country) + "/home/device_list"
params = {
"data": '{"getVirtualModel":false,"getHuamiDevices":0}'
}
return self.execute_api_call_encrypted(url, params)
- def execute_api_call_encrypted(self, url, params):
+ def execute_api_call_encrypted(self, url: str, params: Dict[str, str]) -> Any:
headers = {
"Accept-Encoding": "identity",
"User-Agent": self._agent,
@@ -184,29 +188,29 @@ def execute_api_call_encrypted(self, url, params):
return json.loads(decoded)
return None
- def get_api_url(self, country):
+ def get_api_url(self, country: str) -> str:
return "https://" + ("" if country == "cn" else (country + ".")) + "api.io.mi.com/app"
- def signed_nonce(self, nonce):
+ def signed_nonce(self, nonce: str) -> str:
hash_object = hashlib.sha256(base64.b64decode(self._ssecurity) + base64.b64decode(nonce))
return base64.b64encode(hash_object.digest()).decode('utf-8')
@staticmethod
- def generate_nonce(millis):
+ def generate_nonce(millis: int):
nonce_bytes = os.urandom(8) + (int(millis / 60000)).to_bytes(4, byteorder='big')
return base64.b64encode(nonce_bytes).decode()
@staticmethod
- def generate_agent():
- agent_id = "".join(map(lambda i: chr(i), [random.randint(65, 69) for _ in range(13)]))
+ def generate_agent() -> str:
+ agent_id = "".join((chr(random.randint(65, 69)) for _ in range(13)))
return f"Android-7.1.1-1.0.0-ONEPLUS A3010-136-{agent_id} APP/xiaomi.smarthome APPV/62830"
@staticmethod
- def generate_device_id():
- return "".join(map(lambda i: chr(i), [random.randint(97, 122) for _ in range(6)]))
+ def generate_device_id() -> str:
+ return "".join((chr(random.randint(97, 122)) for _ in range(6)))
@staticmethod
- def generate_signature(url, signed_nonce, nonce, params):
+ def generate_signature(url, signed_nonce: str, nonce: str, params: Dict[str, str]) -> str:
signature_params = [url.split("com")[1], signed_nonce, nonce]
for k, v in params.items():
signature_params.append(f"{k}={v}")
@@ -215,7 +219,7 @@ def generate_signature(url, signed_nonce, nonce, params):
return base64.b64encode(signature.digest()).decode()
@staticmethod
- def generate_enc_signature(url, method, signed_nonce, params):
+ def generate_enc_signature(url, method: str, signed_nonce: str, params: Dict[str, str]) -> str:
signature_params = [str(method).upper(), url.split("com")[1].replace("/app/", "/")]
for k, v in params.items():
signature_params.append(f"{k}={v}")
@@ -224,7 +228,8 @@ def generate_enc_signature(url, method, signed_nonce, params):
return base64.b64encode(hashlib.sha1(signature_string.encode('utf-8')).digest()).decode()
@staticmethod
- def generate_enc_params(url, method, signed_nonce, nonce, params, ssecurity):
+ def generate_enc_params(url: str, method: str, signed_nonce: str, nonce: str, params: Dict[str, str],
+ ssecurity: str) -> Dict[str, str]:
params['rc4_hash__'] = XiaomiCloudConnector.generate_enc_signature(url, method, signed_nonce, params)
for k, v in params.items():
params[k] = XiaomiCloudConnector.encrypt_rc4(signed_nonce, v)
@@ -236,17 +241,17 @@ def generate_enc_params(url, method, signed_nonce, nonce, params, ssecurity):
return params
@staticmethod
- def to_json(response_text):
+ def to_json(response_text: str) -> Any:
return json.loads(response_text.replace("&&&START&&&", ""))
@staticmethod
- def encrypt_rc4(password, payload):
+ def encrypt_rc4(password: str, payload: str) -> str:
r = ARC4.new(base64.b64decode(password))
r.encrypt(bytes(1024))
return base64.b64encode(r.encrypt(payload.encode())).decode()
@staticmethod
- def decrypt_rc4(password, payload):
+ def decrypt_rc4(password: str, payload: str) -> bytes:
r = ARC4.new(base64.b64decode(password))
r.encrypt(bytes(1024))
return r.encrypt(base64.b64decode(payload))
diff --git a/custom_components/xiaomi_cloud_map_extractor/const.py b/custom_components/xiaomi_cloud_map_extractor/const.py
index d975347..5bdd5c6 100644
--- a/custom_components/xiaomi_cloud_map_extractor/const.py
+++ b/custom_components/xiaomi_cloud_map_extractor/const.py
@@ -29,6 +29,7 @@
CONF_SIZE_OBSTACLE_RADIUS = "obstacle_radius"
CONF_SIZE_OBSTACLE_WITH_PHOTO_RADIUS = "obstacle_with_photo_radius"
CONF_SIZE_VACUUM_RADIUS = "vacuum_radius"
+CONF_SIZE_PATH_WIDTH = "path_width"
CONF_STORE_MAP_RAW = "store_map_raw"
CONF_STORE_MAP_IMAGE = "store_map_image"
CONF_STORE_MAP_PATH = "store_map_path"
@@ -42,7 +43,7 @@
CONF_AVAILABLE_APIS = [CONF_AVAILABLE_API_XIAOMI, CONF_AVAILABLE_API_VIOMI, CONF_AVAILABLE_API_ROIDMI,
CONF_AVAILABLE_API_DREAME]
-CONF_AVAILABLE_SIZES = [CONF_SIZE_VACUUM_RADIUS, CONF_SIZE_IGNORED_OBSTACLE_RADIUS,
+CONF_AVAILABLE_SIZES = [CONF_SIZE_VACUUM_RADIUS, CONF_SIZE_PATH_WIDTH, CONF_SIZE_IGNORED_OBSTACLE_RADIUS,
CONF_SIZE_IGNORED_OBSTACLE_WITH_PHOTO_RADIUS, CONF_SIZE_OBSTACLE_RADIUS,
CONF_SIZE_OBSTACLE_WITH_PHOTO_RADIUS, CONF_SIZE_CHARGER_RADIUS]
@@ -101,6 +102,7 @@
ATTR_ROTATION = "rotation"
ATTR_SCALE = "scale"
ATTR_SIZE = "size"
+ATTR_TWO_FACTOR_AUTH = "url_2fa"
ATTR_TYPE = "type"
ATTR_USED_API = "used_api"
ATTR_WIDTH = "width"
@@ -141,6 +143,7 @@
COLOR_ROOM_PREFIX = "color_room_"
COLOR_CHARGER = "color_charger"
+COLOR_CHARGER_OUTLINE = "color_charger_outline"
COLOR_CLEANED_AREA = "color_cleaned_area"
COLOR_GOTO_PATH = "color_goto_path"
COLOR_GREY_WALL = "color_grey_wall"
@@ -160,6 +163,7 @@
COLOR_PATH = "color_path"
COLOR_PREDICTED_PATH = "color_predicted_path"
COLOR_ROBO = "color_robo"
+COLOR_ROBO_OUTLINE = "color_robo_outline"
COLOR_ROOM_NAMES = "color_room_names"
COLOR_SCAN = "color_scan"
COLOR_UNKNOWN = "color_unknown"
@@ -167,12 +171,13 @@
COLOR_ZONES = "color_zones"
COLOR_ZONES_OUTLINE = "color_zones_outline"
-CONF_AVAILABLE_COLORS = [COLOR_CHARGER, COLOR_CLEANED_AREA, COLOR_GOTO_PATH, COLOR_GREY_WALL, COLOR_IGNORED_OBSTACLE,
- COLOR_IGNORED_OBSTACLE_WITH_PHOTO, COLOR_MAP_INSIDE, COLOR_MAP_OUTSIDE, COLOR_MAP_WALL,
- COLOR_MAP_WALL_V2, COLOR_NEW_DISCOVERED_AREA, COLOR_NO_GO_ZONES, COLOR_NO_GO_ZONES_OUTLINE,
- COLOR_NO_MOPPING_ZONES, COLOR_NO_MOPPING_ZONES_OUTLINE, COLOR_OBSTACLE,
- COLOR_OBSTACLE_WITH_PHOTO, COLOR_PATH, COLOR_PREDICTED_PATH, COLOR_ROBO, COLOR_ROOM_NAMES,
- COLOR_SCAN, COLOR_UNKNOWN, COLOR_VIRTUAL_WALLS, COLOR_ZONES, COLOR_ZONES_OUTLINE]
+CONF_AVAILABLE_COLORS = [COLOR_CHARGER, COLOR_CHARGER_OUTLINE, COLOR_CLEANED_AREA, COLOR_GOTO_PATH, COLOR_GREY_WALL,
+ COLOR_IGNORED_OBSTACLE, COLOR_IGNORED_OBSTACLE_WITH_PHOTO, COLOR_MAP_INSIDE, COLOR_MAP_OUTSIDE,
+ COLOR_MAP_WALL, COLOR_MAP_WALL_V2, COLOR_NEW_DISCOVERED_AREA, COLOR_NO_GO_ZONES,
+ COLOR_NO_GO_ZONES_OUTLINE, COLOR_NO_MOPPING_ZONES, COLOR_NO_MOPPING_ZONES_OUTLINE,
+ COLOR_OBSTACLE, COLOR_OBSTACLE_WITH_PHOTO, COLOR_PATH, COLOR_PREDICTED_PATH, COLOR_ROBO,
+ COLOR_ROBO_OUTLINE, COLOR_ROOM_NAMES, COLOR_SCAN, COLOR_UNKNOWN, COLOR_VIRTUAL_WALLS,
+ COLOR_ZONES, COLOR_ZONES_OUTLINE]
COLOR_ROOM_1 = "color_room_1"
COLOR_ROOM_2 = "color_room_2"
@@ -197,8 +202,9 @@
AVAILABLE_APIS = {
CONF_AVAILABLE_API_DREAME: ["dreame.vacuum."],
- CONF_AVAILABLE_API_ROIDMI: ["roidmi.vacuum."],
- CONF_AVAILABLE_API_VIOMI: ["viomi.vacuum."]
+ CONF_AVAILABLE_API_ROIDMI: ["roidmi.vacuum.", "zhimi.vacuum."],
+ CONF_AVAILABLE_API_VIOMI: ["viomi.vacuum."],
+ CONF_AVAILABLE_API_XIAOMI: ["roborock.vacuum", "rockrobo.vacuum"]
}
API_EXCEPTIONS = {
diff --git a/custom_components/xiaomi_cloud_map_extractor/dreame/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/dreame/image_handler.py
new file mode 100644
index 0000000..4f79975
--- /dev/null
+++ b/custom_components/xiaomi_cloud_map_extractor/dreame/image_handler.py
@@ -0,0 +1,86 @@
+import logging
+from enum import IntEnum
+from typing import Dict, Tuple
+
+from PIL import Image
+from PIL.Image import Image as ImageType
+
+from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler
+from custom_components.xiaomi_cloud_map_extractor.common.map_data import Room
+from custom_components.xiaomi_cloud_map_extractor.const import \
+ CONF_SCALE, CONF_TRIM, CONF_LEFT, CONF_RIGHT, CONF_TOP, CONF_BOTTOM, \
+ COLOR_MAP_OUTSIDE, COLOR_MAP_INSIDE, COLOR_MAP_WALL, COLOR_ROOM_PREFIX
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class ImageHandlerDreame(ImageHandler):
+ class PixelTypes(IntEnum):
+ NONE = 0
+ FLOOR = 1
+ WALL = 2
+
+ @staticmethod
+ def parse(raw_data: bytes, header, colors, image_config, map_data_type: str) -> Tuple[ImageType, Dict[int, Room]]:
+ scale = image_config[CONF_SCALE]
+ trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * header.image_width / 100)
+ trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * header.image_width / 100)
+ trim_top = int(image_config[CONF_TRIM][CONF_TOP] * header.image_height / 100)
+ trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * header.image_height / 100)
+ trimmed_height = header.image_height - trim_top - trim_bottom
+ trimmed_width = header.image_width - trim_left - trim_right
+ image = Image.new('RGBA', (trimmed_width, trimmed_height))
+ if header.image_width == 0 or header.image_height == 0:
+ return ImageHandler.create_empty_map_image(colors), {}
+ pixels = image.load()
+ rooms = {}
+
+ for img_y in range(trimmed_height):
+ for img_x in range(trimmed_width):
+ x = img_x
+ y = trimmed_height - img_y - 1
+ room_x = img_x + trim_left
+ room_y = img_y + trim_bottom
+
+ # TODO : use MapDataParserDreame.MapDataTypes enum
+ if map_data_type == "regular":
+ px = raw_data[img_x + trim_left + header.image_width * (img_y + trim_bottom)]
+ segment_id = px >> 2
+ if 0 < segment_id < 62:
+ if segment_id not in rooms:
+ rooms[segment_id] = Room(segment_id, room_x, room_y, room_x, room_y)
+ rooms[segment_id] = Room(segment_id,
+ min(rooms[segment_id].x0, room_x), min(rooms[segment_id].y0, room_y),
+ max(rooms[segment_id].x1, room_x), max(rooms[segment_id].y1, room_y))
+ default = ImageHandler.ROOM_COLORS[segment_id >> 1]
+ pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{segment_id}", colors, default)
+ else:
+ masked_px = px & 0b00000011
+
+ if masked_px == ImageHandlerDreame.PixelTypes.NONE:
+ pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors)
+ elif masked_px == ImageHandlerDreame.PixelTypes.FLOOR:
+ pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_INSIDE, colors)
+ elif masked_px == ImageHandlerDreame.PixelTypes.WALL:
+ pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_WALL, colors)
+ else:
+ _LOGGER.warning(f'unhandled pixel type: {px}')
+ elif map_data_type == "rism":
+ px = raw_data[img_x + trim_left + header.image_width * (img_y + trim_bottom)]
+ segment_id = px & 0b01111111
+ wall_flag = px >> 7
+
+ if wall_flag:
+ pixels[x, y] = ImageHandler.__get_color__(COLOR_MAP_WALL, colors)
+ elif segment_id > 0:
+ if segment_id not in rooms:
+ rooms[segment_id] = Room(segment_id, room_x, room_y, room_x, room_y)
+ rooms[segment_id] = Room(segment_id,
+ min(rooms[segment_id].x0, room_x), min(rooms[segment_id].y0, room_y),
+ max(rooms[segment_id].x1, room_x), max(rooms[segment_id].y1, room_y))
+ default = ImageHandler.ROOM_COLORS[segment_id >> 1]
+ pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{segment_id}", colors, default)
+
+ if image_config["scale"] != 1 and header.image_width != 0 and header.image_height != 0:
+ image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST)
+ return image, rooms
diff --git a/custom_components/xiaomi_cloud_map_extractor/dreame/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/dreame/map_data_parser.py
new file mode 100644
index 0000000..6a960cd
--- /dev/null
+++ b/custom_components/xiaomi_cloud_map_extractor/dreame/map_data_parser.py
@@ -0,0 +1,245 @@
+import base64
+import json
+import logging
+import re
+import zlib
+from enum import Enum, IntEnum
+from typing import Dict, List, Optional, Tuple
+
+from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \
+ Wall
+from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser
+from custom_components.xiaomi_cloud_map_extractor.dreame.image_handler import ImageHandlerDreame
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class MapDataHeader:
+ def __init__(self):
+ self.map_index: Optional[int] = None
+ self.frame_type: Optional[int] = None
+ self.vacuum_position: Optional[Point] = None
+ self.charger_position: Optional[Point] = None
+ self.image_pixel_size: Optional[int] = None
+ self.image_width: Optional[int] = None
+ self.image_height: Optional[int] = None
+ self.image_left: Optional[int] = None
+ self.image_top: Optional[int] = None
+
+
+class MapDataParserDreame(MapDataParser):
+ HEADER_SIZE = 27
+ PATH_REGEX = r'(?P[SL])(?P-?\d+),(?P-?\d+)'
+
+ class PathOperators(str, Enum):
+ START = "S"
+ RELATIVE_LINE = "L"
+
+ class FrameTypes(IntEnum):
+ I_FRAME = 73
+ P_FRAME = 80
+
+ class MapDataTypes(str, Enum):
+ REGULAR = "regular"
+ RISM = "rism" # Room - information
+
+ @staticmethod
+ def decode_map(raw_map: str, colors, drawables, texts, sizes, image_config,
+ map_data_type=MapDataTypes.REGULAR) -> MapData:
+ _LOGGER.debug(f'decoding {map_data_type} type map')
+ raw_map_string = raw_map.replace('_', '/').replace('-', '+')
+ unzipped = zlib.decompress(base64.decodebytes(raw_map_string.encode("utf8")))
+ return MapDataParserDreame.parse(unzipped, colors, drawables, texts, sizes, image_config, map_data_type)
+
+ @staticmethod
+ def parse(raw: bytes, colors, drawables, texts, sizes, image_config,
+ map_data_type: MapDataTypes = MapDataTypes.REGULAR, *args, **kwargs) -> Optional[MapData]:
+ map_data = MapData(0, 1000)
+
+ header = MapDataParserDreame.parse_header(raw)
+
+ if header.frame_type != MapDataParserDreame.FrameTypes.I_FRAME:
+ _LOGGER.error("unsupported map frame type")
+ return
+
+ if len(raw) >= MapDataParserDreame.HEADER_SIZE + header.image_width * header.image_height:
+ image_raw = raw[MapDataParserDreame.HEADER_SIZE:
+ MapDataParserDreame.HEADER_SIZE + header.image_width * header.image_height]
+ additional_data_raw = raw[MapDataParserDreame.HEADER_SIZE + header.image_width * header.image_height:]
+ additional_data_json = json.loads(additional_data_raw.decode("utf8"))
+ _LOGGER.debug(f'map additional_data: {additional_data_json}')
+
+ map_data.charger = header.charger_position
+ map_data.vacuum_position = header.vacuum_position
+
+ map_data.image, map_data.rooms = MapDataParserDreame.parse_image(image_raw, header, colors, image_config,
+ additional_data_json, map_data_type)
+
+ if additional_data_json.get("rism") and \
+ additional_data_json.get("ris") and additional_data_json["ris"] == 2:
+ rism_map_data = MapDataParserDreame.decode_map(
+ additional_data_json["rism"],
+ colors,
+ drawables,
+ texts,
+ sizes,
+ image_config,
+ MapDataParserDreame.MapDataTypes.RISM
+ )
+ map_data.no_go_areas = rism_map_data.no_go_areas
+ map_data.no_mopping_areas = rism_map_data.no_mopping_areas
+ map_data.walls = rism_map_data.walls
+ map_data.rooms = rism_map_data.rooms
+ _LOGGER.debug(f"rooms: {map_data.rooms}")
+
+ if not rism_map_data.image.is_empty:
+ map_data.image = rism_map_data.image
+
+ if additional_data_json.get("tr"):
+ map_data.path = MapDataParserDreame.parse_path(additional_data_json["tr"])
+
+ if additional_data_json.get("vw"):
+ if additional_data_json["vw"].get("rect"):
+ map_data.no_go_areas = MapDataParserDreame.parse_areas(additional_data_json["vw"]["rect"])
+ if additional_data_json["vw"].get("mop"):
+ map_data.no_mopping_areas = MapDataParserDreame.parse_areas(additional_data_json["vw"]["mop"])
+ if additional_data_json["vw"].get("line"):
+ map_data.walls = MapDataParserDreame.parse_virtual_walls(additional_data_json["vw"]["line"])
+
+ if additional_data_json.get("sa") and isinstance(additional_data_json["sa"], list):
+ active_segment_ids = [sa[0] for sa in additional_data_json["sa"]]
+
+ if not map_data.image.is_empty:
+ if map_data_type == MapDataParserDreame.MapDataTypes.REGULAR:
+ MapDataParserDreame.draw_elements(colors, drawables, sizes, map_data, image_config)
+ ImageHandlerDreame.rotate(map_data.image)
+
+ return map_data
+
+ @staticmethod
+ def parse_header(raw: bytes) -> Optional[MapDataHeader]:
+ header = MapDataHeader()
+
+ if not raw or len(raw) < MapDataParserDreame.HEADER_SIZE:
+ _LOGGER.error("wrong header size for map")
+ return
+
+ header.map_index = MapDataParserDreame.read_int_16_le(raw)
+ header.frame_type = MapDataParserDreame.read_int_8(raw, 4)
+ header.vacuum_position = Point(
+ MapDataParserDreame.read_int_16_le(raw, 5),
+ MapDataParserDreame.read_int_16_le(raw, 7),
+ MapDataParserDreame.read_int_16_le(raw, 9)
+ )
+ header.charger_position = Point(
+ MapDataParserDreame.read_int_16_le(raw, 11),
+ MapDataParserDreame.read_int_16_le(raw, 13),
+ MapDataParserDreame.read_int_16_le(raw, 15)
+ )
+ header.image_pixel_size = MapDataParserDreame.read_int_16_le(raw, 17)
+ header.image_width = MapDataParserDreame.read_int_16_le(raw, 19)
+ header.image_height = MapDataParserDreame.read_int_16_le(raw, 21)
+ header.image_left = round(MapDataParserDreame.read_int_16_le(raw, 23) / header.image_pixel_size)
+ header.image_top = round(MapDataParserDreame.read_int_16_le(raw, 25) / header.image_pixel_size)
+
+ _LOGGER.debug(f'decoded map header : {header.__dict__}')
+
+ return header
+
+ @staticmethod
+ def parse_image(image_raw: bytes, header: MapDataHeader, colors, image_config,
+ additional_data_json, map_data_type: MapDataTypes) -> Tuple[ImageData, Dict[int, Room]]:
+
+ _LOGGER.debug(f"parse image for map {map_data_type}")
+ image, image_rooms = ImageHandlerDreame.parse(image_raw, header, colors, image_config, map_data_type)
+
+ room_names = {}
+ if additional_data_json.get("seg_inf"):
+ room_names = {int(k): base64.b64decode(v.get("name")).decode('utf-8') for (k, v) in
+ additional_data_json["seg_inf"].items() if
+ v.get("name")}
+
+ rooms = {k: Room(
+ k,
+ (v.x0 + header.image_left) * header.image_pixel_size,
+ (v.y0 + header.image_top) * header.image_pixel_size,
+ (v.x1 + header.image_left) * header.image_pixel_size,
+ (v.y1 + header.image_top) * header.image_pixel_size,
+ room_names[k] if room_names.get(k) else str(k)
+ ) for (k, v) in image_rooms.items()}
+
+ return ImageData(
+ header.image_width * header.image_height,
+ header.image_top,
+ header.image_left,
+ header.image_height,
+ header.image_width,
+ image_config,
+ image,
+ lambda p: MapDataParserDreame.map_to_image(p, header.image_pixel_size)
+ ), rooms
+
+ @staticmethod
+ def map_to_image(p: Point, image_pixel_size: int) -> Point:
+ return Point(
+ p.x / image_pixel_size,
+ p.y / image_pixel_size
+ )
+
+ @staticmethod
+ def parse_path(path_string: str) -> Path:
+ r = re.compile(MapDataParserDreame.PATH_REGEX)
+ matches = [m.groupdict() for m in r.finditer(path_string)]
+
+ current_path = []
+ path_points = []
+ current_position = Point(0, 0)
+ for match in matches:
+ if match["operator"] == MapDataParserDreame.PathOperators.START:
+ current_path = []
+ path_points.append(current_path)
+ current_position = Point(int(match["x"]), int(match["y"]))
+ elif match["operator"] == MapDataParserDreame.PathOperators.RELATIVE_LINE:
+ current_position = Point(current_position.x + int(match["x"]), current_position.y + int(match["y"]))
+ else:
+ _LOGGER.error(f'invalid path operator {match["operator"]}')
+ current_path.append(current_position)
+
+ return Path(None, None, None, path_points)
+
+ @staticmethod
+ def parse_areas(areas: list) -> List[Area]:
+ parsed_areas = []
+ for area in areas:
+ x_coords = sorted([area[0], area[2]])
+ y_coords = sorted([area[1], area[3]])
+ parsed_areas.append(
+ Area(
+ x_coords[0], y_coords[0],
+ x_coords[1], y_coords[0],
+ x_coords[1], y_coords[1],
+ x_coords[0], y_coords[1]
+ )
+ )
+ return parsed_areas
+
+ @staticmethod
+ def parse_virtual_walls(virtual_walls: list) -> List[Wall]:
+ return [Wall(virtual_wall[0], virtual_wall[1], virtual_wall[2], virtual_wall[3])
+ for virtual_wall in virtual_walls]
+
+ @staticmethod
+ def read_int_8(data: bytes, offset: int = 0):
+ return int.from_bytes(data[offset:offset + 1], byteorder='big', signed=True)
+
+ @staticmethod
+ def read_int_8_le(data: bytes, offset: int = 0):
+ return int.from_bytes(data[offset:offset + 1], byteorder='little', signed=True)
+
+ @staticmethod
+ def read_int_16(data: bytes, offset: int = 0):
+ return int.from_bytes(data[offset:offset + 2], byteorder='big', signed=True)
+
+ @staticmethod
+ def read_int_16_le(data: bytes, offset: int = 0):
+ return int.from_bytes(data[offset:offset + 2], byteorder='little', signed=True)
diff --git a/custom_components/xiaomi_cloud_map_extractor/dreame/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/dreame/vacuum.py
index 7bdf65f..35d9ad7 100644
--- a/custom_components/xiaomi_cloud_map_extractor/dreame/vacuum.py
+++ b/custom_components/xiaomi_cloud_map_extractor/dreame/vacuum.py
@@ -1,10 +1,24 @@
+from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData
from custom_components.xiaomi_cloud_map_extractor.common.vacuum_v2 import XiaomiCloudVacuumV2
+from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector
+from custom_components.xiaomi_cloud_map_extractor.dreame.map_data_parser import MapDataParserDreame
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts
class DreameVacuum(XiaomiCloudVacuumV2):
- def __init__(self, connector, country, user_id, device_id, model):
+ def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str):
super().__init__(connector, country, user_id, device_id, model)
- def get_map_archive_extension(self):
+ def get_map_archive_extension(self) -> str:
return "b64"
+
+ def decode_map(self,
+ raw_map: bytes,
+ colors: Colors,
+ drawables: Drawables,
+ texts: Texts,
+ sizes: Sizes,
+ image_config: ImageConfig) -> MapData:
+ raw_map_string = raw_map.decode()
+ return MapDataParserDreame.decode_map(raw_map_string, colors, drawables, texts, sizes, image_config)
diff --git a/custom_components/xiaomi_cloud_map_extractor/manifest.json b/custom_components/xiaomi_cloud_map_extractor/manifest.json
index c5a9fe9..626c221 100644
--- a/custom_components/xiaomi_cloud_map_extractor/manifest.json
+++ b/custom_components/xiaomi_cloud_map_extractor/manifest.json
@@ -14,6 +14,6 @@
"requests",
"pycryptodome"
],
- "version": "v2.1.5",
+ "version": "v2.2.0",
"iot_class": "cloud_polling"
}
diff --git a/custom_components/xiaomi_cloud_map_extractor/roidmi/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/roidmi/image_handler.py
index cc326be..ff4d685 100644
--- a/custom_components/xiaomi_cloud_map_extractor/roidmi/image_handler.py
+++ b/custom_components/xiaomi_cloud_map_extractor/roidmi/image_handler.py
@@ -1,11 +1,12 @@
import logging
-from typing import Dict, Tuple
+from typing import Dict, List, Tuple
from PIL import Image
from PIL.Image import Image as ImageType
from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler
from custom_components.xiaomi_cloud_map_extractor.const import *
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig
_LOGGER = logging.getLogger(__name__)
@@ -16,7 +17,8 @@ class ImageHandlerRoidmi(ImageHandler):
MAP_UNKNOWN = 255
@staticmethod
- def parse(raw_data, width, height, colors, image_config, room_numbers) \
+ def parse(raw_data: bytes, width: int, height: int, colors: Colors, image_config: ImageConfig,
+ room_numbers: List[int]) \
-> Tuple[ImageType, Dict[int, Tuple[int, int, int, int]]]:
rooms = {}
scale = image_config[CONF_SCALE]
diff --git a/custom_components/xiaomi_cloud_map_extractor/roidmi/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/roidmi/map_data_parser.py
index 54ad77d..c50f077 100644
--- a/custom_components/xiaomi_cloud_map_extractor/roidmi/map_data_parser.py
+++ b/custom_components/xiaomi_cloud_map_extractor/roidmi/map_data_parser.py
@@ -1,5 +1,6 @@
import json
import logging
+import math
from typing import Dict, List, Optional, Tuple
from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \
@@ -14,7 +15,7 @@
class MapDataParserRoidmi(MapDataParser):
@staticmethod
- def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
+ def parse(raw: bytes, colors, drawables, texts, sizes, image_config, *args, **kwargs) -> MapData:
scale = float(image_config[CONF_SCALE])
map_image_size = raw.find(bytes([127, 123]))
map_image = raw[16:map_image_size + 1]
@@ -67,7 +68,7 @@ def parse_image(map_image: bytes, width: int, height: int, min_x: float, min_y:
colors: Dict, image_config: Dict, rooms: Dict[int, Room]) -> ImageData:
image_top = 0
image_left = 0
- room_numbers = rooms.keys()
+ room_numbers = list(rooms.keys())
image, rooms_raw = ImageHandlerRoidmi.parse(map_image, width, height, colors, image_config, room_numbers)
for number, room in rooms_raw.items():
pf = lambda p: MapDataParserRoidmi.image_to_map(p, resolution, min_x, min_y)
@@ -88,23 +89,34 @@ def parse_path(map_info: dict) -> Path:
for raw_point in raw_points:
point = Point(raw_point[0], raw_point[1])
path_points.append(point)
- return Path(None, None, None, path_points)
+ return Path(None, None, None, [path_points])
@staticmethod
def parse_vacuum_position(map_info: dict) -> Point:
- vacuum_position = None
- if "robotPos" in map_info and "robotPhi" in map_info:
- vacuum_position = Point(map_info["robotPos"][0], map_info["robotPos"][1], map_info["robotPhi"])
- elif "posX" in map_info and "posY" in map_info and "posPhi" in map_info:
- vacuum_position = Point(map_info["posX"], map_info["posY"], map_info["posPhi"])
+ vacuum_position = MapDataParserRoidmi.parse_position(map_info, "robotPos", "robotPos", "robotPhi")
+ if vacuum_position is None:
+ vacuum_position = MapDataParserRoidmi.parse_position(map_info, "posX", "posY", "posPhi")
return vacuum_position
@staticmethod
def parse_charger_position(map_info: dict) -> Point:
- charger_position = None
- if "chargeHandlePos" in map_info:
- charger_position = Point(map_info["chargeHandlePos"][0], map_info["chargeHandlePos"][1])
- return charger_position
+ return MapDataParserRoidmi.parse_position(map_info, "chargeHandlePos", "chargeHandlePos", "chargeHandlePhi")
+
+ @staticmethod
+ def parse_position(map_info: dict, x_label: str, y_label: str, a_label: str) -> Optional[Point]:
+ position = None
+ if x_label not in map_info or y_label not in map_info:
+ return position
+ x = map_info[x_label]
+ y = map_info[y_label]
+ a = None
+ if x_label == y_label:
+ x = x[0]
+ y = y[1]
+ if a_label in map_info:
+ a = map_info[a_label] / 1000 * 180 / math.pi
+ position = Point(x, y, a)
+ return position
@staticmethod
def parse_rooms(map_info: dict) -> Dict[int, Room]:
diff --git a/custom_components/xiaomi_cloud_map_extractor/roidmi/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/roidmi/vacuum.py
index 819fdaf..ce9b24d 100644
--- a/custom_components/xiaomi_cloud_map_extractor/roidmi/vacuum.py
+++ b/custom_components/xiaomi_cloud_map_extractor/roidmi/vacuum.py
@@ -2,17 +2,25 @@
from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData
from custom_components.xiaomi_cloud_map_extractor.common.vacuum_v2 import XiaomiCloudVacuumV2
+from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector
from custom_components.xiaomi_cloud_map_extractor.roidmi.map_data_parser import MapDataParserRoidmi
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts
class RoidmiVacuum(XiaomiCloudVacuumV2):
- def __init__(self, connector, country, user_id, device_id, model):
+ def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str):
super().__init__(connector, country, user_id, device_id, model)
- def decode_map(self, raw_map, colors, drawables, texts, sizes, image_config) -> MapData:
+ def decode_map(self,
+ raw_map: bytes,
+ colors: Colors,
+ drawables: Drawables,
+ texts: Texts,
+ sizes: Sizes,
+ image_config: ImageConfig) -> MapData:
unzipped = gzip.decompress(raw_map)
return MapDataParserRoidmi.parse(unzipped, colors, drawables, texts, sizes, image_config)
- def get_map_archive_extension(self):
+ def get_map_archive_extension(self) -> str:
return "gz"
diff --git a/custom_components/xiaomi_cloud_map_extractor/types.py b/custom_components/xiaomi_cloud_map_extractor/types.py
new file mode 100644
index 0000000..6c00ec2
--- /dev/null
+++ b/custom_components/xiaomi_cloud_map_extractor/types.py
@@ -0,0 +1,9 @@
+from typing import Any, Dict, List, Tuple, Union
+
+Color = Union[Tuple[int, int, int], Tuple[int, int, int, int]]
+Colors = Dict[str, Color]
+Drawables = List[str]
+Texts = List[Any]
+Sizes = Dict[str, float]
+ImageConfig = Dict[str, Any]
+CalibrationPoints = List[Dict[str, Dict[str, Union[float, int]]]]
diff --git a/custom_components/xiaomi_cloud_map_extractor/unsupported/__init__.py b/custom_components/xiaomi_cloud_map_extractor/unsupported/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/custom_components/xiaomi_cloud_map_extractor/unsupported/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/unsupported/vacuum.py
new file mode 100644
index 0000000..928a2ab
--- /dev/null
+++ b/custom_components/xiaomi_cloud_map_extractor/unsupported/vacuum.py
@@ -0,0 +1,10 @@
+from custom_components.xiaomi_cloud_map_extractor.common.vacuum_v2 import XiaomiCloudVacuumV2
+
+
+class UnsupportedVacuum(XiaomiCloudVacuumV2):
+
+ def __init__(self, connector, country, user_id, device_id, model):
+ super().__init__(connector, country, user_id, device_id, model)
+
+ def get_map_archive_extension(self):
+ return "unknown"
diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/viomi/image_handler.py
index f58fc1f..538d3a9 100644
--- a/custom_components/xiaomi_cloud_map_extractor/viomi/image_handler.py
+++ b/custom_components/xiaomi_cloud_map_extractor/viomi/image_handler.py
@@ -6,6 +6,8 @@
from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler
from custom_components.xiaomi_cloud_map_extractor.const import *
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig
+from custom_components.xiaomi_cloud_map_extractor.viomi.parsing_buffer import ParsingBuffer
_LOGGER = logging.getLogger(__name__)
@@ -21,7 +23,8 @@ class ImageHandlerViomi(ImageHandler):
MAP_SELECTED_ROOM_MAX = 109
@staticmethod
- def parse(buf, width, height, colors, image_config, draw_cleaned_area) \
+ def parse(buf: ParsingBuffer, width: int, height: int, colors: Colors, image_config: ImageConfig,
+ draw_cleaned_area: bool) \
-> Tuple[ImageType, Dict[int, Tuple[int, int, int, int]], Set[int], Optional[ImageType]]:
rooms = {}
cleaned_areas = set()
diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py
index 0d3a7ee..49a62c0 100644
--- a/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py
+++ b/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py
@@ -1,88 +1,18 @@
import logging
-from struct import unpack_from
+import math
from typing import Dict, List, Optional, Set, Tuple
from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \
Wall, Zone
from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser
from custom_components.xiaomi_cloud_map_extractor.const import *
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts
from custom_components.xiaomi_cloud_map_extractor.viomi.image_handler import ImageHandlerViomi
+from custom_components.xiaomi_cloud_map_extractor.viomi.parsing_buffer import ParsingBuffer
_LOGGER = logging.getLogger(__name__)
-class ParsingBuffer:
- def __init__(self, name: str, data: bytes, start_offs: int, length: int):
- self._name = name
- self._data = data
- self._offs = start_offs
- self._length = length
- self._image_beginning = None
-
- def set_name(self, name: str):
- self._name = name
- _LOGGER.debug('SECTION %s: offset 0x%x', self._name, self._offs)
-
- def mark_as_image_beginning(self):
- self._image_beginning = self._offs
-
- def get_at_image(self, offset):
- return self._data[self._image_beginning + offset - 1]
-
- def skip(self, field, n):
- if self._length < n:
- raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
- self._offs += n
- self._length -= n
-
- def get_uint8(self, field):
- if self._length < 1:
- raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
- self._offs += 1
- self._length -= 1
- return self._data[self._offs - 1]
-
- def get_uint16(self, field):
- if self._length < 2:
- raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
- self._offs += 2
- self._length -= 2
- return unpack_from(' MapData:
+ def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes,
+ image_config: ImageConfig, *args, **kwargs) -> MapData:
map_data = MapData(0, 1)
buf = ParsingBuffer('header', raw, 0, len(raw))
feature_flags = buf.get_uint32('feature_flags')
@@ -119,9 +50,8 @@ def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
if feature_flags & MapDataParserViomi.FEATURE_CHARGE_STATION != 0:
MapDataParserViomi.parse_section(buf, 'charge_station', map_id)
- map_data.charger = MapDataParserViomi.parse_position(buf, 'pos')
- foo = buf.get_float32('foo')
- _LOGGER.debug('pos: %s, foo: %f', map_data.charger, foo)
+ map_data.charger = MapDataParserViomi.parse_position(buf, 'pos', with_angle=True)
+ _LOGGER.debug('pos: %s', map_data.charger)
if feature_flags & MapDataParserViomi.FEATURE_RESTRICTED_AREAS != 0:
MapDataParserViomi.parse_section(buf, 'restricted_areas', map_id)
@@ -141,9 +71,8 @@ def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
if feature_flags & MapDataParserViomi.FEATURE_REALTIME != 0:
MapDataParserViomi.parse_section(buf, 'realtime', map_id)
buf.skip('unknown1', 5)
- map_data.vacuum_position = MapDataParserViomi.parse_position(buf, 'pos')
- foo = buf.get_float32('foo')
- _LOGGER.debug('pos: %s, foo: %f', map_data.vacuum_position, foo)
+ map_data.vacuum_position = MapDataParserViomi.parse_position(buf, 'pos', with_angle=True)
+ _LOGGER.debug('pos: %s', map_data.vacuum_position)
if feature_flags & 0x00000800 != 0:
MapDataParserViomi.parse_section(buf, 'unknown1', map_id)
@@ -163,7 +92,8 @@ def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
buf.check_empty()
- _LOGGER.debug('rooms: %s', [str(room) for number, room in map_data.rooms.items()])
+ if map_data.rooms is not None:
+ _LOGGER.debug('rooms: %s', [str(room) for number, room in map_data.rooms.items()])
if not map_data.image.is_empty:
MapDataParserViomi.draw_elements(colors, drawables, sizes, map_data, image_config)
if len(map_data.rooms) > 0 and map_data.vacuum_position is not None:
@@ -176,11 +106,11 @@ def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
return map_data
@staticmethod
- def map_to_image(p: Point):
+ def map_to_image(p: Point) -> Point:
return Point(p.x * 20 + 400, p.y * 20 + 400)
@staticmethod
- def image_to_map(x):
+ def image_to_map(x: float) -> float:
return (x - 400) / 20
@staticmethod
@@ -194,7 +124,7 @@ def get_current_vacuum_room(buf: ParsingBuffer, vacuum_position: Point) -> Optio
return None
@staticmethod
- def parse_image(buf: ParsingBuffer, colors: Dict, image_config: Dict, draw_cleaned_area: bool) \
+ def parse_image(buf: ParsingBuffer, colors: Colors, image_config: ImageConfig, draw_cleaned_area: bool) \
-> Tuple[ImageData, Dict[int, Room], Set[int]]:
buf.skip('unknown1', 0x08)
image_top = 0
@@ -237,7 +167,7 @@ def parse_history(buf: ParsingBuffer) -> Path:
for _ in range(history_count):
mode = buf.get_uint8('mode') # 0: taxi, 1: working
path_points.append(MapDataParserViomi.parse_position(buf, 'path'))
- return Path(len(path_points), 1, 0, path_points)
+ return Path(len(path_points), 1, 0, [path_points])
@staticmethod
def parse_restricted_areas(buf: ParsingBuffer) -> Tuple[List[Wall], List[Area]]:
@@ -315,12 +245,15 @@ def parse_section(buf: ParsingBuffer, name: str, map_id: int):
f"Magic: {magic:#x}, Map ID: {map_id:#x}")
@staticmethod
- def parse_position(buf: ParsingBuffer, name: str) -> Optional[Point]:
+ def parse_position(buf: ParsingBuffer, name: str, with_angle: bool = False) -> Optional[Point]:
x = buf.get_float32(name + '.x')
y = buf.get_float32(name + '.y')
if x == MapDataParserViomi.POSITION_UNKNOWN or y == MapDataParserViomi.POSITION_UNKNOWN:
return None
- return Point(x, y)
+ a = None
+ if with_angle:
+ a = buf.get_float32(name + '.a') * 180 / math.pi
+ return Point(x, y, a)
@staticmethod
def parse_unknown_section(buf: ParsingBuffer) -> bool:
diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/parsing_buffer.py b/custom_components/xiaomi_cloud_map_extractor/viomi/parsing_buffer.py
new file mode 100644
index 0000000..5acc334
--- /dev/null
+++ b/custom_components/xiaomi_cloud_map_extractor/viomi/parsing_buffer.py
@@ -0,0 +1,77 @@
+import logging
+
+from struct import unpack_from
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class ParsingBuffer:
+ def __init__(self, name: str, data: bytes, start_offs: int, length: int):
+ self._name = name
+ self._data = data
+ self._offs = start_offs
+ self._length = length
+ self._image_beginning = None
+
+ def set_name(self, name: str):
+ self._name = name
+ _LOGGER.debug('SECTION %s: offset 0x%x', self._name, self._offs)
+
+ def mark_as_image_beginning(self):
+ self._image_beginning = self._offs
+
+ def get_at_image(self, offset) -> int:
+ return self._data[self._image_beginning + offset - 1]
+
+ def skip(self, field: str, n: int):
+ if self._length < n:
+ raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
+ self._offs += n
+ self._length -= n
+
+ def get_uint8(self, field: str) -> int:
+ if self._length < 1:
+ raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
+ self._offs += 1
+ self._length -= 1
+ return self._data[self._offs - 1]
+
+ def get_uint16(self, field: str) -> int:
+ if self._length < 2:
+ raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
+ self._offs += 2
+ self._length -= 2
+ return unpack_from(' int:
+ if self._length < 4:
+ raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
+ self._offs += 4
+ self._length -= 4
+ return unpack_from(' float:
+ if self._length < 4:
+ raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
+ self._offs += 4
+ self._length -= 4
+ return unpack_from(' str:
+ n = self.get_uint8(field + '.len')
+ if self._length < n:
+ raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
+ self._offs += n
+ self._length -= n
+ return self._data[self._offs - n:self._offs].decode('UTF-8')
+
+ def peek_uint32(self, field: str) -> int:
+ if self._length < 4:
+ raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun")
+ return unpack_from(' MapData:
+ def decode_map(self,
+ raw_map: bytes,
+ colors: Colors,
+ drawables: Drawables,
+ texts: Texts,
+ sizes: Sizes,
+ image_config: ImageConfig) -> MapData:
unzipped = zlib.decompress(raw_map)
return MapDataParserViomi.parse(unzipped, colors, drawables, texts, sizes, image_config)
- def get_map_archive_extension(self):
+ def get_map_archive_extension(self) -> str:
return "zlib"
diff --git a/custom_components/xiaomi_cloud_map_extractor/xiaomi/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/xiaomi/image_handler.py
index 4290ff3..2140cc7 100644
--- a/custom_components/xiaomi_cloud_map_extractor/xiaomi/image_handler.py
+++ b/custom_components/xiaomi_cloud_map_extractor/xiaomi/image_handler.py
@@ -6,6 +6,7 @@
from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler
from custom_components.xiaomi_cloud_map_extractor.const import *
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig
_LOGGER = logging.getLogger(__name__)
@@ -17,7 +18,8 @@ class ImageHandlerXiaomi(ImageHandler):
MAP_SCAN = 0x07
@staticmethod
- def parse(raw_data: bytes, width, height, colors, image_config) -> Tuple[ImageType, dict]:
+ def parse(raw_data: bytes, width: int, height: int, colors: Colors,
+ image_config: ImageConfig) -> Tuple[ImageType, dict]:
rooms = {}
scale = image_config[CONF_SCALE]
trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100)
@@ -69,7 +71,7 @@ def parse(raw_data: bytes, width, height, colors, image_config) -> Tuple[ImageTy
return image, rooms
@staticmethod
- def get_room_at_pixel(raw_data: bytes, width, x, y):
+ def get_room_at_pixel(raw_data: bytes, width: int, x: int, y: int) -> int:
room_number = None
pixel_type = raw_data[x + width * y]
if pixel_type not in [ImageHandlerXiaomi.MAP_INSIDE, ImageHandlerXiaomi.MAP_SCAN]:
diff --git a/custom_components/xiaomi_cloud_map_extractor/xiaomi/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/xiaomi/map_data_parser.py
index acf9b49..d4bc9b3 100644
--- a/custom_components/xiaomi_cloud_map_extractor/xiaomi/map_data_parser.py
+++ b/custom_components/xiaomi_cloud_map_extractor/xiaomi/map_data_parser.py
@@ -1,7 +1,9 @@
import logging
+from typing import Tuple
from custom_components.xiaomi_cloud_map_extractor.common.map_data import *
from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, Sizes, Texts
from custom_components.xiaomi_cloud_map_extractor.xiaomi.image_handler import ImageHandlerXiaomi
_LOGGER = logging.getLogger(__name__)
@@ -37,7 +39,8 @@ class MapDataParserXiaomi(MapDataParser):
}
@staticmethod
- def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
+ def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes,
+ image_config: ImageConfig, *args, **kwargs) -> MapData:
map_data = MapData(25500, 1000)
map_header_length = MapDataParserXiaomi.get_int16(raw, 0x02)
map_data.major_version = MapDataParserXiaomi.get_int16(raw, 0x08)
@@ -54,7 +57,7 @@ def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
block_data_start = block_start_position + block_header_length
data = MapDataParserXiaomi.get_bytes(raw, block_data_start, block_data_length)
if block_type == MapDataParserXiaomi.CHARGER:
- map_data.charger = MapDataParserXiaomi.parse_charger(block_start_position, raw)
+ map_data.charger = MapDataParserXiaomi.parse_object_position(block_data_length, data)
elif block_type == MapDataParserXiaomi.IMAGE:
img_start = block_start_position
image, rooms = MapDataParserXiaomi.parse_image(block_data_length, block_header_length, data, header,
@@ -62,7 +65,7 @@ def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
map_data.image = image
map_data.rooms = rooms
elif block_type == MapDataParserXiaomi.ROBOT_POSITION:
- map_data.vacuum_position = MapDataParserXiaomi.parse_vacuum_position(block_data_length, data)
+ map_data.vacuum_position = MapDataParserXiaomi.parse_object_position(block_data_length, data)
elif block_type == MapDataParserXiaomi.PATH:
map_data.path = MapDataParserXiaomi.parse_path(block_start_position, header, raw)
elif block_type == MapDataParserXiaomi.GOTO_PATH:
@@ -103,15 +106,15 @@ def parse(raw: bytes, colors, drawables, texts, sizes, image_config) -> MapData:
return map_data
@staticmethod
- def map_to_image(p: Point):
+ def map_to_image(p: Point) -> Point:
return Point(p.x / MM, p.y / MM)
@staticmethod
- def image_to_map(x):
+ def image_to_map(x: float) -> float:
return x * MM
@staticmethod
- def get_current_vacuum_room(block_start_position, raw, vacuum_position):
+ def get_current_vacuum_room(block_start_position: int, raw: bytes, vacuum_position: Point) -> int:
block_header_length = MapDataParserXiaomi.get_int16(raw, block_start_position + 0x02)
header = MapDataParserXiaomi.get_bytes(raw, block_start_position, block_header_length)
block_data_length = MapDataParserXiaomi.get_int32(header, 0x04)
@@ -125,7 +128,8 @@ def get_current_vacuum_room(block_start_position, raw, vacuum_position):
return room
@staticmethod
- def parse_image(block_data_length, block_header_length, data, header, colors, image_config):
+ def parse_image(block_data_length: int, block_header_length: int, data: bytes, header: bytes, colors: Colors,
+ image_config: ImageConfig) -> Tuple[ImageData, Dict[int, Room]]:
image_size = block_data_length
image_top = MapDataParserXiaomi.get_int32(header, block_header_length - 16)
image_left = MapDataParserXiaomi.get_int32(header, block_header_length - 12)
@@ -157,28 +161,24 @@ def parse_image(block_data_length, block_header_length, data, header, colors, im
image, MapDataParserXiaomi.map_to_image), rooms
@staticmethod
- def parse_goto_target(data):
+ def parse_goto_target(data: bytes) -> Point:
x = MapDataParserXiaomi.get_int16(data, 0x00)
y = MapDataParserXiaomi.get_int16(data, 0x02)
return Point(x, y)
@staticmethod
- def parse_vacuum_position(block_data_length, data):
+ def parse_object_position(block_data_length: int, data: bytes) -> Point:
x = MapDataParserXiaomi.get_int32(data, 0x00)
y = MapDataParserXiaomi.get_int32(data, 0x04)
a = None
if block_data_length > 8:
a = MapDataParserXiaomi.get_int32(data, 0x08)
+ if a > 0xFF:
+ a = (a & 0xFF) - 256
return Point(x, y, a)
@staticmethod
- def parse_charger(block_start_position, raw):
- x = MapDataParserXiaomi.get_int32(raw, block_start_position + 0x08)
- y = MapDataParserXiaomi.get_int32(raw, block_start_position + 0x0C)
- return Point(x, y)
-
- @staticmethod
- def parse_walls(data, header):
+ def parse_walls(data: bytes, header: bytes) -> List[Wall]:
wall_pairs = MapDataParserXiaomi.get_int16(header, 0x08)
walls = []
for wall_start in range(0, wall_pairs * 8, 8):
@@ -190,7 +190,7 @@ def parse_walls(data, header):
return walls
@staticmethod
- def parse_obstacles(data, header):
+ def parse_obstacles(data: bytes, header: bytes) -> List[Obstacle]:
obstacle_pairs = MapDataParserXiaomi.get_int16(header, 0x08)
obstacles = []
if obstacle_pairs == 0:
@@ -215,7 +215,7 @@ def parse_obstacles(data, header):
return obstacles
@staticmethod
- def parse_zones(data, header):
+ def parse_zones(data: bytes, header: bytes) -> List[Zone]:
zone_pairs = MapDataParserXiaomi.get_int16(header, 0x08)
zones = []
for zone_start in range(0, zone_pairs * 8, 8):
@@ -227,7 +227,7 @@ def parse_zones(data, header):
return zones
@staticmethod
- def parse_path(block_start_position, header, raw):
+ def parse_path(block_start_position: int, header: bytes, raw: bytes) -> Path:
path_points = []
end_pos = MapDataParserXiaomi.get_int32(header, 0x04)
point_length = MapDataParserXiaomi.get_int32(header, 0x08)
@@ -238,10 +238,10 @@ def parse_path(block_start_position, header, raw):
x = MapDataParserXiaomi.get_int16(raw, pos)
y = MapDataParserXiaomi.get_int16(raw, pos + 2)
path_points.append(Point(x, y))
- return Path(point_length, point_size, angle, path_points)
+ return Path(point_length, point_size, angle, [path_points])
@staticmethod
- def parse_area(header, data):
+ def parse_area(header: bytes, data: bytes) -> List[Area]:
area_pairs = MapDataParserXiaomi.get_int16(header, 0x08)
areas = []
for area_start in range(0, area_pairs * 16, 16):
@@ -257,21 +257,21 @@ def parse_area(header, data):
return areas
@staticmethod
- def get_bytes(data: bytes, start_index: int, size: int):
- return data[start_index: start_index + size]
+ def get_bytes(data: bytes, start_index: int, size: int) -> bytes:
+ return data[start_index: start_index + size]
@staticmethod
- def get_int8(data: bytes, address: int):
+ def get_int8(data: bytes, address: int) -> int:
return data[address] & 0xFF
@staticmethod
- def get_int16(data: bytes, address: int):
+ def get_int16(data: bytes, address: int) -> int:
return \
((data[address + 0] << 0) & 0xFF) | \
((data[address + 1] << 8) & 0xFFFF)
@staticmethod
- def get_int32(data: bytes, address: int):
+ def get_int32(data: bytes, address: int) -> int:
return \
((data[address + 0] << 0) & 0xFF) | \
((data[address + 1] << 8) & 0xFFFF) | \
diff --git a/custom_components/xiaomi_cloud_map_extractor/xiaomi/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/xiaomi/vacuum.py
index dc3128d..d3913d3 100644
--- a/custom_components/xiaomi_cloud_map_extractor/xiaomi/vacuum.py
+++ b/custom_components/xiaomi_cloud_map_extractor/xiaomi/vacuum.py
@@ -1,7 +1,9 @@
import gzip
+from typing import Optional
from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData
from custom_components.xiaomi_cloud_map_extractor.common.vacuum import XiaomiCloudVacuum
+from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts
from custom_components.xiaomi_cloud_map_extractor.xiaomi.map_data_parser import MapDataParserXiaomi
@@ -10,7 +12,7 @@ class XiaomiVacuum(XiaomiCloudVacuum):
def __init__(self, connector, country, user_id, device_id, model):
super().__init__(connector, country, user_id, device_id, model)
- def get_map_url(self, map_name):
+ def get_map_url(self, map_name: str) -> Optional[str]:
url = self._connector.get_api_url(self._country) + "/home/getmapfileurl"
params = {
"data": '{"obj_name":"' + map_name + '"}'
@@ -23,12 +25,18 @@ def get_map_url(self, map_name):
return None
return api_response["result"]["url"]
- def decode_map(self, raw_map, colors, drawables, texts, sizes, image_config) -> MapData:
+ def decode_map(self,
+ raw_map: bytes,
+ colors: Colors,
+ drawables: Drawables,
+ texts: Texts,
+ sizes: Sizes,
+ image_config: ImageConfig) -> MapData:
unzipped = gzip.decompress(raw_map)
return MapDataParserXiaomi.parse(unzipped, colors, drawables, texts, sizes, image_config)
- def should_get_map_from_vacuum(self):
+ def should_get_map_from_vacuum(self) -> bool:
return True
- def get_map_archive_extension(self):
+ def get_map_archive_extension(self) -> str:
return "gz"
diff --git a/scripts/map_processor.py b/scripts/map_processor.py
index c7d8222..3608fa1 100644
--- a/scripts/map_processor.py
+++ b/scripts/map_processor.py
@@ -3,12 +3,15 @@
import os
import yaml
+from homeassistant import config_entries # to fix circular imports
+from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_TOKEN, CONF_USERNAME
+
from custom_components.xiaomi_cloud_map_extractor.camera import PLATFORM_SCHEMA, VacuumCamera
from custom_components.xiaomi_cloud_map_extractor.const import *
+from custom_components.xiaomi_cloud_map_extractor.dreame.vacuum import DreameVacuum
from custom_components.xiaomi_cloud_map_extractor.roidmi.vacuum import RoidmiVacuum
from custom_components.xiaomi_cloud_map_extractor.viomi.vacuum import ViomiVacuum
from custom_components.xiaomi_cloud_map_extractor.xiaomi.vacuum import XiaomiVacuum
-from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_TOKEN, CONF_USERNAME
logging.basicConfig()
logging.getLogger().setLevel(logging.WARNING)
@@ -86,13 +89,16 @@ def parse_map_file(map_config, map_filename, api, suffix=""):
map_data = ViomiVacuum.decode_map(None, map_file, colors, drawables, texts, sizes, transform)
elif api == CONF_AVAILABLE_API_ROIDMI:
map_data = RoidmiVacuum.decode_map(None, map_file, colors, drawables, texts, sizes, transform)
+ elif api == CONF_AVAILABLE_API_DREAME:
+ map_data = DreameVacuum.decode_map(None, map_file, colors, drawables, texts, sizes, transform)
except Exception as e:
print(f"Failed to parse map data! {e}")
if map_data is not None:
map_data.image.data.save(f"{map_filename}{suffix}.png")
print(f"Map image saved to \"{map_filename}{suffix}.png\"")
attributes_output_file = open(f"{map_filename}{suffix}.yaml", "w")
- yaml.dump(attributes_to_dict(map_data), attributes_output_file)
+ attributes = VacuumCamera.extract_attributes(map_data, CONF_AVAILABLE_ATTRIBUTES, "")
+ yaml.dump(attributes_to_dict(attributes), attributes_output_file)
attributes_output_file.close()
print(f"Map attributes saved to \"{map_filename}{suffix}.yaml\"")
else:
@@ -100,7 +106,7 @@ def parse_map_file(map_config, map_filename, api, suffix=""):
def run_download(map_config, data_output_dir):
- print(f"Downloading map data...")
+ print("Downloading map data...")
camera = create_camera(map_config, data_output_dir)
camera.update()
attributes = camera.extra_state_attributes
@@ -135,14 +141,14 @@ def run_test(map_config, test_dir):
args_parser_parse = args_subparsers.add_parser("parse", help="Parse already downloaded map file")
args_parser_parse.add_argument("--config", type=str, required=True, help="camera yaml config file")
args_parser_parse.add_argument("--map-file", type=str, required=True, help="raw map file")
- args_parser_parse.add_argument("--api", type=str, choices=["xiaomi", "viomi", "roidmi"], required=True,
+ args_parser_parse.add_argument("--api", type=str, choices=["xiaomi", "viomi", "roidmi", "dreame"], required=True,
help="used api")
args = args_parser.parse_args()
config_filename = args.config
print(f"Validating configuration file: {config_filename}")
config = open_and_validate_config(config_filename)
- print(f"Configuration validation successful")
+ print("Configuration validation successful")
output_dir = config_filename.replace(".yaml", "")
if not os.path.exists(output_dir):
os.mkdir(output_dir)