AmmosReader 0.314 documentation
"""I parse an R&S AMMOS recording."""
import os
from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
from ammosreader.AmmosAudioDataHeader import AmmosAudioDataHeader
from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader
from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
from ammosreader.AmmosSingleFrame import AmmosSingleFrame
from ammosreader.AmmosContainer import AmmosContainer
[docs]class AmmosAudioReader():
"""I read the audio data embedded in an R&S AMMOS recording."""
GLOBAL_HEADER_SIZE = 24 # 8 words
STANDARD_AUDIO_DATA_HEADER_SIZE = 36 # 9 words
EXTENDED_AUDIO_DATA_HEADER_SIZE = 44 # 11 words
def __init__(self, file_name):
"""
I return an instance of AmmosAudioReader initialized with a given file name.
:param file_name: the file to read from
:type file_name: str
"""
self.file_name = file_name
self.file = open(self.file_name, "rb")
self.file_size = os.path.getsize(self.file_name)
self.container = AmmosContainer(self.file_name, [])
self.tags = {}
[docs] def rewind_to_start(self):
"""I set the file pointer to the beginning of the file for the next operation."""
self.file.seek(0)
[docs] def add_tag(self, tag):
"""
I add a tag to my tag list.
:param tag: The tag to add to my tag list
:type tag: dict
"""
self.tags[tag.key] = tag.value
[docs] def read_all_frames_left(self):
"""
I read all remaining frames into my container until end of file is reached.
:return: a container containing all frames read
:rtype: AmmosContainer
"""
frames_read = 0
while True:
print("Reading single frame", frames_read, '...')
current_frame = self.read_next_single_frame()
if current_frame is not None:
frames_read += 1
self.container.add_frame(current_frame)
if frames_read % 10000 == 0:
print("#", end="")
else:
print("Frame:", frames_read+1, " incomplete")
break
print(len(self.container.global_frames), "frames read")
return self.container
[docs] def read_next_global_frame_header(self):
"""
I return the next global frame header read from current position in file.
:return: the next global frame header or None if incomplete
:rtype: AmmosGlobalFrameHeader
"""
bytes = self.file.read(AmmosAudioReader.GLOBAL_HEADER_SIZE)
print("Reading next global frame header")
if ((not bytes) or (len(bytes) < AmmosAudioReader.GLOBAL_HEADER_SIZE)):
print("Can not read all", AmmosAudioReader.GLOBAL_HEADER_SIZE, "bytes of global frame header")
return None
# FIXME: Catch exceptions and add some asserts
current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(bytes)
# print("Current global frame header", current_global_frame_header)
return current_global_frame_header
[docs] def read_next_global_frame_body_data_header(self):
"""
I return the next global frame body data header from current position in file.
:param data_header_size: the number of bytes to read
:type data_header_size: int
:return: the next Ammos Audio Data header or None if incomplete
:rtype: AmmosAudioDataHeader
"""
bytes = self.file.read(AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)
# print("\nReading global frame body standard data header\n")
if ((not bytes) or (len(bytes) < AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)):
print("Can not read all", AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE,
"bytes of global frame body data header")
return None
return AmmosAudioDataHeader.from_bytes(bytes)
[docs] def read_next_global_frame_body_extended_data_header(self):
"""
I return the next global frame body extended data header from current position in file.
:return: the next Ammos Audio Extended Data header or None if incomplete
:rtype: AmmosExtendedAudioDataHeader
"""
bytes = self.file.read(AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)
if ((not bytes) or (len(bytes) < AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)):
print("Can not read all ", AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE,
" bytes of global frame extended data header")
return None
return AmmosExtendedAudioDataHeader.from_bytes(bytes)
[docs] def read_next_audio_data_body(self, sample_count, channel_count, sample_size):
"""
I return the next audio data read from current position in file.
:param sample_count: the number of samples per channel inside data body
:type sample_count: int
:param channel_count: number of channels (e.g. mono, stereo or even more)
:type channel_count: int
:param sample_size: sample size in bytes (1, 2 or 4 bytes)
:type sample_size: int
:return: the next audio data or None if incomplete
:rtype: bytes
"""
# FIXME: Describe the parameters better
total = sample_count*channel_count*sample_size
byte_string = self.file.read(total)
if len(byte_string) != total:
print("Can not read all", total, "bytes of data body")
return None
print([hex(c) for c in byte_string])
return byte_string
[docs] def read_next_global_frame_body(self, global_frame_header):
audio_data_header = None
if global_frame_header.data_header_length == AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE:
print("Read standard data header")
audio_data_header = self.read_next_global_frame_body_data_header()
if global_frame_header.data_header_length == AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE:
print("Read extended data header")
audio_data_header = self.read_next_global_frame_body_extended_data_header()
if audio_data_header is None:
print("Data header missing or format unknown")
return None
audio_data_body = self.read_next_audio_data_body(audio_data_header.sample_count,
audio_data_header.channel_count,
audio_data_header.sample_size)
if audio_data_body is None:
print("Data body missing")
return None
return AmmosGlobalFrameBody(audio_data_header, audio_data_body)
[docs] def read_next_single_frame(self):
global_frame_header = self.read_next_global_frame_header()
print(global_frame_header)
if global_frame_header is None:
print("Global frame header missing")
return None
if global_frame_header.data_header_length is None:
print("Data header length empty")
return None
if global_frame_header.frame_type == 256:
print("Audio Datastream found")
global_frame_body = self.read_next_global_frame_body(global_frame_header)
if global_frame_body is None:
return None
else:
print("Unsupported frame type", global_frame_header.frame_type, "found")
return None
ammos_single_frame = AmmosSingleFrame(global_frame_header, global_frame_body)
return ammos_single_frame