-
Notifications
You must be signed in to change notification settings - Fork 51
/
example.py
95 lines (72 loc) · 2.64 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env python
"""Basic usage example and testing of pysma."""
import argparse
import asyncio
import logging
import signal
import sys
import aiohttp
import pysma
# This example will work with Python 3.9+
_LOGGER = logging.getLogger(__name__)
VAR = {}
def print_table(sensors):
"""Print sensors formatted as table."""
for sen in sensors:
if sen.value is None:
print("{:>25}".format(sen.name))
else:
print("{:>25}{:>15} {}".format(sen.name, str(sen.value), sen.unit))
async def main_loop(password, user, url):
"""Run main loop."""
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False)
) as session:
VAR["sma"] = pysma.SMA(session, url, password=password, group=user)
try:
await VAR["sma"].new_session()
except pysma.exceptions.SmaAuthenticationException:
_LOGGER.warning("Authentication failed!")
return
except pysma.exceptions.SmaConnectionException:
_LOGGER.warning("Unable to connect to device at %s", url)
return
# We should not get any exceptions, but if we do we will close the session.
try:
VAR["running"] = True
cnt = 5
sensors = await VAR["sma"].get_sensors()
device_info = await VAR["sma"].device_info()
for name, value in device_info.items():
print("{:>15}{:>25}".format(name, value))
# enable all sensors
for sensor in sensors:
sensor.enabled = True
while VAR.get("running"):
await VAR["sma"].read(sensors)
print_table(sensors)
cnt -= 1
if cnt == 0:
break
await asyncio.sleep(2)
finally:
_LOGGER.info("Closing Session...")
await VAR["sma"].close_session()
async def main():
"""Run example."""
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
parser = argparse.ArgumentParser(description="Test the SMA webconnect library.")
parser.add_argument(
"url",
type=str,
help="Web address of the Webconnect module (http://ip-address or https://ip-address)",
)
parser.add_argument("user", choices=["user", "installer"], help="Login username")
parser.add_argument("password", help="Login password")
args = parser.parse_args()
def _shutdown(*_):
VAR["running"] = False
signal.signal(signal.SIGINT, _shutdown)
await main_loop(user=args.user, password=args.password, url=args.url)
if __name__ == "__main__":
asyncio.run(main())