Changes in / [27e3114:1d0974d] in ammosreader
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
sample_scripts/audio_socket_reader_test_streamer.py
r27e3114 r1d0974d 25 25 print('Sending Bytes') 26 26 out_socket.sendall(total_bytes) 27 out_socket.close()28 out_socket.shutdown(socket.SHUT_RDWR) -
src/ammosreader/AmmosAudioSocketReader.py
r27e3114 r1d0974d 1 1 """I read a Ammos datastream from a socket.""" 2 2 3 import select4 3 import socket 5 4 from collections import deque … … 11 10 12 11 class AmmosAudioSocketReader: 13 def __init__(self, socket:socket.socket , debug=True):12 def __init__(self, socket:socket.socket): 14 13 """ 15 14 Initializes the AmmosAudioSocketReader … … 17 16 Args: 18 17 socket (socket.socket): socket to read from 19 debug (bool): if true, prints debug information20 18 """ 21 19 … … 26 24 self.__socket = socket 27 25 28 # 29 self.DEBUG_MODE = debug 30 31 def __get_next_data(self, byte_count: int) -> bytearray: 26 def __read_next_audio_data_body(self, sample_count:int, channel_count:int, sample_size:int) -> bytearray: 32 27 """ 33 Gets the next bytes from the socket, for example headers and body data.28 reads the next audio data body 34 29 35 30 Args: 36 byte_count (int): number of bytes to read 37 38 Raises: 39 TimeoutError: Raises TimeoutError if the socket does not serve data anymore 31 sample_count (int): amount of samples 32 channel_count (int): amount of channels 33 sample_size (int): size of a sample in bytes 40 34 41 35 Returns: 42 bytearray: data from socket as bytearray36 bytearray: contains the audio data 43 37 """ 44 38 39 total = sample_count*channel_count*sample_size 45 40 byte_array = [] 46 47 while len(b''.join(byte_array)) < byte_count:48 if self.DEBUG_MODE:49 print(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}")50 self.__socket.settimeout(5)51 new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL)52 41 53 if not new_bytes:54 raise TimeoutError("Socket timed out while reading data")42 while len(b''.join(byte_array)) < total: 43 byte_array.append(self.__socket.recv(total - len(b''.join(byte_array)))) 55 44 56 if self.DEBUG_MODE: 57 print(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining") 58 59 byte_array.append(new_bytes) 60 45 if len(b''.join(byte_array)) != total: 46 print("Can not read all", total, "bytes of data body") 47 return None 61 48 return b''.join(byte_array) 62 49 … … 74 61 return np.frombuffer(audio_data_body, dtype=np.int16) 75 62 76 def read_next_frame(self) -> tuple[np.ndarray, int]: 77 """Reads the next ammos audio frame from socket 78 79 Raises: 80 TimeoutError: Raisees TimeoutError if the socket does not serve data anymore 63 def read_next_frame(self) -> tuple[bytearray, int]: 64 """ 65 reads the next ammos audio frame 81 66 82 67 Returns: 83 tuple[ np.ndarray, int]: Contains the audio data and the sample rate68 tuple[bytearray, int]: contains the audio data and the sample rate 84 69 """ 70 #read loop 71 byte = self.__socket.recv(1) 85 72 86 # get first byte of the day 87 self.__socket.settimeout(5) 88 89 new_byte = self.__socket.recv(1, socket.MSG_WAITALL) 90 # raise Exception if socket does not return anything 91 if len(new_byte) < 1: 92 raise TimeoutError 93 94 #read loop 95 while new_byte: 73 while byte: 96 74 # 97 self.__magic_word_buffer.append( new_byte)75 self.__magic_word_buffer.append(byte) 98 76 byte_array = b''.join(self.__magic_word_buffer) 99 77 … … 102 80 103 81 ammos_global_header_buffer = list(self.__magic_word_buffer) 104 ammos_global_header_buffer.append(self.__get_next_data(20)) 105 #while len(b''.join(ammos_global_header_buffer)) < 24: 106 # ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) 82 while len(b''.join(ammos_global_header_buffer)) < 24: 83 ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) 107 84 108 85 ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer)) 109 if self.DEBUG_MODE: 110 print(ammos_global_header) 86 print(ammos_global_header) 111 87 112 88 if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256: 113 byte_array_header = self.__get_next_data(44)114 #while len(b''.join(byte_array_header)) < 44:115 #byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))89 byte_array_header = [] 90 while len(b''.join(byte_array_header)) < 44: 91 byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header)))) 116 92 117 ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header) 118 if self.DEBUG_MODE: 119 print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size) 120 audio_body = self.__get_next_data(ammos_extended_audio_data_header.sample_count* 121 ammos_extended_audio_data_header.channel_count* 122 ammos_extended_audio_data_header.sample_size) 93 ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(b''.join(byte_array_header)) 94 print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size) 95 audio_body = self.__read_next_audio_data_body(ammos_extended_audio_data_header.sample_count, 96 ammos_extended_audio_data_header.channel_count, 97 ammos_extended_audio_data_header.sample_size) 123 98 124 99 audio_array = self.__audio_data_body_to_numpy(audio_body) 125 if self.DEBUG_MODE: 126 print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 100 print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) 127 101 128 102 return [audio_array, ammos_extended_audio_data_header.sample_rate] 129 103 130 104 elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256: 131 byte_array_header = self.__get_next_data(36)132 #while len(b''.join(byte_array_header)) < 36:133 #byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))105 byte_array_header = [] 106 while len(b''.join(byte_array_header)) < 36: 107 byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header)))) 134 108 135 ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header) 136 if self.DEBUG_MODE: 137 print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size) 138 audio_body = self.__get_next_data(ammos_extended_audio_data_header.sample_count* 139 ammos_extended_audio_data_header.channel_count* 140 ammos_extended_audio_data_header.sample_size) 109 ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(b''.join(byte_array_header)) 110 print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size) 111 audio_body = self.__read_next_audio_data_body(ammos_audio_data_header.sample_count, 112 ammos_audio_data_header.channel_count, 113 ammos_audio_data_header.sample_size) 141 114 142 115 audio_array = self.__audio_data_body_to_numpy(audio_body) 143 if self.DEBUG_MODE: 144 print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 116 print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) 145 117 146 118 return [audio_array, ammos_audio_data_header.sample_rate] 147 119 148 # get the next byte 149 self.__socket.settimeout(5) 150 151 new_byte = self.__socket.recv(1, socket.MSG_WAITALL) 152 # raise Exception if socket does not return anything 153 if len(new_byte) < 1: 154 raise TimeoutError 120 byte = self.__socket.recv(1) 155 121 156 122 return None
Note:
See TracChangeset
for help on using the changeset viewer.