Changeset 4455f2b in ammosreader
- Timestamp:
- 06/29/22 15:35:25 (3 years ago)
- Branches:
- AmmosSource, guix
- Children:
- 2a9faed
- Parents:
- 83e6c89
- Location:
- ammosreader
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
ammosreader/AmmosAudioDataHeader.py
r83e6c89 r4455f2b 5 5 6 6 7 class AmmosAudioDataHeader ():7 class AmmosAudioDataHeader: 8 8 """I implement an AMMOS data header for audio data frames.""" 9 9 -
ammosreader/AmmosIFDataBlockHeader.py
r83e6c89 r4455f2b 1 1 import struct 2 2 3 3 4 class AmmosIFDataBlockHeader: -
ammosreader/AmmosIFDataBody.py
r83e6c89 r4455f2b 7 7 class AmmosIFDataBody: 8 8 """I implement an AMMOS data body for IF data.""" 9 def __init__(self, data_blocks): 10 self.__data_blocks = data_blocks 9 def __init__(self, data_blocks=None): 10 if data_blocks is None: 11 self.__data_blocks = [] 12 else: 13 self.__data_blocks = data_blocks 11 14 12 15 @property -
ammosreader/AmmosIFDataHeader.py
r83e6c89 r4455f2b 6 6 7 7 class AmmosIFDataHeader(): 8 """I implement a Ammos data header for IF data frames."""8 """I implement an Ammos data header for IF data frames.""" 9 9 10 10 HEADER_SIZE = 56 -
ammosreader/AmmosIFReader.py
r83e6c89 r4455f2b 1 1 """I provide a specialized Ammos Reader for IF data.""" 2 import logging 2 3 3 4 from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody 4 5 from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader 5 6 from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader 6 from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader 7 from ammosreader.AmmosSingleFrame import AmmosSingleFrame 7 from ammosreader.AmmosIFDataBody import AmmosIFDataBody 8 8 from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock 9 from ammosreader.Ammos Container import AmmosContainer9 from ammosreader.AmmosIFDataBlockHeader import AmmosIFDataBlockHeader 10 10 11 11 12 12 class AmmosIFReader(): 13 14 STANDARD_IF_DATA_HEADER_SIZE = 56 15 EXTENDED_IF_DATA_HEADER_SIZE = 76 13 """I read the IF data embedded in an R&S AMMOS recording.""" 16 14 17 15 def __init__(self, file_name): … … 20 18 def read_next_global_frame_body_data_header(self): 21 19 22 bytes = self.ammos_file.read(AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE)20 header_size = AmmosIFDataHeader.HEADER_SIZE 23 21 24 # print("\nReading global frame body standard data header\n") 25 if ((not bytes) or (len(bytes) < AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE)): 26 # print("Can not read all 56 bytes of global frame body data header") 22 bytes = self.ammos_file.read(header_size) 23 24 logging.info("\nReading global frame body standard data header\n") 25 if ((not bytes) or (len(bytes) < header_size)): 26 logging.debug("Can not read all %s bytes of global frame body data header", header_size) 27 27 return None 28 29 data_header = AmmosIFDataHeader.from_bytes(bytes) 30 # print("Data header", data_header) 31 return data_header 28 return AmmosIFDataHeader.from_bytes(bytes) 32 29 33 30 def read_next_global_frame_body_extended_data_header(self): 31 """ 32 I return the next global frame body extended data header from current position in file. 34 33 35 bytes = self.ammos_file.read(AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE) 36 # print("\nReading global frame body extended data header\n") 34 :return: the next Ammos Extended IF data header or None if incomplete 35 :rtype: AmmosExtendedIFDataHeader 36 """ 37 header_size = AmmosExtendedIFDataHeader.HEADER_SIZE 37 38 38 if ((not bytes) or (len(bytes) < AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE)): 39 # print("Can not read all ", 76, "bytes of global frame extended data header") 39 bytes = self.ammos_file.read(header_size) 40 41 logging.info("\nReading global frame body extended data header\n") 42 if ((not bytes) or (len(bytes) < header_size)): 43 logging.debug("Can not read all %s bytes of global frame extended data header", header_size) 40 44 return None 41 extended_data_header = AmmosExtendedIFDataHeader.from_bytes(bytes) 42 # print("Extended data header", extended_data_header) 43 return extended_data_header 45 return AmmosExtendedIFDataHeader.from_bytes(bytes) 44 46 45 def read_next_if_data_blocks(self, n, length): 47 def read_next_if_data_body(self, number_of_data_blocks, data_length): 48 """ 49 I return the next data body read from current position in file. 46 50 47 # FIXME: Describe the parameters better 51 :param number_of_data_blocks: the number of data blocks inside the body 52 :type number_of_data_blocks: int 48 53 49 data_blocks = [] 54 :param data_length: the length of the raw data inside a single block 55 :type data_length: int 56 """ 57 header_size = AmmosIFDataBlockHeader.HEADER_SIZE 50 58 51 block_length = 4 + length59 data_body = AmmosIFDataBody() 52 60 53 total = n*block_length 61 block_length = header_size + data_length 62 63 total = number_of_data_blocks*block_length 54 64 55 65 byte_string = self.ammos_file.read(block_length) 56 66 57 67 if len(byte_string) != total: 58 # print("Can not read all", total, "bytes of data body")68 logging.debug("Can not read all %s bytes of data body", total) 59 69 return None 60 70 61 for i in range(0, n ):71 for i in range(0, number_of_data_blocks): 62 72 result = byte_string[i*block_length:(i*block_length+block_length)] 63 data_blocks.append(AmmosIFDataBlock(result[0:4], result[4:])) 73 data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]), 74 result[header_size:])) 64 75 65 return data_b locks76 return data_body 66 77 67 78 def read_next_global_frame_body(self, data_header_length): 79 """ 80 I return the next global frame body read from current position in file. 68 81 82 :param data_header_length: the length of the data header 83 :type data_header_length: int 84 """ 69 85 if_data_header = None 70 86 71 if data_header_length == AmmosIF Reader.STANDARD_IF_DATA_HEADER_SIZE:87 if data_header_length == AmmosIFDataHeader.HEADER_SIZE: 72 88 if_data_header = self.read_next_global_frame_body_data_header() 73 if data_header_length == Ammos IFReader.EXTENDED_IF_DATA_HEADER_SIZE:89 if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE: 74 90 if_data_header = self.read_next_global_frame_body_extended_data_header() 75 91 76 92 if if_data_header is None: 77 # print("Data header missing")93 logging.debug("Data header missing") 78 94 return None 79 95
Note:
See TracChangeset
for help on using the changeset viewer.