Changeset 6d0f203 in ammosreader
- Timestamp:
- 06/30/22 16:18:57 (3 years ago)
- Branches:
- AmmosSource, guix
- Children:
- d62906b
- Parents:
- 809cb16
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
ammosreader/AbstractAmmosReader.py
r809cb16 r6d0f203 8 8 from ammosreader.AmmosContainer import AmmosContainer 9 9 from ammosreader.AmmosConstants import FrameType 10 11 logging.basicConfig(filename='ammos.log', level=logging.DEBUG) 10 12 11 13 … … 96 98 return None 97 99 98 logging.info("Current global frame header %s", current_global_frame_header)100 # logging.info("Current global frame header %s", current_global_frame_header) 99 101 return current_global_frame_header 100 102 -
ammosreader/AmmosAudioDataBody.py
r809cb16 r6d0f203 10 10 self.__samples_per_channel = samples_per_channel 11 11 self.__sample_size = sample_size 12 self.__ data= pcm_data12 self.__payload = pcm_data 13 13 14 14 @property 15 def data(self):15 def payload(self): 16 16 """I return the raw pcm data with channels interweaved.""" 17 return self.__ data17 return self.__payload 18 18 19 19 def pcm_for_channel(self, channel_number): … … 21 21 start_offset = channel_number * self.__sample_size 22 22 step = self.__sample_size * self.__number_of_channels 23 end = (len(self.__ data) // step) * step23 end = (len(self.__payload) // step) * step 24 24 channel_bytes = b"" 25 25 for each in range(start_offset, end, step): 26 channel_bytes += self.__ data[each:each+self.__sample_size]26 channel_bytes += self.__payload[each:each+self.__sample_size] 27 27 return channel_bytes 28 28 … … 32 32 "\nSamples per channel:" + str(self.__samples_per_channel) + 33 33 "\nSample size:" + str(self.__sample_size) + 34 "\nData size:" + str(len(self.__ data)))34 "\nData size:" + str(len(self.__payload))) -
ammosreader/AmmosContainer.py
r809cb16 r6d0f203 30 30 31 31 def frequencies(self): 32 return set(list(filter(lambda frame: frame.global_frame_body.data_header.frequency, self.__global_frames))) 32 """I return a list of unique frequencies inside this container.""" 33 return list({each.global_frame_body.data_header.frequency for each in self.__global_frames}) 34 35 def frame_types(self): 36 return list({each.global_frame_header.frame_type for each in self.__global_frames}) 37 38 def frame_sizes(self): 39 return list({each.global_frame_header.frame_length*4 for each in self.__global_frames}) 40 41 def is_homogenic(self): 42 return (len(self.frame_sizes()) == 1) and (len(self.frame_types()) == 1) 33 43 34 44 def __str__(self): -
ammosreader/AmmosGlobalFrameBody.py
r809cb16 r6d0f203 29 29 self.__data_body = data_bytes 30 30 31 def data_bytes_only(self): 32 33 byte_string = b"" 34 35 for each_block in self.data_body: 36 if not each_block: 37 print("Block is nil") 38 39 byte_string += each_block.data 40 41 return byte_string 31 def payload(self): 32 """I return the payload only.""" 33 return b"".join([each_block.data for each_block in self.data_body]) -
ammosreader/AmmosGlobalFrameHeader.py
r809cb16 r6d0f203 1 """I provide an AMMOS global frame header.""" 1 2 import struct 3 import logging 4 5 logging.basicConfig(filename='ammos.log', level=logging.DEBUG) 2 6 3 7 4 8 class AmmosGlobalFrameHeader(): 5 9 """I implement an AMMOS global frame header.""" 6 10 MAGIC_WORD = "726574fb" 7 11 HEADER_SIZE = 24 … … 9 13 @classmethod 10 14 def from_bytes(cls, bytes): 15 """I create a new AmmosGlobalFrameHeader from bytes.""" 16 assert len(bytes) == cls.HEADER_SIZE 11 17 12 18 elements = struct.unpack('<4s4s4s4s4s4s', bytes) 13 19 14 20 magic_word = elements[0].hex() 21 22 if magic_word != cls.MAGIC_WORD: 23 return None 15 24 16 25 frame_length = (int.from_bytes(elements[1], byteorder='little')*4) … … 28 37 29 38 def __init__(self, magic_word, frame_length, running_frame_number, frame_type, data_header_length, reserved): 30 39 """I return a new instance of myself initialized with above parameters.""" 31 40 if magic_word != type(self).MAGIC_WORD: 32 print("Wrong magic word")41 logging.error("Wrong magic word found") 33 42 self.magic_word = magic_word 34 43 else: … … 41 50 42 51 def __str__(self): 52 """I return the string representation of myself.""" 43 53 output = ("Global frame header info\n" + 44 54 "------------------------\n" + … … 49 59 "Data header length:" + str(self.data_header_length) + "\n") 50 60 return output 51 52 def size(self):53 return 24 -
ammosreader/AmmosIFDataBlock.py
r809cb16 r6d0f203 10 10 11 11 @property 12 def data(self):12 def payload(self): 13 13 return self.__data -
ammosreader/AmmosIFDataBody.py
r809cb16 r6d0f203 21 21 22 22 @property 23 def data(self):24 return b"".join([each. datafor each in self.data_blocks])23 def payload(self): 24 return b"".join([each.payload for each in self.data_blocks]) -
ammosreader/AmmosIFReader.py
r809cb16 r6d0f203 103 103 return AmmosGlobalFrameBody(if_data_header, if_data_body) 104 104 105 def data(self):106 return b"".join([each.global_frame_body.data_body. datafor each in self.container.global_frames])105 def payload(self): 106 return b"".join([each.global_frame_body.data_body.payload for each in self.container.global_frames]) -
sample_scripts/audio_reader.py
r809cb16 r6d0f203 1 1 import sys 2 import os3 2 import io 4 3 from pydub import AudioSegment 5 4 from pydub.playback import play 6 7 sys.path.append('../src/')8 5 9 6 from ammosreader.AmmosAudioReader import AmmosAudioReader … … 23 20 print("Sample rate:", dat_file.container.global_frames[0].global_frame_body.data_header.sample_rate) 24 21 print("Container size:", dat_file.container.size()) 22 print("Frequencies", dat_file.container.frequencies()) 23 print("Frame types:", dat_file.container.frame_types()) 24 print("Frame sizes:", dat_file.container.frame_sizes()) 25 print("Homogenic:", dat_file.container.is_homogenic()) 25 26 pcm_data = dat_file.pcm_for_channel(0) 26 27 print("PCM data size total:", len(pcm_data)) -
sample_scripts/iqdw_reader.py
r809cb16 r6d0f203 18 18 19 19 dat_file.read_all_frames_left() 20 print(len(dat_file.data())) 20 print("Frequencies", dat_file.container.frequencies()) 21 print("Frame types:", dat_file.container.frame_types()) 22 print("Frame sizes:", dat_file.container.frame_sizes()) 23 print("Homogenic:", dat_file.container.is_homogenic())
Note:
See TracChangeset
for help on using the changeset viewer.