Skip to content

Commit

Permalink
Merge pull request #12 from denysdovhan/unofficial-yasno-api
Browse files Browse the repository at this point in the history
Add support for Dnipro city
  • Loading branch information
denysdovhan authored Jul 23, 2024
2 parents d422e72 + 6815a74 commit 81efa80
Show file tree
Hide file tree
Showing 14 changed files with 350 additions and 1,053 deletions.
1 change: 0 additions & 1 deletion contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ Only native speaker can translate to specific language.

1. Copy `custom_components/yasno_outages/translations/en.json` file and name it with appropriate language code.
1. Translate only keys in this file, not values.
1. Mention your translation in `readme.md` file.
1. Open a PR.
1. Find someone to check and approve your PR.

Expand Down
21 changes: 21 additions & 0 deletions custom_components/yasno_outages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from homeassistant.const import Platform

from .const import CONF_CITY, DEFAULT_CITY
from .coordinator import YasnoOutagesCoordinator

if TYPE_CHECKING:
Expand All @@ -18,6 +19,26 @@
PLATFORMS = [Platform.CALENDAR, Platform.SENSOR]


async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Migrate old entry."""
LOGGER.debug(
"Migrating configuration from version %s.%s",
config_entry.version,
config_entry.minor_version,
)

version = config_entry.version

if version == 1:
LOGGER.debug("Migrating: city is set to default city (%s).", DEFAULT_CITY)
data = {**config_entry.data}
if CONF_CITY not in data:
data[CONF_CITY] = DEFAULT_CITY
hass.config_entries.async_update_entry(config_entry, data=data, version=2)

return True


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up a new entry."""
LOGGER.info("Setup entry: %s", entry)
Expand Down
132 changes: 103 additions & 29 deletions custom_components/yasno_outages/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,128 @@

import datetime
import logging
from pathlib import Path

import recurring_ical_events
from icalendar import Calendar

from .const import CALENDAR_PATH
import requests
from dateutil.rrule import WEEKLY, rrule

LOGGER = logging.getLogger(__name__)

API_ENDPOINT = (
"https://api.yasno.com.ua/api/v1/pages/home/schedule-turn-off-electricity"
)
START_OF_DAY = 0
END_OF_DAY = 24


class YasnoOutagesApi:
"""Class to interact with calendar files for Yasno outages."""
"""Class to interact with Yasno outages API."""

calendar: recurring_ical_events.UnfoldableCalendar | None
"""Group name format"""
group_name = "group_{group}"

def __init__(self, group: int) -> None:
def __init__(self, city: str | None = None, group: str | None = None) -> None:
"""Initialize the YasnoOutagesApi."""
self.group = group
self.calendar = None
self.city = city
self.api_url = API_ENDPOINT
self.schedule = None

def _extract_schedule(self, data: dict) -> dict:
"""Extract schedule from the API response."""
schedule_component = next(
(
item
for item in data["components"]
if item["template_name"] == "electricity-outages-schedule"
),
None,
)
if schedule_component:
return schedule_component["schedule"]
LOGGER.error("Schedule component not found in the API response.")
return None

def _build_event_hour(
self,
date: datetime.datetime,
start_hour: int,
) -> datetime.datetime:
return date.replace(hour=start_hour, minute=0, second=0, microsecond=0)

def fetch_schedule(self) -> None:
"""Fetch outages from the API."""
try:
response = requests.get(self.api_url, timeout=60)
response.raise_for_status()
self.schedule = self._extract_schedule(response.json())
except requests.RequestException as error:
LOGGER.exception("Error fetching schedule from Yasno API: %s", error) # noqa: TRY401
self.schedule = {}

@property
def calendar_path(self) -> Path:
"""Return the path to the ICS file."""
return Path(__file__).parent / CALENDAR_PATH.format(group=self.group)
def get_cities(self) -> list[str]:
"""Get a list of available cities."""
return list(self.schedule.keys())

def fetch_calendar(self) -> None:
"""Fetch outages from the ICS file."""
if not self.calendar:
with self.calendar_path.open() as file:
ical = Calendar.from_ical(file.read())
self.calendar = recurring_ical_events.of(ical)
return self.calendar
def get_city_groups(self, city: str) -> dict[str, list]:
"""Get all schedules for all of available groups for a city."""
return self.schedule.get(city, {})

def get_group_schedule(self, city: str, group: int) -> list:
"""Get the schedule for a specific group."""
city_groups = self.get_city_groups(city)
return city_groups.get(self.group_name.format(group=group), [])

def get_current_event(self, at: datetime.datetime) -> dict:
"""Get the current event."""
if not self.calendar:
return None
events_at = self.calendar.at(at)
if not events_at:
return None
return events_at[0] # return only the first event
for event in self.get_events(at, at + datetime.timedelta(hours=1)):
if event["start"] <= at < event["end"]:
return event
return None

def get_events(
self,
start_date: datetime.datetime,
end_date: datetime.datetime,
) -> list[dict]:
"""Get all events."""
if not self.calendar:
return []
return self.calendar.between(start_date, end_date)
group_schedule = self.get_group_schedule(self.city, self.group)
events = []

# For each day of the week in the schedule
for dow, day_events in enumerate(group_schedule):
# Build a recurrence rule the events between start and end dates
recurrance_rule = rrule(
WEEKLY,
dtstart=start_date,
until=end_date,
byweekday=dow,
)

# For each event in the day
for event in day_events:
event_start_hour = event["start"]
event_end_hour = event["end"]

if event_end_hour == END_OF_DAY:
event_end_hour = START_OF_DAY

