Changeset 4455f2b in ammosreader


Ignore:
Timestamp:
06/29/22 15:35:25 (3 years ago)
Author:
Enrico Schwass <ennoausberlin@…>
Branches:
AmmosSource, guix
Children:
2a9faed
Parents:
83e6c89
Message:

another massive refactoring for IF frames

Location:
ammosreader
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • ammosreader/AmmosAudioDataHeader.py

    r83e6c89 r4455f2b  
    55
    66
    7 class AmmosAudioDataHeader():
     7class AmmosAudioDataHeader:
    88    """I implement an AMMOS data header for audio data frames."""
    99
  • ammosreader/AmmosIFDataBlockHeader.py

    r83e6c89 r4455f2b  
    11import struct
     2
    23
    34class AmmosIFDataBlockHeader:
  • ammosreader/AmmosIFDataBody.py

    r83e6c89 r4455f2b  
    77class AmmosIFDataBody:
    88    """I implement an AMMOS data body for IF data."""
    9     def __init__(self, data_blocks):
    10         self.__data_blocks = data_blocks
     9    def __init__(self, data_blocks=None):
     10        if data_blocks is None:
     11            self.__data_blocks = []
     12        else:
     13            self.__data_blocks = data_blocks
    1114
    1215    @property
  • ammosreader/AmmosIFDataHeader.py

    r83e6c89 r4455f2b  
    66
    77class AmmosIFDataHeader():
    8     """I implement a Ammos data header for IF data frames."""
     8    """I implement an Ammos data header for IF data frames."""
    99
    1010    HEADER_SIZE = 56
  • ammosreader/AmmosIFReader.py

    r83e6c89 r4455f2b  
    11"""I provide a specialized Ammos Reader for IF data."""
     2import logging
    23
    34from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
    45from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader
    56from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader
    6 from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
    7 from ammosreader.AmmosSingleFrame import AmmosSingleFrame
     7from ammosreader.AmmosIFDataBody import AmmosIFDataBody
    88from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock
    9 from ammosreader.AmmosContainer import AmmosContainer
     9from ammosreader.AmmosIFDataBlockHeader import AmmosIFDataBlockHeader
    1010
    1111
    1212class AmmosIFReader():
    13 
    14     STANDARD_IF_DATA_HEADER_SIZE = 56
    15     EXTENDED_IF_DATA_HEADER_SIZE = 76
     13    """I read the IF data embedded in an R&S AMMOS recording."""
    1614
    1715    def __init__(self, file_name):
     
    2018    def read_next_global_frame_body_data_header(self):
    2119
    22         bytes = self.ammos_file.read(AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE)
     20        header_size = AmmosIFDataHeader.HEADER_SIZE
    2321
    24         # print("\nReading global frame body standard data header\n")
    25         if ((not bytes) or (len(bytes) < AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE)):
    26             # print("Can not read all 56 bytes of global frame body data header")
     22        bytes = self.ammos_file.read(header_size)
     23
     24        logging.info("\nReading global frame body standard data header\n")
     25        if ((not bytes) or (len(bytes) < header_size)):
     26            logging.debug("Can not read all %s bytes of global frame body data header", header_size)
    2727            return None
    28 
    29         data_header = AmmosIFDataHeader.from_bytes(bytes)
    30         # print("Data header", data_header)
    31         return data_header
     28        return AmmosIFDataHeader.from_bytes(bytes)
    3229
    3330    def read_next_global_frame_body_extended_data_header(self):
     31        """
     32        I return the next global frame body extended data header from current position in file.
    3433
    35         bytes = self.ammos_file.read(AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE)
    36         # print("\nReading global frame body extended data header\n")
     34        :return: the next Ammos Extended IF data header or None if incomplete
     35        :rtype: AmmosExtendedIFDataHeader
     36        """
     37        header_size = AmmosExtendedIFDataHeader.HEADER_SIZE
    3738
    38         if ((not bytes) or (len(bytes) < AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE)):
    39             # print("Can not read all ", 76, "bytes of global frame extended data header")
     39        bytes = self.ammos_file.read(header_size)
     40
     41        logging.info("\nReading global frame body extended data header\n")
     42        if ((not bytes) or (len(bytes) < header_size)):
     43            logging.debug("Can not read all %s bytes of global frame extended data header", header_size)
    4044            return None
    41         extended_data_header = AmmosExtendedIFDataHeader.from_bytes(bytes)
    42         # print("Extended data header", extended_data_header)
    43         return extended_data_header
     45        return AmmosExtendedIFDataHeader.from_bytes(bytes)
    4446
    45     def read_next_if_data_blocks(self, n, length):
     47    def read_next_if_data_body(self, number_of_data_blocks, data_length):
     48        """
     49        I return the next data body read from current position in file.
    4650
    47         # FIXME: Describe the parameters better
     51        :param number_of_data_blocks: the number of data blocks inside the body
     52        :type number_of_data_blocks: int
    4853
    49         data_blocks = []
     54        :param data_length: the length of the raw data inside a single block
     55        :type data_length: int
     56        """
     57        header_size = AmmosIFDataBlockHeader.HEADER_SIZE
    5058
    51         block_length = 4 + length
     59        data_body = AmmosIFDataBody()
    5260
    53         total = n*block_length
     61        block_length = header_size + data_length
     62
     63        total = number_of_data_blocks*block_length
    5464
    5565        byte_string = self.ammos_file.read(block_length)
    5666
    5767        if len(byte_string) != total:
    58             # print("Can not read all", total, "bytes of data body")
     68            logging.debug("Can not read all %s bytes of data body", total)
    5969            return None
    6070
    61         for i in range(0, n):
     71        for i in range(0, number_of_data_blocks):
    6272            result = byte_string[i*block_length:(i*block_length+block_length)]
    63             data_blocks.append(AmmosIFDataBlock(result[0:4], result[4:]))
     73            data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]),
     74                                                      result[header_size:]))
    6475
    65         return data_blocks
     76        return data_body
    6677
    6778    def read_next_global_frame_body(self, data_header_length):
     79        """
     80        I return the next global frame body read from current position in file.
    6881
     82        :param data_header_length: the length of the data header
     83        :type data_header_length: int
     84        """
    6985        if_data_header = None
    7086
    71         if data_header_length == AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE:
     87        if data_header_length == AmmosIFDataHeader.HEADER_SIZE:
    7288            if_data_header = self.read_next_global_frame_body_data_header()
    73         if data_header_length == AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE:
     89        if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE:
    7490            if_data_header = self.read_next_global_frame_body_extended_data_header()
    7591
    7692        if if_data_header is None:
    77             # print("Data header missing")
     93            logging.debug("Data header missing")
    7894            return None
    7995
Note: See TracChangeset for help on using the changeset viewer.