Source code for gripspy.telemetry.inspection

Module for inspecting telemetry files
from __future__ import division, absolute_import, print_function

from io import open
from os.path import getsize

import numpy as np

from .generators import packet_generator
from .parsers import parse_packet_header

__all__ = ['inspect']

[docs]def inspect(filename): """Report top-level information about the contents of one or more telemetry files Parameters ---------- filename : str or list of str One or more telemetry files """ # If a list is provided as input, recurse on each entry of the list if type(filename) == list: for entry in filename: inspect(entry) print() return packet_count = np.zeros((256, 256), dtype=np.uint64) byte_count = np.zeros((256, 256), dtype=np.uint64) previous_packet_counter = np.full((256, 256), 0xFFFFFFFF, dtype=np.uint64) gap_count = np.zeros((256, 256), dtype=np.uint64) not_coincident = np.zeros(16, dtype=bool) systime_start = None print("Inspecting {0}".format(filename)) total_bytes = getsize(filename) print("{0} bytes".format(total_bytes)) with open(filename, 'rb') as f: pg = packet_generator(f, raise_exceptions=True) for p in pg: header = parse_packet_header(p) systemid = header['systemid'] tmtype = header['tmtype'] packet_count[systemid, tmtype] += 1 byte_count[systemid, tmtype] += 16 + header['length'] if systime_start is None: systime_start = header['systime'] if previous_packet_counter[systemid, tmtype] != 0xFFFFFFFF: packet_counter_difference = header['counter'] - previous_packet_counter[systemid, tmtype] if packet_counter_difference > 1: # The counter works slightly differently for quicklook-spectrum packets if not (packet_counter_difference == 6 and systemid == 0x10 and (tmtype & 0xF0) == 0x10): gap_count[systemid, tmtype] += 1 if (systemid & 0xF0) == 0x80 and tmtype == 0xF3: buf = bytearray(p) if buf[19:22] == b'\0\0\0' or buf[22:25] == b'\0\0\0': not_coincident[systemid & 0x0F] = True previous_packet_counter[systemid, header['tmtype']] = header['counter'] systime_end = header['systime'] print("Packet breakdown:") for systemid in range(256): for tmtype in range(256): if packet_count[systemid, tmtype] != 0: print(" 0x{0:02x} 0x{1:02x} : [{2:5.1%}] {3}".format(systemid, tmtype, byte_count[systemid, tmtype] / total_bytes, packet_count[systemid, tmtype]) + \ (" (includes non-coincident data!)" if (systemid & 0xF0) == 0x80 and tmtype == 0xF3 and not_coincident[systemid & 0x0F] else "") + \ (", plus {0} gaps".format(gap_count[systemid, tmtype]) if gap_count[systemid, tmtype] > 0 else "")) print("Elapsed gondola time: {0} ({1} minutes)".format(systime_end - systime_start, (systime_end - systime_start) * 1e-7 / 60))