Module ammosreader.AmmosAudioSocketReader

I read a Ammos datastream from a socket.

Expand source code
"""I read a Ammos datastream from a socket."""

import select
import socket
from collections import deque
import numpy as np
from ammosreader.AmmosAudioDataHeader import AmmosAudioDataHeader
from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader
from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader


class AmmosAudioSocketReader:
    def __init__(self, socket:socket.socket, debug=True):
        """
        Initializes the AmmosAudioSocketReader

        Args:
            socket (socket.socket): socket to read from
            debug (bool): if true, prints debug information
        """

        #buffer for reading socket bytewise und check for the magic word
        self.__magic_word_buffer = deque(maxlen=4)

        #input socket to read from
        self.__socket = socket

        #
        self.DEBUG_MODE = debug

    def __get_next_data(self, byte_count: int) -> bytearray:
        """
        Gets the next bytes from the socket, for example headers and body data.

        Args:
            byte_count (int): number of bytes to read

        Raises:
            TimeoutError: Raises TimeoutError if the socket does not serve data anymore

        Returns:
            bytearray: data from socket as bytearray
        """

        byte_array = []
        
        while len(b''.join(byte_array)) < byte_count:
            if self.DEBUG_MODE:
                print(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}")
            self.__socket.settimeout(5)
            new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL)

            if not new_bytes:
                raise TimeoutError("Socket timed out while reading data")

            if self.DEBUG_MODE:
                print(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining")

            byte_array.append(new_bytes)

        return b''.join(byte_array)

    def __audio_data_body_to_numpy(self, audio_data_body:bytearray) -> np.ndarray:
        """
        converts the audio data body to a numpy array

        Args:
            audio_data_body (bytearray): audio data from audio data body

        Returns:
            np.ndarray: audio data as numpy array
        """

        return np.frombuffer(audio_data_body, dtype=np.int16)

    def read_next_frame(self) -> tuple[np.ndarray, int]:
        """Reads the next ammos audio frame from socket

        Raises:
            TimeoutError: Raisees TimeoutError if the socket does not serve data anymore

        Returns:
            tuple[np.ndarray, int]: Contains the audio data and the sample rate
        """

        # get first byte of the day
        self.__socket.settimeout(5)

        new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
        # raise Exception if socket does not return anything
        if len(new_byte) < 1:
            raise TimeoutError      

        #read loop
        while new_byte:
            #
            self.__magic_word_buffer.append(new_byte)
            byte_array = b''.join(self.__magic_word_buffer)

            if byte_array.hex() == '726574fb':
                #print(byte_array.hex())

                ammos_global_header_buffer = list(self.__magic_word_buffer)
                ammos_global_header_buffer.append(self.__get_next_data(20))
                #while len(b''.join(ammos_global_header_buffer)) < 24:
                #    ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
                    
                ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer))
                if self.DEBUG_MODE:
                    print(ammos_global_header)

                if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256:
                    byte_array_header = self.__get_next_data(44)
                    #while len(b''.join(byte_array_header)) < 44:
                    #    byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))

                    ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header)
                    if self.DEBUG_MODE:
                        print(ammos_extended_audio_data_header.number_of_samples, ammos_extended_audio_data_header.number_of_channels, ammos_extended_audio_data_header.sample_size)
                    audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 
                                                      ammos_extended_audio_data_header.number_of_channels* 
                                                      ammos_extended_audio_data_header.sample_size)

                    audio_array = self.__audio_data_body_to_numpy(audio_body)
                    if self.DEBUG_MODE:
                        print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)

                    return [audio_array, ammos_extended_audio_data_header.sample_rate]

                elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:
                    byte_array_header = self.__get_next_data(36)
                    #while len(b''.join(byte_array_header)) < 36:
                    #    byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))

                    ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header)
                    if self.DEBUG_MODE:
                        print(ammos_audio_data_header.number_of_samples, ammos_audio_data_header.number_of_channels, ammos_audio_data_header.sample_size)
                    audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 
                                                      ammos_extended_audio_data_header.number_of_channels* 
                                                      ammos_extended_audio_data_header.sample_size)

                    audio_array = self.__audio_data_body_to_numpy(audio_body)
                    if self.DEBUG_MODE:
                        print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)

                    return [audio_array, ammos_audio_data_header.sample_rate]

            # get the next byte
            self.__socket.settimeout(5)

            new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
            # raise Exception if socket does not return anything
            if len(new_byte) < 1:
                raise TimeoutError   

        return None

