Module ammosreader.AmmosIFReader

I provide a specialized Ammos Reader for IF data.

Expand source code
"""I provide a specialized Ammos Reader for IF data."""
import logging

from ammosreader.AbstractAmmosReader import AbstractAmmosReader
from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader
from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader
from ammosreader.AmmosIFDataBody import AmmosIFDataBody
from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock
from ammosreader.AmmosIFDataBlockHeader import AmmosIFDataBlockHeader


class AmmosIFReader(AbstractAmmosReader):
    """I read the IF data embedded in an R&S AMMOS recording."""

    def __init__(self, file_name):
        super().__init__(file_name)

    def read_next_global_frame_body_data_header(self):

        header_size = AmmosIFDataHeader.HEADER_SIZE

        bytes = self.ammos_file.read(header_size)

        logging.info("\nReading global frame body standard data header\n")
        if ((not bytes) or (len(bytes) < header_size)):
            logging.debug("Can not read all %s bytes of global frame body data header", header_size)
            return None
        return AmmosIFDataHeader.from_bytes(bytes)

    def read_next_global_frame_body_extended_data_header(self):
        """
        I return the next global frame body extended data header from current position in file.

        :return: the next Ammos Extended IF data header or None if incomplete
        :rtype: AmmosExtendedIFDataHeader
        """
        header_size = AmmosExtendedIFDataHeader.HEADER_SIZE

        bytes = self.ammos_file.read(header_size)

        logging.info("\nReading global frame body extended data header\n")
        if ((not bytes) or (len(bytes) < header_size)):
            logging.debug("Can not read all %s bytes of global frame extended data header", header_size)
            return None
        return AmmosExtendedIFDataHeader.from_bytes(bytes)

    def read_next_if_data_body(self, number_of_data_blocks, data_length):
        """
        I return the next data body read from current position in file.

        :param number_of_data_blocks: the number of data blocks inside the body
        :type number_of_data_blocks: int

        :param data_length: the length of the raw data inside a single block
        :type data_length: int
        """
        header_size = AmmosIFDataBlockHeader.HEADER_SIZE

        data_body = AmmosIFDataBody()

        block_length = header_size + data_length

        total = number_of_data_blocks*block_length

        byte_string = self.ammos_file.read(block_length)

        if len(byte_string) != total:
            logging.debug("Can not read all %s bytes of data body", total)
            return None

        for i in range(0, number_of_data_blocks):
            result = byte_string[i*block_length:(i*block_length+block_length)]
            data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]),
                                                      result[header_size:]))

        return data_body

    def read_next_global_frame_body(self, data_header_length):
        """
        I return the next global frame body read from current position in file.

        :param data_header_length: the length of the data header
        :type data_header_length: int
        """
        if_data_header = None

        if data_header_length == AmmosIFDataHeader.HEADER_SIZE:
            if_data_header = self.read_next_global_frame_body_data_header()
        if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE:
            if_data_header = self.read_next_global_frame_body_extended_data_header()

        if if_data_header is None:
            logging.debug("Data header missing")
            return None

        if_data_body = self.read_next_if_data_body(if_data_header.block_count, if_data_header.block_length)

        if if_data_body is None:
            logging.debug("Data body missing")
            return None

        return AmmosGlobalFrameBody(if_data_header, if_data_body)

    def data(self):
        return b"".join([each.global_frame_body.data_body.data for each in self.container.global_frames])

Classes

class AmmosIFReader (file_name)

I read the IF data embedded in an R&S AMMOS recording.

I am the standard constructor for Ammos Readers.

Additional information about the file can be added as key/value pairs in tags

:param file_name: The file to read Ammos data from :type file_name: str

