Module ammosreader.AmmosAudioSocketReader
I read a Ammos datastream from a socket.
Expand source code
"""I read a Ammos datastream from a socket."""
# import select
import socket
from collections import deque
import numpy as np
from ammosreader.AmmosAudioDataHeader import AmmosAudioDataHeader
from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader
from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
from ammosreader.AmmosConstants import AmmosAudioDemodType
from ammosreader import logger
class AmmosAudioSocketReader:
"""I read Ammos Audio data from a socket"""
def __init__(self, in_socket: socket.socket):
"""
Initializes the AmmosAudioSocketReader
Args:
socket (socket.socket): socket to read from
"""
# buffer for reading socket bytewise und check for the magic word
self.__magic_word_buffer = deque(maxlen=4)
# input socket to read from
self.__socket = in_socket
def __get_next_data(self, byte_count: int) -> bytearray:
"""
Gets the next bytes from the socket, for example headers and body data.
Args:
byte_count (int): number of bytes to read
Raises:
TimeoutError: Raises TimeoutError if the socket does not serve data anymore
Returns:
bytearray: data from socket as bytearray
"""
byte_array = []
while len(b''.join(byte_array)) < byte_count:
logger.info("Remaining Bytes: %s", byte_count - len(b''.join(byte_array)))
# logger.info(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}")
self.__socket.settimeout(5)
new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL)
if not new_bytes:
raise TimeoutError("Socket timed out while reading data")
logger.info("Got %s bytes of %s remaining", len(new_bytes), byte_count - len(b''.join(byte_array)))
# logger.info(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} remaining")
byte_array.append(new_bytes)
return b''.join(byte_array)
def __audio_data_body_to_numpy(self, audio_data_body: bytearray) -> np.ndarray:
"""
converts the audio data body to a numpy array
Args:
audio_data_body (bytearray): audio data from audio data body
Returns:
np.ndarray: audio data as numpy array
"""
return np.frombuffer(audio_data_body, dtype=np.int16)
def read_next_frame(self) -> tuple[np.ndarray, int]:
"""Reads the next ammos audio frame from socket
Raises:
TimeoutError: Raises TimeoutError if the socket does not serve data anymore
Returns:
tuple[np.ndarray, int]: Contains the audio data and the sample rate
"""
# get first byte of the day
self.__socket.settimeout(5)
new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
# raise Exception if socket does not return anything
if len(new_byte) < 1:
raise TimeoutError
# read loop
while new_byte:
#
self.__magic_word_buffer.append(new_byte)
byte_array = b''.join(self.__magic_word_buffer)
if byte_array.hex() == AmmosGlobalFrameHeader.MAGIC_WORD:
# print(byte_array.hex())
ammos_global_header_buffer = list(self.__magic_word_buffer)
ammos_global_header_buffer.append(self.__get_next_data(20))
# while len(b''.join(ammos_global_header_buffer)) < 24:
# ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer))
logger.info(ammos_global_header)
if (ammos_global_header.data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL):
byte_array_header = self.__get_next_data(AmmosExtendedAudioDataHeader.HEADER_SIZE)
# while len(b''.join(byte_array_header)) < 44:
# byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))
ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header)
logger.debug(str(ammos_extended_audio_data_header.number_of_samples or ''),
str(ammos_extended_audio_data_header.number_of_channels or ''),
str(ammos_extended_audio_data_header.sample_size or ''))
audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples *
ammos_extended_audio_data_header.number_of_channels *
ammos_extended_audio_data_header.sample_size)
audio_array = self.__audio_data_body_to_numpy(audio_body)
# print("44,256", len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)
logger.debug("%s, %s", len(audio_array), len(audio_array) / ammos_extended_audio_data_header.sample_rate)
return [audio_array, ammos_extended_audio_data_header.sample_rate]
elif (ammos_global_header.data_header_length == AmmosAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL):
byte_array_header = self.__get_next_data(AmmosAudioDataHeader.HEADER_SIZE)
# while len(b''.join(byte_array_header)) < 36:
# byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))
ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header)
logger.debug(str(ammos_audio_data_header.number_of_samples or ''),
str(ammos_audio_data_header.number_of_channels or ''),
str(ammos_audio_data_header.sample_size or ''))
audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples *
ammos_extended_audio_data_header.number_of_channels *
ammos_extended_audio_data_header.sample_size)
audio_array = self.__audio_data_body_to_numpy(audio_body)
logger.debug("%s, %s", len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)
return [audio_array, ammos_audio_data_header.sample_rate]
# get the next byte
self.__socket.settimeout(5)
new_byte = self.__socket.recv(1, socket.MSG_WAITALL)
# raise Exception if socket does not return anything
if len(new_byte) < 1:
raise TimeoutError
return None
Classes
class AmmosAudioSocketReader (in_socket: socket.socket)
-
I read Ammos Audio data from a socket
Initializes the AmmosAudioSocketReader
Args
socket
:socket.socket
- socket to read from
Expand source code
class AmmosAudioSocketReader: """I read Ammos Audio data from a socket""" def __init__(self, in_socket: socket.socket): """ Initializes the AmmosAudioSocketReader Args: socket (socket.socket): socket to read from """ # buffer for reading socket bytewise und check for the magic word self.__magic_word_buffer = deque(maxlen=4) # input socket to read from self.__socket = in_socket def __get_next_data(self, byte_count: int) -> bytearray: """ Gets the next bytes from the socket, for example headers and body data. Args: byte_count (int): number of bytes to read Raises: TimeoutError: Raises TimeoutError if the socket does not serve data anymore Returns: bytearray: data from socket as bytearray """ byte_array = [] while len(b''.join(byte_array)) < byte_count: logger.info("Remaining Bytes: %s", byte_count - len(b''.join(byte_array))) # logger.info(f"Remaining Bytes: {byte_count - len(b''.join(byte_array))}") self.__socket.settimeout(5) new_bytes = self.__socket.recv(byte_count - len(b''.join(byte_array)), socket.MSG_WAITALL) if not new_bytes: raise TimeoutError("Socket timed out while reading data") logger.info("Got %s bytes of %s remaining", len(new_bytes), byte_count - len(b''.join(byte_array))) # logger.info(f"Got {len(new_bytes)} bytes of {byte_count - len(b''.join(byte_array))} remaining") byte_array.append(new_bytes) return b''.join(byte_array) def __audio_data_body_to_numpy(self, audio_data_body: bytearray) -> np.ndarray: """ converts the audio data body to a numpy array Args: audio_data_body (bytearray): audio data from audio data body Returns: np.ndarray: audio data as numpy array """ return np.frombuffer(audio_data_body, dtype=np.int16) def read_next_frame(self) -> tuple[np.ndarray, int]: """Reads the next ammos audio frame from socket Raises: TimeoutError: Raises TimeoutError if the socket does not serve data anymore Returns: tuple[np.ndarray, int]: Contains the audio data and the sample rate """ # get first byte of the day self.__socket.settimeout(5) new_byte = self.__socket.recv(1, socket.MSG_WAITALL) # raise Exception if socket does not return anything if len(new_byte) < 1: raise TimeoutError # read loop while new_byte: # self.__magic_word_buffer.append(new_byte) byte_array = b''.join(self.__magic_word_buffer) if byte_array.hex() == AmmosGlobalFrameHeader.MAGIC_WORD: # print(byte_array.hex()) ammos_global_header_buffer = list(self.__magic_word_buffer) ammos_global_header_buffer.append(self.__get_next_data(20)) # while len(b''.join(ammos_global_header_buffer)) < 24: # ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer)) logger.info(ammos_global_header) if (ammos_global_header.data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): byte_array_header = self.__get_next_data(AmmosExtendedAudioDataHeader.HEADER_SIZE) # while len(b''.join(byte_array_header)) < 44: # byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header)))) ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header) logger.debug(str(ammos_extended_audio_data_header.number_of_samples or ''), str(ammos_extended_audio_data_header.number_of_channels or ''), str(ammos_extended_audio_data_header.sample_size or '')) audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * ammos_extended_audio_data_header.number_of_channels * ammos_extended_audio_data_header.sample_size) audio_array = self.__audio_data_body_to_numpy(audio_body) # print("44,256", len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) logger.debug("%s, %s", len(audio_array), len(audio_array) / ammos_extended_audio_data_header.sample_rate) return [audio_array, ammos_extended_audio_data_header.sample_rate] elif (ammos_global_header.data_header_length == AmmosAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): byte_array_header = self.__get_next_data(AmmosAudioDataHeader.HEADER_SIZE) # while len(b''.join(byte_array_header)) < 36: # byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header)))) ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header) logger.debug(str(ammos_audio_data_header.number_of_samples or ''), str(ammos_audio_data_header.number_of_channels or ''), str(ammos_audio_data_header.sample_size or '')) audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * ammos_extended_audio_data_header.number_of_channels * ammos_extended_audio_data_header.sample_size) audio_array = self.__audio_data_body_to_numpy(audio_body) logger.debug("%s, %s", len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) return [audio_array, ammos_audio_data_header.sample_rate] # get the next byte self.__socket.settimeout(5) new_byte = self.__socket.recv(1, socket.MSG_WAITALL) # raise Exception if socket does not return anything if len(new_byte) < 1: raise TimeoutError return None
Methods
def read_next_frame(self) ‑> tuple[numpy.ndarray, int]
-
Reads the next ammos audio frame from socket
Raises
TimeoutError
- Raises TimeoutError if the socket does not serve data anymore
Returns
tuple[np.ndarray, int]
- Contains the audio data and the sample rate
Expand source code
def read_next_frame(self) -> tuple[np.ndarray, int]: """Reads the next ammos audio frame from socket Raises: TimeoutError: Raises TimeoutError if the socket does not serve data anymore Returns: tuple[np.ndarray, int]: Contains the audio data and the sample rate """ # get first byte of the day self.__socket.settimeout(5) new_byte = self.__socket.recv(1, socket.MSG_WAITALL) # raise Exception if socket does not return anything if len(new_byte) < 1: raise TimeoutError # read loop while new_byte: # self.__magic_word_buffer.append(new_byte) byte_array = b''.join(self.__magic_word_buffer) if byte_array.hex() == AmmosGlobalFrameHeader.MAGIC_WORD: # print(byte_array.hex()) ammos_global_header_buffer = list(self.__magic_word_buffer) ammos_global_header_buffer.append(self.__get_next_data(20)) # while len(b''.join(ammos_global_header_buffer)) < 24: # ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer)))) ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer)) logger.info(ammos_global_header) if (ammos_global_header.data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): byte_array_header = self.__get_next_data(AmmosExtendedAudioDataHeader.HEADER_SIZE) # while len(b''.join(byte_array_header)) < 44: # byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header)))) ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(byte_array_header) logger.debug(str(ammos_extended_audio_data_header.number_of_samples or ''), str(ammos_extended_audio_data_header.number_of_channels or ''), str(ammos_extended_audio_data_header.sample_size or '')) audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * ammos_extended_audio_data_header.number_of_channels * ammos_extended_audio_data_header.sample_size) audio_array = self.__audio_data_body_to_numpy(audio_body) # print("44,256", len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate) logger.debug("%s, %s", len(audio_array), len(audio_array) / ammos_extended_audio_data_header.sample_rate) return [audio_array, ammos_extended_audio_data_header.sample_rate] elif (ammos_global_header.data_header_length == AmmosAudioDataHeader.HEADER_SIZE and ammos_global_header.frame_type == AmmosAudioDemodType.DIGITAL): byte_array_header = self.__get_next_data(AmmosAudioDataHeader.HEADER_SIZE) # while len(b''.join(byte_array_header)) < 36: # byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header)))) ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(byte_array_header) logger.debug(str(ammos_audio_data_header.number_of_samples or ''), str(ammos_audio_data_header.number_of_channels or ''), str(ammos_audio_data_header.sample_size or '')) audio_body = self.__get_next_data(ammos_extended_audio_data_header.number_of_samples * ammos_extended_audio_data_header.number_of_channels * ammos_extended_audio_data_header.sample_size) audio_array = self.__audio_data_body_to_numpy(audio_body) logger.debug("%s, %s", len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate) return [audio_array, ammos_audio_data_header.sample_rate] # get the next byte self.__socket.settimeout(5) new_byte = self.__socket.recv(1, socket.MSG_WAITALL) # raise Exception if socket does not return anything if len(new_byte) < 1: raise TimeoutError return None