Changeset 38d2a8b in ammosreader
- Timestamp:
- 06/30/22 11:35:30 (3 years ago)
- Branches:
- AmmosSource, guix
- Children:
- 6ded698
- Parents:
- ca6263a
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
ammosreader/AbstractAmmosReader.py
rca6263a r38d2a8b 12 12 class AbstractAmmosReader(ABC): 13 13 """I implement a base class for specialized AmmosReaders.""" 14 15 GLOBAL_HEADER_SIZE = 24 # 8 words16 14 17 15 def __init__(self, file_name): … … 81 79 :rtype: AmmosGlobalFrameHeader 82 80 """ 83 bytes = self.ammos_file.read(AbstractAmmosReader.GLOBAL_HEADER_SIZE) 81 82 header_size = AmmosGlobalFrameHeader.HEADER_SIZE 83 84 bytes = self.ammos_file.read(header_size) 84 85 logging.info("Reading next global frame header") 85 if ((not bytes) or (len(bytes) < AbstractAmmosReader.GLOBAL_HEADER_SIZE)):86 logging.debug("Can not read all %s bytes of global frame header", AbstractAmmosReader.GLOBAL_HEADER_SIZE)86 if ((not bytes) or (len(bytes) < header_size)): 87 logging.debug("Can not read all %s bytes of global frame header", header_size) 87 88 return None 88 89 -
ammosreader/AmmosExtendedAudioDataHeader.py
rca6263a r38d2a8b 5 5 6 6 7 class AmmosExtendedAudioDataHeader ():7 class AmmosExtendedAudioDataHeader: 8 8 9 9 HEADER_SIZE = 44 # 11 words … … 14 14 assert len(bytes) == cls.HEADER_SIZE 15 15 standard_header = AmmosAudioDataHeader.from_bytes(bytes[0:AmmosAudioDataHeader.HEADER_SIZE]) 16 extended_header_elements = struct.unpack(' Q', bytes[AmmosAudioDataHeader.HEADER_SIZE:])16 extended_header_elements = struct.unpack('<Q', bytes[AmmosAudioDataHeader.HEADER_SIZE:]) 17 17 timestamp = extended_header_elements[0] 18 18 sample_rate = standard_header.sample_rate -
ammosreader/AmmosExtendedIFDataHeader.py
rca6263a r38d2a8b 13 13 """I return an AMMOS extended data header from given bytes.""" 14 14 standard_header = AmmosIFDataHeader.from_bytes(bytes[0:AmmosIFDataHeader.HEADER_SIZE]) 15 extended_header_elements = struct.unpack(' QQI', bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE])15 extended_header_elements = struct.unpack('<QQI', bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE]) 16 16 block_count = standard_header.block_count 17 17 block_length = standard_header.block_length -
ammosreader/AmmosGlobalFrameHeader.py
rca6263a r38d2a8b 5 5 6 6 MAGIC_WORD = "726574fb" 7 HEADER_SIZE = 24 7 8 8 9 @classmethod 9 10 def from_bytes(cls, bytes): 10 11 11 elements = struct.unpack(' 4s4s4s4s4s4s', bytes)12 elements = struct.unpack('<4s4s4s4s4s4s', bytes) 12 13 13 14 magic_word = elements[0].hex() -
ammosreader/AmmosIFDataBlockHeader.py
rca6263a r38d2a8b 13 13 elements = struct.unpack('<ccH', bytes) 14 14 header = cls() 15 first_entry = int.from_bytes(elements[0], byteorder=' big')15 first_entry = int.from_bytes(elements[0], byteorder='little') 16 16 header.invalidity = ((first_entry & 1) == 1) 17 17 header.blanking = ((first_entry & 2) == 1) 18 18 header.user_data = ((first_entry & 252)) 19 header.reserved = int.from_bytes(elements[1], byteorder=' big')19 header.reserved = int.from_bytes(elements[1], byteorder='little') 20 20 header.reciprocal_gain = elements[2] 21 21 return header -
ammosreader/PDW.py
rca6263a r38d2a8b 26 26 assert(len(byte_string) == 32) 27 27 28 parts = struct.unpack(' Q4s4s4s4s4s4s', byte_string)28 parts = struct.unpack('<Q4s4s4s4s4s4s', byte_string) 29 29 nanoseconds = (parts[0]) 30 30 time_of_arrival = np.datetime64(nanoseconds, 'ns') -
sample_scripts/iqdw_reader.py
rca6263a r38d2a8b 3 3 import sys 4 4 import os 5 sys.path.append('../src/')6 5 7 6 from ammosreader.AmmosIFReader import AmmosIFReader
Note:
See TracChangeset
for help on using the changeset viewer.