Changeset a574a93 in ammosreader
- Timestamp:
- 05/09/22 06:15:46 (3 years ago)
- Branches:
- AmmosSource, guix
- Children:
- b67e7e5
- Parents:
- 4180d6a
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
src/ammosreader/AmmosAudioSocketReader.py
r4180d6a ra574a93 1 1 """I read a Ammos datastream from a socket.""" 2 2 3 import select 3 4 import socket 4 5 from collections import deque … … 10 11 11 12 class AmmosAudioSocketReader: 12 def __init__(self, socket:socket.socket ):13 def __init__(self, socket:socket.socket, debug=True): 13 14 """ 14 15 Initializes the AmmosAudioSocketReader … … 16 17 Args: 17 18 socket (socket.socket): socket to read from 19 debug (bool): if true, prints debug information 18 20 """ 19 21 … … 24 26 self.__socket = socket 25 27 26 def __read_next_audio_data_body(self, sample_count:int, channel_count:int, sample_size:int) -> bytearray: 28 # 29 self.DEBUG_MODE = debug 30 31 def __get_next_data(self, byte_count: int) -> bytearray: 27 32 """ 28 reads the next audio data body33 Gets the next bytes from the socket, for example headers and body data. 29 34 30 35 Args: 31 sample_count (int): amount of samples 32 channel_count (int): amount of channels 33 sample_size (int): size of a sample in bytes 36 byte_count (int): number of bytes to read 37 38 Raises: 39 TimeoutError: Raises TimeoutError if the socket does not serve data anymore 34 40 35 41 Returns: 36 bytearray: contains the audio data42 bytearray: data from socket as bytearray 37 43 """ 38 44 39 total = sample_count*channel_count*sample_size40 45 byte_array = [] 46 47 while len(b''.join(byte_array)) < byte_count: 48 if self.DEBUG_MODE: 49 print(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}") 50 self.__socket.settimeout(5) 51 new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL) 41 52 42 while len(b''.join(byte_array)) < total:43 byte_array.append(self.__socket.recv(total - len(b''.join(byte_array))))53 if not new_bytes: 54 raise TimeoutError("Socket timed out while reading data") 44 55 45 if len(b''.join(byte_array)) != total: 46 print("Can not read all", total, "bytes of data body") 47 return None 56 if self.DEBUG_MODE: 57 print(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining") 58 59 byte_array.append(new_bytes) 60 48 61 return b''.join(byte_array) 49 62 … … 61 74 return np.frombuffer(audio_data_body, dtype=np.int16) 62 75 63 def read_next_frame(self) -> tuple[bytearray, int]: 64 """ 65 reads the next ammos audio frame 76 def read_next_frame(self) -> tuple[np.ndarray, int]: 77 """Reads the next ammos audio frame from socket 78 79 Raises: 80 TimeoutError: Raisees TimeoutError if the socket does not serve data anymore 66 81 67 82 Returns: 68 tuple[ bytearray, int]: contains the audio data and the sample rate83 tuple[np.ndarray, int]: Contains the audio data and the sample rate 69 84 """ 85 86 # get first byte of the day 87 self.__socket.settimeout(5) 88 89 new_byte = self.__socket.recv(1, socket.MSG_WAITALL) 90 # raise Exception if socket does not return anything 91 if len(new_byte) < 1: 92 raise TimeoutError 93 70 94 #read loop 71 byte = self.__socket.recv(1) 72 73 while byte: 95 while new_byte: 74 96 # 75 self.__magic_word_buffer.append( byte)97 self.__magic_word_buffer.append(new_byte) 76 98 byte_array = b''.join(self.__magic_word_buffer) 77 99 … … 80 102 81 103 ammos_global_header_buffer = list(self.__magic_word_buffer) 82 while len(b''.join(ammos_global_header_buffer)) < 24: 83 ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) 104 ammos_global_header_buffer.append(self.__get_next_data(20)) 105 #while len(b''.join(ammos_global_header_buffer)) < 24: 106 # ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) 84 107 85 108 ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer)) 86 print(ammos_global_header) 109 if self.DEBUG_MODE: 110 print(ammos_global_header) 87 111 88 112 if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256: 89 byte_array_header = []90 while len(b''.join(byte_array_header)) < 44:91 byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))113 byte_array_header = self.__get_next_data(44) 114 #while len(b''.join(byte_array_header)) < 44: 115 # byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header)))) 92 116 93 117 ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(b''.join(byte_array_header)) 94 print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size) 95 audio_body = self.__read_next_audio_data_body(ammos_extended_audio_data_header.sample_count, 96 ammos_extended_audio_data_header.channel_count, 97 ammos_extended_audio_data_header.sample_size) 118 if self.DEBUG_MODE: 119 print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size) 120 audio_body = self.__get_next_data(ammos_extended_audio_data_header.sample_count* 121 ammos_extended_audio_data_header.channel_count* 122 ammos_extended_audio_data_header.sample_size) 98 123 99 124 audio_array = self.__audio_data_body_to_numpy(audio_body) 100 print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 125 if self.DEBUG_MODE: 126 print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 101 127 102 128 return [audio_array, ammos_extended_audio_data_header.sample_rate] 103 129 104 130 elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256: 105 byte_array_header = []106 while len(b''.join(byte_array_header)) < 36:107 byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))131 byte_array_header = self.__get_next_data(36) 132 #while len(b''.join(byte_array_header)) < 36: 133 # byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header)))) 108 134 109 135 ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(b''.join(byte_array_header)) 110 print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size) 111 audio_body = self.__read_next_audio_data_body(ammos_audio_data_header.sample_count, 112 ammos_audio_data_header.channel_count, 113 ammos_audio_data_header.sample_size) 136 if self.DEBUG_MODE: 137 print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size) 138 audio_body = self.__get_next_data(ammos_extended_audio_data_header.sample_count* 139 ammos_extended_audio_data_header.channel_count* 140 ammos_extended_audio_data_header.sample_size) 114 141 115 142 audio_array = self.__audio_data_body_to_numpy(audio_body) 116 print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 143 if self.DEBUG_MODE: 144 print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 117 145 118 146 return [audio_array, ammos_audio_data_header.sample_rate] 119 147 120 byte = self.__socket.recv(1) 148 # get the next byte 149 self.__socket.settimeout(5) 150 151 new_byte = self.__socket.recv(1, socket.MSG_WAITALL) 152 # raise Exception if socket does not return anything 153 if len(new_byte) < 1: 154 raise TimeoutError 121 155 122 156 return None
Note:
See TracChangeset
for help on using the changeset viewer.