"""I provide a specialized Ammos Reader for IF data.""" from ammosreader.AbstractAmmosReader import AbstractAmmosReader from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader from ammosreader.AmmosIFDataBody import AmmosIFDataBody from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock from ammosreader.AmmosIFDataBlockHeader import AmmosIFDataBlockHeader from ammosreader import logger class AmmosIFReader(AbstractAmmosReader): """I read the IF data embedded in an R&S AMMOS recording.""" def __init__(self, file_name): super().__init__(file_name) def read_next_global_frame_body_data_header(self): header_size = AmmosIFDataHeader.HEADER_SIZE bytes = self.ammos_file.read(header_size) logger.info("\nReading global frame body standard data header\n") if ((not bytes) or (len(bytes) < header_size)): logger.debug("Can not read all %s bytes of global frame body data header", header_size) return None return AmmosIFDataHeader.from_bytes(bytes) def read_next_global_frame_body_extended_data_header(self): """ I return the next global frame body extended data header from current position in file. :return: the next Ammos Extended IF data header or None if incomplete :rtype: AmmosExtendedIFDataHeader """ header_size = AmmosExtendedIFDataHeader.HEADER_SIZE bytes = self.ammos_file.read(header_size) logger.info("\nReading global frame body extended data header\n") if ((not bytes) or (len(bytes) < header_size)): logger.debug("Can not read all %s bytes of global frame extended data header", header_size) return None return AmmosExtendedIFDataHeader.from_bytes(bytes) def read_next_if_data_body(self, number_of_data_blocks, data_length): """ I return the next data body read from current position in file. :param number_of_data_blocks: the number of data blocks inside the body :type number_of_data_blocks: int :param data_length: the length of the raw data inside a single block :type data_length: int """ header_size = AmmosIFDataBlockHeader.HEADER_SIZE data_body = AmmosIFDataBody() block_length = header_size + data_length total = number_of_data_blocks*block_length byte_string = self.ammos_file.read(block_length) if len(byte_string) != total: logger.debug("Can not read all %s bytes of data body", total) return None for i in range(0, number_of_data_blocks): result = byte_string[i*block_length:(i*block_length+block_length)] data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]), result[header_size:])) return data_body def read_next_global_frame_body(self, data_header_length): """ I return the next global frame body read from current position in file. :param data_header_length: the length of the data header :type data_header_length: int """ if_data_header = None if data_header_length == AmmosIFDataHeader.HEADER_SIZE: if_data_header = self.read_next_global_frame_body_data_header() if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE: if_data_header = self.read_next_global_frame_body_extended_data_header() if if_data_header is None: logger.debug("Data header missing") return None if_data_body = self.read_next_if_data_body(if_data_header.block_count, if_data_header.block_length) if if_data_body is None: logger.debug("Data body missing") return None return AmmosGlobalFrameBody(if_data_header, if_data_body) def payload(self): return b"".join([each.global_frame_body.data_body.payload for each in self.container.global_frames])