-
Notifications
You must be signed in to change notification settings - Fork 7
/
weatherscene.py
executable file
·269 lines (246 loc) · 7.37 KB
/
weatherscene.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# Render the current weather forecast from SMHI.se
#
import time
import gc
try:
import urequests as requests
except ImportError:
import requests
# Local imports
from pixelfont import PixelFont
from icon import Icon
# Based on demoscene.py
class WeatherScene:
"""
This module displays a weather forecast from SMHI (Sweden)
"""
dir_prefix = 'weather/'
def __init__(self, display, config):
"""
Initialize the module.
`display` is saved as an instance variable because it is needed to
update the display via self.display.put_pixel() and .render()
"""
self.display = display
self.icon = None
self.debug = False
self.intensity = 16
self.lat = 59.3293
self.lon = 18.0686
self.api_url = 'https://opendata-download-metfcst.smhi.se'
self.headers = {
'User-Agent':'weatherscene.py/1.0 (+https://github.com/noahwilliamsson/lamatrix)',
'Accept-Encoding': 'identity',
}
self.temperature = 0
self.wind_speed = 0
self.last_refreshed_at = 0
# http://opendata.smhi.se/apidocs/metfcst/parameters.html#parameter-wsymb
self.symbol = None
self.symbol_to_icon = [
None,
['moon-stars.bin', 'sunny.bin'], # clear sky
['moon-stars.bin', 'sunny-with-clouds.bin'], # nearly clear sky
'cloud-partly.bin', # variable cloudiness
'sunny-with-clouds.bin', # halfclear sky
'cloudy.bin', # cloudy sky
'cloudy.bin', # overcast
'fog.bin', # fog
'rain.bin', # light rain showers
'rain.bin', # medium rain showers
'rain.bin', # heavy rain showers
'rain.bin', # thunderstorm
'rain-snow.bin', # light sleet showers
'rain-snow.bin', # medium sleet showers
'rain-snow.bin', # heavy sleet showers
'rain-snow.bin', # light snow showers
'rain-snow.bin', # medium snow showers
'rain-snow.bin', # heavy snow showers
'rain.bin', # light rain
'rain.bin', # medium rain
'rain.bin', # heavy rain
'thunderstorm.bin', # thunder
'rain-snowy.bin', # light sleet
'rain-snowy.bin', # medium sleet
'rain-snowy.bin', # heavy sleet
'snow-house.bin', # light snowfall
'snow-house.bin', # medium snowfall
'snow-house.bin', # heavy snowfall
]
self.num_frames = 1
self.remaining_frames = 1
self.next_frame_at = 0
if not config:
return
if 'debug' in config:
self.debug = config['debug']
if 'intensity' in config:
self.intensity = int(round(config['intensity']*255))
if 'lat' in config:
self.lat = config['lat']
if 'lon' in config:
self.lon = config['lon']
def reset(self):
"""
This method is called before transitioning to this scene.
Use it to (re-)initialize any state necessary for your scene.
"""
self.next_frame_at = 0
self.reset_icon()
t = time.time()
if t < self.last_refreshed_at + 1800:
return
# fetch a new forecast from SMHI
url = '{}/api/category/pmp3g/version/2/geotype/point/lon/{}/lat/{}/data.json'.format(self.api_url, self.lon, self.lat)
print('WeatherScene: reset called, requesting weather forecast from: {}'.format(url))
r = requests.get(url, headers=self.headers, stream=True)
if r.status_code != 200:
print('WeatherScene: failed to request {}: status {}'.format(url, r.status_code))
return
print('WeatherScene: parsing weather forecast')
next_hour = int(time.time())
next_hour = next_hour - next_hour%3600 + 3600
expected_timestamp = '{:04d}-{:02d}-{:02d}T{:02d}'.format(*time.gmtime(next_hour))
temp, ws, symb = self.get_forecast(r.raw, expected_timestamp)
# Close socket and free up RAM
r.close()
r = None
gc.collect()
if temp == None:
print('WeatherScene: failed to find forecast for timestamp prefix: {}'.format(expected_timestamp))
return
self.temperature = float(temp.decode())
self.wind_speed = float(ws.decode())
self.symbol = int(symb.decode())
self.last_refreshed_at = t
filename = self.symbol_to_icon[self.symbol]
if not filename:
return
if type(filename) == list:
lt = time.localtime(next_hour)
if lt[3] < 7 or lt[3] > 21:
# Assume night icon
filename = filename[0]
else:
filename = filename[1]
if self.icon:
# MicroPython does not support destructors so we need to manually
# close the file we have opened
self.icon.close() # Close icon file
self.icon = Icon(self.dir_prefix + filename)
self.icon.set_intensity(self.intensity)
self.reset_icon()
def input(self, button_state):
"""
Handle button inputs
"""
return 0 # signal that we did not handle the input
def set_intensity(self, value=None):
if value is not None:
self.intensity -= 1
if not self.intensity:
self.intensity = 16
if self.icon:
self.icon.set_intensity(self.intensity)
return self.intensity
def render(self, frame, dropped_frames, fps):
"""
Render the scene.
This method is called by the render loop with the current frame number,
the number of dropped frames since the previous invocation and the
requested frames per second (FPS).
"""
if frame < self.next_frame_at:
return True
self.remaining_frames -= 1
n = self.num_frames
index = n - (self.remaining_frames % n) - 1
# Calculate next frame number
self.next_frame_at = frame + int(fps * self.icon.frame_length()/1000)
# Render frame
display = self.display
intensity = self.intensity
self.icon.blit(display, 0 if display.columns == 32 else 4, 0)
# Render text
if self.remaining_frames >= n:
text = '{:.2g}\'c'.format(self.temperature)
else:
text = '{:.2g}m/s'.format(self.wind_speed)
if display.columns <= 16:
text = '{:.1g}m/s'.format(self.wind_speed)
display.render_text(PixelFont, text, 9 if display.columns == 32 else 0, 1 if display.columns == 32 else 10, intensity)
display.render()
if self.remaining_frames == 0:
return False
return True
def reset_icon(self):
if not self.icon:
return
self.icon.reset()
self.num_frames = self.icon.num_frames
self.remaining_frames = self.num_frames*2
t_icon = self.icon.length_total()
# Ensure a minimum display time
for i in range(1,6):
if t_icon*i >= 4000:
break
self.remaining_frames += self.num_frames
def get_forecast(self, f, validTime):
"""
Extract temperature, windspeed, weather symbol from JSON response
"""
timeStr = None
tempStr = None
wsStr = None
symbStr = None
while True:
v = f.read(1)
if v != b'"':
continue
s = self.next_string(f)
if not timeStr:
if not s.startswith(b'validTime'):
continue
timeStr = self.next_string(f, 2)
if not timeStr.startswith(validTime):
timeStr = None
continue
elif not tempStr and s == b't':
tempStr = self.next_array_entry(f)
elif not wsStr and s == b'ws':
wsStr = self.next_array_entry(f)
elif not symbStr and s == b'Wsymb2':
symbStr = self.next_array_entry(f)
elif timeStr and tempStr and wsStr and symbStr:
break
return (tempStr, wsStr, symbStr)
def next_string(self, f, start_at = 0):
"""
Extract string value from JSON
"""
if start_at:
f.read(start_at)
stash = bytearray()
while True:
c = f.read(1)
if c == b'"':
break
stash.append(c[0])
return bytes(stash)
def next_array_entry(self, f):
"""
Extract first array entry from JSON
"""
in_array = False
stash = bytearray()
while True:
c = f.read(1)
if not in_array:
if c != b'[':
continue
in_array = True
continue
if c == b']' or c == b' ' or c == b',':
break
stash.append(c[0])
return bytes(stash)