-
Notifications
You must be signed in to change notification settings - Fork 0
/
helen_el.py
87 lines (73 loc) · 3.23 KB
/
helen_el.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import datetime
import pendulum
from airflow.models import Variable
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.python import PythonOperator, ExternalPythonOperator, PythonVirtualenvOperator, is_venv_installed
@dag(
schedule="5 */8 * * *",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
max_active_runs=1,
default_args={
"depends_on_past": False,
"retries": 3,
"retry_delay": datetime.timedelta(minutes=30),
},
tags=["electricity"],
)
def helen_el():
hook = PostgresHook(postgres_conn_id='electricity')
def execute_query_with_conn_obj(query, datatuple, hook):
conn = hook.get_conn()
cur = conn.cursor()
cur.execute(query, datatuple)
conn.commit()
create_electricity_tables = PostgresOperator(
task_id="create_electricity_tables",
postgres_conn_id="electricity",
sql="sql/electricity_schema.sql",
)
@task.virtualenv(
requirements=["pandas==1.5.3",
"Numpy==1.26.4",
"PyYAML==6.0",
"requests>=2.31.0",
"psycopg2-binary==2.9.6",
"pyroscope-io==0.8.5",
"git+https://github.com/ruupert/python-helen-electricity-usage"],
system_site_packages=False
)
def extract(username, password, delivery_site):
import datetime
import math
import os
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pandas
import helen_electricity_usage
t = datetime.now()
start_date = datetime(year=t.year,month=t.month,day=t.day, hour=0, minute=0, second=0, tzinfo=timezone.utc) + timedelta(days=-7)
end_date = datetime(year=t.year,month=t.month,day=t.day, hour=0, minute=0, second=0, tzinfo=timezone.utc)
helen = helen_electricity_usage.Helen(username, password, delivery_site)
helen.login()
data = helen.get_electricity(start_date, end_date)
data = data['intervals']['electricity'][0]
idx = pandas.date_range(start=datetime.strptime( data['start'], "%Y-%m-%dT%H:%M:%SZ"), end=(datetime.strptime( data['stop'], "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=-1)), freq='H')
pa = pandas.array(data['measurements'])
df = pandas.DataFrame(data=pa, index=idx)
return df
@task()
def load_consumption(df, hook: PostgresHook):
for row in df.itertuples():
if row[1]['status'] == 'valid':
data_tuple = (row[0].tz_localize('utc').tz_convert("Europe/Helsinki"), row[1]['value'])
execute_query_with_conn_obj("""INSERT INTO consumption (date, val) VALUES (%s, %s) ON CONFLICT (date) DO NOTHING""", data_tuple, hook)
consumption_data = extract(Variable.get("electricity_costs_helen_username"),
Variable.get("electricity_costs_helen_password"),
Variable.get("electricity_costs_delivery_site"))
create_electricity_tables >> consumption_data
load_consumption(consumption_data, hook)
helen_el()