From e9044fe1ee7d10d41bfe2073bec788af764b0432 Mon Sep 17 00:00:00 2001 From: Rossen Georgiev Date: Wed, 27 Jan 2016 13:48:18 +0000 Subject: [PATCH] added tests for parsing.common #15 --- aprslib/parsing/common.py | 89 +++++++----- tests/test_parse_common.py | 284 ++++++++++++++++++++++++++++++++++++- 2 files changed, 333 insertions(+), 40 deletions(-) diff --git a/aprslib/parsing/common.py b/aprslib/parsing/common.py index a4b2293..65d8795 100644 --- a/aprslib/parsing/common.py +++ b/aprslib/parsing/common.py @@ -11,6 +11,9 @@ 'parse_header', 'parse_timestamp', 'parse_comment', + 'parse_data_extentions', + 'parse_comment_altitude', + 'parse_dao', ] def validate_callsign(callsign, prefix=""): @@ -44,7 +47,7 @@ def parse_header(head): path = path.split(',') - if len(path) < 1 or len(path[0]) == 0: + if len(path[0]) == 0: raise ParseError("no tocallsign in header") tocall = path[0] @@ -99,83 +102,97 @@ def parse_timestamp(body, packet_type=''): timestamp = utc.strptime(timestamp, "%Y%m%d%H%M%S") timestamp = time.mktime(timestamp.timetuple()) - - parsed.update({'raw_timestamp': rawts}) except Exception as exp: timestamp = 0 logger.debug(exp) - parsed.update({'timestamp': int(timestamp)}) + parsed.update({ + 'raw_timestamp': rawts, + 'timestamp': int(timestamp), + }) return (body, parsed) def parse_comment(body, parsed): - match = re.findall(r"^([0-9]{3})/([0-9]{3})", body) + body, result = parse_data_extentions(body) + parsed.update(result) + + body, result = parse_comment_altitude(body) + parsed.update(result) + + body, result = parse_comment_telemetry(body) + parsed.update(result) + + body = parse_dao(body, parsed) + + if len(body) > 0 and body[0] == "/": + body = body[1:] + + parsed.update({'comment': body.strip(' ')}) + + +def parse_data_extentions(body): + parsed = {} + match = re.findall(r"^([0-9 \.]{3})/([0-9 \.]{3})", body) + if match: cse, spd = match[0] body = body[7:] parsed.update({ - 'course': int(cse), - 'speed': int(spd)*1.852 # knots to kms + 'course': int(cse) if cse.strip(' .') != '' else 0, + 'speed': int(spd)*1.852 if spd.strip(' .') != '' else 0, }) - # try BRG/NRQ/ - match = re.findall(r"^([0-9]{3})/([0-9]{3})", body) + match = re.findall(r"^/([0-9]{3})/([0-9]{3})", body) if match: brg, nrq = match[0] - body = body[7:] + body = body[8:] parsed.update({'bearing': int(brg), 'nrq': int(nrq)}) else: - match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d[0-9A-Z]?))\/", body) + match = re.findall(r"^(PHG(\d[\x30-\x7e]\d\d[0-9A-Z]?))", body) if match: ext, phg = match[0] body = body[len(ext):] parsed.update({'phg': phg}) else: - match = re.findall(r"^(RNG(\d{4}))\/", body) + match = re.findall(r"^RNG(\d{4})", body) if match: - ext, rng = match[0] - body = body[len(ext):] + rng = match[0] + body = body[7:] parsed.update({'rng': int(rng) * 1.609344}) # miles to km - # try find altitude in comment /A=dddddd - match = re.findall(r"^(.*?)/A=(\-\d{5}|\d{6})(.*)$", body) + return body, parsed +def parse_comment_altitude(body): + parsed = {} + match = re.findall(r"^(.*?)/A=(\-\d{5}|\d{6})(.*)$", body) if match: body, altitude, rest = match[0] body += rest - parsed.update({'altitude': int(altitude)*0.3048}) - body, telemetry = parse_comment_telemetry(body) - parsed.update(telemetry) - - # parse DAO extention - body = parse_dao(body, parsed) - - if len(body) > 0 and body[0] == "/": - body = body[1:] - - parsed.update({'comment': body.strip(' ')}) + return body, parsed def parse_dao(body, parsed): - match = re.findall("^(.*)\!([\x21-\x7b][\x20-\x7b]{2})\!(.*?)$", body) + match = re.findall("^(.*)\!([\x21-\x7b])([\x20-\x7b]{2})\!(.*?)$", body) if match: - body, dao, rest = match[0] + body, daobyte, dao, rest = match[0] body += rest - parsed.update({'daodatumbyte': dao[0].upper()}) + parsed.update({ + 'daodatumbyte': daobyte.upper(), + }) lat_offset = lon_offset = 0 - if re.match("^[A-Z]", dao): - lat_offset = int(dao[1]) * 0.001 / 60 - lon_offset = int(dao[2]) * 0.001 / 60 - elif re.match("^[a-z]", dao): - lat_offset = base91.to_decimal(dao[1]) / 91.0 * 0.01 / 60 - lon_offset = base91.to_decimal(dao[2]) / 91.0 * 0.01 / 60 + if daobyte == 'W' and dao.isdigit(): + lat_offset = int(dao[0]) * 0.001 / 60 + lon_offset = int(dao[1]) * 0.001 / 60 + elif daobyte == 'w' and ' ' not in dao: + lat_offset = (base91.to_decimal(dao[0]) / 91.0) * 0.01 / 60 + lon_offset = (base91.to_decimal(dao[1]) / 91.0) * 0.01 / 60 parsed['latitude'] += lat_offset if parsed['latitude'] >= 0 else -lat_offset parsed['longitude'] += lon_offset if parsed['longitude'] >= 0 else -lon_offset diff --git a/tests/test_parse_common.py b/tests/test_parse_common.py index 5ab8481..fca888c 100644 --- a/tests/test_parse_common.py +++ b/tests/test_parse_common.py @@ -1,9 +1,10 @@ import unittest2 as unittest import string from random import randint, randrange, sample +from datetime import datetime -from aprslib.parsing import parse_header -from aprslib.parsing import validate_callsign +from aprslib import base91 +from aprslib.parsing.common import * from aprslib.exceptions import ParseError @@ -47,6 +48,10 @@ def test_invalid_input(self): class ParseHeader(unittest.TestCase): + def test_no_tocall(self): + with self.assertRaises(ParseError): + parse_header("AAA>") + parse_header("AAA>,") def testvalid_input_and_format(self): # empty path header @@ -151,5 +156,276 @@ def test_invalid_format(self): continue -if __name__ == '__main__': - unittest.main() +class TimestampTC(unittest.TestCase): + def test_timestamp_invalid(self): + body = "000000ntext" + remaining, parsed = parse_timestamp(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'timestamp': 0, + 'raw_timestamp': '000000n', + }) + + def test_status_timestamp_invalid(self): + body = "000000htext" + remaining, parsed = parse_timestamp(body, '>') + + self.assertEqual(remaining, body) + self.assertEqual(parsed, { + 'timestamp': 0, + 'raw_timestamp': '000000h', + }) + + def test_timestamp_valid(self): + timestamp = 1453891611 + date = datetime.utcfromtimestamp(timestamp) + + # hhmmss format + body = date.strftime("%H%M%Shtext") + remaining, parsed = parse_timestamp(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'timestamp': timestamp, + 'raw_timestamp': body[:7], + }) + + # ddhhmm format + body = date.strftime("%d%H%Mztext") + remaining, parsed = parse_timestamp(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'timestamp': timestamp - date.second, + 'raw_timestamp': body[:7], + }) + + # ddhhmm format, local time, we parse as zulu + body = date.strftime("%d%H%M/text") + remaining, parsed = parse_timestamp(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'timestamp': timestamp - date.second, + 'raw_timestamp': body[:7], + }) + + def test_invalid_date(self): + body = "999999ztext" + + remaining, parsed = parse_timestamp(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'timestamp': 0, + 'raw_timestamp': '999999z', + }) + + +class CommentTC(unittest.TestCase): + def test_comment(self): + body = "test body" + parsed = {} + parse_comment(body, parsed) + + self.assertEqual(parsed, {'comment': body}) + + body = "/test body" + parsed = {} + parse_comment(body, parsed) + + self.assertEqual(parsed, {'comment': body[1:]}) + + body = " test body " + parsed = {} + parse_comment(body, parsed) + + self.assertEqual(parsed, {'comment': body.strip(' ')}) + + +class DataExtentionsTC(unittest.TestCase): + def test_course_speed(self): + body = "123/100/text" + remaining, parsed = parse_data_extentions(body) + + self.assertEqual(remaining, '/text') + self.assertEqual(parsed, { + 'course': 123, + 'speed': 100*1.852, + }) + + def test_empty_course_speed(self): + body = " / /text" + remaining, parsed = parse_data_extentions(body) + + self.assertEqual(remaining, '/text') + self.assertEqual(parsed, { + 'course': 0, + 'speed': 0, + }) + + body = ".../.../text" + remaining, parsed = parse_data_extentions(body) + + self.assertEqual(remaining, '/text') + self.assertEqual(parsed, { + 'course': 0, + 'speed': 0, + }) + + def test_course_speed_bearing_nrq(self): + body = "123/100/234/345text" + remaining, parsed = parse_data_extentions(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'course': 123, + 'speed': 100*1.852, + 'bearing': 234, + 'nrq': 345, + }) + + def test_PHG(self): + body = "PHG1234Atext" + remaining, parsed = parse_data_extentions(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'phg': '1234A', + }) + + body = "PHG1234text" + remaining, parsed = parse_data_extentions(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'phg': '1234', + }) + + def test_range(self): + body = "RNG1000text" + remaining, parsed = parse_data_extentions(body) + + self.assertEqual(remaining, 'text') + self.assertEqual(parsed, { + 'rng': 1000*1.609344, + }) + + + +class CommentAltitudeTC(unittest.TestCase): + def test_valid_inputs(self): + body = "asdasd/A=10000078zxc" + remaining, parsed = parse_comment_altitude(body) + + self.assertEqual(remaining, "asdasd78zxc") + self.assertEqual(parsed, {'altitude': 30480}) + + body = "asdasd/A=-1000078zxc" + remaining, parsed = parse_comment_altitude(body) + + self.assertEqual(remaining, "asdasd78zxc") + self.assertEqual(parsed, {'altitude': -3048}) + + def test_invalid(self): + tests = [ + "", + "aaaaaaaaaaaaaaaaaaaaa", + "sdfsdfsdf/A=00000aaaa", + "sdfsdfsdf/A=0000aaaaa", + "sdfsdfsdf/A=000aaaaaa", + "sdfsdfsdf/A=00aaaaaaa", + "sdfsdfsdf/A=0aaaaaaaa", + "sdfsdfsdf/A=aaaaaaaaa", + "sdfsdfsdf/Aa000000aaa", + "sdfsdfsdf/A=+00000aaa", + "sdfsdfsdf/A=00a000aaa", + ] + + for body in tests: + remaining, parsed = parse_comment_altitude(body) + self.assertEqual(remaining, body) + self.assertEqual(parsed, {}) + + +class DAO_TC(unittest.TestCase): + def test_wgs84_human_readable(self): + body = "!W36!" + parsed = {'latitude': 0, 'longitude': 0} + remaining = parse_dao(body, parsed) + + self.assertEqual(remaining, '') + self.assertEqual(parsed, { + 'daodatumbyte': 'W', + 'latitude': 0.00005, + 'longitude': 0.0001, + }) + + def test_wgs84_human_readable_nagarive(self): + body = "!W36!" + parsed = {'latitude': -1, 'longitude': -1} + remaining = parse_dao(body, parsed) + + self.assertEqual(remaining, '') + self.assertEqual(parsed, { + 'daodatumbyte': 'W', + 'latitude': -1.00005, + 'longitude': -1.0001, + }) + + def test_wgs84_human_readable_blank(self): + body = "!W !" + parsed = {'latitude': 1, 'longitude': 2} + remaining = parse_dao(body, parsed) + + self.assertEqual(remaining, '') + self.assertEqual(parsed, { + 'daodatumbyte': 'W', + 'latitude': 1, + 'longitude': 2, + }) + + def test_wgs84_base91(self): + body = "!w??!" + parsed = {'latitude': 0, 'longitude': 0} + remaining = parse_dao(body, parsed) + + self.assertEqual(remaining, '') + self.assertEqual(parsed, { + 'daodatumbyte': 'W', + 'latitude': 5.4945054945054945e-05, + 'longitude': 5.4945054945054945e-05, + }) + + def test_wgs84_base91_blank(self): + body = "!w !" + parsed = {'latitude': 1, 'longitude': 2} + remaining = parse_dao(body, parsed) + + self.assertEqual(remaining, '') + self.assertEqual(parsed, { + 'daodatumbyte': 'W', + 'latitude': 1, + 'longitude': 2, + }) + + def test_dao_matching(self): + body = "aaa!W12!bbb!W23!ccc!W45!ddd" + parsed = {'latitude': 0, 'longitude': 0} + remaining = parse_dao(body, parsed) + + self.assertEqual(remaining, 'aaa!W12!bbb!W23!cccddd') + + def test_other_datum_bytes(self): + for datum in [chr(x) for x in range(0x21,0x7c)]: + body = "!" + datum + " !" + parsed = {'latitude': 1, 'longitude': 2} + remaining = parse_dao(body, parsed) + + self.assertEqual(remaining, '') + self.assertEqual(parsed, { + 'daodatumbyte': datum.upper(), + 'latitude': 1, + 'longitude': 2, + })