Changeset 6914dbb in ammosreader
- Timestamp:
- 05/19/23 13:27:43 (2 years ago)
- Branches:
- AmmosSource, guix
- Children:
- 04097cd
- Parents:
- efb5900
- Files:
-
- 22 added
- 22 deleted
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
ammosreader/AbstractAmmosReader.py
refb5900 r6914dbb 1 1 """I provide a base class for specialized AmmosReaders.""" 2 import logging3 2 4 3 from abc import ABC, abstractmethod … … 8 7 from ammosreader.AmmosContainer import AmmosContainer 9 8 from ammosreader.AmmosConstants import FrameType 10 11 logging.basicConfig(filename='ammos.log', level=logging.DEBUG) 12 9 from ammosreader import logger 10 from ammosreader import logger 13 11 14 12 class AbstractAmmosReader(ABC): … … 67 65 frames_read = 0 68 66 while True: 69 logg ing.info("Reading single frame %s ...", frames_read)67 logger.info("Reading single frame %s ...", frames_read) 70 68 current_frame = self.read_next_single_frame() 71 69 if current_frame is not None: … … 73 71 self.container.add_frame(current_frame) 74 72 else: 75 logg ing.debug("Frame: %s incomplete", frames_read+1)73 logger.debug("Frame: %s incomplete", frames_read+1) 76 74 break 77 logg ing.info("%s frames read", len(self.container.global_frames))75 logger.info("%s frames read", len(self.container.global_frames)) 78 76 return self.container 79 77 … … 88 86 89 87 bytes = self.ammos_file.read(header_size) 90 logg ing.info("Reading next global frame header")88 logger.info("Reading next global frame header") 91 89 if ((not bytes) or (len(bytes) < header_size)): 92 logg ing.debug("Can not read all %s bytes of global frame header", header_size)90 logger.debug("Can not read all %s bytes of global frame header", header_size) 93 91 return None 94 92 … … 98 96 return None 99 97 100 # logg ing.info("Current global frame header %s", current_global_frame_header)98 # logger.info("Current global frame header %s", current_global_frame_header) 101 99 return current_global_frame_header 102 100 … … 119 117 120 118 if global_frame_header is None: 121 logg ing.debug("Global frame header missing")119 logger.debug("Global frame header missing") 122 120 return None 123 121 124 122 if global_frame_header.data_header_length is None: 125 logg ing.debug("Data header length empty")123 logger.debug("Data header length empty") 126 124 return None 127 125 128 126 if global_frame_header.frame_type not in list(FrameType): 129 logg ing.info("Unknown frame type %s found", global_frame_header.frame_type)127 logger.info("Unknown frame type %s found", global_frame_header.frame_type) 130 128 return None 131 129 … … 141 139 142 140 if global_frame_header.frame_type == FrameType.AUDIO_DATA.value: 143 logg ing.info("Audio Datastream found")141 logger.info("Audio Datastream found") 144 142 global_frame_body = self.read_next_global_frame_body(global_frame_header.data_header_length) 145 143 if global_frame_body is None: … … 148 146 149 147 if global_frame_header.frame_type in if_data_types: 150 logg ing.info("IF Datastream found")148 logger.info("IF Datastream found") 151 149 global_frame_body = self.read_next_global_frame_body(global_frame_header.data_header_length) 152 150 if global_frame_body is None: … … 157 155 return None 158 156 159 logg ing.info("Unsupported frame type %s found", global_frame_header.frame_type)157 logger.info("Unsupported frame type %s found", global_frame_header.frame_type) 160 158 return None -
ammosreader/AmmosAudioReader.py
refb5900 r6914dbb 1 1 """I provide a specialized Ammos Reader for audio data.""" 2 import logging3 2 4 3 from ammosreader.AbstractAmmosReader import AbstractAmmosReader … … 7 6 from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader 8 7 from ammosreader.AmmosAudioDataBody import AmmosAudioDataBody 9 8 from ammosreader import logger 10 9 11 10 class AmmosAudioReader(AbstractAmmosReader): … … 34 33 bytes = self.ammos_file.read(header_size) 35 34 36 logg ing.info("\nReading global frame body standard data header\n")35 logger.info("\nReading global frame body standard data header\n") 37 36 if ((not bytes) or (len(bytes) < header_size)): 38 logg ing.debug("Can not read all %s bytes of global frame body data header", header_size)37 logger.debug("Can not read all %s bytes of global frame body data header", header_size) 39 38 return None 40 39 return AmmosAudioDataHeader.from_bytes(bytes) … … 52 51 53 52 if ((not bytes) or (len(bytes) < header_size)): 54 logg ing.debug("Can not read all %s bytes of global frame extended data header", header_size)53 logger.debug("Can not read all %s bytes of global frame extended data header", header_size) 55 54 return None 56 55 return AmmosExtendedAudioDataHeader.from_bytes(bytes) … … 77 76 78 77 if len(byte_string) != total: 79 logg ing.debug("Can not read all %s bytes of data body", total)78 logger.debug("Can not read all %s bytes of data body", total) 80 79 return None 81 80 return AmmosAudioDataBody(byte_string, number_of_channels, number_of_samples, sample_size) … … 91 90 92 91 if data_header_length == AmmosAudioDataHeader.HEADER_SIZE: 93 logg ing.info("Read standard data header")92 logger.info("Read standard data header") 94 93 audio_data_header = self.read_next_global_frame_body_data_header() 95 94 96 logg ing.info("Data header length %s", data_header_length)95 logger.info("Data header length %s", data_header_length) 97 96 if data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE: 98 logg ing.info("Read extended data header")97 logger.info("Read extended data header") 99 98 audio_data_header = self.read_next_global_frame_body_extended_data_header() 100 99 101 100 if audio_data_header is None: 102 logg ing.debug("Data header missing or format unknown")101 logger.debug("Data header missing or format unknown") 103 102 return None 104 103 … … 108 107 109 108 if audio_data_body is None: 110 logg ing.debug("Data body missing")109 logger.debug("Data body missing") 111 110 return None 112 111 -
ammosreader/AmmosAudioSocketReader.py
refb5900 r6914dbb 8 8 from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader 9 9 from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader 10 10 from ammosreader import logger 11 11 12 12 class AmmosAudioSocketReader: 13 def __init__(self, socket:socket.socket , debug=True):13 def __init__(self, socket:socket.socket): 14 14 """ 15 15 Initializes the AmmosAudioSocketReader … … 17 17 Args: 18 18 socket (socket.socket): socket to read from 19 debug (bool): if true, prints debug information20 19 """ 21 20 … … 25 24 #input socket to read from 26 25 self.__socket = socket 27 28 #29 self.DEBUG_MODE = debug30 26 31 27 def __get_next_data(self, byte_count: int) -> bytearray: … … 46 42 47 43 while len(b''.join(byte_array)) < byte_count: 48 if self.DEBUG_MODE: 49 print(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}") 44 logger.info(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}") 50 45 self.__socket.settimeout(5) 51 46 new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL) … … 54 49 raise TimeoutError("Socket timed out while reading data") 55 50 56 if self.DEBUG_MODE: 57 print(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining") 51 logger.info(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining") 58 52 59 53 byte_array.append(new_bytes) … … 107 101 108 102 ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer)) 109 if self.DEBUG_MODE: 110 print(ammos_global_header) 103 logger.info(ammos_global_header) 111 104 112 105 if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256: … … 116 109 117 110 ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header) 118 if self.DEBUG_MODE: 119 print(ammos_extended_audio_data_header.number_of_samples, ammos_extended_audio_data_header.number_of_channels, ammos_extended_audio_data_header.sample_size) 111 logger.info(ammos_extended_audio_data_header.number_of_samples, ammos_extended_audio_data_header.number_of_channels, ammos_extended_audio_data_header.sample_size) 120 112 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 121 113 ammos_extended_audio_data_header.number_of_channels* … … 123 115 124 116 audio_array = self.__audio_data_body_to_numpy(audio_body) 125 if self.DEBUG_MODE: 126 print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 117 logger.info(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 127 118 128 119 return [audio_array, ammos_extended_audio_data_header.sample_rate] … … 134 125 135 126 ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header) 136 if self.DEBUG_MODE: 137 print(ammos_audio_data_header.number_of_samples, ammos_audio_data_header.number_of_channels, ammos_audio_data_header.sample_size) 127 logger.info(ammos_audio_data_header.number_of_samples, ammos_audio_data_header.number_of_channels, ammos_audio_data_header.sample_size) 138 128 audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 139 129 ammos_extended_audio_data_header.number_of_channels* … … 141 131 142 132 audio_array = self.__audio_data_body_to_numpy(audio_body) 143 if self.DEBUG_MODE: 144 print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 133 logger.info(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 145 134 146 135 return [audio_array, ammos_audio_data_header.sample_rate] -
ammosreader/AmmosIFReader.py
refb5900 r6914dbb 1 1 """I provide a specialized Ammos Reader for IF data.""" 2 import logging3 2 4 3 from ammosreader.AbstractAmmosReader import AbstractAmmosReader … … 9 8 from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock 10 9 from ammosreader.AmmosIFDataBlockHeader import AmmosIFDataBlockHeader 11 10 from ammosreader import logger 12 11 13 12 class AmmosIFReader(AbstractAmmosReader): … … 23 22 bytes = self.ammos_file.read(header_size) 24 23 25 logg ing.info("\nReading global frame body standard data header\n")24 logger.info("\nReading global frame body standard data header\n") 26 25 if ((not bytes) or (len(bytes) < header_size)): 27 logg ing.debug("Can not read all %s bytes of global frame body data header", header_size)26 logger.debug("Can not read all %s bytes of global frame body data header", header_size) 28 27 return None 29 28 return AmmosIFDataHeader.from_bytes(bytes) … … 40 39 bytes = self.ammos_file.read(header_size) 41 40 42 logg ing.info("\nReading global frame body extended data header\n")41 logger.info("\nReading global frame body extended data header\n") 43 42 if ((not bytes) or (len(bytes) < header_size)): 44 logg ing.debug("Can not read all %s bytes of global frame extended data header", header_size)43 logger.debug("Can not read all %s bytes of global frame extended data header", header_size) 45 44 return None 46 45 return AmmosExtendedIFDataHeader.from_bytes(bytes) … … 67 66 68 67 if len(byte_string) != total: 69 logg ing.debug("Can not read all %s bytes of data body", total)68 logger.debug("Can not read all %s bytes of data body", total) 70 69 return None 71 70 … … 92 91 93 92 if if_data_header is None: 94 logg ing.debug("Data header missing")93 logger.debug("Data header missing") 95 94 return None 96 95 … … 98 97 99 98 if if_data_body is None: 100 logg ing.debug("Data body missing")99 logger.debug("Data body missing") 101 100 return None 102 101
Note:
See TracChangeset
for help on using the changeset viewer.