# For each date in the recurrence rule
for dt in recurrance_rule:
event_start = self._build_event_hour(dt, event_start_hour)
event_end = self._build_event_hour(dt, event_end_hour)
if event_end_hour == START_OF_DAY:
event_end += datetime.timedelta(days=1)
if (
start_date <= event_start <= end_date
or start_date <= event_end <= end_date
):
events.append(
{
"summary": event["type"],
"start": event_start,
"end": event_end,
},
)

# Sort events by start time to ensure correct order
return sorted(events, key=lambda event: event["start"])
130 changes: 109 additions & 21 deletions custom_components/yasno_outages/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,22 @@
OptionsFlow,
)
from homeassistant.core import callback
from homeassistant.helpers.selector import selector
from homeassistant.helpers.selector import (
SelectSelector,
SelectSelectorConfig,
)

from .api import YasnoOutagesApi
from .const import CONF_CITY, CONF_GROUP, DEFAULT_CITY, DEFAULT_GROUP, DOMAIN, NAME

LOGGER = logging.getLogger(__name__)

GROUP_PREFIX = "group_"

from .const import CONF_GROUP, DEFAULT_GROUP, DOMAIN

_LOGGER = logging.getLogger(__name__)
def extract_group_index(group: str) -> str:
"""Extract the group index from the group name."""
return group[len(GROUP_PREFIX) :]


def get_config_value(
Expand All @@ -29,22 +40,45 @@ def get_config_value(
return default


def build_schema(config_entry: ConfigEntry) -> vol.Schema:
"""Build the schema for the config flow."""
def build_city_schema(api: YasnoOutagesApi, config_entry: ConfigEntry) -> vol.Schema:
"""Build the schema for the city selection step."""
cities = api.get_cities()
return vol.Schema(
{
vol.Required(
CONF_CITY,
default=get_config_value(config_entry, CONF_CITY, DEFAULT_CITY),
): SelectSelector(
SelectSelectorConfig(
options=cities,
translation_key="city",
),
),
},
)


def build_group_schema(
api: YasnoOutagesApi,
config_entry: ConfigEntry,
data: dict,
) -> vol.Schema:
"""Build the schema for the group selection step."""
city = data[CONF_CITY]
groups = api.get_city_groups(city).keys()
group_indexes = [extract_group_index(group) for group in groups]
LOGGER.debug("Getting %s groups: %s", city, groups)

return vol.Schema(
{
vol.Required(
CONF_GROUP,
default=get_config_value(config_entry, CONF_GROUP, DEFAULT_GROUP),
): selector(
{
"select": {
"options": [
{"value": str(i), "label": f"Group {i}"}
for i in range(1, 7)
],
},
},
): SelectSelector(
SelectSelectorConfig(
options=group_indexes,
translation_key="group",
),
),
},
)
Expand All @@ -56,16 +90,43 @@ class YasnoOutagesOptionsFlow(OptionsFlow):
def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize options flow."""
self.config_entry = config_entry
self.api = YasnoOutagesApi()
self.data: dict[str, Any] = {}

async def async_step_init(self, user_input: dict | None = None) -> ConfigFlowResult:
"""Manage the options."""
"""Handle the city change."""
if user_input is not None:
_LOGGER.debug("Updating options: %s", user_input)
return self.async_create_entry(title="", data=user_input)
LOGGER.debug("Updating options: %s", user_input)
self.data.update(user_input)
return await self.async_step_group()

await self.hass.async_add_executor_job(self.api.fetch_schedule)

LOGGER.debug("Options: %s", self.config_entry.options)
LOGGER.debug("Data: %s", self.config_entry.data)

return self.async_show_form(
step_id="init",
data_schema=build_schema(config_entry=self.config_entry),
data_schema=build_city_schema(api=self.api, config_entry=self.config_entry),
)

async def async_step_group(
self,
user_input: dict | None = None,
) -> ConfigFlowResult:
"""Handle the group change."""
if user_input is not None:
LOGGER.debug("User input: %s", user_input)
self.data.update(user_input)
return self.async_create_entry(title="", data=self.data)

return self.async_show_form(
step_id="group",
data_schema=build_group_schema(
api=self.api,
config_entry=self.config_entry,
data=self.data,
),
)


Expand All @@ -74,6 +135,11 @@ class YasnoOutagesConfigFlow(ConfigFlow, domain=DOMAIN):

VERSION = 1

def __init__(self) -> None:
"""Initialize config flow."""
self.api = YasnoOutagesApi()
self.data: dict[str, Any] = {}

@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> YasnoOutagesOptionsFlow:
Expand All @@ -83,10 +149,32 @@ def async_get_options_flow(config_entry: ConfigEntry) -> YasnoOutagesOptionsFlow
async def async_step_user(self, user_input: dict | None = None) -> ConfigFlowResult:
"""Handle the initial step."""
if user_input is not None:
_LOGGER.debug("User input: %s", user_input)
return self.async_create_entry(title="Yasno Outages", data=user_input)
LOGGER.debug("City selected: %s", user_input)
self.data.update(user_input)
return await self.async_step_group()

await self.hass.async_add_executor_job(self.api.fetch_schedule)

return self.async_show_form(
step_id="user",
data_schema=build_schema(config_entry=None),
data_schema=build_city_schema(api=self.api, config_entry=None),
)

async def async_step_group(
self,
user_input: dict | None = None,
) -> ConfigFlowResult:
"""Handle the group step."""
if user_input is not None:
LOGGER.debug("User input: %s", user_input)
self.data.update(user_input)
return self.async_create_entry(title=NAME, data=self.data)

return self.async_show_form(
step_id="group",
data_schema=build_group_schema(
api=self.api,
config_entry=None,
data=self.data,
),
)
Loading

0 comments on commit 81efa80

Please sign in to comment.