-
Notifications
You must be signed in to change notification settings - Fork 0
/
unifi_lte_stats.py
executable file
·332 lines (231 loc) · 11.5 KB
/
unifi_lte_stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
#!/usr/bin/env python3
'''
unifi_lte_stats.py -- Exporter for unifi U-LTE-Pro devices
unifi_lte_stats.py is a data exporter for Prometheus
@author: Brendan Bank
@copyright: 2023 Brendan Bank. All rights reserved.
@license: BSDv3
@contact: brendan.bank ... gmail.com
@deffield updated: Updated
'''
'''
Copyright 2023 Brendan Bank
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
__all__ = []
__version__ = 0.2
__date__ = '2023-01-10'
__updated__ = '2023-01-20'
import requests
import urllib3
import re
import time
import prometheus_client
from prometheus_client import Info, generate_latest, Gauge, start_http_server
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import os
from dotenv import load_dotenv
import pprint
import argparse
import pwd
import grp
ENV = '.env'
import logging, sys
log = logging.getLogger(os.path.basename(__file__))
logging.basicConfig(format='%(name)s.%(funcName)s(%(lineno)s): %(message)s', stream=sys.stderr, level=logging.WARN)
""" get authentication credentials from .env file"""
TIMEOUT = 5
POLL_INTERVAL = 20
EXPORTER_PORT = os.environ.get('PORT', 9013)
def main():
'''main function.'''
program_name = os.path.basename(sys.argv[0])
program_version = "v%s" % __version__
program_build_date = str(__updated__)
program_version_message = '%%(prog)s %s (%s)' % (program_version, program_build_date)
program_shortdesc = __import__('__main__').__doc__.split("\n")[1]
program_license = '''%s
Created by Brendan Bank on %s.
Copyright 2023 Brendan Bank. All rights reserved.
Licensed under the BSD-3-Clause
https://opensource.org/licenses/BSD-3-Clause
Distributed on an "AS IS" basis without warranties
or conditions of any kind, either express or implied.
USAGE''' % (program_shortdesc, str(__date__))
""" Set variables """
# Setup argument parser
parser = argparse.ArgumentParser(description=program_license, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", default=False,
help="set verbosity [default: %(default)s]")
parser.add_argument('-V', '--version', action='version', version=program_version_message)
parser.add_argument('-d', '--debug', action='store_true', dest="debug", default=False,
help="set debug [default: %(default)s]")
parser.add_argument('-E', '--exporter-port', type=int, dest="exporter_port", default=EXPORTER_PORT,
help="set TCP Port for the exporter server [default: %(default)s]")
parser.add_argument('-i', '--interval', type=int, dest="interval", default=POLL_INTERVAL,
help="Poll interval [default: %(default)s] seconds")
parser.add_argument('-e', '--environment', type=argparse.FileType('r'), dest="environment_file", default=ENV,
help="enviroment file to read from [default: %(default)s] seconds")
parser.add_argument('hostname', type=str, help='Hostname to poll')
parser.add_argument('-u', '--username', type=str, dest="priv_username",
default="nobody",
help="Run the exporter as a specific user drop. " +
"The exporter must be started as root to enable this. [default: %(default)s]")
# Process arguments
args = parser.parse_args()
if (args.debug):
log.setLevel(level=logging.DEBUG)
elif (args.verbose):
log.setLevel(level=logging.INFO)
basedir = os.path.abspath(os.path.dirname(__file__))
envpath = os.path.join(basedir, args.environment_file.name)
load_dotenv(envpath)
USERNAME = os.environ.get('USERNAME')
PASSWORD = os.environ.get('PASSWORD')
for i in ['USERNAME', 'PASSWORD']:
if not os.environ.get(i):
log.critical(f'{i} is not set ')
exit(1)
login_url = f"https://{args.hostname}/api/auth/login"
Headers = {'Content-type': 'application/json'}
device_url = f"https://{args.hostname}/proxy/network/api/s/default/stat/device"
""" Create a prometheus_client registry and http session"""
registry = prometheus_client.CollectorRegistry()
http_session = requests.Session()
""" create prometheus_client items """
lte_info = Info('unifi_lte', 'LTE info', registry=registry)
stats = ['lte_rx_chan', 'lte_tx_chan', 'lte_rssi', 'lte_rsrq', 'lte_rsrp', 'total_tx_bytes', 'total_rx_bytes', 'lte_signal','lte_failover', 'uptime']
lte_stats = {}
for i in stats:
lte_stats[i] = Gauge('unifi_' + i, i, labelnames=['id', 'name', 'model'], registry=registry)
stats_text = ['lte_connected', 'lte_imei', 'lte_iccid', 'lte_radio', 'lte_ip', 'lte_networkoperator', 'lte_pdptype', \
'lte_rat', 'lte_signal', 'lte_mode', 'lte_band', 'lte_cell_id', 'lte_radio_mode', \
'model', 'name', 'ip', 'mac', 'version', 'license_state', '_id', 'serial',
'displayable_version','lte_state','lte_ext_ant', 'lte_connected', 'ip' ]
lte_data = {}
""" create prometheus server """
drop_privileges(args.priv_username)
start_http_server(EXPORTER_PORT, registry=registry)
cookie = None
pp = pprint.PrettyPrinter(indent=4)
while True:
""" try to fetch data, this fill fail on the first run """
try:
r_data = http_session.get(device_url, cookies=cookie, verify=False, timeout=TIMEOUT)
except Exception as e:
log.critical(f'could not connect to {device_url}: {e}')
time.sleep(POLL_INTERVAL)
log.debug(f'sleep {POLL_INTERVAL}')
continue
if (r_data.status_code != 200):
""" try to authenticate """
log.critical(f'Could not fetch {device_url}: http status code: {r_data.status_code}')
data = {'username': USERNAME, 'password': PASSWORD }
log.info(f'try to fetch cookie from: {login_url}')
try:
r = http_session.post(login_url,
json=data,
headers=Headers,
verify=False,
timeout=TIMEOUT
)
except Exception as e:
log.critical(f'could not connect to {login_url}: f{e}')
time.sleep(POLL_INTERVAL)
log.debug(f'sleep {POLL_INTERVAL}')
continue
if (r.status_code == 200):
log.critical(f'login was successful on {login_url}')
cookie = r.cookies
else:
log.critical(f'Could not fetch: http status code: {r.status_code}, sleep for 60 sec')
time.sleep(60)
log.critical(f'sleep 60')
continue
j_data = r_data.json()
for data in j_data['data']:
"""Only pull data from ULTEPEU or ULTEUS device types"""
if data['model'] == "ULTEPEU" or data['model'] == "ULTEUS":
log.debug('ULTE found')
log.debug(pp.pformat(data))
lte_data['name'] = data['name']
lte_data['model'] = data['model']
lte_data['id'] = data['_id']
lte_data['stats'] = {}
for i in stats:
if not (i in data):
continue
lte_data['stats'][i] = data[i]
lte_data['info'] = {}
for i in stats_text:
if i in data:
lte_data['info'][i] = data[i]
"""Fill unifi_lte_info with details"""
lte_info.info(lte_data['info'])
log.debug(f'fill info with {lte_data["info"]}')
"""Fill unifi_* stats """
for k in lte_stats.keys():
if not (k in lte_data['stats']):
log.info (f'element {k} not in data set')
continue
if k == 'lte_signal':
signal = re.search(r'\d', lte_data['stats'][k])
lte_stats[k].labels(lte_data['id'], lte_data['name'], lte_data['model']).set(signal.group())
elif k == 'lte_failover':
if lte_data['stats'][k]:
lte_stats[k].labels(lte_data['id'], lte_data['name'], lte_data['model']).set(1)
else:
lte_stats[k].labels(lte_data['id'], lte_data['name'], lte_data['model']).set(0)
else:
lte_stats[k].labels(lte_data['id'], lte_data['name'], lte_data['model']).set(lte_data['stats'][k])
log.debug(f'set {k} to {lte_data["stats"][k]}')
log.debug(f'generate_latest')
log.debug(generate_latest(registry=registry).decode("ascii"))
log.debug(f'sleep {POLL_INTERVAL}')
if 'lte_signal' in lte_data['stats'] and 'lte_rssi' in lte_data['stats']:
log.info(f"Fetched data from {args.hostname} signal: '{lte_data['stats']['lte_signal']}' rssi: {lte_data['stats']['lte_rssi']} sleep: {POLL_INTERVAL}s")
time.sleep (POLL_INTERVAL)
def drop_privileges(uid_name='nobody', gid_name='nogroup'):
if os.getuid() != 0:
# We're not root so, like, whatever dude
return
log.warning(f'Dropping privileges to user {uid_name} and group {gid_name}')
# Get the uid/gid from the name
try:
running_uid = pwd.getpwnam(uid_name).pw_uid
except Exception as e:
log.critical(f'cannot find username {uid_name}: {e}')
exit()
try:
running_gid = grp.getgrnam(gid_name).gr_gid
except Exception as e:
log.critical(f'cannot find groupname {gid_name}: {e}')
exit()
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
old_umask = os.umask(0o077)
if __name__ == "__main__":
main()