Changeset ef16c0b in ammosreader


Ignore:
Timestamp:
06/29/22 11:01:47 (3 years ago)
Author:
Enrico Schwass <ennoausberlin@…>
Branches:
AmmosSource, guix
Children:
87b2e39
Parents:
b41e975
Message:

Refactoring to use AbstractAmmosReader as base class to avoid code duplication

Location:
ammosreader
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • ammosreader/AmmosAudioReader.py

    rb41e975 ref16c0b  
    1 """I parse an R&S AMMOS recording."""
     1"""I provide a specialized Ammos Reader for audio data."""
    22
    3 import os
    4 
     3from ammosreader.AbstractAmmosReader import AbstractAmmosReader
    54from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
    65from ammosreader.AmmosAudioDataHeader import AmmosAudioDataHeader
    76from ammosreader.AmmosExtendedAudioDataHeader import AmmosExtendedAudioDataHeader
    8 from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
    9 from ammosreader.AmmosSingleFrame import AmmosSingleFrame
    10 from ammosreader.AmmosContainer import AmmosContainer
    117from ammosreader.AmmosAudioDataBlock import AmmosAudioDataBlock
    128
    139
    14 class AmmosAudioReader():
     10class AmmosAudioReader(AbstractAmmosReader):
    1511    """I read the audio data embedded in an R&S AMMOS recording."""
    1612
    17     GLOBAL_HEADER_SIZE = 24  # 8 words
    1813    STANDARD_AUDIO_DATA_HEADER_SIZE = 36  # 9 words
    1914    EXTENDED_AUDIO_DATA_HEADER_SIZE = 44  # 11 words
     
    2621        :type file_name: str
    2722        """
    28         self.file_name = file_name
    29         self.file = open(self.file_name, "rb")
    30         self.file_size = os.path.getsize(self.file_name)
    31 
    32         self.container = AmmosContainer(self.file_name, [])
    33 
    34         self.tags = {}
    35 
    36     def rewind_to_start(self):
    37         """I set the file pointer to the beginning of the file for the next operation."""
    38         self.file.seek(0)
    39 
    40     def add_tag(self, tag):
    41         """
    42         I add a tag to my tag list.
    43 
    44         :param tag: The tag to add to my tag list
    45         :type tag: dict
    46         """
    47         self.tags[tag.key] = tag.value
    48 
    49     def read_all_frames_left(self):
    50         """
    51         I read all remaining frames into my container until end of file is reached.
    52 
    53         :return: a container containing all frames read
    54         :rtype: AmmosContainer
    55         """
    56         frames_read = 0
    57         while True:
    58             print("Reading single frame", frames_read, '...')
    59             current_frame = self.read_next_single_frame()
    60             if current_frame is not None:
    61                 frames_read += 1
    62                 self.container.add_frame(current_frame)
    63                 if frames_read % 10000 == 0:
    64                     print("#", end="")
    65             else:
    66                 print("Frame:", frames_read+1, " incomplete")
    67                 break
    68 
    69         print(len(self.container.global_frames), "frames read")
    70         return self.container
    71 
    72     def read_next_global_frame_header(self):
    73         """
    74         I return the next global frame header read from current position in file.
    75 
    76         :return: the next global frame header or None if incomplete
    77         :rtype: AmmosGlobalFrameHeader
    78         """
    79         bytes = self.file.read(AmmosAudioReader.GLOBAL_HEADER_SIZE)
    80         print("Reading next global frame header")
    81         if ((not bytes) or (len(bytes) < AmmosAudioReader.GLOBAL_HEADER_SIZE)):
    82             print("Can not read all", AmmosAudioReader.GLOBAL_HEADER_SIZE, "bytes of global frame header")
    83             return None
    84 
    85         # FIXME: Catch exceptions and add some asserts
    86         current_global_frame_header = AmmosGlobalFrameHeader.from_bytes(bytes)
    87         # print("Current global frame header", current_global_frame_header)
    88         return current_global_frame_header
     23        super().__init__(file_name)
    8924
    9025    def read_next_global_frame_body_data_header(self):
     
    9732        :rtype: AmmosAudioDataHeader
    9833        """
    99         bytes = self.file.read(AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)
     34        bytes = self.ammos_file.read(AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)
    10035
    10136        # print("\nReading global frame body standard data header\n")
     
    11348        :rtype: AmmosExtendedAudioDataHeader
    11449        """
    115         bytes = self.file.read(AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)
     50        bytes = self.ammos_file.read(AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)
    11651
    11752        if ((not bytes) or (len(bytes) < AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)):
     
    14176        total = sample_count*channel_count*sample_size
    14277
    143         byte_string = self.file.read(total)
     78        byte_string = self.ammos_file.read(total)
    14479
    14580        if len(byte_string) != total:
     
    14984        return AmmosAudioDataBlock(byte_string, channel_count, sample_count, sample_size)
    15085
    151     def read_next_global_frame_body(self, global_frame_header):
     86    def read_next_global_frame_body(self, data_header_length):
     87        """
     88        I return the next global frame body read from current position in file.
    15289
     90        :param global_frame_header:
     91        """
    15392        audio_data_header = None
    15493
    155         if global_frame_header.data_header_length == AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE:
     94        if data_header_length == AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE:
    15695            print("Read standard data header")
    15796            audio_data_header = self.read_next_global_frame_body_data_header()
    15897
    159         if global_frame_header.data_header_length == AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE:
     98        print("Data header length", data_header_length)
     99        if data_header_length == AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE:
    160100            print("Read extended data header")
    161101            audio_data_header = self.read_next_global_frame_body_extended_data_header()
     
    175115        return AmmosGlobalFrameBody(audio_data_header, audio_data_body)
    176116
    177     def read_next_single_frame(self):
     117    def pcm_for_channel(self, a_channel):
     118        """
     119        I return the raw pcm audio data for a given channel.
    178120
    179         global_frame_header = self.read_next_global_frame_header()
     121        :param a_channel: the channel I have to extract
     122        :type a_channel: int
    180123
    181         print(global_frame_header)
    182 
    183         if global_frame_header is None:
    184             print("Global frame header missing")
    185             return None
    186 
    187         if global_frame_header.data_header_length is None:
    188             print("Data header length empty")
    189             return None
    190 
    191         if global_frame_header.frame_type == 256:
    192             print("Audio Datastream found")
    193             global_frame_body = self.read_next_global_frame_body(global_frame_header)
    194             if global_frame_body is None:
    195                 return None
    196         else:
    197             print("Unsupported frame type", global_frame_header.frame_type, "found")
    198             return None
    199 
    200         ammos_single_frame = AmmosSingleFrame(global_frame_header, global_frame_body)
    201         return ammos_single_frame
    202 
    203     def pcm_for_channel(self, a_channel):
     124        :rtype: bytes
     125        """
     126        for each in self.container.global_frames:
     127            print(each.global_frame_body)
    204128        return b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) for each in self.container.global_frames])
  • ammosreader/AmmosIFReader.py

    rb41e975 ref16c0b  
    1 import math
    2 import os
     1"""I provide a specialized Ammos Reader for IF data."""
    32
    43from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
     
    1312class AmmosIFReader():
    1413
     14    STANDARD_IF_DATA_HEADER_SIZE = 56
     15    EXTENDED_IF_DATA_HEADER_SIZE = 76
     16
    1517    def __init__(self, file_name):
    16 
    17         self.file_name = file_name
    18         self.file = open(self.file_name, "rb")
    19 
    20         self.container = AmmosContainer(self.file_name, [])
    21 
    22         self.tags = []
    23 
    24     def add_tag(self, tag):
    25         self.tags.append(tag)
    26 
    27     def read_all_frames_left(self):
    28 
    29         frames_read = 0
    30 
    31         while True:
    32             # print("Reading single frame", frames_read, '...')
    33             current_frame = self.read_next_single_frame()
    34             if current_frame is not None:
    35                 frames_read += 1
    36                 self.container.add_frame(current_frame)
    37                 # if frames_read % 10000 == 0:
    38                 #    print("#", end="")
    39             else:
    40                 # print("Frame:", frames_read+1, " incomplete")
    41                 break
    42 
    43         # print(len(self.container.global_frames), "frames read")
    44 
    45     def read_next_global_frame_header(self):
    46         bytes = self.file.read(24)
    47         # print("Reading next global frame header")
    48         if ((not bytes) or (len(bytes) < 24)):
    49             # print("Can not read all 24 bytes of global frame header")
    50             return None
    51 
    52         return AmmosGlobalFrameHeader.from_bytes(bytes)
     18        super.__init__(file_name)
    5319
    5420    def read_next_global_frame_body_data_header(self):
    5521
    56         bytes = self.file.read(56)
     22        bytes = self.ammos_file.read(AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE)
    5723
    5824        # print("\nReading global frame body standard data header\n")
    59         if ((not bytes) or (len(bytes) < 56)):
     25        if ((not bytes) or (len(bytes) < AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE)):
    6026            # print("Can not read all 56 bytes of global frame body data header")
    6127            return None
     
    6733    def read_next_global_frame_body_extended_data_header(self):
    6834
    69         bytes = self.file.read(76)
     35        bytes = self.ammos_file.read(AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE)
    7036        # print("\nReading global frame body extended data header\n")
    7137
    72         if ((not bytes) or (len(bytes) < 76)):
     38        if ((not bytes) or (len(bytes) < AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE)):
    7339            # print("Can not read all ", 76, "bytes of global frame extended data header")
    7440            return None
     
    8753        total = n*block_length
    8854
    89         byte_string = self.file.read(block_length)
     55        byte_string = self.ammos_file.read(block_length)
    9056
    9157        if len(byte_string) != total:
     
    9965        return data_blocks
    10066
    101     def read_next_global_frame_body(self, global_frame_header):
     67    def read_next_global_frame_body(self, data_header_length):
    10268
    10369        if_data_header = None
    10470
    105         if global_frame_header.data_header_length == 56:
     71        if data_header_length == AmmosIFReader.STANDARD_IF_DATA_HEADER_SIZE:
    10672            if_data_header = self.read_next_global_frame_body_data_header()
    107         else:
     73        if data_header_length == AmmosIFReader.EXTENDED_IF_DATA_HEADER_SIZE:
    10874            if_data_header = self.read_next_global_frame_body_extended_data_header()
    10975
     
    11985
    12086        return AmmosGlobalFrameBody(if_data_header, if_data_body)
    121 
    122     def read_next_single_frame(self):
    123 
    124         global_frame_header = self.read_next_global_frame_header()
    125 
    126         # print("\nReading next global frame header\n", global_frame_header)
    127         # print("File pointer", self.file.tell())
    128 
    129         if global_frame_header is None:
    130             # print("Global frame header missing")
    131             return None
    132 
    133         if global_frame_header.data_header_length is None:
    134             # print("Data header length empty")
    135             return None
    136 
    137         if global_frame_header.frame_type == 2:
    138 
    139             global_frame_body = self.read_next_global_frame_body(global_frame_header)
    140             if global_frame_body is None:
    141                 return None
    142 
    143         else:
    144             # print("Unsupported frame type", global_frame_header.frame_type, "found")
    145             return None
    146 
    147         return AmmosSingleFrame(global_frame_header, global_frame_body)
  • ammosreader/CAConstants.py

    rb41e975 ref16c0b  
    11"""I provide several constants used in R&S software."""
    22
    3 import Enum
     3from enum import Enum
    44
    55
     
    4040    SCAN_TUNING = 0x4002
    4141    SCAN_LEVEL_TUNING = 0x4003
    42     DDF_RESERVED_RANGE = range(0x5000, 0x5100)
     42    DDF_RESERVED_START = 0x5000
     43    DDF_RESERVED_END = 0x50FF
Note: See TracChangeset for help on using the changeset viewer.