Changeset a574a93 in ammosreader


Ignore:
Timestamp:
05/09/22 06:15:46 (3 years ago)
Author:
recknagel <recknagel@…>
Branches:
AmmosSource, guix
Children:
b67e7e5
Parents:
4180d6a
Message:

added error handling for timed out connections

File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/ammosreader/AmmosAudioSocketReader.py

    r4180d6a ra574a93  
    11"""I read a Ammos datastream from a socket."""
    22
     3import select
    34import socket
    45from collections import deque
     
    1011
    1112class AmmosAudioSocketReader:
    12     def __init__(self, socket:socket.socket):
     13    def __init__(self, socket:socket.socket, debug=True):
    1314        """
    1415        Initializes the AmmosAudioSocketReader
     
    1617        Args:
    1718            socket (socket.socket): socket to read from
     19            debug (bool): if true, prints debug information
    1820        """
    1921
     
    2426        self.__socket = socket
    2527
    26     def __read_next_audio_data_body(self, sample_count:int, channel_count:int, sample_size:int) -> bytearray:
     28        #
     29        self.DEBUG_MODE = debug
     30
     31    def __get_next_data(self, byte_count: int) -> bytearray:
    2732        """
    28         reads the next audio data body
     33        Gets the next bytes from the socket, for example headers and body data.
    2934
    3035        Args:
    31             sample_count (int): amount of samples
    32             channel_count (int): amount of channels
    33             sample_size (int): size of a sample in bytes
     36            byte_count (int): number of bytes to read
     37
     38        Raises:
     39            TimeoutError: Raises TimeoutError if the socket does not serve data anymore
    3440
    3541        Returns:
    36             bytearray: contains the audio data
     42            bytearray: data from socket as bytearray
    3743        """
    3844
    39         total = sample_count*channel_count*sample_size
    4045        byte_array = []
     46       
     47        while len(b''.join(byte_array)) < byte_count:
     48            if self.DEBUG_MODE:
     49                print(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}")
     50            self.__socket.settimeout(5)
     51            new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL)
    4152
    42         while len(b''.join(byte_array)) < total:
    43             byte_array.append(self.__socket.recv(total - len(b''.join(byte_array))))
     53            if not new_bytes:
     54                raise TimeoutError("Socket timed out while reading data")
    4455
    45         if len(b''.join(byte_array)) != total:
    46             print("Can not read all", total, "bytes of data body")
    47             return None
     56            if self.DEBUG_MODE:
     57                print(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining")
     58
     59            byte_array.append(new_bytes)
     60
    4861        return b''.join(byte_array)
    4962
     
    6174        return np.frombuffer(audio_data_body, dtype=np.int16)
    6275
    63     def read_next_frame(self) -> tuple[bytearray, int]:
    64         """
    65         reads the next ammos audio frame
     76    def read_next_frame(self) -> tuple[np.ndarray, int]:
     77        """Reads the next ammos audio frame from socket
     78
     79        Raises:
     80            TimeoutError: Raisees TimeoutError if the socket does not serve data anymore
    6681
    6782        Returns:
    68             tuple[bytearray, int]: contains the audio data and the sample rate
     83            tuple[np.ndarray, int]: Contains the audio data and the sample rate
    6984        """
     85
     86        # get first byte of the day
     87        self.__socket.settimeout(5)
     88
     89        new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
     90        # raise Exception if socket does not return anything
     91        if len(new_byte) < 1:
     92            raise TimeoutError     
     93
    7094        #read loop
    71         byte = self.__socket.recv(1)
    72 
    73         while byte:
     95        while new_byte:
    7496            #
    75             self.__magic_word_buffer.append(byte)
     97            self.__magic_word_buffer.append(new_byte)
    7698            byte_array = b''.join(self.__magic_word_buffer)
    7799
     
    80102
    81103                ammos_global_header_buffer = list(self.__magic_word_buffer)
    82                 while len(b''.join(ammos_global_header_buffer)) < 24:
    83                     ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
     104                ammos_global_header_buffer.append(self.__get_next_data(20))
     105                #while len(b''.join(ammos_global_header_buffer)) < 24:
     106                #    ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
    84107                   
    85108                ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer))
    86                 print(ammos_global_header)
     109                if self.DEBUG_MODE:
     110                    print(ammos_global_header)
    87111
    88112                if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256:
    89                     byte_array_header = []
    90                     while len(b''.join(byte_array_header)) < 44:
    91                         byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))
     113                    byte_array_header = self.__get_next_data(44)
     114                    #while len(b''.join(byte_array_header)) < 44:
     115                    #    byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))
    92116
    93117                    ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(b''.join(byte_array_header))
    94                     print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size)
    95                     audio_body = self.__read_next_audio_data_body(ammos_extended_audio_data_header.sample_count,
    96                                                                   ammos_extended_audio_data_header.channel_count,
    97                                                                   ammos_extended_audio_data_header.sample_size)
     118                    if self.DEBUG_MODE:
     119                        print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size)
     120                    audio_body = self.__get_next_data(ammos_extended_audio_data_header.sample_count*
     121                                                      ammos_extended_audio_data_header.channel_count*
     122                                                      ammos_extended_audio_data_header.sample_size)
    98123
    99124                    audio_array = self.__audio_data_body_to_numpy(audio_body)
    100                     print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)
     125                    if self.DEBUG_MODE:
     126                        print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)
    101127
    102128                    return [audio_array, ammos_extended_audio_data_header.sample_rate]
    103129
    104130                elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:
    105                     byte_array_header = []
    106                     while len(b''.join(byte_array_header)) < 36:
    107                         byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))
     131                    byte_array_header = self.__get_next_data(36)
     132                    #while len(b''.join(byte_array_header)) < 36:
     133                    #    byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))
    108134
    109135                    ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(b''.join(byte_array_header))
    110                     print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size)
    111                     audio_body = self.__read_next_audio_data_body(ammos_audio_data_header.sample_count,
    112                                                                   ammos_audio_data_header.channel_count,
    113                                                                   ammos_audio_data_header.sample_size)
     136                    if self.DEBUG_MODE:
     137                        print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size)
     138                    audio_body = self.__get_next_data(ammos_extended_audio_data_header.sample_count*
     139                                                      ammos_extended_audio_data_header.channel_count*
     140                                                      ammos_extended_audio_data_header.sample_size)
    114141
    115142                    audio_array = self.__audio_data_body_to_numpy(audio_body)
    116                     print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)
     143                    if self.DEBUG_MODE:
     144                        print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)
    117145
    118146                    return [audio_array, ammos_audio_data_header.sample_rate]
    119147
    120             byte = self.__socket.recv(1)
     148            # get the next byte
     149            self.__socket.settimeout(5)
     150
     151            new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
     152            # raise Exception if socket does not return anything
     153            if len(new_byte) < 1:
     154                raise TimeoutError   
    121155
    122156        return None
Note: See TracChangeset for help on using the changeset viewer.