Classes

class AmmosAudioSocketReader (socket: socket.socket, debug=True)

Initializes the AmmosAudioSocketReader

Args

socket : socket.socket
socket to read from
debug : bool
if true, prints debug information
Expand source code
class AmmosAudioSocketReader:
    def __init__(self, socket:socket.socket, debug=True):
        """
        Initializes the AmmosAudioSocketReader

        Args:
            socket (socket.socket): socket to read from
            debug (bool): if true, prints debug information
        """

        #buffer for reading socket bytewise und check for the magic word
        self.__magic_word_buffer = deque(maxlen=4)

        #input socket to read from
        self.__socket = socket

        #
        self.DEBUG_MODE = debug

    def __get_next_data(self, byte_count: int) -> bytearray:
        """
        Gets the next bytes from the socket, for example headers and body data.

        Args:
            byte_count (int): number of bytes to read

        Raises:
            TimeoutError: Raises TimeoutError if the socket does not serve data anymore

        Returns:
            bytearray: data from socket as bytearray
        """

        byte_array = []
        
        while len(b''.join(byte_array)) < byte_count:
            if self.DEBUG_MODE:
                print(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}")
            self.__socket.settimeout(5)
            new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL)

            if not new_bytes:
                raise TimeoutError("Socket timed out while reading data")

            if self.DEBUG_MODE:
                print(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} ramining")

            byte_array.append(new_bytes)

        return b''.join(byte_array)

    def __audio_data_body_to_numpy(self, audio_data_body:bytearray) -> np.ndarray:
        """
        converts the audio data body to a numpy array

        Args:
            audio_data_body (bytearray): audio data from audio data body

        Returns:
            np.ndarray: audio data as numpy array
        """

        return np.frombuffer(audio_data_body, dtype=np.int16)

    def read_next_frame(self) -> tuple[np.ndarray, int]:
        """Reads the next ammos audio frame from socket

        Raises:
            TimeoutError: Raisees TimeoutError if the socket does not serve data anymore

        Returns:
            tuple[np.ndarray, int]: Contains the audio data and the sample rate
        """

        # get first byte of the day
        self.__socket.settimeout(5)

        new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
        # raise Exception if socket does not return anything
        if len(new_byte) < 1:
            raise TimeoutError      

        #read loop
        while new_byte:
            #
            self.__magic_word_buffer.append(new_byte)
            byte_array = b''.join(self.__magic_word_buffer)

            if byte_array.hex() == '726574fb':
                #print(byte_array.hex())

                ammos_global_header_buffer = list(self.__magic_word_buffer)
                ammos_global_header_buffer.append(self.__get_next_data(20))
                #while len(b''.join(ammos_global_header_buffer)) < 24:
                #    ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
                    
                ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer))
                if self.DEBUG_MODE:
                    print(ammos_global_header)

                if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256:
                    byte_array_header = self.__get_next_data(44)
                    #while len(b''.join(byte_array_header)) < 44:
                    #    byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))

                    ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header)
                    if self.DEBUG_MODE:
                        print(ammos_extended_audio_data_header.number_of_samples, ammos_extended_audio_data_header.number_of_channels, ammos_extended_audio_data_header.sample_size)
                    audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 
                                                      ammos_extended_audio_data_header.number_of_channels* 
                                                      ammos_extended_audio_data_header.sample_size)

                    audio_array = self.__audio_data_body_to_numpy(audio_body)
                    if self.DEBUG_MODE:
                        print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)

                    return [audio_array, ammos_extended_audio_data_header.sample_rate]

                elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:
                    byte_array_header = self.__get_next_data(36)
                    #while len(b''.join(byte_array_header)) < 36:
                    #    byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))

                    ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header)
                    if self.DEBUG_MODE:
                        print(ammos_audio_data_header.number_of_samples, ammos_audio_data_header.number_of_channels, ammos_audio_data_header.sample_size)
                    audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 
                                                      ammos_extended_audio_data_header.number_of_channels* 
                                                      ammos_extended_audio_data_header.sample_size)

                    audio_array = self.__audio_data_body_to_numpy(audio_body)
                    if self.DEBUG_MODE:
                        print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)

                    return [audio_array, ammos_audio_data_header.sample_rate]

            # get the next byte
            self.__socket.settimeout(5)

            new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
            # raise Exception if socket does not return anything
            if len(new_byte) < 1:
                raise TimeoutError   

        return None

