import math import os from AmmosGlobalFrameBody import AmmosGlobalFrameBody from AmmosAudioDataHeader import AmmosAudioDataHeader from AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader from AmmosGlobalFrameHeader import AmmosGlobalFrameHeader from AmmosSingleFrame import AmmosSingleFrame from AmmosContainer import AmmosContainer class AmmosAudioReader(): def __init__(self, file_name): self.file_name = file_name self.file = open(self.file_name, "rb") self.file_size = os.path.getsize(self.file_name) self.container = AmmosContainer(self.file_name, []) self.tags = [] def rewind_to_start(self): self.file.seek(0) def add_tag(self, tag): self.tags.append(tag) def read_all_frames_left(self): frames_read = 0 while True: print("Reading single frame", frames_read, '...') current_frame = self.read_next_single_frame() if current_frame is not None: frames_read += 1 self.container.add_frame(current_frame) if frames_read % 10000 == 0: print("#", end="") else: print("Frame:", frames_read+1, " incomplete") break print(len(self.container.global_frames), "frames read") def read_next_global_frame_header(self): bytes = self.file.read(24) print("Reading next global frame header") if ((not bytes) or (len(bytes) < 24)): print("Can not read all 24 bytes of global frame header") return None current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(bytes) print("Current global frame header", current_global_frame_header) return current_global_frame_header def read_next_global_frame_body_data_header(self): bytes = self.file.read(56) # print("\nReading global frame body standard data header\n") if ((not bytes) or (len(bytes) < 56)): print("Can not read all 56 bytes of global frame body data header") return None data_header = AmmosAudioDataHeader.from_bytes(bytes) # print("Data header", data_header) return data_header def read_next_global_frame_body_extended_data_header(self): bytes = self.file.read(44) # print("\nReading global frame body extended data header\n") if ((not bytes) or (len(bytes) < 44)): print("Can not read all ", 44, "bytes of global frame extended data header") return None extended_data_header = AmmosExtendedAudioDataHeader.from_bytes(bytes) # print("Extended data header", extended_data_header) return extended_data_header def read_next_audio_data_body(self, sample_count, channel_count, sample_size): # FIXME: Describe the parameters better total = sample_count*channel_count*sample_size byte_string = self.file.read(total) if len(byte_string) != total: print("Can not read all", total, "bytes of data body") return None print([hex(c) for c in byte_string]) return byte_string def read_next_global_frame_body(self, global_frame_header): audio_data_header = None if global_frame_header.data_header_length == 36: print("Read standard data header") audio_data_header = self.read_next_global_frame_body_data_header() else: print("Read extended data header") audio_data_header = self.read_next_global_frame_body_extended_data_header() if audio_data_header is None: print("Data header missing") return None audio_data_body = self.read_next_audio_data_body(audio_data_header.sample_count, audio_data_header.channel_count, audio_data_header.sample_size) if audio_data_body is None: print("Data body missing") return None return AmmosGlobalFrameBody(audio_data_header, audio_data_body) def read_next_single_frame(self): global_frame_header = self.read_next_global_frame_header() if global_frame_header is None: print("Global frame header missing") return None if global_frame_header.data_header_length is None: print("Data header length empty") return None if global_frame_header.frame_type == 256: print("Audio Datastream found") global_frame_body = self.read_next_global_frame_body(global_frame_header) if global_frame_body is None: return None else: print("Unsupported frame type", global_frame_header.frame_type, "found") return None ammos_single_frame = AmmosSingleFrame(global_frame_header, global_frame_body) return ammos_single_frame