AmmosReader 0.314 documentation

Source code for ammosreader.AmmosAudioReader

"""I parse an R&S AMMOS recording."""

import os

from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
from ammosreader.AmmosAudioDataHeader import AmmosAudioDataHeader
from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader
from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
from ammosreader.AmmosSingleFrame import AmmosSingleFrame
from ammosreader.AmmosContainer import AmmosContainer


[docs]class AmmosAudioReader(): """I read the audio data embedded in an R&S AMMOS recording.""" GLOBAL_HEADER_SIZE = 24 # 8 words STANDARD_AUDIO_DATA_HEADER_SIZE = 36 # 9 words EXTENDED_AUDIO_DATA_HEADER_SIZE = 44 # 11 words def __init__(self, file_name): """ I return an instance of AmmosAudioReader initialized with a given file name. :param file_name: the file to read from :type file_name: str """ self.file_name = file_name self.file = open(self.file_name, "rb") self.file_size = os.path.getsize(self.file_name) self.container = AmmosContainer(self.file_name, []) self.tags = {}
[docs] def rewind_to_start(self): """I set the file pointer to the beginning of the file for the next operation.""" self.file.seek(0)
[docs] def add_tag(self, tag): """ I add a tag to my tag list. :param tag: The tag to add to my tag list :type tag: dict """ self.tags[tag.key] = tag.value
[docs] def read_all_frames_left(self): """ I read all remaining frames into my container until end of file is reached. :return: a container containing all frames read :rtype: AmmosContainer """ frames_read = 0 while True: print("Reading single frame", frames_read, '...') current_frame = self.read_next_single_frame() if current_frame is not None: frames_read += 1 self.container.add_frame(current_frame) if frames_read % 10000 == 0: print("#", end="") else: print("Frame:", frames_read+1, " incomplete") break print(len(self.container.global_frames), "frames read") return self.container
[docs] def read_next_global_frame_header(self): """ I return the next global frame header read from current position in file. :return: the next global frame header or None if incomplete :rtype: AmmosGlobalFrameHeader """ bytes = self.file.read(AmmosAudioReader.GLOBAL_HEADER_SIZE) print("Reading next global frame header") if ((not bytes) or (len(bytes) < AmmosAudioReader.GLOBAL_HEADER_SIZE)): print("Can not read all", AmmosAudioReader.GLOBAL_HEADER_SIZE, "bytes of global frame header") return None # FIXME: Catch exceptions and add some asserts current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(bytes) # print("Current global frame header", current_global_frame_header) return current_global_frame_header
[docs] def read_next_global_frame_body_data_header(self): """ I return the next global frame body data header from current position in file. :param data_header_size: the number of bytes to read :type data_header_size: int :return: the next Ammos Audio Data header or None if incomplete :rtype: AmmosAudioDataHeader """ bytes = self.file.read(AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE) # print("\nReading global frame body standard data header\n") if ((not bytes) or (len(bytes) < AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)): print("Can not read all", AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE, "bytes of global frame body data header") return None return AmmosAudioDataHeader.from_bytes(bytes)
[docs] def read_next_global_frame_body_extended_data_header(self): """ I return the next global frame body extended data header from current position in file. :return: the next Ammos Audio Extended Data header or None if incomplete :rtype: AmmosExtendedAudioDataHeader """ bytes = self.file.read(AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE) if ((not bytes) or (len(bytes) < AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)): print("Can not read all ", AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE, " bytes of global frame extended data header") return None return AmmosExtendedAudioDataHeader.from_bytes(bytes)
[docs] def read_next_audio_data_body(self, sample_count, channel_count, sample_size): """ I return the next audio data read from current position in file. :param sample_count: the number of samples per channel inside data body :type sample_count: int :param channel_count: number of channels (e.g. mono, stereo or even more) :type channel_count: int :param sample_size: sample size in bytes (1, 2 or 4 bytes) :type sample_size: int :return: the next audio data or None if incomplete :rtype: bytes """ # FIXME: Describe the parameters better total = sample_count*channel_count*sample_size byte_string = self.file.read(total) if len(byte_string) != total: print("Can not read all", total, "bytes of data body") return None print([hex(c) for c in byte_string]) return byte_string
[docs] def read_next_global_frame_body(self, global_frame_header): audio_data_header = None if global_frame_header.data_header_length == AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE: print("Read standard data header") audio_data_header = self.read_next_global_frame_body_data_header() if global_frame_header.data_header_length == AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE: print("Read extended data header") audio_data_header = self.read_next_global_frame_body_extended_data_header() if audio_data_header is None: print("Data header missing or format unknown") return None audio_data_body = self.read_next_audio_data_body(audio_data_header.sample_count, audio_data_header.channel_count, audio_data_header.sample_size) if audio_data_body is None: print("Data body missing") return None return AmmosGlobalFrameBody(audio_data_header, audio_data_body)
[docs] def read_next_single_frame(self): global_frame_header = self.read_next_global_frame_header() print(global_frame_header) if global_frame_header is None: print("Global frame header missing") return None if global_frame_header.data_header_length is None: print("Data header length empty") return None if global_frame_header.frame_type == 256: print("Audio Datastream found") global_frame_body = self.read_next_global_frame_body(global_frame_header) if global_frame_body is None: return None else: print("Unsupported frame type", global_frame_header.frame_type, "found") return None ammos_single_frame = AmmosSingleFrame(global_frame_header, global_frame_body) return ammos_single_frame