Methods

def read_next_frame(self) ‑> tuple[numpy.ndarray, int]

Reads the next ammos audio frame from socket

Raises

TimeoutError
Raisees TimeoutError if the socket does not serve data anymore

Returns

tuple[np.ndarray, int]
Contains the audio data and the sample rate
Expand source code
def read_next_frame(self) -> tuple[np.ndarray, int]:
    """Reads the next ammos audio frame from socket

    Raises:
        TimeoutError: Raisees TimeoutError if the socket does not serve data anymore

    Returns:
        tuple[np.ndarray, int]: Contains the audio data and the sample rate
    """

    # get first byte of the day
    self.__socket.settimeout(5)

    new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
    # raise Exception if socket does not return anything
    if len(new_byte) < 1:
        raise TimeoutError      

    #read loop
    while new_byte:
        #
        self.__magic_word_buffer.append(new_byte)
        byte_array = b''.join(self.__magic_word_buffer)

        if byte_array.hex() == '726574fb':
            #print(byte_array.hex())

            ammos_global_header_buffer = list(self.__magic_word_buffer)
            ammos_global_header_buffer.append(self.__get_next_data(20))
            #while len(b''.join(ammos_global_header_buffer)) < 24:
            #    ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
                
            ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer))
            if self.DEBUG_MODE:
                print(ammos_global_header)

            if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256:
                byte_array_header = self.__get_next_data(44)
                #while len(b''.join(byte_array_header)) < 44:
                #    byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))

                ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header)
                if self.DEBUG_MODE:
                    print(ammos_extended_audio_data_header.number_of_samples, ammos_extended_audio_data_header.number_of_channels, ammos_extended_audio_data_header.sample_size)
                audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 
                                                  ammos_extended_audio_data_header.number_of_channels* 
                                                  ammos_extended_audio_data_header.sample_size)

                audio_array = self.__audio_data_body_to_numpy(audio_body)
                if self.DEBUG_MODE:
                    print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)

                return [audio_array, ammos_extended_audio_data_header.sample_rate]

            elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:
                byte_array_header = self.__get_next_data(36)
                #while len(b''.join(byte_array_header)) < 36:
                #    byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))

                ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header)
                if self.DEBUG_MODE:
                    print(ammos_audio_data_header.number_of_samples, ammos_audio_data_header.number_of_channels, ammos_audio_data_header.sample_size)
                audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples* 
                                                  ammos_extended_audio_data_header.number_of_channels* 
                                                  ammos_extended_audio_data_header.sample_size)

                audio_array = self.__audio_data_body_to_numpy(audio_body)
                if self.DEBUG_MODE:
                    print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)

                return [audio_array, ammos_audio_data_header.sample_rate]

        # get the next byte
        self.__socket.settimeout(5)

        new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
        # raise Exception if socket does not return anything
        if len(new_byte) < 1:
            raise TimeoutError   

    return None