Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Tornado 16X SQ air conditioner (0x4E2A) #520

Merged
merged 15 commits into from
Apr 17, 2024
4 changes: 2 additions & 2 deletions broadlink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Generator, List, Union, Tuple

from .alarm import S1C
from .climate import hysen
from .climate import hysen, hvac
from .cover import dooya
from .device import device, ping, scan
from .exceptions import exception
Expand Down Expand Up @@ -105,12 +105,12 @@
0x60C8: (lb1, "LB1", "Broadlink"),
0x6112: (lb1, "LB1", "Broadlink"),
0x2722: (S1C, "S2KIT", "Broadlink"),
0X4E2A: (hvac, "HVAC", "Licensed manufacturer"),
0x4EAD: (hysen, "HY02B05H", "Hysen"),
0x4E4D: (dooya, "DT360E-45/20", "Dooya"),
0x51E3: (bg1, "BG800/BG900", "BG Electrical"),
}


def gendevice(
dev_type: int,
host: Tuple[str, int],
Expand Down
217 changes: 217 additions & 0 deletions broadlink/climate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Support for climate control."""
from typing import List
import logging
from enum import IntEnum, unique
import struct

from .device import device
from .exceptions import check_error
Expand Down Expand Up @@ -235,3 +238,217 @@ def set_schedule(self, weekday: List[dict], weekend: List[dict]) -> None:
input_payload.append(int(weekend[i]["temp"] * 2))

self.send_request(input_payload)


class hvac(device):
"""Controls a HVAC.

Supported models:
- Tornado SMART X SQ series.
"""

@unique
class Mode(IntEnum):
"""Enumerates modes."""
AUTO = 0
COOL = 1
DRY = 2
HEAT = 3
FAN = 4

@unique
class Speed(IntEnum):
"""Enumerates fan speed."""
HIGH = 1
MID = 2
LOW = 3
AUTO = 5

@unique
class Preset(IntEnum):
"""Enumerates presets."""
NORMAL = 0
TURBO = 1
MUTE = 2

@unique
class SwHoriz(IntEnum):
"""Enumerates horizontal swing."""
ON = 0
OFF = 7

@unique
class SwVert(IntEnum):
"""Enumerates vertical swing."""
ON = 0
POS1 = 1
POS2 = 2
POS3 = 3
POS4 = 4
POS5 = 5
OFF = 7

def __init__(self, *args, **kwargs):
device.__init__(self, *args, **kwargs)
self.type = "HVAC"

def _crc(self, data: bytes) -> int:
"""Calculate CRC of a byte object."""
s = sum([v if i % 2 == 0 else v << 8 for i, v in enumerate(data)])
# trim the overflow and add it to smallest bit
s = (s & 0xffff) + (s >> 16)
return (0xffff - s) # TODO: fix: we can't return negative values
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misunderstanding what you mean.
Screen Shot 2021-05-02 at 14 23 06
But if you mean be negative like in the screenshot, I think it'd be okay just to check that the packet is smaller than 2^18B.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, overflow is unlikely as we are not dealing with packets of this size. & 0xFFFF should be enough as a safeguard:

     return (0xffff - s) & 0xffff

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just found the polynomial, now we can do:

crc = CRC16.calculate(payload[0x02:p_len], polynomial=0x9BE4)


def _encode(self, data: bytes) -> bytes:
"""Encode data for transport."""
packet = bytearray(10)
p_len = 8 + len(data)
struct.pack_into("<HHHHH", packet, 0, p_len, 0x00BB, 0x8006, 0, len(data))
packet += data
packet += self._crc(packet[2:]).to_bytes(2, "little")
return packet

def _decode(self, response: bytes) -> bytes:
"""Decode data from transport."""
# payload[0x2:0x8] == bytes([0xbb, 0x00, 0x07, 0x00, 0x00, 0x00])
payload = self.decrypt(response[0x38:])
p_len = int.from_bytes(payload[:0x2], "little")
checksum = int.from_bytes(payload[p_len:p_len+2], "little")

if checksum != self._crc(payload[0x2:p_len]):
logging.debug(
"Checksum incorrect (calculated %s actual %s).",
checksum.hex(), payload[p_len:p_len+2].hex()
)

d_len = int.from_bytes(payload[0x8:0xA], "little")
return payload[0xA:0xA+d_len]

def _send(self, command: int, data: bytes = b'') -> bytes:
"""Send a command to the unit."""
command = bytes([((command << 4) | 1), 1])
packet = self._encode(command + data)
logging.debug("Payload:\n%s", packet.hex(' '))
response = self.send_packet(0x6a, packet)
check_error(response[0x22:0x24])
return self._decode(response)[0x2:]

def get_state(self) -> dict:
"""Returns a dictionary with the unit's parameters.

Returns:
dict:
power (bool):
target_temp (float): temperature set point 16<n<32
mode (hvac.Mode):
speed (hvac.Speed):
preset (hvac.Preset):
swing_h (hvac.SwHoriz):
swing_v (hvac.SwVert):
sleep (bool):
display (bool):
health (bool):
clean (bool):
mildew (bool):
"""
resp = self._send(0x1)

if (len(resp) != 0xF):
raise ValueError(f"unexpected resp size: {len(resp)}")

logging.debug("Received resp:\n%s", resp.hex(' '))
logging.debug("0b[R] mask: %x, 0c[R] mask: %x, cmnd_16: %x",
resp[0x3] & 0xF, resp[0x4] & 0xF, resp[0x4])

state = {}
state['power'] = resp[0x8] & 1 << 5
state['target_temp'] = 8 + (resp[0x0] >> 3) + (resp[0x4] >> 7) * 0.5
state['swing_v'] = self.SwVert(resp[0x0] & 0b111)
state['swing_h'] = self.SwHoriz(resp[0x1] >> 5)
state['mode'] = self.Mode(resp[0x5] >> 5)
state['speed'] = self.Speed(resp[0x3] >> 5)
state['preset'] = self.Preset(resp[0x4] >> 6)
state['sleep'] = bool(resp[0x5] & 1 << 2)
state['health'] = bool(resp[0x8] & 1 << 1)
state['clean'] = bool(resp[0x8] & 1 << 2)
state['display'] = bool(resp[0xA] & 1 << 4)
state['mildew'] = bool(resp[0xA] & 1 << 3)

logging.debug("State: %s", state)

return state

def get_ac_info(self) -> dict:
"""Returns dictionary with AC info.

Returns:
dict:
power (bool): power
ambient_temp (float): ambient temperature
"""
resp = self._send(2)
if (len(resp) != 0x18):
raise ValueError(f"unexpected resp size: {len(resp)}")

logging.debug("Received resp:\n%s", resp.hex(' '))

ac_info = {}
ac_info["power"] = resp[0x1] & 1

ambient_temp = resp[0x5] & 0b11111, resp[0x15] & 0b11111
if any(ambient_temp):
ac_info["ambient_temp"] = ambient_temp[0] + ambient_temp[1] / 10.0

logging.debug("AC info: %s", ac_info)
return ac_info

def set_state(
self,
power: bool,
target_temp: float, # 16<=target_temp<=32
mode: int, # hvac.Mode
speed: int, # hvac.Speed
preset: int, # hvac.Preset
swing_h: int, # hvac.SwHoriz
swing_v: int, # hvac.SwVert
sleep: bool,
display: bool,
health: bool,
clean: bool,
mildew: bool,
) -> None:
"""Set the state of the device."""
# TODO: What does these values represent?
UNK0 = 0b100
UNK1 = 0b1101
UNK2 = 0b101

target_temp = round(target_temp * 2) / 2
if not (16 <= target_temp <= 32):
raise ValueError(f"target_temp out of range: {target_temp}")

if preset == self.Preset.MUTE:
if mode != self.Mode.FAN:
raise ValueError("mute is only available in fan mode")
speed = self.Speed.LOW

elif preset == self.Preset.TURBO:
if mode not in {self.Mode.COOL, self.Mode.HEAT}:
raise ValueError("turbo is only available in cooling/heating")
speed = self.Speed.HIGH

data = bytearray(0xD)
data[0x0] = (int(target_temp) - 8 << 3) | swing_v
data[0x1] = (swing_h << 5) | UNK0
data[0x2] = ((target_temp % 1 == 0.5) << 7) | UNK1
data[0x3] = speed << 5
data[0x4] = preset << 6
data[0x5] = mode << 5 | (sleep << 2)
data[0x8] = (power << 5 | clean << 2 | health * 0b11)
data[0xA] = display << 4 | mildew << 3
data[0xC] = UNK2

logging.debug("Constructed payload data:\n%s", data.hex(' '))

response_payload = self._send(0, data)
logging.debug("Response payload:\n%s", response_payload.hex(' '))