- Timestamp:
- 05/20/23 20:56:15 (2 years ago)
- Branches:
- AmmosSource, guix
- Children:
- f3b78e8
- Parents:
- 4eb8a2f
- Location:
- doc/ammosreader
- Files:
-
- 16 edited
Legend:
- Unmodified
- Added
- Removed
-
doc/ammosreader/AbstractAmmosReader.html
r4eb8a2f r7752d1f 37 37 from ammosreader.AmmosConstants import FrameType 38 38 from ammosreader import logger 39 from ammosreader import logger 39 40 40 41 41 class AbstractAmmosReader(ABC): … … 114 114 header_size = AmmosGlobalFrameHeader.HEADER_SIZE 115 115 116 bytes = self.ammos_file.read(header_size)116 in_bytes = self.ammos_file.read(header_size) 117 117 logger.info("Reading next global frame header") 118 if ((not bytes) or (len(bytes) < header_size)):118 if ((not in_bytes) or (len(in_bytes) < header_size)): 119 119 logger.debug("Can not read all %s bytes of global frame header", header_size) 120 120 return None 121 121 122 current_global_frame_header = AmmosGlobalFrameHeader.from_bytes( bytes)122 current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(in_bytes) 123 123 124 124 if current_global_frame_header is None: … … 286 286 header_size = AmmosGlobalFrameHeader.HEADER_SIZE 287 287 288 bytes = self.ammos_file.read(header_size)288 in_bytes = self.ammos_file.read(header_size) 289 289 logger.info("Reading next global frame header") 290 if ((not bytes) or (len(bytes) < header_size)):290 if ((not in_bytes) or (len(in_bytes) < header_size)): 291 291 logger.debug("Can not read all %s bytes of global frame header", header_size) 292 292 return None 293 293 294 current_global_frame_header = AmmosGlobalFrameHeader.from_bytes( bytes)294 current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(in_bytes) 295 295 296 296 if current_global_frame_header is None: … … 507 507 header_size = AmmosGlobalFrameHeader.HEADER_SIZE 508 508 509 bytes = self.ammos_file.read(header_size)509 in_bytes = self.ammos_file.read(header_size) 510 510 logger.info("Reading next global frame header") 511 if ((not bytes) or (len(bytes) < header_size)):511 if ((not in_bytes) or (len(in_bytes) < header_size)): 512 512 logger.debug("Can not read all %s bytes of global frame header", header_size) 513 513 return None 514 514 515 current_global_frame_header = AmmosGlobalFrameHeader.from_bytes( bytes)515 current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(in_bytes) 516 516 517 517 if current_global_frame_header is None: -
doc/ammosreader/AmmosAudioDataHeader.html
r4eb8a2f r7752d1f 40 40 41 41 @classmethod 42 def from_bytes(cls, bytes):42 def from_bytes(cls, in_bytes): 43 43 """I return an AMMOS data header from given bytes.""" 44 assert len( bytes) == cls.HEADER_SIZE45 elements = struct.unpack('<IIQIIIII', bytes)44 assert len(in_bytes) == cls.HEADER_SIZE 45 elements = struct.unpack('<IIQIIIII', in_bytes) 46 46 sample_rate = elements[0] 47 47 status = elements[1] … … 106 106 107 107 @classmethod 108 def from_bytes(cls, bytes):108 def from_bytes(cls, in_bytes): 109 109 """I return an AMMOS data header from given bytes.""" 110 assert len( bytes) == cls.HEADER_SIZE111 elements = struct.unpack('<IIQIIIII', bytes)110 assert len(in_bytes) == cls.HEADER_SIZE 111 elements = struct.unpack('<IIQIIIII', in_bytes) 112 112 sample_rate = elements[0] 113 113 status = elements[1] … … 155 155 <dl> 156 156 <dt id="ammosreader.AmmosAudioDataHeader.AmmosAudioDataHeader.from_bytes"><code class="name flex"> 157 <span>def <span class="ident">from_bytes</span></span>(<span> bytes)</span>157 <span>def <span class="ident">from_bytes</span></span>(<span>in_bytes)</span> 158 158 </code></dt> 159 159 <dd> … … 164 164 </summary> 165 165 <pre><code class="python">@classmethod 166 def from_bytes(cls, bytes):166 def from_bytes(cls, in_bytes): 167 167 """I return an AMMOS data header from given bytes.""" 168 assert len( bytes) == cls.HEADER_SIZE169 elements = struct.unpack('<IIQIIIII', bytes)168 assert len(in_bytes) == cls.HEADER_SIZE 169 elements = struct.unpack('<IIQIIIII', in_bytes) 170 170 sample_rate = elements[0] 171 171 status = elements[1] -
doc/ammosreader/AmmosAudioReader.html
r4eb8a2f r7752d1f 37 37 from ammosreader import logger 38 38 39 39 40 class AmmosAudioReader(AbstractAmmosReader): 40 41 """I read the audio data embedded in an R&S AMMOS recording.""" 41 42 def __init__(self, file_name):43 """44 I return an instance of AmmosAudioReader initialized with a given file name.45 46 :param file_name: the file to read from47 :type file_name: str48 """49 super().__init__(file_name)50 42 51 43 def read_next_global_frame_body_data_header(self): … … 60 52 header_size = AmmosAudioDataHeader.HEADER_SIZE 61 53 62 bytes = self.ammos_file.read(header_size)54 in_bytes = self.ammos_file.read(header_size) 63 55 64 56 logger.info("\nReading global frame body standard data header\n") 65 if ((not bytes) or (len(bytes) < header_size)):57 if ((not in_bytes) or (len(in_bytes) < header_size)): 66 58 logger.debug("Can not read all %s bytes of global frame body data header", header_size) 67 59 return None 68 return AmmosAudioDataHeader.from_bytes( bytes)60 return AmmosAudioDataHeader.from_bytes(in_bytes) 69 61 70 62 def read_next_global_frame_body_extended_data_header(self): … … 77 69 header_size = AmmosExtendedAudioDataHeader.HEADER_SIZE 78 70 79 bytes = self.ammos_file.read(header_size)80 81 if ((not bytes) or (len(bytes) < header_size)):71 in_bytes = self.ammos_file.read(header_size) 72 73 if ((not in_bytes) or (len(in_bytes) < header_size)): 82 74 logger.debug("Can not read all %s bytes of global frame extended data header", header_size) 83 75 return None 84 return AmmosExtendedAudioDataHeader.from_bytes( bytes)76 return AmmosExtendedAudioDataHeader.from_bytes(in_bytes) 85 77 86 78 def read_next_audio_data_body(self, number_of_samples, number_of_channels, sample_size): … … 150 142 :rtype: bytes 151 143 """ 152 return b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) for each in self.container.global_frames])</code></pre> 144 return (b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) 145 for each in self.container.global_frames]))</code></pre> 153 146 </details> 154 147 </section> … … 168 161 <dd> 169 162 <div class="desc"><p>I read the audio data embedded in an R&S AMMOS recording.</p> 170 <p>I return an instance of AmmosAudioReader initialized with a given file name.</p> 171 <p>:param file_name: the file to read from 163 <p>I am the standard constructor for Ammos Readers.</p> 164 <p>Additional information about the file can be added as key/value pairs in tags</p> 165 <p>:param file_name: The file to read Ammos data from 172 166 :type file_name: str</p></div> 173 167 <details class="source"> … … 177 171 <pre><code class="python">class AmmosAudioReader(AbstractAmmosReader): 178 172 """I read the audio data embedded in an R&S AMMOS recording.""" 179 180 def __init__(self, file_name):181 """182 I return an instance of AmmosAudioReader initialized with a given file name.183 184 :param file_name: the file to read from185 :type file_name: str186 """187 super().__init__(file_name)188 173 189 174 def read_next_global_frame_body_data_header(self): … … 198 183 header_size = AmmosAudioDataHeader.HEADER_SIZE 199 184 200 bytes = self.ammos_file.read(header_size)185 in_bytes = self.ammos_file.read(header_size) 201 186 202 187 logger.info("\nReading global frame body standard data header\n") 203 if ((not bytes) or (len(bytes) < header_size)):188 if ((not in_bytes) or (len(in_bytes) < header_size)): 204 189 logger.debug("Can not read all %s bytes of global frame body data header", header_size) 205 190 return None 206 return AmmosAudioDataHeader.from_bytes( bytes)191 return AmmosAudioDataHeader.from_bytes(in_bytes) 207 192 208 193 def read_next_global_frame_body_extended_data_header(self): … … 215 200 header_size = AmmosExtendedAudioDataHeader.HEADER_SIZE 216 201 217 bytes = self.ammos_file.read(header_size)218 219 if ((not bytes) or (len(bytes) < header_size)):202 in_bytes = self.ammos_file.read(header_size) 203 204 if ((not in_bytes) or (len(in_bytes) < header_size)): 220 205 logger.debug("Can not read all %s bytes of global frame extended data header", header_size) 221 206 return None 222 return AmmosExtendedAudioDataHeader.from_bytes( bytes)207 return AmmosExtendedAudioDataHeader.from_bytes(in_bytes) 223 208 224 209 def read_next_audio_data_body(self, number_of_samples, number_of_channels, sample_size): … … 288 273 :rtype: bytes 289 274 """ 290 return b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) for each in self.container.global_frames])</code></pre> 275 return (b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) 276 for each in self.container.global_frames]))</code></pre> 291 277 </details> 292 278 <h3>Ancestors</h3> … … 318 304 :rtype: bytes 319 305 """ 320 return b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) for each in self.container.global_frames])</code></pre> 306 return (b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) 307 for each in self.container.global_frames]))</code></pre> 321 308 </details> 322 309 </dd> … … 432 419 header_size = AmmosAudioDataHeader.HEADER_SIZE 433 420 434 bytes = self.ammos_file.read(header_size)421 in_bytes = self.ammos_file.read(header_size) 435 422 436 423 logger.info("\nReading global frame body standard data header\n") 437 if ((not bytes) or (len(bytes) < header_size)):424 if ((not in_bytes) or (len(in_bytes) < header_size)): 438 425 logger.debug("Can not read all %s bytes of global frame body data header", header_size) 439 426 return None 440 return AmmosAudioDataHeader.from_bytes( bytes)</code></pre>427 return AmmosAudioDataHeader.from_bytes(in_bytes)</code></pre> 441 428 </details> 442 429 </dd> … … 461 448 header_size = AmmosExtendedAudioDataHeader.HEADER_SIZE 462 449 463 bytes = self.ammos_file.read(header_size)464 465 if ((not bytes) or (len(bytes) < header_size)):450 in_bytes = self.ammos_file.read(header_size) 451 452 if ((not in_bytes) or (len(in_bytes) < header_size)): 466 453 logger.debug("Can not read all %s bytes of global frame extended data header", header_size) 467 454 return None 468 return AmmosExtendedAudioDataHeader.from_bytes( bytes)</code></pre>455 return AmmosExtendedAudioDataHeader.from_bytes(in_bytes)</code></pre> 469 456 </details> 470 457 </dd> -
doc/ammosreader/AmmosAudioSocketReader.html
r4eb8a2f r7752d1f 30 30 <pre><code class="python">"""I read a Ammos datastream from a socket.""" 31 31 32 import select32 # import select 33 33 import socket 34 34 from collections import deque … … 37 37 from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader 38 38 from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader 39 from ammosreader.AmmosConstants import AmmosAudioDemodType 39 40 from ammosreader import logger 40 41 42 41 43 class AmmosAudioSocketReader: 42 def __init__(self, socket:socket.socket): 44 """I read Ammos Audio data from a socket""" 45 46 def __init__(self, in_socket: socket.socket): 43 47 """ 44 48 Initializes the AmmosAudioSocketReader … … 48 52 """ 49 53 50 # buffer for reading socket bytewise und check for the magic word54 # buffer for reading socket bytewise und check for the magic word 51 55 self.__magic_word_buffer = deque(maxlen=4) 52 56 53 # input socket to read from54 self.__socket = socket57 # input socket to read from 58 self.__socket = in_socket 55 59 56 60 def __get_next_data(self, byte_count: int) -> bytearray: … … 69 73 70 74 byte_array = [] 71 75 72 76 while len(b''.join(byte_array)) < byte_count: 73 logger.info(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}") 77 logger.info("Remaining Bytes: %s", byte_count - len(b''.join(byte_array))) 78 # logger.info(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}") 74 79 self.__socket.settimeout(5) 75 80 new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL) … … 77 82 if not new_bytes: 78 83 raise TimeoutError("Socket timed out while reading data") 79 80 logger.info(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining")84 logger.info("Got %s bytes of %s remaining", len(new_bytes), byte_count - len(b''.join(byte_array))) 85 # logger.info(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} remaining") 81 86 82 87 byte_array.append(new_bytes) … … 84 89 return b''.join(byte_array) 85 90 86 def __audio_data_body_to_numpy(self, audio_data_body: bytearray) -> np.ndarray:91 def __audio_data_body_to_numpy(self, audio_data_body: bytearray) -> np.ndarray: 87 92 """ 88 93 converts the audio data body to a numpy array … … 101 106 102 107 Raises: 103 TimeoutError: Raise es TimeoutError if the socket does not serve data anymore108 TimeoutError: Raises TimeoutError if the socket does not serve data anymore 104 109 105 110 Returns: … … 113 118 # raise Exception if socket does not return anything 114 119 if len(new_byte) < 1: 115 raise TimeoutError 116 117 # read loop120 raise TimeoutError 121 122 # read loop 118 123 while new_byte: 119 124 # … … 121 126 byte_array = b''.join(self.__magic_word_buffer) 122 127 123 if byte_array.hex() == '726574fb':124 # print(byte_array.hex())128 if byte_array.hex() == AmmosGlobalFrameHeader.MAGIC_WORD: 129 # print(byte_array.hex()) 125 130 126 131 ammos_global_header_buffer = list(self.__magic_word_buffer) 127 132 ammos_global_header_buffer.append(self.__get_next_data(20)) 128 # while len(b''.join(ammos_global_header_buffer)) < 24:133 # while len(b''.join(ammos_global_header_buffer)) < 24: 129 134 # ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) 130 135 131 136 ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer)) 132 137 logger.info(ammos_global_header) 133 138 134 if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256: 135 byte_array_header = self.__get_next_data(44) 136 #while len(b''.join(byte_array_header)) < 44: 139 if (ammos_global_header.data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): 140 141 byte_array_header = self.__get_next_data(AmmosExtendedAudioDataHeader.HEADER_SIZE) 142 # while len(b''.join(byte_array_header)) < 44: 137 143 # byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header)))) 138 144 139 145 ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header) 140 logger.info(ammos_extended_audio_data_header.number_of_samples, ammos_extended_audio_data_header.number_of_channels, ammos_extended_audio_data_header.sample_size) 141 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 142 ammos_extended_audio_data_header.number_of_channels* 146 logger.debug(str(ammos_extended_audio_data_header.number_of_samples or ''), 147 str(ammos_extended_audio_data_header.number_of_channels or ''), 148 str(ammos_extended_audio_data_header.sample_size or '')) 149 150 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * 151 ammos_extended_audio_data_header.number_of_channels * 143 152 ammos_extended_audio_data_header.sample_size) 144 153 145 154 audio_array = self.__audio_data_body_to_numpy(audio_body) 146 logger.info(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 155 # print("44,256", len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 156 logger.debug("%s, %s", len(audio_array), len(audio_array) / ammos_extended_audio_data_header.sample_rate) 147 157 148 158 return [audio_array, ammos_extended_audio_data_header.sample_rate] 149 159 150 elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:151 byte_array_header = self.__get_next_data( 36)152 # while len(b''.join(byte_array_header)) < 36:160 elif (ammos_global_header.data_header_length == AmmosAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): 161 byte_array_header = self.__get_next_data(AmmosAudioDataHeader.HEADER_SIZE) 162 # while len(b''.join(byte_array_header)) < 36: 153 163 # byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header)))) 154 164 155 165 ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header) 156 logger.info(ammos_audio_data_header.number_of_samples, ammos_audio_data_header.number_of_channels, ammos_audio_data_header.sample_size) 157 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 158 ammos_extended_audio_data_header.number_of_channels* 166 logger.debug(str(ammos_audio_data_header.number_of_samples or ''), 167 str(ammos_audio_data_header.number_of_channels or ''), 168 str(ammos_audio_data_header.sample_size or '')) 169 170 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * 171 ammos_extended_audio_data_header.number_of_channels * 159 172 ammos_extended_audio_data_header.sample_size) 160 173 161 174 audio_array = self.__audio_data_body_to_numpy(audio_body) 162 logger. info(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)175 logger.debug("%s, %s", len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 163 176 164 177 return [audio_array, ammos_audio_data_header.sample_rate] … … 170 183 # raise Exception if socket does not return anything 171 184 if len(new_byte) < 1: 172 raise TimeoutError 185 raise TimeoutError 173 186 174 187 return None</code></pre> … … 186 199 <dt id="ammosreader.AmmosAudioSocketReader.AmmosAudioSocketReader"><code class="flex name class"> 187 200 <span>class <span class="ident">AmmosAudioSocketReader</span></span> 188 <span>(</span><span> socket: socket.socket)</span>201 <span>(</span><span>in_socket: socket.socket)</span> 189 202 </code></dt> 190 203 <dd> 191 <div class="desc"><p>Initializes the AmmosAudioSocketReader</p> 204 <div class="desc"><p>I read Ammos Audio data from a socket</p> 205 <p>Initializes the AmmosAudioSocketReader</p> 192 206 <h2 id="args">Args</h2> 193 207 <dl> … … 200 214 </summary> 201 215 <pre><code class="python">class AmmosAudioSocketReader: 202 def __init__(self, socket:socket.socket): 216 """I read Ammos Audio data from a socket""" 217 218 def __init__(self, in_socket: socket.socket): 203 219 """ 204 220 Initializes the AmmosAudioSocketReader … … 208 224 """ 209 225 210 # buffer for reading socket bytewise und check for the magic word226 # buffer for reading socket bytewise und check for the magic word 211 227 self.__magic_word_buffer = deque(maxlen=4) 212 228 213 # input socket to read from214 self.__socket = socket229 # input socket to read from 230 self.__socket = in_socket 215 231 216 232 def __get_next_data(self, byte_count: int) -> bytearray: … … 229 245 230 246 byte_array = [] 231 247 232 248 while len(b''.join(byte_array)) < byte_count: 233 logger.info(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}") 249 logger.info("Remaining Bytes: %s", byte_count - len(b''.join(byte_array))) 250 # logger.info(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}") 234 251 self.__socket.settimeout(5) 235 252 new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL) … … 237 254 if not new_bytes: 238 255 raise TimeoutError("Socket timed out while reading data") 239 240 logger.info(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining")256 logger.info("Got %s bytes of %s remaining", len(new_bytes), byte_count - len(b''.join(byte_array))) 257 # logger.info(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} remaining") 241 258 242 259 byte_array.append(new_bytes) … … 244 261 return b''.join(byte_array) 245 262 246 def __audio_data_body_to_numpy(self, audio_data_body: bytearray) -> np.ndarray:263 def __audio_data_body_to_numpy(self, audio_data_body: bytearray) -> np.ndarray: 247 264 """ 248 265 converts the audio data body to a numpy array … … 261 278 262 279 Raises: 263 TimeoutError: Raise es TimeoutError if the socket does not serve data anymore280 TimeoutError: Raises TimeoutError if the socket does not serve data anymore 264 281 265 282 Returns: … … 273 290 # raise Exception if socket does not return anything 274 291 if len(new_byte) < 1: 275 raise TimeoutError 276 277 # read loop292 raise TimeoutError 293 294 # read loop 278 295 while new_byte: 279 296 # … … 281 298 byte_array = b''.join(self.__magic_word_buffer) 282 299 283 if byte_array.hex() == '726574fb':284 # print(byte_array.hex())300 if byte_array.hex() == AmmosGlobalFrameHeader.MAGIC_WORD: 301 # print(byte_array.hex()) 285 302 286 303 ammos_global_header_buffer = list(self.__magic_word_buffer) 287 304 ammos_global_header_buffer.append(self.__get_next_data(20)) 288 # while len(b''.join(ammos_global_header_buffer)) < 24:305 # while len(b''.join(ammos_global_header_buffer)) < 24: 289 306 # ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) 290 307 291 308 ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer)) 292 309 logger.info(ammos_global_header) 293 310 294 if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256: 295 byte_array_header = self.__get_next_data(44) 296 #while len(b''.join(byte_array_header)) < 44: 311 if (ammos_global_header.data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): 312 313 byte_array_header = self.__get_next_data(AmmosExtendedAudioDataHeader.HEADER_SIZE) 314 # while len(b''.join(byte_array_header)) < 44: 297 315 # byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header)))) 298 316 299 317 ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header) 300 logger.info(ammos_extended_audio_data_header.number_of_samples, ammos_extended_audio_data_header.number_of_channels, ammos_extended_audio_data_header.sample_size) 301 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 302 ammos_extended_audio_data_header.number_of_channels* 318 logger.debug(str(ammos_extended_audio_data_header.number_of_samples or ''), 319 str(ammos_extended_audio_data_header.number_of_channels or ''), 320 str(ammos_extended_audio_data_header.sample_size or '')) 321 322 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * 323 ammos_extended_audio_data_header.number_of_channels * 303 324 ammos_extended_audio_data_header.sample_size) 304 325 305 326 audio_array = self.__audio_data_body_to_numpy(audio_body) 306 logger.info(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 327 # print("44,256", len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 328 logger.debug("%s, %s", len(audio_array), len(audio_array) / ammos_extended_audio_data_header.sample_rate) 307 329 308 330 return [audio_array, ammos_extended_audio_data_header.sample_rate] 309 331 310 elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:311 byte_array_header = self.__get_next_data( 36)312 # while len(b''.join(byte_array_header)) < 36:332 elif (ammos_global_header.data_header_length == AmmosAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): 333 byte_array_header = self.__get_next_data(AmmosAudioDataHeader.HEADER_SIZE) 334 # while len(b''.join(byte_array_header)) < 36: 313 335 # byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header)))) 314 336 315 337 ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header) 316 logger.info(ammos_audio_data_header.number_of_samples, ammos_audio_data_header.number_of_channels, ammos_audio_data_header.sample_size) 317 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 318 ammos_extended_audio_data_header.number_of_channels* 338 logger.debug(str(ammos_audio_data_header.number_of_samples or ''), 339 str(ammos_audio_data_header.number_of_channels or ''), 340 str(ammos_audio_data_header.sample_size or '')) 341 342 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * 343 ammos_extended_audio_data_header.number_of_channels * 319 344 ammos_extended_audio_data_header.sample_size) 320 345 321 346 audio_array = self.__audio_data_body_to_numpy(audio_body) 322 logger. info(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)347 logger.debug("%s, %s", len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 323 348 324 349 return [audio_array, ammos_audio_data_header.sample_rate] … … 330 355 # raise Exception if socket does not return anything 331 356 if len(new_byte) < 1: 332 raise TimeoutError 357 raise TimeoutError 333 358 334 359 return None</code></pre> … … 344 369 <dl> 345 370 <dt><code>TimeoutError</code></dt> 346 <dd>Raise es TimeoutError if the socket does not serve data anymore</dd>371 <dd>Raises TimeoutError if the socket does not serve data anymore</dd> 347 372 </dl> 348 373 <h2 id="returns">Returns</h2> … … 359 384 360 385 Raises: 361 TimeoutError: Raise es TimeoutError if the socket does not serve data anymore386 TimeoutError: Raises TimeoutError if the socket does not serve data anymore 362 387 363 388 Returns: … … 371 396 # raise Exception if socket does not return anything 372 397 if len(new_byte) < 1: 373 raise TimeoutError 374 375 # read loop398 raise TimeoutError 399 400 # read loop 376 401 while new_byte: 377 402 # … … 379 404 byte_array = b''.join(self.__magic_word_buffer) 380 405 381 if byte_array.hex() == '726574fb':382 # print(byte_array.hex())406 if byte_array.hex() == AmmosGlobalFrameHeader.MAGIC_WORD: 407 # print(byte_array.hex()) 383 408 384 409 ammos_global_header_buffer = list(self.__magic_word_buffer) 385 410 ammos_global_header_buffer.append(self.__get_next_data(20)) 386 # while len(b''.join(ammos_global_header_buffer)) < 24:411 # while len(b''.join(ammos_global_header_buffer)) < 24: 387 412 # ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) 388 413 389 414 ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer)) 390 415 logger.info(ammos_global_header) 391 416 392 if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256: 393 byte_array_header = self.__get_next_data(44) 394 #while len(b''.join(byte_array_header)) < 44: 417 if (ammos_global_header.data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): 418 419 byte_array_header = self.__get_next_data(AmmosExtendedAudioDataHeader.HEADER_SIZE) 420 # while len(b''.join(byte_array_header)) < 44: 395 421 # byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header)))) 396 422 397 423 ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header) 398 logger.info(ammos_extended_audio_data_header.number_of_samples, ammos_extended_audio_data_header.number_of_channels, ammos_extended_audio_data_header.sample_size) 399 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 400 ammos_extended_audio_data_header.number_of_channels* 424 logger.debug(str(ammos_extended_audio_data_header.number_of_samples or ''), 425 str(ammos_extended_audio_data_header.number_of_channels or ''), 426 str(ammos_extended_audio_data_header.sample_size or '')) 427 428 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * 429 ammos_extended_audio_data_header.number_of_channels * 401 430 ammos_extended_audio_data_header.sample_size) 402 431 403 432 audio_array = self.__audio_data_body_to_numpy(audio_body) 404 logger.info(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 433 # print("44,256", len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 434 logger.debug("%s, %s", len(audio_array), len(audio_array) / ammos_extended_audio_data_header.sample_rate) 405 435 406 436 return [audio_array, ammos_extended_audio_data_header.sample_rate] 407 437 408 elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:409 byte_array_header = self.__get_next_data( 36)410 # while len(b''.join(byte_array_header)) < 36:438 elif (ammos_global_header.data_header_length == AmmosAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): 439 byte_array_header = self.__get_next_data(AmmosAudioDataHeader.HEADER_SIZE) 440 # while len(b''.join(byte_array_header)) < 36: 411 441 # byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header)))) 412 442 413 443 ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header) 414 logger.info(ammos_audio_data_header.number_of_samples, ammos_audio_data_header.number_of_channels, ammos_audio_data_header.sample_size) 415 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 416 ammos_extended_audio_data_header.number_of_channels* 444 logger.debug(str(ammos_audio_data_header.number_of_samples or ''), 445 str(ammos_audio_data_header.number_of_channels or ''), 446 str(ammos_audio_data_header.sample_size or '')) 447 448 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * 449 ammos_extended_audio_data_header.number_of_channels * 417 450 ammos_extended_audio_data_header.sample_size) 418 451 419 452 audio_array = self.__audio_data_body_to_numpy(audio_body) 420 logger. info(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)453 logger.debug("%s, %s", len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 421 454 422 455 return [audio_array, ammos_audio_data_header.sample_rate] … … 428 461 # raise Exception if socket does not return anything 429 462 if len(new_byte) < 1: 430 raise TimeoutError 463 raise TimeoutError 431 464 432 465 return None</code></pre> -
doc/ammosreader/AmmosContainer.html
r4eb8a2f r7752d1f 6 6 <meta name="generator" content="pdoc 0.10.0" /> 7 7 <title>ammosreader.AmmosContainer API documentation</title> 8 <meta name="description" content="I store the content of an R&S Ammos file in a more accessible way." />8 <meta name="description" content="I provide an Ammos container." /> 9 9 <link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/sanitize.min.css" integrity="sha256-PK9q560IAAa6WVRRh76LtCaI8pjTJ2z11v0miyNNjrs=" crossorigin> 10 10 <link rel="preload stylesheet" as="style" href="https://cdnjs.cloudflare.com/ajax/libs/10up-sanitize.css/11.0.1/typography.min.css" integrity="sha256-7l/o7C8jubJiy74VsKTidCy1yBkRtiUGbVkYBylBqUg=" crossorigin> … … 23 23 </header> 24 24 <section id="section-intro"> 25 <p>I store the content of an R&S Ammos file in a more accessible way.</p> 26 <details class="source"> 27 <summary> 28 <span>Expand source code</span> 29 </summary> 30 <pre><code class="python">"""I store the content of an R&S Ammos file in a more accessible way.""" 25 <p>I provide an Ammos container.</p> 26 <details class="source"> 27 <summary> 28 <span>Expand source code</span> 29 </summary> 30 <pre><code class="python">"""I provide an Ammos container.""" 31 31 32 32 33 class AmmosContainer(): 33 34 """I store the content of an R&S Ammos file in a more accessible way.""" 34 35 def __init__(self, name, frames): 35 36 self.__name = name … … 56 57 57 58 def size(self): 58 return sum([each.global_frame_header.frame_length for each in self.__global_frames]) 59 # return sum([each.global_frame_header.frame_length for each in self.__global_frames]) 60 return sum(each.global_frame_header.frame_length for each in self.__global_frames) 59 61 60 62 def frequencies(self): … … 101 103 </code></dt> 102 104 <dd> 103 <div class="desc">< /div>105 <div class="desc"><p>I store the content of an R&S Ammos file in a more accessible way.</p></div> 104 106 <details class="source"> 105 107 <summary> … … 107 109 </summary> 108 110 <pre><code class="python">class AmmosContainer(): 109 111 """I store the content of an R&S Ammos file in a more accessible way.""" 110 112 def __init__(self, name, frames): 111 113 self.__name = name … … 132 134 133 135 def size(self): 134 return sum([each.global_frame_header.frame_length for each in self.__global_frames]) 136 # return sum([each.global_frame_header.frame_length for each in self.__global_frames]) 137 return sum(each.global_frame_header.frame_length for each in self.__global_frames) 135 138 136 139 def frequencies(self): … … 293 296 </summary> 294 297 <pre><code class="python">def size(self): 295 return sum([each.global_frame_header.frame_length for each in self.__global_frames])</code></pre> 298 # return sum([each.global_frame_header.frame_length for each in self.__global_frames]) 299 return sum(each.global_frame_header.frame_length for each in self.__global_frames)</code></pre> 296 300 </details> 297 301 </dd> -
doc/ammosreader/AmmosExtendedAudioDataHeader.html
r4eb8a2f r7752d1f 39 39 40 40 @classmethod 41 def from_bytes(cls, bytes):41 def from_bytes(cls, in_bytes): 42 42 """I return a new AMMOS extended data header for audio frames built from given bytes.""" 43 assert len( bytes) == cls.HEADER_SIZE44 standard_header = AmmosAudioDataHeader.from_bytes( bytes[0:AmmosAudioDataHeader.HEADER_SIZE])45 extended_header_elements = struct.unpack('<Q', bytes[AmmosAudioDataHeader.HEADER_SIZE:])43 assert len(in_bytes) == cls.HEADER_SIZE 44 standard_header = AmmosAudioDataHeader.from_bytes(in_bytes[0:AmmosAudioDataHeader.HEADER_SIZE]) 45 extended_header_elements = struct.unpack('<Q', in_bytes[AmmosAudioDataHeader.HEADER_SIZE:]) 46 46 timestamp = extended_header_elements[0] 47 47 sample_rate = standard_header.sample_rate … … 94 94 95 95 @classmethod 96 def from_bytes(cls, bytes):96 def from_bytes(cls, in_bytes): 97 97 """I return a new AMMOS extended data header for audio frames built from given bytes.""" 98 assert len( bytes) == cls.HEADER_SIZE99 standard_header = AmmosAudioDataHeader.from_bytes( bytes[0:AmmosAudioDataHeader.HEADER_SIZE])100 extended_header_elements = struct.unpack('<Q', bytes[AmmosAudioDataHeader.HEADER_SIZE:])98 assert len(in_bytes) == cls.HEADER_SIZE 99 standard_header = AmmosAudioDataHeader.from_bytes(in_bytes[0:AmmosAudioDataHeader.HEADER_SIZE]) 100 extended_header_elements = struct.unpack('<Q', in_bytes[AmmosAudioDataHeader.HEADER_SIZE:]) 101 101 timestamp = extended_header_elements[0] 102 102 sample_rate = standard_header.sample_rate … … 134 134 <dl> 135 135 <dt id="ammosreader.AmmosExtendedAudioDataHeader.AmmosExtendedAudioDataHeader.from_bytes"><code class="name flex"> 136 <span>def <span class="ident">from_bytes</span></span>(<span> bytes)</span>136 <span>def <span class="ident">from_bytes</span></span>(<span>in_bytes)</span> 137 137 </code></dt> 138 138 <dd> … … 143 143 </summary> 144 144 <pre><code class="python">@classmethod 145 def from_bytes(cls, bytes):145 def from_bytes(cls, in_bytes): 146 146 """I return a new AMMOS extended data header for audio frames built from given bytes.""" 147 assert len( bytes) == cls.HEADER_SIZE148 standard_header = AmmosAudioDataHeader.from_bytes( bytes[0:AmmosAudioDataHeader.HEADER_SIZE])149 extended_header_elements = struct.unpack('<Q', bytes[AmmosAudioDataHeader.HEADER_SIZE:])147 assert len(in_bytes) == cls.HEADER_SIZE 148 standard_header = AmmosAudioDataHeader.from_bytes(in_bytes[0:AmmosAudioDataHeader.HEADER_SIZE]) 149 extended_header_elements = struct.unpack('<Q', in_bytes[AmmosAudioDataHeader.HEADER_SIZE:]) 150 150 timestamp = extended_header_elements[0] 151 151 sample_rate = standard_header.sample_rate -
doc/ammosreader/AmmosExtendedIFDataHeader.html
r4eb8a2f r7752d1f 38 38 39 39 @classmethod 40 def from_bytes(cls, bytes):40 def from_bytes(cls, in_bytes): 41 41 """I return an AMMOS extended data header from given bytes.""" 42 standard_header = AmmosIFDataHeader.from_bytes( bytes[0:AmmosIFDataHeader.HEADER_SIZE])43 extended_header_elements = struct.unpack('<QQI', bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE])42 standard_header = AmmosIFDataHeader.from_bytes(in_bytes[0:AmmosIFDataHeader.HEADER_SIZE]) 43 extended_header_elements = struct.unpack('<QQI', in_bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE]) 44 44 block_count = standard_header.block_count 45 45 block_length = standard_header.block_length … … 57 57 sample_counter = extended_header_elements[1] 58 58 antenna_correction = extended_header_elements[2] 59 size = len( bytes)59 size = len(in_bytes) 60 60 return AmmosExtendedIFDataHeader(size, block_count, block_length, timestamp, status, source_id, 61 61 source_state, frequency, bandwidth, sample_rate, interpolation, … … 126 126 127 127 @classmethod 128 def from_bytes(cls, bytes):128 def from_bytes(cls, in_bytes): 129 129 """I return an AMMOS extended data header from given bytes.""" 130 standard_header = AmmosIFDataHeader.from_bytes( bytes[0:AmmosIFDataHeader.HEADER_SIZE])131 extended_header_elements = struct.unpack('<QQI', bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE])130 standard_header = AmmosIFDataHeader.from_bytes(in_bytes[0:AmmosIFDataHeader.HEADER_SIZE]) 131 extended_header_elements = struct.unpack('<QQI', in_bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE]) 132 132 block_count = standard_header.block_count 133 133 block_length = standard_header.block_length … … 145 145 sample_counter = extended_header_elements[1] 146 146 antenna_correction = extended_header_elements[2] 147 size = len( bytes)147 size = len(in_bytes) 148 148 return AmmosExtendedIFDataHeader(size, block_count, block_length, timestamp, status, source_id, 149 149 source_state, frequency, bandwidth, sample_rate, interpolation, … … 197 197 <dl> 198 198 <dt id="ammosreader.AmmosExtendedIFDataHeader.AmmosExtendedIFDataHeader.from_bytes"><code class="name flex"> 199 <span>def <span class="ident">from_bytes</span></span>(<span> bytes)</span>199 <span>def <span class="ident">from_bytes</span></span>(<span>in_bytes)</span> 200 200 </code></dt> 201 201 <dd> … … 206 206 </summary> 207 207 <pre><code class="python">@classmethod 208 def from_bytes(cls, bytes):208 def from_bytes(cls, in_bytes): 209 209 """I return an AMMOS extended data header from given bytes.""" 210 standard_header = AmmosIFDataHeader.from_bytes( bytes[0:AmmosIFDataHeader.HEADER_SIZE])211 extended_header_elements = struct.unpack('<QQI', bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE])210 standard_header = AmmosIFDataHeader.from_bytes(in_bytes[0:AmmosIFDataHeader.HEADER_SIZE]) 211 extended_header_elements = struct.unpack('<QQI', in_bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE]) 212 212 block_count = standard_header.block_count 213 213 block_length = standard_header.block_length … … 225 225 sample_counter = extended_header_elements[1] 226 226 antenna_correction = extended_header_elements[2] 227 size = len( bytes)227 size = len(in_bytes) 228 228 return AmmosExtendedIFDataHeader(size, block_count, block_length, timestamp, status, source_id, 229 229 source_state, frequency, bandwidth, sample_rate, interpolation, -
doc/ammosreader/AmmosGlobalFrameHeader.html
r4eb8a2f r7752d1f 39 39 40 40 @classmethod 41 def from_bytes(cls, bytes):41 def from_bytes(cls, in_bytes): 42 42 """I create a new AmmosGlobalFrameHeader from bytes.""" 43 assert len( bytes) == cls.HEADER_SIZE44 45 elements = struct.unpack('<4s4s4s4s4s4s', bytes)43 assert len(in_bytes) == cls.HEADER_SIZE 44 45 elements = struct.unpack('<4s4s4s4s4s4s', in_bytes) 46 46 47 47 magic_word = elements[0].hex() … … 116 116 117 117 @classmethod 118 def from_bytes(cls, bytes):118 def from_bytes(cls, in_bytes): 119 119 """I create a new AmmosGlobalFrameHeader from bytes.""" 120 assert len( bytes) == cls.HEADER_SIZE121 122 elements = struct.unpack('<4s4s4s4s4s4s', bytes)120 assert len(in_bytes) == cls.HEADER_SIZE 121 122 elements = struct.unpack('<4s4s4s4s4s4s', in_bytes) 123 123 124 124 magic_word = elements[0].hex() … … 179 179 <dl> 180 180 <dt id="ammosreader.AmmosGlobalFrameHeader.AmmosGlobalFrameHeader.from_bytes"><code class="name flex"> 181 <span>def <span class="ident">from_bytes</span></span>(<span> bytes)</span>181 <span>def <span class="ident">from_bytes</span></span>(<span>in_bytes)</span> 182 182 </code></dt> 183 183 <dd> … … 188 188 </summary> 189 189 <pre><code class="python">@classmethod 190 def from_bytes(cls, bytes):190 def from_bytes(cls, in_bytes): 191 191 """I create a new AmmosGlobalFrameHeader from bytes.""" 192 assert len( bytes) == cls.HEADER_SIZE193 194 elements = struct.unpack('<4s4s4s4s4s4s', bytes)192 assert len(in_bytes) == cls.HEADER_SIZE 193 194 elements = struct.unpack('<4s4s4s4s4s4s', in_bytes) 195 195 196 196 magic_word = elements[0].hex() -
doc/ammosreader/AmmosIFDataBlockHeader.html
r4eb8a2f r7752d1f 36 36 37 37 @classmethod 38 def from_bytes(cls, bytes):38 def from_bytes(cls, in_bytes): 39 39 """I return an AMMOS IF data block header built from bytes.""" 40 assert len( bytes) == cls.HEADER_SIZE41 elements = struct.unpack('<ccH', bytes)40 assert len(in_bytes) == cls.HEADER_SIZE 41 elements = struct.unpack('<ccH', in_bytes) 42 42 header = cls() 43 43 first_entry = int.from_bytes(elements[0], byteorder='little') … … 125 125 126 126 @classmethod 127 def from_bytes(cls, bytes):127 def from_bytes(cls, in_bytes): 128 128 """I return an AMMOS IF data block header built from bytes.""" 129 assert len( bytes) == cls.HEADER_SIZE130 elements = struct.unpack('<ccH', bytes)129 assert len(in_bytes) == cls.HEADER_SIZE 130 elements = struct.unpack('<ccH', in_bytes) 131 131 header = cls() 132 132 first_entry = int.from_bytes(elements[0], byteorder='little') … … 198 198 <dl> 199 199 <dt id="ammosreader.AmmosIFDataBlockHeader.AmmosIFDataBlockHeader.from_bytes"><code class="name flex"> 200 <span>def <span class="ident">from_bytes</span></span>(<span> bytes)</span>200 <span>def <span class="ident">from_bytes</span></span>(<span>in_bytes)</span> 201 201 </code></dt> 202 202 <dd> … … 207 207 </summary> 208 208 <pre><code class="python">@classmethod 209 def from_bytes(cls, bytes):209 def from_bytes(cls, in_bytes): 210 210 """I return an AMMOS IF data block header built from bytes.""" 211 assert len( bytes) == cls.HEADER_SIZE212 elements = struct.unpack('<ccH', bytes)211 assert len(in_bytes) == cls.HEADER_SIZE 212 elements = struct.unpack('<ccH', in_bytes) 213 213 header = cls() 214 214 first_entry = int.from_bytes(elements[0], byteorder='little') -
doc/ammosreader/AmmosIFDataHeader.html
r4eb8a2f r7752d1f 40 40 41 41 @classmethod 42 def from_bytes(cls, bytes):42 def from_bytes(cls, in_bytes): 43 43 """I return an AMMOS data header from given bytes.""" 44 assert len( bytes) == cls.HEADER_SIZE45 elements = struct.unpack('<IIQIIIQIIIIi', bytes)44 assert len(in_bytes) == cls.HEADER_SIZE 45 elements = struct.unpack('<IIQIIIQIIIIi', in_bytes) 46 46 block_count = elements[0] 47 47 block_length = int(elements[1])*4 … … 77 77 self.voltage_ref = voltage_ref 78 78 79 def __str_ (self):79 def __str__(self): 80 80 output = ("\nGlobal frame body data header\n" + 81 81 "-----------------------------\n" + … … 115 115 116 116 @classmethod 117 def from_bytes(cls, bytes):117 def from_bytes(cls, in_bytes): 118 118 """I return an AMMOS data header from given bytes.""" 119 assert len( bytes) == cls.HEADER_SIZE120 elements = struct.unpack('<IIQIIIQIIIIi', bytes)119 assert len(in_bytes) == cls.HEADER_SIZE 120 elements = struct.unpack('<IIQIIIQIIIIi', in_bytes) 121 121 block_count = elements[0] 122 122 block_length = int(elements[1])*4 … … 152 152 self.voltage_ref = voltage_ref 153 153 154 def __str_ (self):154 def __str__(self): 155 155 output = ("\nGlobal frame body data header\n" + 156 156 "-----------------------------\n" + … … 173 173 <dl> 174 174 <dt id="ammosreader.AmmosIFDataHeader.AmmosIFDataHeader.from_bytes"><code class="name flex"> 175 <span>def <span class="ident">from_bytes</span></span>(<span> bytes)</span>175 <span>def <span class="ident">from_bytes</span></span>(<span>in_bytes)</span> 176 176 </code></dt> 177 177 <dd> … … 182 182 </summary> 183 183 <pre><code class="python">@classmethod 184 def from_bytes(cls, bytes):184 def from_bytes(cls, in_bytes): 185 185 """I return an AMMOS data header from given bytes.""" 186 assert len( bytes) == cls.HEADER_SIZE187 elements = struct.unpack('<IIQIIIQIIIIi', bytes)186 assert len(in_bytes) == cls.HEADER_SIZE 187 elements = struct.unpack('<IIQIIIQIIIIi', in_bytes) 188 188 block_count = elements[0] 189 189 block_length = int(elements[1])*4 -
doc/ammosreader/AmmosIFReader.html
r4eb8a2f r7752d1f 39 39 from ammosreader import logger 40 40 41 41 42 class AmmosIFReader(AbstractAmmosReader): 42 43 """I read the IF data embedded in an R&S AMMOS recording.""" 43 44 44 def __init__(self, file_name):45 super().__init__(file_name)46 47 45 def read_next_global_frame_body_data_header(self): 48 46 """I read the next data header of a global frame body from current position in file.""" 49 47 header_size = AmmosIFDataHeader.HEADER_SIZE 50 48 51 bytes = self.ammos_file.read(header_size)49 in_bytes = self.ammos_file.read(header_size) 52 50 53 51 logger.info("\nReading global frame body standard data header\n") 54 if ((not bytes) or (len(bytes) < header_size)):52 if ((not in_bytes) or (len(in_bytes) < header_size)): 55 53 logger.debug("Can not read all %s bytes of global frame body data header", header_size) 56 54 return None 57 return AmmosIFDataHeader.from_bytes( bytes)55 return AmmosIFDataHeader.from_bytes(in_bytes) 58 56 59 57 def read_next_global_frame_body_extended_data_header(self): … … 66 64 header_size = AmmosExtendedIFDataHeader.HEADER_SIZE 67 65 68 bytes = self.ammos_file.read(header_size)66 in_bytes = self.ammos_file.read(header_size) 69 67 70 68 logger.info("\nReading global frame body extended data header\n") 71 if ((not bytes) or (len(bytes) < header_size)):69 if ((not in_bytes) or (len(in_bytes) < header_size)): 72 70 logger.debug("Can not read all %s bytes of global frame extended data header", header_size) 73 71 return None 74 return AmmosExtendedIFDataHeader.from_bytes( bytes)72 return AmmosExtendedIFDataHeader.from_bytes(in_bytes) 75 73 76 74 def read_next_if_data_body(self, number_of_data_blocks, data_length): … … 132 130 133 131 def payload(self): 132 """I return just the pure date (payload) from my container.""" 134 133 return b"".join([each.global_frame_body.data_body.payload for each in self.container.global_frames])</code></pre> 135 134 </details> … … 161 160 """I read the IF data embedded in an R&S AMMOS recording.""" 162 161 163 def __init__(self, file_name):164 super().__init__(file_name)165 166 162 def read_next_global_frame_body_data_header(self): 167 163 """I read the next data header of a global frame body from current position in file.""" 168 164 header_size = AmmosIFDataHeader.HEADER_SIZE 169 165 170 bytes = self.ammos_file.read(header_size)166 in_bytes = self.ammos_file.read(header_size) 171 167 172 168 logger.info("\nReading global frame body standard data header\n") 173 if ((not bytes) or (len(bytes) < header_size)):169 if ((not in_bytes) or (len(in_bytes) < header_size)): 174 170 logger.debug("Can not read all %s bytes of global frame body data header", header_size) 175 171 return None 176 return AmmosIFDataHeader.from_bytes( bytes)172 return AmmosIFDataHeader.from_bytes(in_bytes) 177 173 178 174 def read_next_global_frame_body_extended_data_header(self): … … 185 181 header_size = AmmosExtendedIFDataHeader.HEADER_SIZE 186 182 187 bytes = self.ammos_file.read(header_size)183 in_bytes = self.ammos_file.read(header_size) 188 184 189 185 logger.info("\nReading global frame body extended data header\n") 190 if ((not bytes) or (len(bytes) < header_size)):186 if ((not in_bytes) or (len(in_bytes) < header_size)): 191 187 logger.debug("Can not read all %s bytes of global frame extended data header", header_size) 192 188 return None 193 return AmmosExtendedIFDataHeader.from_bytes( bytes)189 return AmmosExtendedIFDataHeader.from_bytes(in_bytes) 194 190 195 191 def read_next_if_data_body(self, number_of_data_blocks, data_length): … … 251 247 252 248 def payload(self): 249 """I return just the pure date (payload) from my container.""" 253 250 return b"".join([each.global_frame_body.data_body.payload for each in self.container.global_frames])</code></pre> 254 251 </details> … … 264 261 </code></dt> 265 262 <dd> 266 <div class="desc">< /div>263 <div class="desc"><p>I return just the pure date (payload) from my container.</p></div> 267 264 <details class="source"> 268 265 <summary> … … 270 267 </summary> 271 268 <pre><code class="python">def payload(self): 269 """I return just the pure date (payload) from my container.""" 272 270 return b"".join([each.global_frame_body.data_body.payload for each in self.container.global_frames])</code></pre> 273 271 </details> … … 315 313 </code></dt> 316 314 <dd> 317 <div class="desc">< /div>315 <div class="desc"><p>I read the next data header of a global frame body from current position in file.</p></div> 318 316 <details class="source"> 319 317 <summary> … … 321 319 </summary> 322 320 <pre><code class="python">def read_next_global_frame_body_data_header(self): 323 321 """I read the next data header of a global frame body from current position in file.""" 324 322 header_size = AmmosIFDataHeader.HEADER_SIZE 325 323 326 bytes = self.ammos_file.read(header_size)324 in_bytes = self.ammos_file.read(header_size) 327 325 328 326 logger.info("\nReading global frame body standard data header\n") 329 if ((not bytes) or (len(bytes) < header_size)):327 if ((not in_bytes) or (len(in_bytes) < header_size)): 330 328 logger.debug("Can not read all %s bytes of global frame body data header", header_size) 331 329 return None 332 return AmmosIFDataHeader.from_bytes( bytes)</code></pre>330 return AmmosIFDataHeader.from_bytes(in_bytes)</code></pre> 333 331 </details> 334 332 </dd> … … 353 351 header_size = AmmosExtendedIFDataHeader.HEADER_SIZE 354 352 355 bytes = self.ammos_file.read(header_size)353 in_bytes = self.ammos_file.read(header_size) 356 354 357 355 logger.info("\nReading global frame body extended data header\n") 358 if ((not bytes) or (len(bytes) < header_size)):356 if ((not in_bytes) or (len(in_bytes) < header_size)): 359 357 logger.debug("Can not read all %s bytes of global frame extended data header", header_size) 360 358 return None 361 return AmmosExtendedIFDataHeader.from_bytes( bytes)</code></pre>359 return AmmosExtendedIFDataHeader.from_bytes(in_bytes)</code></pre> 362 360 </details> 363 361 </dd> -
doc/ammosreader/AmmosSingleFrame.html
r4eb8a2f r7752d1f 48 48 (self.global_frame_body.data_header.block_count * 49 49 (self.global_frame_body.data_header.block_length + 4)))) 50 # FIXME: Use str method instead51 52 50 def __str__(self): 53 51 output = ( … … 102 100 (self.global_frame_body.data_header.block_count * 103 101 (self.global_frame_body.data_header.block_length + 4)))) 104 # FIXME: Use str method instead105 106 102 def __str__(self): 107 103 output = ( -
doc/ammosreader/IQDWTXBlock.html
r4eb8a2f r7752d1f 34 34 @classmethod 35 35 def from_bytes(cls, byte_string): 36 pdw = PDW.from_bytes(byte s[0:32])37 return IQDWTXBLOCK(pdw, byte s[32:])36 pdw = PDW.from_bytes(byte_string[0:32]) 37 return IQDWTXBLOCK(pdw, byte_string[32:]) 38 38 39 39 def __init__(self, pdw, iq): … … 66 66 @classmethod 67 67 def from_bytes(cls, byte_string): 68 pdw = PDW.from_bytes(byte s[0:32])69 return IQDWTXBLOCK(pdw, byte s[32:])68 pdw = PDW.from_bytes(byte_string[0:32]) 69 return IQDWTXBLOCK(pdw, byte_string[32:]) 70 70 71 71 def __init__(self, pdw, iq): … … 87 87 <pre><code class="python">@classmethod 88 88 def from_bytes(cls, byte_string): 89 pdw = PDW.from_bytes(byte s[0:32])90 return IQDWTXBLOCK(pdw, byte s[32:])</code></pre>89 pdw = PDW.from_bytes(byte_string[0:32]) 90 return IQDWTXBLOCK(pdw, byte_string[32:])</code></pre> 91 91 </details> 92 92 </dd> -
doc/ammosreader/PPDWContainer.html
r4eb8a2f r7752d1f 27 27 <span>Expand source code</span> 28 28 </summary> 29 <pre><code class="python">import numpy as np 30 from datetime import datetime 31 29 <pre><code class="python">from datetime import datetime 30 import numpy as np 32 31 33 32 class PPDWContainer(): … … 93 92 94 93 def start_time(self): 95 return min( [each.time_of_arrival for each in self.signals])94 return min(each.time_of_arrival for each in self.signals) 96 95 97 96 def end_time(self): 98 return max( [each.time_of_arrival for each in self.signals])97 return max(each.time_of_arrival for each in self.signals) 99 98 100 99 def to_json(self): … … 196 195 197 196 def start_time(self): 198 return min( [each.time_of_arrival for each in self.signals])197 return min(each.time_of_arrival for each in self.signals) 199 198 200 199 def end_time(self): 201 return max( [each.time_of_arrival for each in self.signals])200 return max(each.time_of_arrival for each in self.signals) 202 201 203 202 def to_json(self): … … 295 294 </summary> 296 295 <pre><code class="python">def end_time(self): 297 return max( [each.time_of_arrival for each in self.signals])</code></pre>296 return max(each.time_of_arrival for each in self.signals)</code></pre> 298 297 </details> 299 298 </dd> … … 362 361 </summary> 363 362 <pre><code class="python">def start_time(self): 364 return min( [each.time_of_arrival for each in self.signals])</code></pre>363 return min(each.time_of_arrival for each in self.signals)</code></pre> 365 364 </details> 366 365 </dd> -
doc/ammosreader/PPDWReader.html
r4eb8a2f r7752d1f 27 27 <span>Expand source code</span> 28 28 </summary> 29 <pre><code class="python">from pathlib import Path 29 <pre><code class="python">from ammosreader.PDW import PDW 30 from ammosreader.PPDWContainer import PPDWContainer 31 from ammosreader import logger 30 32 31 from ammosreader.PDW import PDW32 from ammosreader.PPDWContainer import PPDWContainer33 33 34 34 class PPDWReader(): 35 """I read data from a file and return a PPDWContainer.""" 35 36 36 37 def __init__(self, a_file): … … 48 49 assert len(current_bytes) == 32 49 50 if not current_bytes: 50 # print('End of file detected')51 logger.info('End of file detected') 51 52 break 52 53 if self.cursor + 32 >= len(self.content): 53 # print('Can not read all 32 bytes of next PDW. EOF')54 logger.info('Can not read all 32 bytes of next PDW. EOF') 54 55 break 55 56 self.container.add(PDW.from_bytes(current_bytes)) … … 74 75 </code></dt> 75 76 <dd> 76 <div class="desc">< /div>77 <div class="desc"><p>I read data from a file and return a PPDWContainer.</p></div> 77 78 <details class="source"> 78 79 <summary> … … 80 81 </summary> 81 82 <pre><code class="python">class PPDWReader(): 83 """I read data from a file and return a PPDWContainer.""" 82 84 83 85 def __init__(self, a_file): … … 95 97 assert len(current_bytes) == 32 96 98 if not current_bytes: 97 # print('End of file detected')99 logger.info('End of file detected') 98 100 break 99 101 if self.cursor + 32 >= len(self.content): 100 # print('Can not read all 32 bytes of next PDW. EOF')102 logger.info('Can not read all 32 bytes of next PDW. EOF') 101 103 break 102 104 self.container.add(PDW.from_bytes(current_bytes)) … … 123 125 assert len(current_bytes) == 32 124 126 if not current_bytes: 125 # print('End of file detected')127 logger.info('End of file detected') 126 128 break 127 129 if self.cursor + 32 >= len(self.content): 128 # print('Can not read all 32 bytes of next PDW. EOF')130 logger.info('Can not read all 32 bytes of next PDW. EOF') 129 131 break 130 132 self.container.add(PDW.from_bytes(current_bytes)) -
doc/ammosreader/index.html
r4eb8a2f r7752d1f 81 81 <dt><code class="name"><a title="ammosreader.AmmosContainer" href="AmmosContainer.html">ammosreader.AmmosContainer</a></code></dt> 82 82 <dd> 83 <div class="desc"><p>I store the content of an R&S Ammos file in a more accessible way.</p></div>83 <div class="desc"><p>I provide an Ammos container.</p></div> 84 84 </dd> 85 85 <dt><code class="name"><a title="ammosreader.AmmosExtendedAudioDataHeader" href="AmmosExtendedAudioDataHeader.html">ammosreader.AmmosExtendedAudioDataHeader</a></code></dt>
Note:
See TracChangeset
for help on using the changeset viewer.