diff --git a/.gitignore b/.gitignore index 382f245..c1c3b08 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ src/pyunigps.egg* pylint_report.txt venv output.* +.VSCodeCounter diff --git a/README.md b/README.md index 15acf04..e79042d 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,8 @@ This is an independent project and we have no affiliation whatsoever with Unicor ![Contributors](https://img.shields.io/github/contributors/semuconsulting/pyunigps.svg) ![Open Issues](https://img.shields.io/github/issues-raw/semuconsulting/pyunigps) -This Alpha implements a comprehensive set of messages for Unicore "NebulasIV" High Precision GPS/GNSS devices, including the UM96n and UM98n series, but is readily [extensible](#extensibility). Refer to [UNI_MSGIDS in unitypes_core.py](https://github.com/semuconsulting/pyunigps/blob/main/src/pyunigps/unitypes_core.py#L86) for the complete list of message definitions currently defined. UNI protocol information sourced from public domain Unicore "NebulasIV" GNSS Protocol Specification © 2023, Unicore. +This Beta implements a comprehensive set of messages for Unicore "NebulasIV" High Precision GPS/GNSS devices, including the UM96n and UM98n series, but is readily [extensible](#extensibility). Refer to [UNI_MSGIDS in unitypes_core.py](https://github.com/semuconsulting/pyunigps/blob/main/src/pyunigps/unitypes_core.py#L86) for the complete list of message definitions currently defined. UNI protocol information sourced from public domain Unicore Reference Commands R1.13 © Dec 2025 Unicore +https://en.unicore.com/uploads/file/Unicore%20Reference%20Commands%20Manual%20For%20N4%20High%20Precision%20Products_V2_EN_R1.13.pdf **FYI:** diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d51fe53..b2e917e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,13 @@ # pyunigps Release Notes +### RELEASE 0.2.0 + +1. Update for Protocol Specification R1.13 Dec 2025. +1. Fix GALUTC `da0g`, `da1g` attribute lengths. +1. Fix GALEPH `fnavreceived`, `inavreceived` attribute lengths. +1. Add additional VERSIONB device type decodes. +1. Remove redundant routines. + ### RELEASE 0.1.5 1. Fix OBSVM, OBSVMCMP `psr` & `adr` attribute scaling. diff --git a/examples/benchmark.py b/examples/benchmark.py index e99e410..011931a 100644 --- a/examples/benchmark.py +++ b/examples/benchmark.py @@ -12,15 +12,26 @@ # pylint: disable=line-too-long +import os +import sys from io import BytesIO from platform import python_version from platform import version as osver from sys import argv from time import process_time_ns +# get path to site-packages (source) folder within venv +pypath = ( + f"{os.path.expanduser("~")}/pygpsclient/lib/python" + f"{sys.version_info.major}.{sys.version_info.minor}/site-packages" +) +sys.path.insert(0, os.path.abspath(pypath)) + from pyunigps._version import __version__ as univer from pyunigps.unireader import UNIReader +CYAN = "\033[1m\033[36m" +NORM = "\033[0m" UNIMESSAGES = [ b"\xaaD\xb5Y%\x004\x01\x00\xa0e\t\xe8[\x80\x04\x00\x00\x00\x00\x00\x12\x02\x00\x1f\x00\x00\x0014208\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00HRPT00-S10C-P\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002310415000015-O322A3233801305\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00ff3bbd949ceb95fc\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002024/05/23\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xb6\xc0\x95\x94", b"\xaaD\xb5Y\xfb\x03\x01\x00\x00\xa0e\t\xe8[\x80\x04\x00\x00\x00\x00\x00\x12\x03\x00\x00\x94#\x96'", @@ -113,11 +124,12 @@ def benchmark(**kwargs) -> float: txnt = txnc * cyc print( - f"\nOperating system: {osver()}", - f"\nPython version: {python_version()}", - f"\npyunigps version: {univer}", - f"\nTest cycles: {cyc:,}", - f"\nTxn per cycle: {txnc:,}", + f"\nAbsolute path: {CYAN}{pypath}{NORM}", + f"\nOperating system: {CYAN}{osver()}{NORM}", + f"\nPython version: {CYAN}{python_version()}{NORM}", + f"\npyunigps version: {CYAN}{univer}{NORM}", + f"\nTest cycles: {CYAN}{cyc:,}{NORM}", + f"\nTxn per cycle: {CYAN}{txnc:,}{NORM}", ) start = process_time_ns() @@ -136,7 +148,8 @@ def benchmark(**kwargs) -> float: kbs = round(msglen * 1e9 / duration / 2**10, 2) print( - f"\n{txnt:,} messages processed in {duration/1e9:,.3f} seconds = {txs:,.2f} txns/second, {kbs:,.2f} kB/second.\n" + f"\n{txnt:,} messages processed in {duration/1e9:,.3f} seconds = " + f"{CYAN}{txs:,.2f}{NORM} txns/second, {CYAN}{kbs:,.2f}{NORM} kB/second ({CYAN}{kbs*8000:,.0f}{NORM} baud).\n" ) return txs, kbs diff --git a/pyproject.toml b/pyproject.toml index 3dfafbb..80aed16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ readme = "README.md" requires-python = ">=3.10" classifiers = [ "Operating System :: OS Independent", - "Development Status :: 3 - Alpha", + "Development Status :: 4 - Beta", "Environment :: MacOS X", "Environment :: Win32 (MS Windows)", "Environment :: X11 Applications", @@ -22,7 +22,6 @@ classifiers = [ "Intended Audience :: Developers", "Intended Audience :: Science/Research", "Intended Audience :: End Users/Desktop", - "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", @@ -101,7 +100,7 @@ disable = """ [tool.pytest.ini_options] minversion = "7.0" -addopts = "--cov --cov-report html --cov-fail-under 85" +addopts = "--cov --cov-report html --cov-fail-under 98" pythonpath = ["src"] testpaths = ["tests"] @@ -112,7 +111,7 @@ source = ["src"] source = ["src"] [tool.coverage.report] -fail_under = 85 +fail_under = 98 [tool.coverage.html] directory = "htmlcov" diff --git a/src/pyunigps/_version.py b/src/pyunigps/_version.py index 0c1eeef..a2eca56 100644 --- a/src/pyunigps/_version.py +++ b/src/pyunigps/_version.py @@ -8,4 +8,4 @@ :license: BSD 3-Clause """ -__version__ = "0.1.5" +__version__ = "0.2.0" diff --git a/src/pyunigps/unihelpers.py b/src/pyunigps/unihelpers.py index 2551792..1b1c65b 100644 --- a/src/pyunigps/unihelpers.py +++ b/src/pyunigps/unihelpers.py @@ -17,8 +17,9 @@ from pynmeagps import leapsecond import pyunigps.exceptions as une -from pyunigps.unitypes_core import ATTTYPE, U4, UNI_MSGIDS +from pyunigps.unitypes_core import ATTTYPE, SCALROUND, U4, UNI_MSGIDS +HDRSTRUCT = " int | tuple[int]: - """ - Get integer indices corresponding to grouped attribute. - - e.g. svid_06 -> 6; gnssId_103 -> 103, gsid_03_04 -> (3,4), tow -> 0 - - :param str att: grouped attribute name e.g. svid_01 - :return: indices as integer(s), or 0 if not grouped - :rtype: int | tuple[int] - """ - - try: - att = att.split("_") - ln = len(att) - if ln == 2: # one group level - return int(att[1]) - if ln > 2: # nested group level(s) - return tuple(int(att[i]) for i in range(1, ln)) - return 0 # not grouped - except ValueError: - return 0 - - -def att2name(att: str) -> str: - """ - Get name of grouped attribute. - - e.g. svid_06 -> svid; gnssId_103 -> gnssId, tow -> tow - - :param str att: grouped attribute name e.g. svid_01 - :return: name without index e.g. svid - :rtype: str - """ - - return att.split("_")[0] - - def attsiz(att: str) -> int: """ Helper function to return attribute size in bytes. @@ -348,28 +312,38 @@ def atttyp(att: str) -> str: return att[0:1] -def bytes2val(valb: bytes, att: str) -> Any: +def bytes2val(valb: bytes, adef: str) -> Any: """ Convert bytes to value for given UNI attribute type. :param bytes valb: attribute value in byte format e.g. b'\\\\x19\\\\x00\\\\x00\\\\x00' - :param str att: attribute type e.g. 'U004' + :param str adef: attribute definition e.g. 'U004' :return: attribute value as int, float, str or bytes :rtype: Any :raises: UNITypeError """ - if atttyp(att) == "X": # bytes + if "*" in adef: + adef, scaling = adef.split("*", 1) + scaling = float(scaling) + else: + scaling = 1 + + if atttyp(adef) == "X": # bytes val = valb - elif atttyp(att) == "C": # string + elif atttyp(adef) == "C": # string val = valb.replace(b"\x00", b" ").decode("utf-8", errors="backslashreplace") - elif atttyp(att) in ("S", "U"): # integer - val = int.from_bytes(valb, byteorder="little", signed=atttyp(att) == "S") - elif atttyp(att) == "R": # floating point - val = struct.unpack(" str: return "b'{}'".format("".join(f"\\x{b:02x}" for b in val)) -def get_bits(bitfield: bytes, bitmask: int) -> int: - """ - Get integer value of specified (masked) bit(s) in a UNI bitfield (attribute type 'X') - - e.g. to get value of bits 6,7 in bitfield b'\\\\x89' (binary 0b10001001):: - - get_bits(b'\\x89', 0b11000000) = get_bits(b'\\x89', 192) = 2 - - :param bytes bitfield: bitfield byte(s) - :param int bitmask: bitmask as integer (= Σ(2**n), where n is the number of the bit) - :return: value of masked bit(s) - :rtype: int - """ - - i = 0 - val = int(bitfield.hex(), 16) - while bitmask & 1 == 0: - bitmask = bitmask >> 1 - i += 1 - return val >> i & bitmask - - -def get_parts(message: bytes | str) -> tuple[list, list, bytes]: - """ - Get header, payload and CRC of ASCII UNI message. - - :param bytes | str message: entire message as bytes or string - :return: tuple of (hdr, payload, crc) - :rtype: tuple[list, list, bytes] - :raises: UNIMessageError (if message is badly formed) - """ - - try: - if isinstance(message, bytes): - message = message.decode("ascii", errors="backslashreplace") - - content, crcstr = message.strip("#\r\n").split("*", 1) - # ASCII CRC is big-endian so reverse byte order - crcb = bytes.fromhex(crcstr)[::-1] - hdrt, payt = content.split(";", 1) - header = hdrt.split(",") - payload = payt.split(",") - return header, payload, crcb - except ValueError as err: - raise une.UNIMessageError(f"Badly formed ASCII message {message}") from err - - def msgname2id(msgname: str) -> int | NoneType: """ Get numeric message id for given message name @@ -497,7 +424,7 @@ def header2bytes( if wno is None or tow is None: wno, tow, leapsec = utc2wnotow() return struct.pack( - " tuple: tow, version, reserved, leapsecond, delay) """ - return struct.unpack(" bool: @@ -541,74 +468,27 @@ def isvalid_checksum(message: bytes) -> bool: return ckm == calc_crc(message[: lenm - 4]) -def key_from_val(dictionary: dict, value) -> str: - """ - Helper method - get dictionary key corresponding to (unique) value. - - :param dict dictionary: dictionary - :param object value: unique dictionary value - :return: dictionary key - :rtype: str - :raises: KeyError: if no key found for value - - """ - - val = None - for key, val in dictionary.items(): - if val == value: - return key - raise KeyError(f"No key found for value {value}") - - -def nomval(att: str) -> Any: +def nomval(adef: str) -> Any: """ Get nominal value for given UNI attribute type. - :param str att: attribute type e.g. 'U004' + :param str adef: attribute definition e.g. 'U004' :return: attribute value as int, float, str or bytes :rtype: Any :raises: UNITypeError """ - if atttyp(att) == "X": - val = b"\x00" * attsiz(att) - elif atttyp(att) == "C": - val = f"{' ':<{attsiz(att)}}" - elif atttyp(att) == "R": + if atttyp(adef) == "X": + val = b"\x00" * attsiz(adef) + elif atttyp(adef) == "C": + val = f"{' ':<{attsiz(adef)}}" + elif atttyp(adef) == "R": val = 0.0 - elif atttyp(att) in ("S", "U"): + elif atttyp(adef) in ("S", "U"): val = 0 else: - raise une.UNITypeError(f"Unknown attribute type {att}") - return val - - -def str2val(valstr: str, adef: str) -> object: - """ - Convert ASCII string to typed value - - :param str valstr: attribute value as string - :param str adef: attribute type e.g. 'R001' - :return: attribute value - :rtype: object - :raises: UNITypeError - """ - - att = atttyp(adef) - val = valstr - if att == "C": # char - pass - elif att == "X": # hex - val = int.from_bytes(bytes.fromhex(val), "little") - elif att == "R": # float - if valstr != "": - val = float(valstr) - elif att in ("S", "U"): # signed or unsigned integer - if valstr != "": - val = int(valstr) - else: - raise une.UNITypeError(f"Unknown attribute type {att}.") + raise une.UNITypeError(f"Unknown attribute type {adef}") return val @@ -633,33 +513,46 @@ def utc2wnotow(utc: datetime | NoneType = None) -> tuple[int, int, int]: return wno, tow, ls -def val2bytes(val: Any, att: str) -> bytes: +def val2bytes(val: Any, adef: str) -> bytes: """ Convert value to bytes for given UNI attribute type. :param Any val: attribute value e.g. 25 - :param str att: attribute type e.g. 'U004' + :param str adef: attribute definition e.g. 'U004' :return: attribute value as bytes :rtype: bytes :raises: UNITypeError """ + if "*" in adef: + adef, scaling = adef.split("*", 1) + scaling = float(scaling) + else: + scaling = 1 + try: - if not isinstance(val, ATTTYPE[atttyp(att)]): + if not isinstance(val, ATTTYPE[atttyp(adef)]): raise TypeError( - f"Attribute type {att} value {val} must be {ATTTYPE[atttyp(att)]}, not {type(val)}" + f"Attribute type {adef} value {val} must be " + f"{ATTTYPE[atttyp(adef)]}, not {type(val)}" ) except KeyError as err: - raise une.UNITypeError(f"Unknown attribute type {att}") from err + raise une.UNITypeError(f"Unknown attribute type {adef}") from err valb = val - if atttyp(att) == "X": # byte + if atttyp(adef) == "X": # byte valb = val - elif atttyp(att) == "C": # string - valb = f"{val:<{attsiz(att)}}".encode("utf-8", errors="backslashreplace") - elif atttyp(att) in ("S", "U"): # integer - valb = val.to_bytes(attsiz(att), byteorder="little", signed=atttyp(att) == "S") - elif atttyp(att) == "R": # floating point - valb = struct.pack(" 0: anami += f"_{i:02d}" - # determine attribute size (bytes) - some attributes have - # variable length, depending on - # - multiple of value of preceding attribute - # - payload length - offset asiz = attsiz(adef) - # if attribute is scaled - if "*" in adef: - adef, scaling = adef.split("*", 1) - scaling = float(scaling) - else: - scaling = 1 - # if payload keyword has been provided, # use the appropriate offset of the payload if "payload" in kwargs: valb = self._payload[self._offset : self._offset + asiz] - if scaling == 1: - val = bytes2val(valb, adef) - else: - val = round(bytes2val(valb, adef) / scaling, SCALROUND) + val = bytes2val(valb, adef) else: # if individual keyword has been provided, # set to provided value, else set to # nominal value val = kwargs.get(anami, nomval(adef)) - if scaling == 1: - valb = val2bytes(val, adef) - else: - valb = val2bytes(int(val * scaling), adef) + valb = val2bytes(val, adef) self._payload += valb setattr(self, anami, val) self._offset += asiz - def _set_attribute_bitfield(self, atyp: str, **kwargs): + def _set_attribute_bitfield(self, adef: tuple[str, dict], **kwargs): """ Parse bitfield attribute (type 'X'). - :param str atyp: attribute type e.g. 'X002' - :param int offset: payload offset in bytes - :param list index: repeating group index array + :param tuple[str, dict] adef: attribute definition and dictionary :param kwargs: optional payload key/value pairs :return: (offset, index[]) :rtype: tuple @@ -265,7 +245,7 @@ def _set_attribute_bitfield(self, atyp: str, **kwargs): """ # pylint: disable=no-member - btyp, bdict = atyp # type of bitfield, bitfield dictionary + btyp, bdict = adef # type of bitfield, bitfield dictionary bsiz = attsiz(btyp) # size of bitfield in bytes bfoffset = 0 @@ -294,61 +274,61 @@ def _set_attribute_bits( self, bitfield: int, bfoffset: int, - key: str, - keyt: str, + anam: str, + adef: str, index: list, **kwargs, - ) -> tuple: + ) -> tuple[int, int]: """ Set individual bit flag from bitfield. :param int bitfield: bitfield :param int bfoffset: bitfield offset in bits - :param str key: attribute key name - :param str keyt: key type e.g. 'U001' + :param str anam: attribute name + :param str adef: attribute definition e.g. 'U001' :param list index: repeating group index array :param kwargs: optional payload key/value pairs :return: (bitfield, bfoffset) - :rtype: tuple + :rtype: tuple[int,int] """ # pylint: disable=no-member # if attribute is part of a (nested) repeating group, suffix name with index - keyr = key + anami = anam for i in index: # one index for each nested level if i > 0: - keyr += f"_{i:02d}" + anami += f"_{i:02d}" # if attribute is scaled - if "*" in keyt: - keyt, scaling = keyt.split("*", 1) + if "*" in adef: + adef, scaling = adef.split("*", 1) scaling = float(scaling) else: scaling = 1 - atts = attsiz(keyt) # determine flag size in bits + asiz = attsiz(adef) # determine flag size in bits if "payload" in kwargs: - val = (bitfield >> bfoffset) & ((1 << atts) - 1) - if self.identity in ("OBSVMCMP", "OBSVHCMP") and key == "psrstd": + val = (bitfield >> bfoffset) & ((1 << asiz) - 1) + if self.identity in ("OBSVMCMP", "OBSVHCMP") and anam == "psrstd": val = PSRSTD[val] - elif self.identity in ("OBSVMCMP", "OBSVHCMP") and key == "adrstd": + elif self.identity in ("OBSVMCMP", "OBSVHCMP") and anam == "adrstd": val = round((val + 1) / 512, SCALROUND) - elif self.identity in ("OBSVMCMP", "OBSVHCMP") and key == "cno": + elif self.identity in ("OBSVMCMP", "OBSVHCMP") and anam == "cno": val += 20 elif scaling != 1: val = round(val / scaling, SCALROUND) else: if scaling == 1: - val = kwargs.get(keyr, 0) + val = kwargs.get(anami, 0) else: - val = int(kwargs.get(keyr, 0) * scaling) + val = int(kwargs.get(anami, 0) * scaling) bitfield = bitfield | (val << bfoffset) - if key[0:8] != "reserved": # don't bother to set reserved bits - setattr(self, keyr, val) - return (bitfield, bfoffset + atts) + if anam[0:8] != "reserved": # don't bother to set reserved bits + setattr(self, anami, val) + return (bitfield, bfoffset + asiz) def _do_len_checksum(self): """ diff --git a/src/pyunigps/unitypes_core.py b/src/pyunigps/unitypes_core.py index 4c0108c..418631d 100644 --- a/src/pyunigps/unitypes_core.py +++ b/src/pyunigps/unitypes_core.py @@ -3,8 +3,8 @@ Created on 26 Jan 2026 -Information sourced from public domain Unicore UM980 Interface Specifications © 2023, Unicore -https://www.ardusimple.com/wp-content/uploads/2023/04/Unicore-Reference-Commands-Manual-For-N4-High-Precision-Products_V2_EN_R1.4-1.pdf +Information sourced from public domain Unicore Reference Commands R1.13 © Dec 2025 Unicore +https://en.unicore.com/uploads/file/Unicore%20Reference%20Commands%20Manual%20For%20N4%20High%20Precision%20Products_V2_EN_R1.13.pdf :author: semuadmin (Steve Smith) """ diff --git a/src/pyunigps/unitypes_decodes.py b/src/pyunigps/unitypes_decodes.py index 5932c41..54b1c53 100644 --- a/src/pyunigps/unitypes_decodes.py +++ b/src/pyunigps/unitypes_decodes.py @@ -5,8 +5,8 @@ Created on 26 Jan 2026 -Information sourced from public domain Unicore UM980 Interface Specifications © 2023, Unicore -https://www.ardusimple.com/wp-content/uploads/2023/04/Unicore-Reference-Commands-Manual-For-N4-High-Precision-Products_V2_EN_R1.4-1.pdf +Information sourced from public domain Unicore Reference Commands R1.13 © Dec 2025 Unicore +https://en.unicore.com/uploads/file/Unicore%20Reference%20Commands%20Manual%20For%20N4%20High%20Precision%20Products_V2_EN_R1.13.pdf :author: semuadmin (Steve Smith) """ @@ -34,7 +34,19 @@ 23: "CLAP-C", 24: "UM960L", 26: "UM981", + 31: "UM981S", + 24: "UM960L", + 26: "UM981", + 40: "UMD982", + 41: "UMD981", + 42: "UMD981S", + 43: "UM981C", 52: "UB9A0", + 53: "UBD9A0", + 62: "UMD960", + 63: "UMD980", + 64: "UM980C", + 65: "UM982C", } """Hardware Device Code""" @@ -141,3 +153,17 @@ 4: "Psrdiff", } """EXTSOLSTAT Pseudorange Ionospheric Correction""" + +BD3EPH_FREQTYPE = { + 0: "B1C", + 1: "B2a", + 2: "B2b", +} +"""BD3EPH Frequency Type""" + +GLOEPH_SATTYPE = { + 0: "GLO_SAT", + 1: "GLO_SAT_M", + 2: "GLO_SAT_K", +} +"""GLOEPH Satellite Type""" diff --git a/src/pyunigps/unitypes_get.py b/src/pyunigps/unitypes_get.py index 3db0d0a..edd90d8 100644 --- a/src/pyunigps/unitypes_get.py +++ b/src/pyunigps/unitypes_get.py @@ -3,8 +3,8 @@ Created on 26 Jan 2026 -Information sourced from public domain Unicore UM980 Interface Specifications © 2023, Unicore -https://www.ardusimple.com/wp-content/uploads/2023/04/Unicore-Reference-Commands-Manual-For-N4-High-Precision-Products_V2_EN_R1.4-1.pdf +Information sourced from public domain Unicore Reference Commands R1.13 © Dec 2025 Unicore +https://en.unicore.com/uploads/file/Unicore%20Reference%20Commands%20Manual%20For%20N4%20High%20Precision%20Products_V2_EN_R1.13.pdf :author: semuadmin (Steve Smith) """ @@ -20,6 +20,7 @@ S1, S2, S4, + S8, U1, U2, U3, @@ -218,8 +219,8 @@ "utcwn": U4, "tot": U4, "clockbias": R8, - "clockrate": R8, "clockdrift": R8, + "clockdriftrate": R8, "wnlsf": U4, "daynum": U4, "deltatls": S4, @@ -248,8 +249,8 @@ "ulwnlsf": U4, "daynum": U4, "deltatlsf": S4, - "da0g": S4, - "da1g": U4, + "da0g": S8, + "da1g": U8, "utt0g": U4, "ulwn0g": U4, }, @@ -333,7 +334,7 @@ "sisaioc2": U1, "reserved1": S4, "reserved2": S4, - "freqtype": U4, + "freqtype": U4, # see BD3EPH_FREQTYPE decode }, "BDSEPH": { "prn": U4, @@ -373,7 +374,7 @@ "GLOEPH": { "sloto": U2, "freqo": U2, - "sattype": U1, + "sattype": U1, # see GLOEPH_SATTYPE decode "reserved1": U1, "eweek": U2, "etime": U4, @@ -403,8 +404,8 @@ }, "GALEPH": { "satid": U4, - "fnavreceived": U1, - "inavreceived": U1, + "fnavreceived": U4, + "inavreceived": U4, "e1bhealth": U1, "e5ahealth": U1, "e5bhealth": U1, @@ -484,7 +485,7 @@ "hour": U1, "minute": U1, "second": U1, - "postype": U1, + "postype": U1, # see POSTYPE decode "headingstat": U1, "numgpssta": U1, "numbdssta": U1, @@ -579,8 +580,8 @@ }, # "UNILOGLIST": {}, # ascii only "BESTNAV": { - "psolstatus": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "lat": R8, "lon": R8, "hmsl": R8, @@ -611,16 +612,16 @@ "horspdstd": R4, }, "BESTNAVXYZ": { - "psolstatus": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "px": R8, "py": R8, "pz": R8, "pxstd": R4, "pystd": R4, "pzstd": R4, - "vsolstatus": U4, - "veltype": U4, + "vsolstatus": U4, # see SOLSTATUS decode + "veltype": U4, # see POSTYPE decode "velx": R8, "vely": R8, "velz": R8, @@ -655,8 +656,8 @@ ), }, "ADRNAV": { - "solstatus": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "lat": R8, "lon": R8, "hmsl": R8, @@ -676,8 +677,8 @@ **EXTSOLSTAT, **GALBDS3SIGMASK, **GPSGLOBDS2SIGMASK, - "solstatus2": U4, - "veltype": U4, + "solstatus2": U4, # see SOLSTATUS decode + "veltype": U4, # see POSTYPE decode "latency": R4, "agediff": R4, "horspd": R8, @@ -688,8 +689,8 @@ }, # "ADRNAVH": {}, # same as ADRNAV, see below "PPPNAV": { - "solstatus": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "lat": R8, "lon": R8, "hmsl": R8, @@ -711,8 +712,8 @@ **GPSGLOBDS2SIGMASK, }, "SPPNAV": { - "solstatus": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "lat": R8, "lon": R8, "hmsl": R8, @@ -732,8 +733,8 @@ **EXTSOLSTAT, **GALBDS3SIGMASK, **GPSGLOBDS2SIGMASK, - "solstatus2": U4, - "veltype": U4, + "solstatus2": U4, # see SOLSTATUS decode + "veltype": U4, # see POSTYPE decode "latency": R4, "agediff": R4, "horspd": R8, @@ -763,65 +764,11 @@ ), }, # "STADOPH": {}, # same as STADOP, see below - "ADRDOP": { - "reserved1": U4, - "gdop": R4, - "pdop": R4, - "tdop": R4, - "vdop": R4, - "hdop": R4, - "ndop": R4, - "edop": R4, - "cutoff": R4, - "reserved2": R4, - "numprn": U2, - "groupprn": ( - "numprn", - { - "prn": U2, - }, - ), - }, - # "ADRDOPH": {}, # same as ADRDOP, see below - "PPPDOP": { - "reserved1": U4, - "gdop": R4, - "pdop": R4, - "tdop": R4, - "vdop": R4, - "hdop": R4, - "ndop": R4, - "edop": R4, - "cutoff": R4, - "reserved2": R4, - "numprn": U2, - "groupprn": ( - "numprn", - { - "prn": U2, - }, - ), - }, - "SPPDOP": { - "reserved1": U4, - "gdop": R4, - "pdop": R4, - "tdop": R4, - "vdop": R4, - "hdop": R4, - "ndop": R4, - "edop": R4, - "cutoff": R4, - "reserved2": R4, - "numprn": U2, - "groupprn": ( - "numprn", - { - "prn": U2, - }, - ), - }, - # "SPPDOPH": {}, # same as SPPDOP, see below + # "ADRDOP": {}, # same as STADOP, see below + # "ADRDOPH": {}, # same as STADOP, see below + # "PPPDOP": {}, # same as STADOP, see below + # "SPPDOP": {}, # same as STADOP, see below + # "SPPDOPH": {}, # same as STADOP, see below "SATSINFO": { "numsat": U1, "version": U1, @@ -854,8 +801,8 @@ ), }, "BASEPOS": { - "psolstatus": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "lat": R8, "lon": R8, "hmsl": R8, @@ -875,8 +822,8 @@ **EXTSOLSTAT, **GALBDS3SIGMASK, **GPSGLOBDS2SIGMASK, - "vsolstatus": U4, - "veltype": U4, + "vsolstatus": U4, # see SOLSTATUS decode + "veltype": U4, # see POSTYPE decode "latency": R4, "agediff": R4, "horspd": R8, @@ -886,7 +833,7 @@ "horspdstd": R4, }, "SATELLITE": { - "gnss": U4, + "gnss": U4, # see GNSS decode "satvis": U4, "compalm": U4, "numsat": U4, @@ -908,7 +855,7 @@ "groupsat": ( "numsat", { - "gnss": U4, + "gnss": U4, # see GNSS decode "prn": U4, "satcoordx": R4, "satcoordy": R4, @@ -934,8 +881,8 @@ "utcstatus": U4, }, "UNIHEADING": { - "solstat": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "length": R4, "heading": R4, "pitch": R4, @@ -953,8 +900,8 @@ **GPSGLOBDS2SIGMASK, }, "UNIHEADING2": { - "solstat": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "length": R4, "heading": R4, "pitch": R4, @@ -993,8 +940,8 @@ "galsource2": U4, "qzsssource": U4, "reserved4": U4, - "postype": U4, - "calcstatus": U4, + "postype": U4, # see POSTYPE decode + "calcstatus": U4, # see CALCSTATUS decode "iondetected": U1, "dualrtkflag": U1, "adrnumber": U1, @@ -1031,14 +978,14 @@ "reserved5": U4, }, "JAMSTATUS": { - "postype": U4, + "postype": U4, # see POSTYPE decode "cwratio": U1, "cwflag": U1, "reserved1": U1, "reserved2": U1, }, "FREQJAMSTATUS": { - "postype": U4, + "postype": U4, # see POSTYPE decode "l1cwratio": U1, "l1cwflag": U1, "l2cwratio": U1, @@ -1105,8 +1052,8 @@ }, # TODO double check this is groupdata within groupmsg or not? # "INFOPART2": {}, # same as INFOPART, see below "MSPOS": { - "mainpsoltatus": U4, - "mainpostype": U4, + "mainpsoltatus": U4, # see SOLSTATUS decode + "mainpostype": U4, # see POSTYPE decode "mainlat": R8, "mainlon": R8, "mainhmsl": R8, @@ -1116,8 +1063,8 @@ "mainobs": U1, "mainsatuse": U1, "reserved1": S2, - "secondarypsolstatus": U4, - "secondarypostype": U4, + "secondarypsolstatus": U4, # see SOLSTATUS decode + "secondarypostype": U4, # see POSTYPE decode "secondarylat": R8, "secondarylon": R8, "secondaryhmsl": R8, @@ -1329,8 +1276,8 @@ ), }, "BSLNENUHD2": { - "solstatus": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "east": R8, "north": R8, "up": R8, @@ -1349,8 +1296,8 @@ **GPSGLOBDS2SIGMASK, }, "BSLNXYZHD2": { - "solstatus": U4, - "postype": U4, + "solstatus": U4, # see SOLSTATUS decode + "postype": U4, # see POSTYPE decode "dx": R8, "dy": R8, "dz": R8, @@ -1403,6 +1350,9 @@ UNI_PAYLOADS_GET["ADRNAVH"] = UNI_PAYLOADS_GET["ADRNAV"] UNI_PAYLOADS_GET["SPPNAVH"] = UNI_PAYLOADS_GET["SPPNAV"] UNI_PAYLOADS_GET["STADOPH"] = UNI_PAYLOADS_GET["STADOP"] -UNI_PAYLOADS_GET["ADRDOPH"] = UNI_PAYLOADS_GET["ADRDOP"] -UNI_PAYLOADS_GET["SPPDOPH"] = UNI_PAYLOADS_GET["SPPDOP"] +UNI_PAYLOADS_GET["ADRDOP"] = UNI_PAYLOADS_GET["STADOP"] +UNI_PAYLOADS_GET["ADRDOPH"] = UNI_PAYLOADS_GET["STADOP"] +UNI_PAYLOADS_GET["PPPDOP"] = UNI_PAYLOADS_GET["STADOP"] +UNI_PAYLOADS_GET["SPPDOP"] = UNI_PAYLOADS_GET["STADOP"] +UNI_PAYLOADS_GET["SPPDOPH"] = UNI_PAYLOADS_GET["STADOP"] UNI_PAYLOADS_GET["INFOPART2"] = UNI_PAYLOADS_GET["INFOPART1"] diff --git a/src/pyunigps/unitypes_poll.py b/src/pyunigps/unitypes_poll.py index b082bce..98c19d2 100644 --- a/src/pyunigps/unitypes_poll.py +++ b/src/pyunigps/unitypes_poll.py @@ -3,9 +3,6 @@ Created on 26 Jan 2026 -Information sourced from public domain Unicore UM980 Interface Specifications © 2023, Unicore -https://www.ardusimple.com/wp-content/uploads/2023/04/Unicore-Reference-Commands-Manual-For-N4-High-Precision-Products_V2_EN_R1.4-1.pdf - :author: semuadmin (Steve Smith) """ diff --git a/src/pyunigps/unitypes_set.py b/src/pyunigps/unitypes_set.py index 662259c..98f510f 100644 --- a/src/pyunigps/unitypes_set.py +++ b/src/pyunigps/unitypes_set.py @@ -3,9 +3,6 @@ Created on 26 Jan 2026 -Information sourced from public domain Unicore UM980 Interface Specifications © 2023, Unicore -https://www.ardusimple.com/wp-content/uploads/2023/04/Unicore-Reference-Commands-Manual-For-N4-High-Precision-Products_V2_EN_R1.4-1.pdf - :author: semuadmin (Steve Smith) """ diff --git a/tests/test_parse.py b/tests/test_parse.py index 9fa4b7f..84b17a7 100644 --- a/tests/test_parse.py +++ b/tests/test_parse.py @@ -28,6 +28,7 @@ UNIMessage, UNIReader, escapeall, + GNSS, ) from pyunigps.unihelpers import isvalid_checksum @@ -49,8 +50,8 @@ def testparseum981(self): "", "", "", - "", - "", + "", + "", "", "", "", @@ -67,9 +68,9 @@ def testparseum981(self): "", "", "", - "", + "", "", - "", + "", "", "", "", @@ -78,7 +79,7 @@ def testparseum981(self): "", "", "", - "", + "", "", "", "", @@ -88,6 +89,29 @@ def testparseum981(self): unr = UNIReader(stream) i = 0 for raw, parsed in unr: + if parsed.identity == "GALEPH": + # sanity check, should be ≈ √29,599,800 m + self.assertTrue(5440 < parsed.roota < 5441) + if parsed.identity == "OBSVM": + for n in range(parsed.numobs): + # sanity check for MEO or ISGO pseudoranges... + psr = getattr(parsed, f"psr_{n+1:02d}") + # gnss = getattr(parsed, f"gnss_{n+1:02d}") + # prn = getattr(parsed, f"prn_{n+1:02d}") + # print(n+1, GNSS[gnss], prn, psr) + self.assertTrue( + (19000000 < psr < 28000000) or (40000000 < psr < 41000000) + ) + if parsed.identity == "OBSVMCMP": + for n in range(parsed.numobs): + # sanity check for MEO or ISGO pseudoranges... + psr = getattr(parsed, f"psr_{n+1:02d}") + # gnss = getattr(parsed, f"gnss_{n+1:02d}") + # prn = getattr(parsed, f"prn_{n+1:02d}") + # print(n+1, GNSS[gnss], prn, psr) + self.assertTrue( + (19000000 < psr < 28000000) or (40000000 < psr < 41000000) + ) # print(f'"{parsed}",') # print(f"{raw},") # fout.write(f'"{str(parsed)}",\r\n') @@ -96,6 +120,7 @@ def testparseum981(self): self.assertEqual(isvalid_checksum(raw), True) self.assertIsInstance(unr.datastream, BufferedReader) i += 1 + self.assertEqual(i, len(EXPECTED_RESULTS)) def testparseSATSINFO(self): diff --git a/tests/test_static.py b/tests/test_static.py index 3192348..49279aa 100644 --- a/tests/test_static.py +++ b/tests/test_static.py @@ -17,20 +17,14 @@ import pyunigps.exceptions as une import pyunigps.unitypes_core as unt from pyunigps.unihelpers import ( - att2idx, - att2name, attsiz, bytes2val, calc_crc, escapeall, - get_bits, - get_parts, header2bytes, header2vals, - key_from_val, msgname2id, nomval, - str2val, utc2wnotow, val2bytes, ) @@ -71,6 +65,8 @@ def testVal2Bytes(self): # test conversion of value to bytes (-23.12345678912345, unt.R8), ("test1234", unt.C8), ("test1234", "C016"), + (23.12345678912345, "R008*100"), + (2345, "U002*20"), ] EXPECTED_RESULTS = [ b"\x29\x09", @@ -79,6 +75,8 @@ def testVal2Bytes(self): # test conversion of value to bytes b"\x1f\xc1\x37\xdd\x9a\x1f\x37\xc0", b"test1234", b"test1234 ", + b'\xe0\x8e\xd3\xfc\xb0\x10\xa2@', + b'4\xb7', ] for i, inp in enumerate(INPUTS): val, att = inp @@ -103,6 +101,7 @@ def testBytes2Val(self): # test conversion of bytes to value (b"\xd7\xfc\xb8\x41", unt.R4), (b"\x1f\xc1\x37\xdd\x9a\x1f\x37\xc0", unt.R8), (b"test1234", unt.C8), + (b'\xe0\x8e\xd3\xfc\xb0\x10\xa2@', "R008*100"), ] EXPECTED_RESULTS = [ 2345, @@ -110,6 +109,7 @@ def testBytes2Val(self): # test conversion of bytes to value 23.12345678, -23.12345678912345, "test1234", + 23.123456789123, ] for i, inp in enumerate(INPUTS): valb, att = inp @@ -150,40 +150,10 @@ def testNomValInvalid(self): with self.assertRaisesRegex(une.UNITypeError, "Unknown attribute type Y002"): res = nomval("Y002") - def testgetbits(self): - INPUTS = [ - (b"\x89", 192), - (b"\xc9", 3), - (b"\x89", 9), - (b"\xc9", 9), - (b"\x18\x18", 8), - (b"\x18\x20", 8), - ] - EXPECTED_RESULTS = [2, 1, 9, 9, 1, 0] - for i, (vb, mask) in enumerate(INPUTS): - vi = get_bits(vb, mask) - self.assertEqual(vi, EXPECTED_RESULTS[i]) - def testattsiz(self): # test attsiz self.assertEqual(attsiz(CV), -1) self.assertEqual(attsiz("C032"), 32) - def testatt2idx(self): # test att2idx - EXPECTED_RESULT = [4, 16, 101, 0, (3, 6), 0] - atts = ["svid_04", "gnssId_16", "cno_101", "gmsLon", "gnod_03_06", "dodgy_xx"] - for i, att in enumerate(atts): - res = att2idx(att) - # print(res) - self.assertEqual(res, EXPECTED_RESULT[i]) - - def testatt2name(self): # test att2name - EXPECTED_RESULT = ["svid", "gnssId", "cno", "gmsLon"] - atts = ["svid_04", "gnssId_16", "cno_101", "gmsLon"] - for i, att in enumerate(atts): - res = att2name(att) - # print(res) - self.assertEqual(res, EXPECTED_RESULT[i]) - def testescapeall(self): EXPECTED_RESULT = "b'\\x68\\x65\\x72\\x65\\x61\\x72\\x65\\x73\\x6f\\x6d\\x65\\x63\\x68\\x61\\x72\\x73'" val = b"herearesomechars" @@ -191,14 +161,6 @@ def testescapeall(self): print(res) self.assertEqual(res, EXPECTED_RESULT) - def testkeyfromval(self): - res = key_from_val(UNI_MSGIDS, "GLOEPH") - self.assertEqual(res, 107) - - def testkeyfromvalinvalid(self): - with self.assertRaisesRegex(KeyError, "No key found for value XXXX"): - res = key_from_val(UNI_MSGIDS, "XXXX") - def testutc2wnotow(self): dat = datetime(2026, 1, 28, 9, 34, 12, 234000, tzinfo=timezone.utc) wno, tow, ls = utc2wnotow(dat) @@ -240,25 +202,6 @@ def testheader2vals(self): # print(v) self.assertEqual(v, (0, 17, 308, 1, 1, 2402, 34675834, 1, 0, 18, 23)) - def testgetparts(self): - MESSAGE = b'#VERSIONA,94,GPS,FINE,2190,117325000,0,0,18,160;"UM982","R4.10Build5251","HRPT00-S10C-P","-","ffff48ffff0fffff","2021/11/26"*e195b254\r\n' - BADMESSAGE = b'#VERSIONA,94,GPS,FINE,2190,"HRPT00-S10C-P","-","ffff48ffff0fffff","2021/11/26"\r\n' - header, payload, crcb = get_parts(MESSAGE) - self.assertEqual(header, ["VERSIONA",'94',"GPS","FINE",'2190','117325000','0','0','18','160']) - self.assertEqual(payload, ['"UM982"','"R4.10Build5251"','"HRPT00-S10C-P"','"-"','"ffff48ffff0fffff"','"2021/11/26"']) - self.assertEqual(crcb, b'\x54\xb2\x95\xe1') - with self.assertRaisesRegex(une.UNIMessageError, 'Badly formed ASCII message #VERSIONA,94,GPS,FINE,2190,"HRPT00-S10C-P","-","ffff48ffff0fffff","2021/11/26"'): - get_parts(BADMESSAGE) - - def teststr2val(self): - self.assertEqual(str2val('23',"U001"), 23) - self.assertEqual(str2val('-128',"S001"), -128) - self.assertEqual(str2val('34.123',"R004"), 34.123) - self.assertEqual(str2val('UM980 ',"C006"), "UM980 ") - self.assertEqual(str2val('34e2b1a6',"X004"), 2796675636) - with self.assertRaisesRegex(une.UNITypeError, "Unknown attribute type Y."): - str2val('34e2b1a6',"Y004") - if __name__ == "__main__": # import sys;sys.argv = ['', 'Test.testName']