Changes in / [27e3114:1d0974d] in ammosreader


Ignore:
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • sample_scripts/audio_socket_reader_test_streamer.py

    r27e3114 r1d0974d  
    2525print('Sending Bytes')
    2626out_socket.sendall(total_bytes)
    27 out_socket.close()
    28 out_socket.shutdown(socket.SHUT_RDWR)
  • src/ammosreader/AmmosAudioSocketReader.py

    r27e3114 r1d0974d  
    11"""I read a Ammos datastream from a socket."""
    22
    3 import select
    43import socket
    54from collections import deque
     
    1110
    1211class AmmosAudioSocketReader:
    13     def __init__(self, socket:socket.socket, debug=True):
     12    def __init__(self, socket:socket.socket):
    1413        """
    1514        Initializes the AmmosAudioSocketReader
     
    1716        Args:
    1817            socket (socket.socket): socket to read from
    19             debug (bool): if true, prints debug information
    2018        """
    2119
     
    2624        self.__socket = socket
    2725
    28         #
    29         self.DEBUG_MODE = debug
    30 
    31     def __get_next_data(self, byte_count: int) -> bytearray:
     26    def __read_next_audio_data_body(self, sample_count:int, channel_count:int, sample_size:int) -> bytearray:
    3227        """
    33         Gets the next bytes from the socket, for example headers and body data.
     28        reads the next audio data body
    3429
    3530        Args:
    36             byte_count (int): number of bytes to read
    37 
    38         Raises:
    39             TimeoutError: Raises TimeoutError if the socket does not serve data anymore
     31            sample_count (int): amount of samples
     32            channel_count (int): amount of channels
     33            sample_size (int): size of a sample in bytes
    4034
    4135        Returns:
    42             bytearray: data from socket as bytearray
     36            bytearray: contains the audio data
    4337        """
    4438
     39        total = sample_count*channel_count*sample_size
    4540        byte_array = []
    46        
    47         while len(b''.join(byte_array)) < byte_count:
    48             if self.DEBUG_MODE:
    49                 print(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}")
    50             self.__socket.settimeout(5)
    51             new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL)
    5241
    53             if not new_bytes:
    54                 raise TimeoutError("Socket timed out while reading data")
     42        while len(b''.join(byte_array)) < total:
     43            byte_array.append(self.__socket.recv(total - len(b''.join(byte_array))))
    5544
    56             if self.DEBUG_MODE:
    57                 print(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining")
    58 
    59             byte_array.append(new_bytes)
    60 
     45        if len(b''.join(byte_array)) != total:
     46            print("Can not read all", total, "bytes of data body")
     47            return None
    6148        return b''.join(byte_array)
    6249
     
    7461        return np.frombuffer(audio_data_body, dtype=np.int16)
    7562
    76     def read_next_frame(self) -> tuple[np.ndarray, int]:
    77         """Reads the next ammos audio frame from socket
    78 
    79         Raises:
    80             TimeoutError: Raisees TimeoutError if the socket does not serve data anymore
     63    def read_next_frame(self) -> tuple[bytearray, int]:
     64        """
     65        reads the next ammos audio frame
    8166
    8267        Returns:
    83             tuple[np.ndarray, int]: Contains the audio data and the sample rate
     68            tuple[bytearray, int]: contains the audio data and the sample rate
    8469        """
     70        #read loop
     71        byte = self.__socket.recv(1)
    8572
    86         # get first byte of the day
    87         self.__socket.settimeout(5)
    88 
    89         new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
    90         # raise Exception if socket does not return anything
    91         if len(new_byte) < 1:
    92             raise TimeoutError     
    93 
    94         #read loop
    95         while new_byte:
     73        while byte:
    9674            #
    97             self.__magic_word_buffer.append(new_byte)
     75            self.__magic_word_buffer.append(byte)
    9876            byte_array = b''.join(self.__magic_word_buffer)
    9977
     
    10280
    10381                ammos_global_header_buffer = list(self.__magic_word_buffer)
    104                 ammos_global_header_buffer.append(self.__get_next_data(20))
    105                 #while len(b''.join(ammos_global_header_buffer)) < 24:
    106                 #    ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
     82                while len(b''.join(ammos_global_header_buffer)) < 24:
     83                    ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
    10784                   
    10885                ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer))
    109                 if self.DEBUG_MODE:
    110                     print(ammos_global_header)
     86                print(ammos_global_header)
    11187
    11288                if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256:
    113                     byte_array_header = self.__get_next_data(44)
    114                     #while len(b''.join(byte_array_header)) < 44:
    115                     #    byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))
     89                    byte_array_header = []
     90                    while len(b''.join(byte_array_header)) < 44:
     91                        byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))
    11692
    117                     ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header)
    118                     if self.DEBUG_MODE:
    119                         print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size)
    120                     audio_body = self.__get_next_data(ammos_extended_audio_data_header.sample_count*
    121                                                       ammos_extended_audio_data_header.channel_count*
    122                                                       ammos_extended_audio_data_header.sample_size)
     93                    ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(b''.join(byte_array_header))
     94                    print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size)
     95                    audio_body = self.__read_next_audio_data_body(ammos_extended_audio_data_header.sample_count,
     96                                                                  ammos_extended_audio_data_header.channel_count,
     97                                                                  ammos_extended_audio_data_header.sample_size)
    12398
    12499                    audio_array = self.__audio_data_body_to_numpy(audio_body)
    125                     if self.DEBUG_MODE:
    126                         print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)
     100                    print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)
    127101
    128102                    return [audio_array, ammos_extended_audio_data_header.sample_rate]
    129103
    130104                elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:
    131                     byte_array_header = self.__get_next_data(36)
    132                     #while len(b''.join(byte_array_header)) < 36:
    133                     #    byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))
     105                    byte_array_header = []
     106                    while len(b''.join(byte_array_header)) < 36:
     107                        byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))
    134108
    135                     ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header)
    136                     if self.DEBUG_MODE:
    137                         print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size)
    138                     audio_body = self.__get_next_data(ammos_extended_audio_data_header.sample_count*
    139                                                       ammos_extended_audio_data_header.channel_count*
    140                                                       ammos_extended_audio_data_header.sample_size)
     109                    ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(b''.join(byte_array_header))
     110                    print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size)
     111                    audio_body = self.__read_next_audio_data_body(ammos_audio_data_header.sample_count,
     112                                                                  ammos_audio_data_header.channel_count,
     113                                                                  ammos_audio_data_header.sample_size)
    141114
    142115                    audio_array = self.__audio_data_body_to_numpy(audio_body)
    143                     if self.DEBUG_MODE:
    144                         print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)
     116                    print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)
    145117
    146118                    return [audio_array, ammos_audio_data_header.sample_rate]
    147119
    148             # get the next byte
    149             self.__socket.settimeout(5)
    150 
    151             new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
    152             # raise Exception if socket does not return anything
    153             if len(new_byte) < 1:
    154                 raise TimeoutError   
     120            byte = self.__socket.recv(1)
    155121
    156122        return None
Note: See TracChangeset for help on using the changeset viewer.