Source code for gripspy.telemetry.generators
"""
Module with generators for handling telemetry files
"""
from __future__ import division, absolute_import, print_function
from sys import stdout
from .parsers import parser
from ..util.checksum import crc16
__all__ = ['packet_generator', 'parser_generator']
[docs]def packet_generator(f, verbose=False, raise_exceptions=False):
"""Generator that yields valid packets from a telemetry-file object.
Packets that have invalid checksums are skipped past.
Parameters
----------
f : file object
e.g., an already open file object
verbose : boolean
If True, output the percentage of the file as a progress indicator
raise_exceptions : boolean
If True, raise exceptions (e.g., if an invalid checksum is encountered)
"""
current_location = f.tell()
f.seek(0, 2)
end_of_file = f.tell()
f.seek(current_location)
if verbose:
percent_completed = int(100 * current_location / end_of_file)
stdout.write("{0:3d}%\b\b\b\b".format(percent_completed))
while True:
first_byte = f.read(1)
if not first_byte:
if verbose:
print("No more bytes to read, stopping")
break
if first_byte == b'\x90':
second_byte = f.read(1)
if not second_byte:
if verbose:
print("Not enough bytes remaining for a sync word, stopping")
break
# If there is no sync word, move pointer appropriately for next read
if second_byte != b'\xeb':
if second_byte == b'\x90':
f.seek(-1, 1)
continue
sync_word_position = f.tell() - 2
remaining_bytes = end_of_file - sync_word_position
if remaining_bytes < 16:
if verbose:
print("Not enough bytes remaining for a header, stopping")
break
# Extract the payload length from the header
packet = bytearray(b'\x90\xeb' + f.read(14))
length = packet[6] | packet[7] << 8
if remaining_bytes >= 16 + length:
packet = packet + bytearray(f.read(length))
claimed_checksum = packet[2] | packet[3] << 8
packet[2:4] = [0, 0]
calculated_checksum = crc16(packet)
if claimed_checksum == calculated_checksum:
if verbose:
if (sync_word_position + length) >= ((percent_completed + 1) / 100.) * end_of_file:
percent_completed += 1
stdout.write("{0:3d}%\b\b\b\b".format(percent_completed))
yield packet
else:
f.seek(sync_word_position + 2)
print("Invalid checksum: {0} != {1}".format(claimed_checksum, calculated_checksum))
if raise_exceptions:
raise RuntimeError("Invalid checksum: {0} != {1}".format(claimed_checksum, calculated_checksum))
else:
f.seek(sync_word_position + 2)
if verbose:
print("Apparent payload length exceeds file length, skipping")
if raise_exceptions:
raise RuntimeError("Apparent payload length exceeds file length, skipping")
[docs]def parser_generator(f, filter_systemid=None, filter_tmtype=None, verbose=False):
"""Generator that yields parsed packet contents from a telemetry-file object.
Packets that have invalid checksums are skipped past.
Parameters
----------
f : file object
e.g., an already open file object
filter_systemid : int
If specified, only yield packets that have a matching SystemId
filter_tmtype : int
If specified, only yield packets that have a match TmType
verbose : boolean
If True, output the percentage of the file as a progress indicator
"""
pg = packet_generator(f, verbose=verbose)
for packet in pg:
out = parser(packet, filter_systemid=filter_systemid, filter_tmtype=filter_tmtype)
if out is not None:
yield out