source: ammosreader/AmmosAudioSocketReader.py@ 6cb85c2

AmmosSource guix
Last change on this file since 6cb85c2 was 1e781ba, checked in by recknagel <recknagel@…>, 3 years ago

former radardex-project

  • Property mode set to 100644
File size: 5.1 KB
RevLine 
[1e781ba]1import socket
2from collections import deque
3
4import numpy as np
5from AmmosAudioDataHeader import AmmosAudioDataHeader
6from AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader
7from AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
8
9
10class AmmosAudioSocketReader:
11 def __init__(self, socket:socket.socket):
12 """
13 Initializes the AmmosAudioSocketReader
14
15 Args:
16 socket (socket.socket): socket to read from
17 """
18
19 self.__socket = socket
20
21 #buffer for reading socket bytewise und check for the magic word
22 self.__magic_word_buffer = deque(maxlen=4)
23
24 #input socket to read from
25 self.__socket = socket
26
27 def __read_next_audio_data_body(self, sample_count:int, channel_count:int, sample_size:int) -> bytearray:
28 """
29 reads the next audio data body
30
31 Args:
32 sample_count (int): amount of samples
33 channel_count (int): amount of channels
34 sample_size (int): size of a sample in bytes
35
36 Returns:
37 bytearray: contains the audio data
38 """
39
40 total = sample_count*channel_count*sample_size
41 byte_array = []
42
43 while len(b''.join(byte_array)) < total:
44 byte_array.append(self.__socket.recv(total - len(b''.join(byte_array))))
45
46 if len(b''.join(byte_array)) != total:
47 print("Can not read all", total, "bytes of data body")
48 return None
49 return b''.join(byte_array)
50
51 def __audio_data_body_to_numpy(self, audio_data_body:bytearray) -> np.ndarray:
52 """
53 converts the audio data body to a numpy array
54
55 Args:
56 audio_data_body (bytearray): audio data from audio data body
57
58 Returns:
59 np.ndarray: audio data as numpy array
60 """
61
62 return np.frombuffer(audio_data_body, dtype=np.int16)
63
64 def read_next_frame(self) -> tuple[bytearray, int]:
65 """
66 reads the next ammos audio frame
67
68 Returns:
69 tuple[bytearray, int]: contains the audio data and the sample rate
70 """
71 #read loop
72 byte = self.__socket.recv(1)
73
74 while byte:
75 #
76 self.__magic_word_buffer.append(byte)
77 byte_array = b''.join(self.__magic_word_buffer)
78
79 if byte_array.hex() == '726574fb':
80 #print(byte_array.hex())
81
82 ammos_global_header_buffer = list(self.__magic_word_buffer)
83 while len(b''.join(ammos_global_header_buffer)) < 24:
84 ammos_global_header_buffer.append(self.__socket.recv(24 - len(b''.join(ammos_global_header_buffer))))
85
86 ammos_global_header = AmmosGlobalFrameHeader.from_bytes(b''.join(ammos_global_header_buffer))
87 print(ammos_global_header)
88
89 if ammos_global_header.data_header_length == 44 and ammos_global_header.frame_type == 256:
90 byte_array_header = []
91 while len(b''.join(byte_array_header)) < 44:
92 byte_array_header.append(self.__socket.recv(44 - len(b''.join(byte_array_header))))
93
94 ammos_extended_audio_data_header = AmmosExtendedAudioDataHeader.from_bytes(b''.join(byte_array_header))
95 print(ammos_extended_audio_data_header.sample_count, ammos_extended_audio_data_header.channel_count, ammos_extended_audio_data_header.sample_size)
96 audio_body = self.__read_next_audio_data_body(ammos_extended_audio_data_header.sample_count,
97 ammos_extended_audio_data_header.channel_count,
98 ammos_extended_audio_data_header.sample_size)
99
100 audio_array = self.__audio_data_body_to_numpy(audio_body)
101 print(len(audio_array), len(audio_array)/ammos_extended_audio_data_header.sample_rate)
102
103 return [audio_array, ammos_extended_audio_data_header.sample_rate]
104
105 elif ammos_global_header.data_header_length == 36 and ammos_global_header.frame_type == 256:
106 byte_array_header = []
107 while len(b''.join(byte_array_header)) < 36:
108 byte_array_header.append(self.__socket.recv(36 - len(b''.join(byte_array_header))))
109
110 ammos_audio_data_header = AmmosAudioDataHeader.from_bytes(b''.join(byte_array_header))
111 print(ammos_audio_data_header.sample_count, ammos_audio_data_header.channel_count, ammos_audio_data_header.sample_size)
112 audio_body = self.__read_next_audio_data_body(ammos_audio_data_header.sample_count,
113 ammos_audio_data_header.channel_count,
114 ammos_audio_data_header.sample_size)
115
116 audio_array = self.__audio_data_body_to_numpy(audio_body)
117 print(len(audio_array), len(audio_array)/ammos_audio_data_header.sample_rate)
118
119 return [audio_array, ammos_audio_data_header.sample_rate]
120
121 byte = self.__socket.recv(1)
122
123 return None
Note: See TracBrowser for help on using the repository browser.