Skip to content

Commit

Permalink
gps: publish nmea message (#998)
Browse files Browse the repository at this point in the history
* test_gps.py: add test of mnea parser

* gps: refactor line parser, adds publish of nmea msg

* test_gps: update tests

* gps: add ValueError and test for invalid msg
  • Loading branch information
tajgr authored Jun 19, 2024
1 parent 3737a2d commit b766e80
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 29 deletions.
69 changes: 55 additions & 14 deletions osgar/drivers/gps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,64 @@ def checksum(s):
return b"%02X" % (sum)


def str2ms(s):
'convert DDMM.MMMMMM string to arc milliseconds(int)'
if s == b'': # unknown position
def str2deg(s):
'convert DDMM.MMMMMM string to deg'
assert s != ""
assert "." in s, s
dm, frac = ('0000' + s).split('.')
try:
return float(dm[:-2]) + float(dm[-2:] + '.' + frac) / 60
except ValueError as e:
print(e)
return None
dm, frac = (b'0000' + s).split(b'.')


def nmea2coord_ms(nmea_data):
lon = nmea_data["lon"]
lat = nmea_data["lat"]
if lon and lat:
return [int(round(lon*3_600_000)), int(round(lat*3_600_000))]
return [None, None]


def parse_nmea(line):
nmea_list = line.decode().split(",")
assert len(nmea_list) == 15
nmea_data = {}
# NMEA sentence $GPGGA or $GNGGA
# https://docs.novatel.com/OEM7/Content/Logs/GPGGA.htm
try:
return round((int(dm[:-2]) * 60 + float(dm[-2:] + b'.' + frac)) * 60000)
except Exception as e:
nmea_data["identifier"] = nmea_list[0]
nmea_data["lon"] = None if nmea_list[4] == "" else str2deg(nmea_list[4])
nmea_data["lon_dir"] = None if nmea_list[5] == "" else nmea_list[5]
nmea_data["lat"] = None if nmea_list[2] == "" else str2deg(nmea_list[2])
nmea_data["lat_dir"] = None if nmea_list[3] == "" else nmea_list[3]
nmea_data["utc_time"] = None if nmea_list[1] == "" else nmea_list[1] # format: "%H%M%S.%f"
nmea_data["quality"] = None if nmea_list[6] == "" else int(nmea_list[6])
nmea_data["sats"] = None if nmea_list[7] == "" else int(nmea_list[7])
nmea_data["hdop"] = None if nmea_list[8] == "" else float(nmea_list[8])
nmea_data["alt"] = None if nmea_list[9] == "" else float(nmea_list[9])
nmea_data["a_units"] = None if nmea_list[10] == "" else nmea_list[10]
nmea_data["undulation"] = None if nmea_list[11] == "" else float(nmea_list[11])
nmea_data["u_units"] = None if nmea_list[12] == "" else nmea_list[12]
nmea_data["age"] = None if nmea_list[13] == "" else float(nmea_list[13])
stn_id = nmea_list[14].split("*")[0]
nmea_data["stn_id"] = None if stn_id == "" else stn_id
except ValueError as e:
print(e)
return None

return nmea_data


def parse_line(line):
assert line.startswith(b'$GNGGA') or line.startswith(b'$GPGGA'), line
if checksum(line[1:-3]) != line[-2:]:
print('Checksum error!', line, checksum(line[1:-3]))
return [None, None] # TODO Probably it should not return a list..
s = line.split(b',')
coord = [str2ms(s[4]), str2ms(s[2])]
return coord
return None
nmea_data = parse_nmea(line)

return nmea_data


def parse_bin(data):
Expand Down Expand Up @@ -119,7 +157,7 @@ def split_buffer(data):

class GPS(Thread):
def __init__(self, config, bus):
bus.register('position', 'rel_position')
bus.register('position', 'rel_position', 'nmea_data')
Thread.__init__(self)
self.setDaemon(True)

Expand All @@ -128,8 +166,8 @@ def __init__(self, config, bus):

def process_packet(self, line):
if line.startswith(b'$GNGGA') or line.startswith(b'$GPGGA'):
coords = parse_line(line)
return {'position': coords}
nmea_data = parse_line(line)
return {'nmea_data': nmea_data}
elif line.startswith(BIN_PREAMBULE):
return parse_bin(line)
return None
Expand All @@ -140,7 +178,8 @@ def process_gen(self, data):
ret = self.process_packet(packet)
if ret is not None:
for k, v in ret.items():
yield k, v
if v is not None:
yield k, v
self.buf, packet = split_buffer(self.buf) # i.e. process only existing buffer now

def run(self):
Expand All @@ -151,6 +190,8 @@ def run(self):
for name, out in self.process_gen(data):
assert out is not None
self.bus.publish(name, out)
if name == "nmea_data":
self.bus.publish("position", nmea2coord_ms(out))
except BusShutdownException:
pass

Expand Down
61 changes: 46 additions & 15 deletions osgar/drivers/test_gps.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from datetime import datetime

from osgar.drivers import gps

Expand All @@ -8,26 +9,18 @@ class GPSTest(unittest.TestCase):
def test_checksum(self):
self.assertEqual(gps.checksum(b'GNGGA,182433.10,5007.71882,N,01422.50467,E,1,05,6.09,305.1,M,44.3,M,,'), b'41')

def test_str2ms(self):
self.assertEqual(gps.str2ms(b'5007.71882'), 180463129)
def test_str2deg(self):
self.assertEqual(gps.str2deg('5007.71882'), 50.128647)

def test_str2ms_err_input(self):
self.assertEqual(gps.str2ms(b'01422.489915\x0052'), None) # the checksum is passing because a zero byte is added

def test_parse_line(self):
line = b'$GNGGA,182433.10,5007.71882,N,01422.50467,E,1,05,6.09,305.1,M,44.3,M,,*41'
self.assertEqual(gps.parse_line(line), [51750280, 180463129]) # arc milliseconds (x, y) = (lon, lat)
def test_str2deg_err_input(self):
self.assertEqual(gps.str2deg('01422.489915\x0052'), None) # the checksum is passing because a zero byte is added

def test_split(self):
buf = b'blabla$GPGGAsomething*12\r\n'
self.assertEqual(gps.split_buffer(buf), (b'\r\n', b'$GPGGAsomething*12'))

self.assertEqual(gps.split_buffer(b'$toofew*1'), (b'$toofew*1', b''))

def test_parsing_old_GPGGA(self):
line = b'$GPGGA,051852.000,5005.0244,N,01430.3360,E,1,06,3.8,253.1,M,45.4,M,,0000*58'
self.assertEqual(gps.parse_line(line), [52220160, 180301464]) # arc milliseconds (x, y) = (lon, lat)

def test_split_with_binary_data(self):
buf = b'\xb5b\x010\x04\x01\xf8n\x8a\x0e\x15\x04\x00\x00\x05\x02\r\x07&&@\x00S\xff\xff\xff\x02\x04\x10\x07$\xa5' \
b'\x00\x00\x00\x00\x00\x00\x13\x05\x04\x04\x15\x00h\x00\x00\x00\x00\x00\x03\x06\r\x07\x1b\r#\x00\xee\xfa' \
Expand All @@ -48,9 +41,6 @@ def test_split_with_binary_data(self):
buf, nmea = gps.split_buffer(buf)
self.assertEqual(nmea, b'$GNGGA,194535.40,5007.71276,N,01422.49206,E,1,12,0.80,284.6,M,44.3,M,,*42')

def test_invalid_coordinates(self):
self.assertEqual(gps.parse_line(b'$GPGGA,053446.426,,,,,0,00,,,M,0.0,M,,0000*56'), gps.INVALID_COORDINATES)

def test_parse_rel_position(self):
data = b'\xb5b\x01<(\x00\x00\x00\x00\x00\x00&\x08\x18\xd9\n\x00\x00N\xeb\xff\xff]\xff\xff\xffV\xd0\xd0\x00' \
b'\xf6\x00\x00\x00)\x01\x00\x00\xea\x01\x00\x00\x0f\x00\x00\x00/\xc6'
Expand All @@ -74,4 +64,45 @@ def test_parse_position_velocity_time_solution(self):
ret = gps.parse_bin(data)
self.assertIsNone(ret)


def test_parse_nmea(self):
line = b'$GNGGA,190615.40,5007.70786799,N,01422.49430110,E,2,09,1.9,290.1985,M,45.0552,M,01,0533*73'
# add old: b'$GPGGA,051852.000,5005.0244,N,01430.3360,E,1,06,3.8,253.1,M,45.4,M,,0000*58'
nmea_data = gps.parse_line(line)
expected_res = {
"identifier": "$GNGGA",
"lon": 14.374905018333333,
"lon_dir": "E",
"lat": 50.1284644665,
"lat_dir": "N",
"utc_time": "190615.40",
"quality": 2,
"sats": 9,
"hdop": 1.9,
"alt": 290.1985,
"a_units": "M",
"undulation": 45.0552,
"u_units": "M",
"age": 1,
"stn_id": "0533"
}
self.assertEqual(nmea_data, expected_res)

def test_parse_nmea_invalid(self):
line = b'$GNGGA,190615.40,,,,,2,09,1.9,290.1985,M,45.0552,M,,*73'
nmea_data = gps.parse_nmea(line)
self.assertIsNone(nmea_data["lon"])
self.assertIsNone(nmea_data["lon_dir"])
self.assertIsNone(nmea_data["stn_id"])

def test_parse_old_GPGGA(self):
line = b'$GPGGA,051852.000,5005.0244,N,01430.3360,E,1,06,3.8,253.1,M,45.4,M,,0000*58'
nmea_data = gps.parse_nmea(line)
self.assertEqual(nmea_data["lon"], 14.5056)
self.assertEqual(nmea_data["lat"], 50.08374)

def test_parse_err_line(self):
line = b'$GPGGA,051852.000,5005.0244,N,01430.3360,E,1,06,3.8,253\x00.1,M,45.4,M,,0000*58'
self.assertIsNone(gps.parse_nmea(line))

# vim: expandtab sw=4 ts=4

0 comments on commit b766e80

Please sign in to comment.