Expand source code
class AmmosIFReader(AbstractAmmosReader):
    """I read the IF data embedded in an R&S AMMOS recording."""

    def __init__(self, file_name):
        super().__init__(file_name)

    def read_next_global_frame_body_data_header(self):

        header_size = AmmosIFDataHeader.HEADER_SIZE

        bytes = self.ammos_file.read(header_size)

        logging.info("\nReading global frame body standard data header\n")
        if ((not bytes) or (len(bytes) < header_size)):
            logging.debug("Can not read all %s bytes of global frame body data header", header_size)
            return None
        return AmmosIFDataHeader.from_bytes(bytes)

    def read_next_global_frame_body_extended_data_header(self):
        """
        I return the next global frame body extended data header from current position in file.

        :return: the next Ammos Extended IF data header or None if incomplete
        :rtype: AmmosExtendedIFDataHeader
        """
        header_size = AmmosExtendedIFDataHeader.HEADER_SIZE

        bytes = self.ammos_file.read(header_size)

        logging.info("\nReading global frame body extended data header\n")
        if ((not bytes) or (len(bytes) < header_size)):
            logging.debug("Can not read all %s bytes of global frame extended data header", header_size)
            return None
        return AmmosExtendedIFDataHeader.from_bytes(bytes)

    def read_next_if_data_body(self, number_of_data_blocks, data_length):
        """
        I return the next data body read from current position in file.

        :param number_of_data_blocks: the number of data blocks inside the body
        :type number_of_data_blocks: int

        :param data_length: the length of the raw data inside a single block
        :type data_length: int
        """
        header_size = AmmosIFDataBlockHeader.HEADER_SIZE

        data_body = AmmosIFDataBody()

        block_length = header_size + data_length

        total = number_of_data_blocks*block_length

        byte_string = self.ammos_file.read(block_length)

        if len(byte_string) != total:
            logging.debug("Can not read all %s bytes of data body", total)
            return None

        for i in range(0, number_of_data_blocks):
            result = byte_string[i*block_length:(i*block_length+block_length)]
            data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]),
                                                      result[header_size:]))

        return data_body

    def read_next_global_frame_body(self, data_header_length):
        """
        I return the next global frame body read from current position in file.

        :param data_header_length: the length of the data header
        :type data_header_length: int
        """
        if_data_header = None

        if data_header_length == AmmosIFDataHeader.HEADER_SIZE:
            if_data_header = self.read_next_global_frame_body_data_header()
        if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE:
            if_data_header = self.read_next_global_frame_body_extended_data_header()

        if if_data_header is None:
            logging.debug("Data header missing")
            return None

        if_data_body = self.read_next_if_data_body(if_data_header.block_count, if_data_header.block_length)

        if if_data_body is None:
            logging.debug("Data body missing")
            return None

        return AmmosGlobalFrameBody(if_data_header, if_data_body)

    def data(self):
        return b"".join([each.global_frame_body.data_body.data for each in self.container.global_frames])

Ancestors

Methods

def data(self)
Expand source code
def data(self):
    return b"".join([each.global_frame_body.data_body.data for each in self.container.global_frames])
def read_next_global_frame_body(self, data_header_length)

I return the next global frame body read from current position in file.

:param data_header_length: the length of the data header :type data_header_length: int

Expand source code
def read_next_global_frame_body(self, data_header_length):
    """
    I return the next global frame body read from current position in file.

    :param data_header_length: the length of the data header
    :type data_header_length: int
    """
    if_data_header = None

    if data_header_length == AmmosIFDataHeader.HEADER_SIZE:
        if_data_header = self.read_next_global_frame_body_data_header()
    if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE:
        if_data_header = self.read_next_global_frame_body_extended_data_header()

    if if_data_header is None:
        logging.debug("Data header missing")
        return None

    if_data_body = self.read_next_if_data_body(if_data_header.block_count, if_data_header.block_length)

    if if_data_body is None:
        logging.debug("Data body missing")
        return None

    return AmmosGlobalFrameBody(if_data_header, if_data_body)
def read_next_global_frame_body_data_header(self)
Expand source code
def read_next_global_frame_body_data_header(self):

    header_size = AmmosIFDataHeader.HEADER_SIZE

    bytes = self.ammos_file.read(header_size)

    logging.info("\nReading global frame body standard data header\n")
    if ((not bytes) or (len(bytes) < header_size)):
        logging.debug("Can not read all %s bytes of global frame body data header", header_size)
        return None
    return AmmosIFDataHeader.from_bytes(bytes)
def read_next_global_frame_body_extended_data_header(self)

I return the next global frame body extended data header from current position in file.

:return: the next Ammos Extended IF data header or None if incomplete :rtype: AmmosExtendedIFDataHeader

Expand source code
def read_next_global_frame_body_extended_data_header(self):
    """
    I return the next global frame body extended data header from current position in file.

    :return: the next Ammos Extended IF data header or None if incomplete
    :rtype: AmmosExtendedIFDataHeader
    """
    header_size = AmmosExtendedIFDataHeader.HEADER_SIZE

    bytes = self.ammos_file.read(header_size)

    logging.info("\nReading global frame body extended data header\n")
    if ((not bytes) or (len(bytes) < header_size)):
        logging.debug("Can not read all %s bytes of global frame extended data header", header_size)
        return None
    return AmmosExtendedIFDataHeader.from_bytes(bytes)
def read_next_if_data_body(self, number_of_data_blocks, data_length)

I return the next data body read from current position in file.

:param number_of_data_blocks: the number of data blocks inside the body :type number_of_data_blocks: int

:param data_length: the length of the raw data inside a single block :type data_length: int

Expand source code
def read_next_if_data_body(self, number_of_data_blocks, data_length):
    """
    I return the next data body read from current position in file.

    :param number_of_data_blocks: the number of data blocks inside the body
    :type number_of_data_blocks: int

    :param data_length: the length of the raw data inside a single block
    :type data_length: int
    """
    header_size = AmmosIFDataBlockHeader.HEADER_SIZE

    data_body = AmmosIFDataBody()

    block_length = header_size + data_length

    total = number_of_data_blocks*block_length

    byte_string = self.ammos_file.read(block_length)

    if len(byte_string) != total:
        logging.debug("Can not read all %s bytes of data body", total)
        return None

    for i in range(0, number_of_data_blocks):
        result = byte_string[i*block_length:(i*block_length+block_length)]
        data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]),
                                                  result[header_size:]))

    return data_body

Inherited members