Changeset d7ea525 in ammosreader


Ignore:
Timestamp:
06/29/22 13:11:15 (3 years ago)
Author:
Enrico Schwass <ennoausberlin@…>
Branches:
AmmosSource, guix
Children:
4f0cd38
Parents:
87b2e39
Message:

massive refactoring

Location:
ammosreader
Files:
1 added
8 edited
1 moved

Legend:

Unmodified
Added
Removed
  • ammosreader/AbstractAmmosReader.py

    r87b2e39 rd7ea525  
    77from ammosreader.AmmosSingleFrame import AmmosSingleFrame
    88from ammosreader.AmmosContainer import AmmosContainer
    9 from ammosreader.CAConstants import FrameType
     9from ammosreader.AmmosConstants import FrameType
    1010
    1111
  • ammosreader/AmmosAudioDataBody.py

    r87b2e39 rd7ea525  
    1 class AmmosAudioDataBlock:
     1"""I provide an AMMOS data block for audio data frames."""
     2
     3
     4class AmmosAudioDataBody:
     5    """I implement an AMMOS data block for audio data frames."""
    26
    37    def __init__(self, pcm_data, number_of_channels=1, samples_per_channel=1, sample_size=1):
    4 
     8        """I return a new AMMOS data block for audio data frames."""
    59        self.__number_of_channels = number_of_channels
    610        self.__samples_per_channel = samples_per_channel
     
    1014    @property
    1115    def data(self):
     16        """I return the raw pcm data with channels interweaved."""
    1217        return self.__data
    1318
    1419    def pcm_for_channel(self, channel_number):
     20        """I return the raw pcm data for a given channel."""
    1521        start_offset = channel_number * self.__sample_size
    1622        step = self.__sample_size * self.__number_of_channels
     
    2228
    2329    def __str__(self):
     30        """I return the string representation of myself."""
    2431        return ("Number of channels:" + str(self.__number_of_channels) +
    2532                "\nSamples per channel:" + str(self.__samples_per_channel) +
  • ammosreader/AmmosAudioDataHeader.py

    r87b2e39 rd7ea525  
     1"""I provide an AMMOS data header for audio data frames."""
     2
    13import struct
    2 import numpy
     4from ammosreader.AmmosConstants import AmmosAudioDemodType
    35
    4 class AmmosAudioDemodType():
    5 
    6     @classmethod
    7     @property
    8     def mapping(cls):
    9         return {0: 'FM', 1: 'AM', 5: 'ISB', 6: 'CW',
    10                 7: 'USB', 8: 'LSB', 256: 'DIGITAL',
    11                 0xFFFFFFFF: 'UNKNOWN'}
    12 
    13     def __init__(self, demod_type):
    14         self.demod_type = demod_type
    15 
    16     def __str__(self):
    17         return AmmosAudioDemodType.mapping[self.demod_type]
    186
    197class AmmosAudioDataHeader():
     8    """I implement an AMMOS data header for audio data frames."""
     9
     10    HEADER_SIZE = 36  # 9 words
    2011
    2112    @classmethod
    2213    def from_bytes(cls, bytes):
     14        """I return an AMMOS data header from given bytes."""
     15        assert len(bytes) == cls.HEADER_SIZE
    2316        elements = struct.unpack('<IIQIIIII', bytes)
    2417        sample_rate = elements[0]
     
    2619        frequency = elements[2]
    2720        demod_bandwidth = elements[3]
    28         demod_type = elements[4]
    29         sample_count = elements[5]
    30         channel_count = elements[6]
     21        demod_type = AmmosAudioDemodType(elements[4])
     22        number_of_samples = elements[5]
     23        number_of_channels = elements[6]
    3124        sample_size = elements[7]
    3225        return AmmosAudioDataHeader(sample_rate, status, frequency, demod_bandwidth, demod_type,
    33                                     sample_count, channel_count, sample_size)
     26                                    number_of_samples, number_of_channels, sample_size)
    3427
    35     def __init__(self, sample_rate, status, frequency, demod_bandwidth, demod_type, sample_count, channel_count, sample_size):
     28    def __init__(self, sample_rate, status, frequency, demod_bandwidth, demod_type, number_of_samples,
     29                 number_of_channels, sample_size):
     30        """I create a new instance of myself using the above parameters."""
    3631        self.sample_rate = sample_rate
    3732        self.status = status
     
    3934        self.demod_bandwidth = demod_bandwidth
    4035        self.demod_type = AmmosAudioDemodType(demod_type)
    41         self.sample_count = sample_count
    42         self.channel_count = channel_count
     36        self.number_of_samples = number_of_samples
     37        self.number_of_channels = number_of_channels
    4338        self.sample_size = sample_size
    4439
    4540    def __str__(self):
     41        """I return the string representation of myself."""
    4642        return ("\nAmmosAudioDataHeader\n" +
    4743                "Sample rate:" + str(self.sample_rate) + "\n" +
     
    5046                "Demodulation bandwidth:" + str(self.demod_bandwidth) + "\n" +
    5147                "Demodulation type:" + str(self.demod_type) + "\n" +
    52                 "Sample count:" + str(self.sample_count) + "\n" +
    53                 "Channel count:" + str(self.channel_count) + "\n" +
     48                "Sample count:" + str(self.number_of_samples) + "\n" +
     49                "Channel count:" + str(self.number_of_channels) + "\n" +
    5450                "Sample size:" + str(self.sample_size) + "\n")
  • ammosreader/AmmosAudioReader.py

    r87b2e39 rd7ea525  
    11"""I provide a specialized Ammos Reader for audio data."""
     2import logging
    23
    34from ammosreader.AbstractAmmosReader import AbstractAmmosReader
     
    1011class AmmosAudioReader(AbstractAmmosReader):
    1112    """I read the audio data embedded in an R&S AMMOS recording."""
    12 
    13     STANDARD_AUDIO_DATA_HEADER_SIZE = 36  # 9 words
    14     EXTENDED_AUDIO_DATA_HEADER_SIZE = 44  # 11 words
    1513
    1614    def __init__(self, file_name):
     
    3230        :rtype: AmmosAudioDataHeader
    3331        """
    34         bytes = self.ammos_file.read(AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)
     32        header_size = AmmosAudioDataHeader.HEADER_SIZE
    3533
    36         # print("\nReading global frame body standard data header\n")
    37         if ((not bytes) or (len(bytes) < AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE)):
    38             print("Can not read all", AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE,
    39                   "bytes of global frame body data header")
     34        bytes = self.ammos_file.read(header_size)
     35
     36        logging.info("\nReading global frame body standard data header\n")
     37        if ((not bytes) or (len(bytes) < header_size)):
     38            logging.debug("Can not read all %s bytes of global frame body data header", header_size)
    4039            return None
    4140        return AmmosAudioDataHeader.from_bytes(bytes)
     
    4847        :rtype: AmmosExtendedAudioDataHeader
    4948        """
    50         bytes = self.ammos_file.read(AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)
     49        header_size = AmmosExtendedAudioDataHeader.HEADER_SIZE
    5150
    52         if ((not bytes) or (len(bytes) < AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE)):
    53             print("Can not read all ", AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE,
    54                   " bytes of global frame extended data header")
     51        bytes = self.ammos_file.read(header_size)
     52
     53        if ((not bytes) or (len(bytes) < header_size)):
     54            logging.debug("Can not read all %s bytes of global frame extended data header", header_size)
    5555            return None
    5656        return AmmosExtendedAudioDataHeader.from_bytes(bytes)
    5757
    58     def read_next_audio_data_body(self, sample_count, channel_count, sample_size):
     58    def read_next_audio_data_body(self, number_of_samples, number_of_channels, sample_size):
    5959        """
    6060        I return the next audio data read from current position in file.
    6161
    62         :param sample_count: the number of samples per channel inside data body
    63         :type sample_count: int
     62        :param number_of_samples: the number of samples per channel inside data body
     63        :type number_of_samples: int
    6464
    65         :param channel_count: number of channels (e.g. mono, stereo or even more)
    66         :type channel_count: int
     65        :param number_of_channels: number of channels (e.g. mono, stereo or even more)
     66        :type number_of_channels: int
    6767
    6868        :param sample_size: sample size in bytes (1, 2 or 4 bytes)
     
    7272        :rtype: bytes
    7373        """
    74         # FIXME: Describe the parameters better
    75 
    76         total = sample_count*channel_count*sample_size
     74        total = number_of_samples*number_of_channels*sample_size
    7775
    7876        byte_string = self.ammos_file.read(total)
    7977
    8078        if len(byte_string) != total:
    81             print("Can not read all", total, "bytes of data body")
     79            logging.debug("Can not read all %s bytes of data body", total)
    8280            return None
    83         # print([hex(c) for c in byte_string])
    84         return AmmosAudioDataBlock(byte_string, channel_count, sample_count, sample_size)
     81        return AmmosAudioDataBlock(byte_string, number_of_channels, number_of_samples, sample_size)
    8582
    8683    def read_next_global_frame_body(self, data_header_length):
     
    9289        audio_data_header = None
    9390
    94         if data_header_length == AmmosAudioReader.STANDARD_AUDIO_DATA_HEADER_SIZE:
    95             print("Read standard data header")
     91        if data_header_length == AmmosAudioDataHeader.HEADER_SIZE:
     92            logging.info("Read standard data header")
    9693            audio_data_header = self.read_next_global_frame_body_data_header()
    9794
    98         print("Data header length", data_header_length)
    99         if data_header_length == AmmosAudioReader.EXTENDED_AUDIO_DATA_HEADER_SIZE:
    100             print("Read extended data header")
     95        logging.info("Data header length %s", data_header_length)
     96        if data_header_length == AmmosExtendedAudioDataHeader.HEADER_SIZE:
     97            logging.info("Read extended data header")
    10198            audio_data_header = self.read_next_global_frame_body_extended_data_header()
    10299
    103100        if audio_data_header is None:
    104             print("Data header missing or format unknown")
     101            logging.debug("Data header missing or format unknown")
    105102            return None
    106103
    107         audio_data_body = self.read_next_audio_data_body(audio_data_header.sample_count,
    108                                                          audio_data_header.channel_count,
     104        audio_data_body = self.read_next_audio_data_body(audio_data_header.number_of_samples,
     105                                                         audio_data_header.number_of_channels,
    109106                                                         audio_data_header.sample_size)
    110107
    111108        if audio_data_body is None:
    112             print("Data body missing")
     109            logging.debug("Data body missing")
    113110            return None
    114111
     
    124121        :rtype: bytes
    125122        """
    126         for each in self.container.global_frames:
    127             print(each.global_frame_body)
    128123        return b"".join([each.global_frame_body.data_body.pcm_for_channel(a_channel) for each in self.container.global_frames])
  • ammosreader/AmmosConstants.py

    r87b2e39 rd7ea525  
    4242    DDF_RESERVED_START = 0x5000
    4343    DDF_RESERVED_END = 0x50FF
     44
     45
     46class AmmosAudioDemodType(int, Enum):
     47    """I map numbers to human readable demodulation types."""
     48
     49    FM = 0
     50    AM = 1
     51    ISB = 5
     52    CW = 6
     53    USB = 7
     54    LSB = 8
     55    DIGITAL = 256
     56    UNKNOWN = 0xFFFFFFFF
  • ammosreader/AmmosExtendedAudioDataHeader.py

    r87b2e39 rd7ea525  
     1"""I provide an Ammos extended data header for audio data frames."""
     2
    13import struct
    2 import numpy as np
    34from ammosreader.AmmosAudioDataHeader import AmmosAudioDataHeader
    45
     
    67class AmmosExtendedAudioDataHeader():
    78
     9    HEADER_SIZE = 44  # 11 words
     10
    811    @classmethod
    912    def from_bytes(cls, bytes):
    10         standard_header = AmmosAudioDataHeader.from_bytes(bytes[0:36])
    11         extended_header_elements = struct.unpack('Q', bytes[36:])
     13        """I return a new AMMOS extended data header for audio frames built from given bytes."""
     14        assert len(bytes) == cls.HEADER_SIZE
     15        standard_header = AmmosAudioDataHeader.from_bytes(bytes[0:AmmosAudioDataHeader.HEADER_SIZE])
     16        extended_header_elements = struct.unpack('Q', bytes[AmmosAudioDataHeader.HEADER_SIZE:])
    1217        timestamp = extended_header_elements[0]
    1318        sample_rate = standard_header.sample_rate
     
    1621        demod_bandwidth = standard_header.demod_bandwidth
    1722        demod_type = standard_header.demod_type
    18         sample_count = standard_header.sample_count
    19         channel_count = standard_header.channel_count
     23        number_of_samples = standard_header.number_of_samples
     24        number_of_channels = standard_header.number_of_channels
    2025        sample_size = standard_header.sample_size
    2126        return AmmosExtendedAudioDataHeader(sample_rate, status, frequency, demod_bandwidth, demod_type,
    22                                             sample_count, channel_count, sample_size, timestamp)
     27                                            number_of_samples, number_of_channels, sample_size, timestamp)
    2328
    2429    def __init__(self, sample_rate, status, frequency, demod_bandwidth, demod_type,
    25                  sample_count, channel_count, sample_size, timestamp):
     30                 number_of_samples, number_of_channels, sample_size, timestamp):
     31        """I return a new AMMOS extended data header for audio frames built from given parameters."""
    2632        self.sample_rate = sample_rate
    2733        self.status = status
     
    2935        self.demod_bandwidth = demod_bandwidth
    3036        self.demod_type = demod_type
    31         self.sample_count = sample_count
    32         self.channel_count = channel_count
     37        self.number_of_samples = number_of_samples
     38        self.number_of_channels = number_of_channels
    3339        self.sample_size = sample_size
    3440        self.timestamp = timestamp
  • ammosreader/AmmosExtendedIFDataHeader.py

    r87b2e39 rd7ea525  
    55
    66class AmmosExtendedIFDataHeader():
     7    """I implement an Ammos extended data header for IF data frames."""
     8
     9    HEADER_SIZE = 76
    710
    811    @classmethod
    912    def from_bytes(cls, bytes):
    10         standard_header = AmmosIFDataHeader.from_bytes(bytes[0:56])
    11         extended_header_elements = struct.unpack('QQI', bytes[56:76])
     13        """I return an AMMOS extended data header from given bytes."""
     14        standard_header = AmmosIFDataHeader.from_bytes(bytes[0:AmmosIFDataHeader.HEADER_SIZE])
     15        extended_header_elements = struct.unpack('QQI', bytes[AmmosIFDataHeader.HEADER_SIZE:cls.HEADER_SIZE])
    1216        block_count = standard_header.block_count
    1317        block_length = standard_header.block_length
     
    3438                 bandwidth, sample_rate, interpolation, decimation, voltage_ref, stream_start, sample_counter,
    3539                 antenna_correction):
    36 
     40        """I create a new instance of myself using the above parameters."""
    3741        self.size = size
    3842        self.block_count = block_count
     
    6468            "Stream start:" + str(self.stream_start) + "\n" +
    6569            "Sample counter:" + str(self.sample_counter) + "\n" +
    66             "Antenna correction:" + str(self.antenna_correction) + "\n"
    67         )
    68 
     70            "Antenna correction:" + str(self.antenna_correction) + "\n")
    6971        return output
  • ammosreader/AmmosIFDataBlock.py

    r87b2e39 rd7ea525  
     1"""I provide an AMMOS data block for IF data frames."""
     2
     3
    14class AmmosIFDataBlock():
     5    """I implement an AMMOS data block for IF data frames."""
    26
    3     def __init__(self, if_datablock_header, if_data):
    4         self.if_datablock_header = if_datablock_header
    5         self.if_data = if_data
     7    def __init__(self, if_datablock_header, if_data_body):
     8        """I return a new AMMOS data block for IF data frames."""
     9        self.__header = if_datablock_header
     10        self.__body = if_data_body
     11
     12    @property
     13    def header(self):
     14        """I return my data block header."""
     15        return self.__header
     16
     17    @property
     18    def body(self):
     19        """I return the raw pcm data with channels interweaved."""
     20        return self.__body
  • ammosreader/AmmosIFDataHeader.py

    r87b2e39 rd7ea525  
     1"""I provide a Ammos data header for IF data frames."""
     2
    13import struct
    24import numpy as np
     
    46
    57class AmmosIFDataHeader():
     8    """I implement a Ammos data header for IF data frames."""
     9
     10    HEADER_SIZE = 56
    611
    712    @classmethod
    813    def from_bytes(cls, bytes):
     14        """I return an AMMOS data header from given bytes."""
     15        assert len(bytes) == cls.HEADER_SIZE
    916        elements = struct.unpack('<IIQIIIQIIIIi', bytes)
    1017        block_count = elements[0]
     
    2734    def __init__(self, block_count, block_length, timestamp, status, source_id, source_state, frequency,
    2835                 bandwidth, sample_rate, interpolation, decimation, voltage_ref):
     36        """I create a new instance of myself using the above parameters."""
    2937        self.block_count = block_count
    3038        self.block_length = block_length
     
    4048        self.voltage_ref = voltage_ref
    4149
    42     def header_size(self):
    43         return 56
    44 
    4550    def __str_(self):
    4651        output = ("\nGlobal frame body data header\n" +
Note: See TracChangeset for help on using the changeset viewer.