source: ammosreader/AmmosAudioReader.py@ 44aebd3

AmmosSource guix
Last change on this file since 44aebd3 was 1e781ba, checked in by recknagel <recknagel@…>, 3 years ago

former radardex-project

  • Property mode set to 100644
File size: 4.9 KB
Line 
1import math
2import os
3
4from AmmosGlobalFrameBody import AmmosGlobalFrameBody
5from AmmosAudioDataHeader import AmmosAudioDataHeader
6from AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader
7from AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
8from AmmosSingleFrame import AmmosSingleFrame
9from AmmosContainer import AmmosContainer
10
11
12class AmmosAudioReader():
13
14 def __init__(self, file_name):
15
16 self.file_name = file_name
17 self.file = open(self.file_name, "rb")
18 self.file_size = os.path.getsize(self.file_name)
19
20 self.container = AmmosContainer(self.file_name, [])
21
22 self.tags = []
23
24 def rewind_to_start(self):
25 self.file.seek(0)
26
27 def add_tag(self, tag):
28 self.tags.append(tag)
29
30 def read_all_frames_left(self):
31
32 frames_read = 0
33
34 while True:
35 print("Reading single frame", frames_read, '...')
36 current_frame = self.read_next_single_frame()
37 if current_frame is not None:
38 frames_read += 1
39 self.container.add_frame(current_frame)
40 if frames_read % 10000 == 0:
41 print("#", end="")
42 else:
43 print("Frame:", frames_read+1, " incomplete")
44 break
45
46 print(len(self.container.global_frames), "frames read")
47
48 def read_next_global_frame_header(self):
49 bytes = self.file.read(24)
50 print("Reading next global frame header")
51 if ((not bytes) or (len(bytes) < 24)):
52 print("Can not read all 24 bytes of global frame header")
53 return None
54
55 current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(bytes)
56 print("Current global frame header", current_global_frame_header)
57 return current_global_frame_header
58
59 def read_next_global_frame_body_data_header(self):
60
61 bytes = self.file.read(56)
62
63 # print("\nReading global frame body standard data header\n")
64 if ((not bytes) or (len(bytes) < 56)):
65 print("Can not read all 56 bytes of global frame body data header")
66 return None
67
68 data_header = AmmosAudioDataHeader.from_bytes(bytes)
69 # print("Data header", data_header)
70 return data_header
71
72 def read_next_global_frame_body_extended_data_header(self):
73
74 bytes = self.file.read(44)
75 # print("\nReading global frame body extended data header\n")
76
77 if ((not bytes) or (len(bytes) < 44)):
78 print("Can not read all ", 44, "bytes of global frame extended data header")
79 return None
80 extended_data_header = AmmosExtendedAudioDataHeader.from_bytes(bytes)
81 # print("Extended data header", extended_data_header)
82 return extended_data_header
83
84 def read_next_audio_data_body(self, sample_count, channel_count, sample_size):
85
86 # FIXME: Describe the parameters better
87
88 total = sample_count*channel_count*sample_size
89
90 byte_string = self.file.read(total)
91
92 if len(byte_string) != total:
93 print("Can not read all", total, "bytes of data body")
94 return None
95 print([hex(c) for c in byte_string])
96 return byte_string
97
98 def read_next_global_frame_body(self, global_frame_header):
99
100 audio_data_header = None
101
102 if global_frame_header.data_header_length == 36:
103 print("Read standard data header")
104 audio_data_header = self.read_next_global_frame_body_data_header()
105
106 else:
107 print("Read extended data header")
108 audio_data_header = self.read_next_global_frame_body_extended_data_header()
109
110 if audio_data_header is None:
111 print("Data header missing")
112 return None
113
114 audio_data_body = self.read_next_audio_data_body(audio_data_header.sample_count,
115 audio_data_header.channel_count,
116 audio_data_header.sample_size)
117
118 if audio_data_body is None:
119 print("Data body missing")
120 return None
121
122 return AmmosGlobalFrameBody(audio_data_header, audio_data_body)
123
124 def read_next_single_frame(self):
125
126 global_frame_header = self.read_next_global_frame_header()
127
128 if global_frame_header is None:
129 print("Global frame header missing")
130 return None
131
132 if global_frame_header.data_header_length is None:
133 print("Data header length empty")
134 return None
135
136 if global_frame_header.frame_type == 256:
137 print("Audio Datastream found")
138 global_frame_body = self.read_next_global_frame_body(global_frame_header)
139 if global_frame_body is None:
140 return None
141 else:
142 print("Unsupported frame type", global_frame_header.frame_type, "found")
143 return None
144
145 ammos_single_frame = AmmosSingleFrame(global_frame_header, global_frame_body)
146 return ammos_single_frame
Note: See TracBrowser for help on using the repository browser.