Changeset ef16c0b in ammosreader
- Timestamp:
- 06/29/22 11:01:47 (3 years ago)
- Branches:
- AmmosSource, guix
- Children:
- 87b2e39
- Parents:
- b41e975
- Location:
- ammosreader
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
ammosreader/AmmosAudioReader.py
rb41e975 ref16c0b 1 """I p arse an R&S AMMOS recording."""1 """I provide a specialized Ammos Reader for audio data.""" 2 2 3 import os 4 3 from ammosreader.AbstractAmmosReader import AbstractAmmosReader 5 4 from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody 6 5 from ammosreader.AmmosAudioDataHeader import AmmosAudioDataHeader 7 6 from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader 8 from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader9 from ammosreader.AmmosSingleFrame import AmmosSingleFrame10 from ammosreader.AmmosContainer import AmmosContainer11 7 from ammosreader.AmmosAudioDataBlock import AmmosAudioDataBlock 12 8 13 9 14 class AmmosAudioReader( ):10 class AmmosAudioReader(AbstractAmmosReader): 15 11 """I read the audio data embedded in an R&S AMMOS recording.""" 16 12 17 GLOBAL_HEADER_SIZE = 24 # 8 words18 13 STANDARD_AUDIO_DATA_HEADER_SIZE = 36 # 9 words 19 14 EXTENDED_AUDIO_DATA_HEADER_SIZE = 44 # 11 words … … 26 21 :type file_name: str 27 22 """ 28 self.file_name = file_name 29 self.file = open(self.file_name, "rb") 30 self.file_size = os.path.getsize(self.file_name) 31 32 self.container = AmmosContainer(self.file_name, []) 33 34 self.tags = {} 35 36 def rewind_to_start(self): 37 """I set the file pointer to the beginning of the file for the next operation.""" 38 self.file.seek(0) 39 40 def add_tag(self, tag): 41 """ 42 I add a tag to my tag list. 43 44 :param tag: The tag to add to my tag list 45 :type tag: dict 46 """ 47 self.tags[tag.key] = tag.value 48 49 def read_all_frames_left(self): 50 """ 51 I read all remaining frames into my container until end of file is reached. 52 53 :return: a container containing all frames read 54 :rtype: AmmosContainer 55 """ 56 frames_read = 0 57 while True: 58 print("Reading single frame", frames_read, '...') 59 current_frame = self.read_next_single_frame() 60 if current_frame is not None: 61 frames_read += 1 62 self.container.add_frame(current_frame) 63 if frames_read % 10000 == 0: 64 print("#", end="") 65 else: 66 print("Frame:", frames_read+1, " incomplete") 67 break 68 69 print(len(self.container.global_frames), "frames read") 70 return self.container 71 72 def read_next_global_frame_header(self): 73 """ 74 I return the next global frame header read from current position in file. 75 76 :return: the next global frame header or None if incomplete 77 :rtype: AmmosGlobalFrameHeader 78 """ 79 bytes = self.file.read(AmmosAudioReader.GLOBAL_HEADER_SIZE) 80 print("Reading next global frame header") 81 if ((not bytes) or (len(bytes) < AmmosAudioReader.GLOBAL_HEADER_SIZE)): 82 print("Can not read all", AmmosAudioReader.GLOBAL_HEADER_SIZE, "bytes of global frame header") 83 return None 84 85 # FIXME: Catch exceptions and add some asserts 86 current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(bytes) 87 # print("Current global frame header", current_global_frame_header) 88 return current_global_frame_header 23 super().__init__(file_name) 89 24 90 25 def read_next_global_frame_body_data_header(self): … … 97 32 :rtype: AmmosAudioDataHeader 98 33 """ 99 bytes = self. file.read(AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)34 bytes = self.ammos_file.read(AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE) 100 35 101 36 # print("\nReading global frame body standard data header\n") … … 113 48 :rtype: AmmosExtendedAudioDataHeader 114 49 """ 115 bytes = self. file.read(AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)50 bytes = self.ammos_file.read(AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE) 116 51 117 52 if ((not bytes) or (len(bytes) < AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)): … … 141 76 total = sample_count*channel_count*sample_size 142 77 143 byte_string = self. file.read(total)78 byte_string = self.ammos_file.read(total) 144 79 145 80 if len(byte_string) != total: … … 149 84 return AmmosAudioDataBlock(byte_string, channel_count, sample_count, sample_size) 150 85 151 def read_next_global_frame_body(self, global_frame_header): 86 def read_next_global_frame_body(self, data_header_length): 87 """ 88 I return the next global frame body read from current position in file. 152 89 90 :param global_frame_header: 91 """ 153 92 audio_data_header = None 154 93 155 if global_frame_header.data_header_length == AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE:94 if data_header_length == AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE: 156 95 print("Read standard data header") 157 96 audio_data_header = self.read_next_global_frame_body_data_header() 158 97 159 if global_frame_header.data_header_length == AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE: 98 print("Data header length", data_header_length) 99 if data_header_length == AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE: 160 100 print("Read extended data header") 161 101 audio_data_header = self.read_next_global_frame_body_extended_data_header() … … 175 115 return AmmosGlobalFrameBody(audio_data_header, audio_data_body) 176 116 177 def read_next_single_frame(self): 117 def pcm_for_channel(self, a_channel): 118 """ 119 I return the raw pcm audio data for a given channel. 178 120 179 global_frame_header = self.read_next_global_frame_header() 121 :param a_channel: the channel I have to extract 122 :type a_channel: int 180 123 181 print(global_frame_header) 182 183 if global_frame_header is None: 184 print("Global frame header missing") 185 return None 186 187 if global_frame_header.data_header_length is None: 188 print("Data header length empty") 189 return None 190 191 if global_frame_header.frame_type == 256: 192 print("Audio Datastream found") 193 global_frame_body = self.read_next_global_frame_body(global_frame_header) 194 if global_frame_body is None: 195 return None 196 else: 197 print("Unsupported frame type", global_frame_header.frame_type, "found") 198 return None 199 200 ammos_single_frame = AmmosSingleFrame(global_frame_header, global_frame_body) 201 return ammos_single_frame 202 203 def pcm_for_channel(self, a_channel): 124 :rtype: bytes 125 """ 126 for each in self.container.global_frames: 127 print(each.global_frame_body) 204 128 return b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) for each in self.container.global_frames]) -
ammosreader/AmmosIFReader.py
rb41e975 ref16c0b 1 import math 2 import os 1 """I provide a specialized Ammos Reader for IF data.""" 3 2 4 3 from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody … … 13 12 class AmmosIFReader(): 14 13 14 STANDARD_IF_DATA_HEADER_SIZE = 56 15 EXTENDED_IF_DATA_HEADER_SIZE = 76 16 15 17 def __init__(self, file_name): 16 17 self.file_name = file_name 18 self.file = open(self.file_name, "rb") 19 20 self.container = AmmosContainer(self.file_name, []) 21 22 self.tags = [] 23 24 def add_tag(self, tag): 25 self.tags.append(tag) 26 27 def read_all_frames_left(self): 28 29 frames_read = 0 30 31 while True: 32 # print("Reading single frame", frames_read, '...') 33 current_frame = self.read_next_single_frame() 34 if current_frame is not None: 35 frames_read += 1 36 self.container.add_frame(current_frame) 37 # if frames_read % 10000 == 0: 38 # print("#", end="") 39 else: 40 # print("Frame:", frames_read+1, " incomplete") 41 break 42 43 # print(len(self.container.global_frames), "frames read") 44 45 def read_next_global_frame_header(self): 46 bytes = self.file.read(24) 47 # print("Reading next global frame header") 48 if ((not bytes) or (len(bytes) < 24)): 49 # print("Can not read all 24 bytes of global frame header") 50 return None 51 52 return AmmosGlobalFrameHeader.from_bytes(bytes) 18 super.__init__(file_name) 53 19 54 20 def read_next_global_frame_body_data_header(self): 55 21 56 bytes = self. file.read(56)22 bytes = self.ammos_file.read(AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE) 57 23 58 24 # print("\nReading global frame body standard data header\n") 59 if ((not bytes) or (len(bytes) < 56)):25 if ((not bytes) or (len(bytes) < AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE)): 60 26 # print("Can not read all 56 bytes of global frame body data header") 61 27 return None … … 67 33 def read_next_global_frame_body_extended_data_header(self): 68 34 69 bytes = self. file.read(76)35 bytes = self.ammos_file.read(AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE) 70 36 # print("\nReading global frame body extended data header\n") 71 37 72 if ((not bytes) or (len(bytes) < 76)):38 if ((not bytes) or (len(bytes) < AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE)): 73 39 # print("Can not read all ", 76, "bytes of global frame extended data header") 74 40 return None … … 87 53 total = n*block_length 88 54 89 byte_string = self. file.read(block_length)55 byte_string = self.ammos_file.read(block_length) 90 56 91 57 if len(byte_string) != total: … … 99 65 return data_blocks 100 66 101 def read_next_global_frame_body(self, global_frame_header):67 def read_next_global_frame_body(self, data_header_length): 102 68 103 69 if_data_header = None 104 70 105 if global_frame_header.data_header_length == 56:71 if data_header_length == AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE: 106 72 if_data_header = self.read_next_global_frame_body_data_header() 107 else:73 if data_header_length == AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE: 108 74 if_data_header = self.read_next_global_frame_body_extended_data_header() 109 75 … … 119 85 120 86 return AmmosGlobalFrameBody(if_data_header, if_data_body) 121 122 def read_next_single_frame(self):123 124 global_frame_header = self.read_next_global_frame_header()125 126 # print("\nReading next global frame header\n", global_frame_header)127 # print("File pointer", self.file.tell())128 129 if global_frame_header is None:130 # print("Global frame header missing")131 return None132 133 if global_frame_header.data_header_length is None:134 # print("Data header length empty")135 return None136 137 if global_frame_header.frame_type == 2:138 139 global_frame_body = self.read_next_global_frame_body(global_frame_header)140 if global_frame_body is None:141 return None142 143 else:144 # print("Unsupported frame type", global_frame_header.frame_type, "found")145 return None146 147 return AmmosSingleFrame(global_frame_header, global_frame_body) -
ammosreader/CAConstants.py
rb41e975 ref16c0b 1 1 """I provide several constants used in R&S software.""" 2 2 3 import Enum3 from enum import Enum 4 4 5 5 … … 40 40 SCAN_TUNING = 0x4002 41 41 SCAN_LEVEL_TUNING = 0x4003 42 DDF_RESERVED_RANGE = range(0x5000, 0x5100) 42 DDF_RESERVED_START = 0x5000 43 DDF_RESERVED_END = 0x50FF
Note:
See TracChangeset
for help on using the changeset viewer.