Changeset d7ea525 in ammosreader
- Timestamp:
- 06/29/22 13:11:15 (3 years ago)
- Branches:
- AmmosSource, guix
- Children:
- 4f0cd38
- Parents:
- 87b2e39
- Location:
- ammosreader
- Files:
-
- 1 added
- 8 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
ammosreader/AbstractAmmosReader.py
r87b2e39 rd7ea525 7 7 from ammosreader.AmmosSingleFrame import AmmosSingleFrame 8 8 from ammosreader.AmmosContainer import AmmosContainer 9 from ammosreader. CAConstants import FrameType9 from ammosreader.AmmosConstants import FrameType 10 10 11 11 -
ammosreader/AmmosAudioDataBody.py
r87b2e39 rd7ea525 1 class AmmosAudioDataBlock: 1 """I provide an AMMOS data block for audio data frames.""" 2 3 4 class AmmosAudioDataBody: 5 """I implement an AMMOS data block for audio data frames.""" 2 6 3 7 def __init__(self, pcm_data, number_of_channels=1, samples_per_channel=1, sample_size=1): 4 8 """I return a new AMMOS data block for audio data frames.""" 5 9 self.__number_of_channels = number_of_channels 6 10 self.__samples_per_channel = samples_per_channel … … 10 14 @property 11 15 def data(self): 16 """I return the raw pcm data with channels interweaved.""" 12 17 return self.__data 13 18 14 19 def pcm_for_channel(self, channel_number): 20 """I return the raw pcm data for a given channel.""" 15 21 start_offset = channel_number * self.__sample_size 16 22 step = self.__sample_size * self.__number_of_channels … … 22 28 23 29 def __str__(self): 30 """I return the string representation of myself.""" 24 31 return ("Number of channels:" + str(self.__number_of_channels) + 25 32 "\nSamples per channel:" + str(self.__samples_per_channel) + -
ammosreader/AmmosAudioDataHeader.py
r87b2e39 rd7ea525 1 """I provide an AMMOS data header for audio data frames.""" 2 1 3 import struct 2 import numpy 4 from ammosreader.AmmosConstants import AmmosAudioDemodType 3 5 4 class AmmosAudioDemodType():5 6 @classmethod7 @property8 def mapping(cls):9 return {0: 'FM', 1: 'AM', 5: 'ISB', 6: 'CW',10 7: 'USB', 8: 'LSB', 256: 'DIGITAL',11 0xFFFFFFFF: 'UNKNOWN'}12 13 def __init__(self, demod_type):14 self.demod_type = demod_type15 16 def __str__(self):17 return AmmosAudioDemodType.mapping[self.demod_type]18 6 19 7 class AmmosAudioDataHeader(): 8 """I implement an AMMOS data header for audio data frames.""" 9 10 HEADER_SIZE = 36 # 9 words 20 11 21 12 @classmethod 22 13 def from_bytes(cls, bytes): 14 """I return an AMMOS data header from given bytes.""" 15 assert len(bytes) == cls.HEADER_SIZE 23 16 elements = struct.unpack('<IIQIIIII', bytes) 24 17 sample_rate = elements[0] … … 26 19 frequency = elements[2] 27 20 demod_bandwidth = elements[3] 28 demod_type = elements[4]29 sample_count= elements[5]30 channel_count= elements[6]21 demod_type = AmmosAudioDemodType(elements[4]) 22 number_of_samples = elements[5] 23 number_of_channels = elements[6] 31 24 sample_size = elements[7] 32 25 return AmmosAudioDataHeader(sample_rate, status, frequency, demod_bandwidth, demod_type, 33 sample_count, channel_count, sample_size)26 number_of_samples, number_of_channels, sample_size) 34 27 35 def __init__(self, sample_rate, status, frequency, demod_bandwidth, demod_type, sample_count, channel_count, sample_size): 28 def __init__(self, sample_rate, status, frequency, demod_bandwidth, demod_type, number_of_samples, 29 number_of_channels, sample_size): 30 """I create a new instance of myself using the above parameters.""" 36 31 self.sample_rate = sample_rate 37 32 self.status = status … … 39 34 self.demod_bandwidth = demod_bandwidth 40 35 self.demod_type = AmmosAudioDemodType(demod_type) 41 self. sample_count = sample_count42 self. channel_count = channel_count36 self.number_of_samples = number_of_samples 37 self.number_of_channels = number_of_channels 43 38 self.sample_size = sample_size 44 39 45 40 def __str__(self): 41 """I return the string representation of myself.""" 46 42 return ("\nAmmosAudioDataHeader\n" + 47 43 "Sample rate:" + str(self.sample_rate) + "\n" + … … 50 46 "Demodulation bandwidth:" + str(self.demod_bandwidth) + "\n" + 51 47 "Demodulation type:" + str(self.demod_type) + "\n" + 52 "Sample count:" + str(self. sample_count) + "\n" +53 "Channel count:" + str(self. channel_count) + "\n" +48 "Sample count:" + str(self.number_of_samples) + "\n" + 49 "Channel count:" + str(self.number_of_channels) + "\n" + 54 50 "Sample size:" + str(self.sample_size) + "\n") -
ammosreader/AmmosAudioReader.py
r87b2e39 rd7ea525 1 1 """I provide a specialized Ammos Reader for audio data.""" 2 import logging 2 3 3 4 from ammosreader.AbstractAmmosReader import AbstractAmmosReader … … 10 11 class AmmosAudioReader(AbstractAmmosReader): 11 12 """I read the audio data embedded in an R&S AMMOS recording.""" 12 13 STANDARD_AUDIO_DATA_HEADER_SIZE = 36 # 9 words14 EXTENDED_AUDIO_DATA_HEADER_SIZE = 44 # 11 words15 13 16 14 def __init__(self, file_name): … … 32 30 :rtype: AmmosAudioDataHeader 33 31 """ 34 bytes = self.ammos_file.read(AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)32 header_size = AmmosAudioDataHeader.HEADER_SIZE 35 33 36 # print("\nReading global frame body standard data header\n") 37 if ((not bytes) or (len(bytes) < AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)): 38 print("Can not read all", AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE, 39 "bytes of global frame body data header") 34 bytes = self.ammos_file.read(header_size) 35 36 logging.info("\nReading global frame body standard data header\n") 37 if ((not bytes) or (len(bytes) < header_size)): 38 logging.debug("Can not read all %s bytes of global frame body data header", header_size) 40 39 return None 41 40 return AmmosAudioDataHeader.from_bytes(bytes) … … 48 47 :rtype: AmmosExtendedAudioDataHeader 49 48 """ 50 bytes = self.ammos_file.read(AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)49 header_size = AmmosExtendedAudioDataHeader.HEADER_SIZE 51 50 52 if ((not bytes) or (len(bytes) < AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)): 53 print("Can not read all ", AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE, 54 " bytes of global frame extended data header") 51 bytes = self.ammos_file.read(header_size) 52 53 if ((not bytes) or (len(bytes) < header_size)): 54 logging.debug("Can not read all %s bytes of global frame extended data header", header_size) 55 55 return None 56 56 return AmmosExtendedAudioDataHeader.from_bytes(bytes) 57 57 58 def read_next_audio_data_body(self, sample_count, channel_count, sample_size):58 def read_next_audio_data_body(self, number_of_samples, number_of_channels, sample_size): 59 59 """ 60 60 I return the next audio data read from current position in file. 61 61 62 :param sample_count: the number of samples per channel inside data body63 :type sample_count: int62 :param number_of_samples: the number of samples per channel inside data body 63 :type number_of_samples: int 64 64 65 :param channel_count: number of channels (e.g. mono, stereo or even more)66 :type channel_count: int65 :param number_of_channels: number of channels (e.g. mono, stereo or even more) 66 :type number_of_channels: int 67 67 68 68 :param sample_size: sample size in bytes (1, 2 or 4 bytes) … … 72 72 :rtype: bytes 73 73 """ 74 # FIXME: Describe the parameters better 75 76 total = sample_count*channel_count*sample_size 74 total = number_of_samples*number_of_channels*sample_size 77 75 78 76 byte_string = self.ammos_file.read(total) 79 77 80 78 if len(byte_string) != total: 81 print("Can not read all", total, "bytes of data body")79 logging.debug("Can not read all %s bytes of data body", total) 82 80 return None 83 # print([hex(c) for c in byte_string]) 84 return AmmosAudioDataBlock(byte_string, channel_count, sample_count, sample_size) 81 return AmmosAudioDataBlock(byte_string, number_of_channels, number_of_samples, sample_size) 85 82 86 83 def read_next_global_frame_body(self, data_header_length): … … 92 89 audio_data_header = None 93 90 94 if data_header_length == AmmosAudio Reader.STANDARD_AUDIO_DATA_HEADER_SIZE:95 print("Read standard data header")91 if data_header_length == AmmosAudioDataHeader.HEADER_SIZE: 92 logging.info("Read standard data header") 96 93 audio_data_header = self.read_next_global_frame_body_data_header() 97 94 98 print("Data header length", data_header_length)99 if data_header_length == Ammos AudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE:100 print("Read extended data header")95 logging.info("Data header length %s", data_header_length) 96 if data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE: 97 logging.info("Read extended data header") 101 98 audio_data_header = self.read_next_global_frame_body_extended_data_header() 102 99 103 100 if audio_data_header is None: 104 print("Data header missing or format unknown")101 logging.debug("Data header missing or format unknown") 105 102 return None 106 103 107 audio_data_body = self.read_next_audio_data_body(audio_data_header. sample_count,108 audio_data_header. channel_count,104 audio_data_body = self.read_next_audio_data_body(audio_data_header.number_of_samples, 105 audio_data_header.number_of_channels, 109 106 audio_data_header.sample_size) 110 107 111 108 if audio_data_body is None: 112 print("Data body missing")109 logging.debug("Data body missing") 113 110 return None 114 111 … … 124 121 :rtype: bytes 125 122 """ 126 for each in self.container.global_frames:127 print(each.global_frame_body)128 123 return b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) for each in self.container.global_frames]) -
ammosreader/AmmosConstants.py
r87b2e39 rd7ea525 42 42 DDF_RESERVED_START = 0x5000 43 43 DDF_RESERVED_END = 0x50FF 44 45 46 class AmmosAudioDemodType(int, Enum): 47 """I map numbers to human readable demodulation types.""" 48 49 FM = 0 50 AM = 1 51 ISB = 5 52 CW = 6 53 USB = 7 54 LSB = 8 55 DIGITAL = 256 56 UNKNOWN = 0xFFFFFFFF -
ammosreader/AmmosExtendedAudioDataHeader.py
r87b2e39 rd7ea525 1 """I provide an Ammos extended data header for audio data frames.""" 2 1 3 import struct 2 import numpy as np3 4 from ammosreader.AmmosAudioDataHeader import AmmosAudioDataHeader 4 5 … … 6 7 class AmmosExtendedAudioDataHeader(): 7 8 9 HEADER_SIZE = 44 # 11 words 10 8 11 @classmethod 9 12 def from_bytes(cls, bytes): 10 standard_header = AmmosAudioDataHeader.from_bytes(bytes[0:36]) 11 extended_header_elements = struct.unpack('Q', bytes[36:]) 13 """I return a new AMMOS extended data header for audio frames built from given bytes.""" 14 assert len(bytes) == cls.HEADER_SIZE 15 standard_header = AmmosAudioDataHeader.from_bytes(bytes[0:AmmosAudioDataHeader.HEADER_SIZE]) 16 extended_header_elements = struct.unpack('Q', bytes[AmmosAudioDataHeader.HEADER_SIZE:]) 12 17 timestamp = extended_header_elements[0] 13 18 sample_rate = standard_header.sample_rate … … 16 21 demod_bandwidth = standard_header.demod_bandwidth 17 22 demod_type = standard_header.demod_type 18 sample_count = standard_header.sample_count19 channel_count = standard_header.channel_count23 number_of_samples = standard_header.number_of_samples 24 number_of_channels = standard_header.number_of_channels 20 25 sample_size = standard_header.sample_size 21 26 return AmmosExtendedAudioDataHeader(sample_rate, status, frequency, demod_bandwidth, demod_type, 22 sample_count, channel_count, sample_size, timestamp)27 number_of_samples, number_of_channels, sample_size, timestamp) 23 28 24 29 def __init__(self, sample_rate, status, frequency, demod_bandwidth, demod_type, 25 sample_count, channel_count, sample_size, timestamp): 30 number_of_samples, number_of_channels, sample_size, timestamp): 31 """I return a new AMMOS extended data header for audio frames built from given parameters.""" 26 32 self.sample_rate = sample_rate 27 33 self.status = status … … 29 35 self.demod_bandwidth = demod_bandwidth 30 36 self.demod_type = demod_type 31 self. sample_count = sample_count32 self. channel_count = channel_count37 self.number_of_samples = number_of_samples 38 self.number_of_channels = number_of_channels 33 39 self.sample_size = sample_size 34 40 self.timestamp = timestamp -
ammosreader/AmmosExtendedIFDataHeader.py
r87b2e39 rd7ea525 5 5 6 6 class AmmosExtendedIFDataHeader(): 7 """I implement an Ammos extended data header for IF data frames.""" 8 9 HEADER_SIZE = 76 7 10 8 11 @classmethod 9 12 def from_bytes(cls, bytes): 10 standard_header = AmmosIFDataHeader.from_bytes(bytes[0:56]) 11 extended_header_elements = struct.unpack('QQI', bytes[56:76]) 13 """I return an AMMOS extended data header from given bytes.""" 14 standard_header = AmmosIFDataHeader.from_bytes(bytes[0:AmmosIFDataHeader.HEADER_SIZE]) 15 extended_header_elements = struct.unpack('QQI', bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE]) 12 16 block_count = standard_header.block_count 13 17 block_length = standard_header.block_length … … 34 38 bandwidth, sample_rate, interpolation, decimation, voltage_ref, stream_start, sample_counter, 35 39 antenna_correction): 36 40 """I create a new instance of myself using the above parameters.""" 37 41 self.size = size 38 42 self.block_count = block_count … … 64 68 "Stream start:" + str(self.stream_start) + "\n" + 65 69 "Sample counter:" + str(self.sample_counter) + "\n" + 66 "Antenna correction:" + str(self.antenna_correction) + "\n" 67 ) 68 70 "Antenna correction:" + str(self.antenna_correction) + "\n") 69 71 return output -
ammosreader/AmmosIFDataBlock.py
r87b2e39 rd7ea525 1 """I provide an AMMOS data block for IF data frames.""" 2 3 1 4 class AmmosIFDataBlock(): 5 """I implement an AMMOS data block for IF data frames.""" 2 6 3 def __init__(self, if_datablock_header, if_data): 4 self.if_datablock_header = if_datablock_header 5 self.if_data = if_data 7 def __init__(self, if_datablock_header, if_data_body): 8 """I return a new AMMOS data block for IF data frames.""" 9 self.__header = if_datablock_header 10 self.__body = if_data_body 11 12 @property 13 def header(self): 14 """I return my data block header.""" 15 return self.__header 16 17 @property 18 def body(self): 19 """I return the raw pcm data with channels interweaved.""" 20 return self.__body -
ammosreader/AmmosIFDataHeader.py
r87b2e39 rd7ea525 1 """I provide a Ammos data header for IF data frames.""" 2 1 3 import struct 2 4 import numpy as np … … 4 6 5 7 class AmmosIFDataHeader(): 8 """I implement a Ammos data header for IF data frames.""" 9 10 HEADER_SIZE = 56 6 11 7 12 @classmethod 8 13 def from_bytes(cls, bytes): 14 """I return an AMMOS data header from given bytes.""" 15 assert len(bytes) == cls.HEADER_SIZE 9 16 elements = struct.unpack('<IIQIIIQIIIIi', bytes) 10 17 block_count = elements[0] … … 27 34 def __init__(self, block_count, block_length, timestamp, status, source_id, source_state, frequency, 28 35 bandwidth, sample_rate, interpolation, decimation, voltage_ref): 36 """I create a new instance of myself using the above parameters.""" 29 37 self.block_count = block_count 30 38 self.block_length = block_length … … 40 48 self.voltage_ref = voltage_ref 41 49 42 def header_size(self):43 return 5644 45 50 def __str_(self): 46 51 output = ("\nGlobal frame body data header\n" +
Note:
See TracChangeset
for help on using the changeset viewer.