source: ammosreader/ammosreader/AmmosIFReader.py@ 6914dbb

AmmosSource guix
Last change on this file since 6914dbb was 6914dbb, checked in by Enrico Schwass <ennoausberlin@…>, 2 years ago

remove DEBUG parameter and update package to new logger mechanism

  • Property mode set to 100644
File size: 4.1 KB
Line 
1"""I provide a specialized Ammos Reader for IF data."""
2
3from ammosreader.AbstractAmmosReader import AbstractAmmosReader
4from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
5from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader
6from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader
7from ammosreader.AmmosIFDataBody import AmmosIFDataBody
8from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock
9from ammosreader.AmmosIFDataBlockHeader import AmmosIFDataBlockHeader
10from ammosreader import logger
11
12class AmmosIFReader(AbstractAmmosReader):
13 """I read the IF data embedded in an R&S AMMOS recording."""
14
15 def __init__(self, file_name):
16 super().__init__(file_name)
17
18 def read_next_global_frame_body_data_header(self):
19
20 header_size = AmmosIFDataHeader.HEADER_SIZE
21
22 bytes = self.ammos_file.read(header_size)
23
24 logger.info("\nReading global frame body standard data header\n")
25 if ((not bytes) or (len(bytes) < header_size)):
26 logger.debug("Can not read all %s bytes of global frame body data header", header_size)
27 return None
28 return AmmosIFDataHeader.from_bytes(bytes)
29
30 def read_next_global_frame_body_extended_data_header(self):
31 """
32 I return the next global frame body extended data header from current position in file.
33
34 :return: the next Ammos Extended IF data header or None if incomplete
35 :rtype: AmmosExtendedIFDataHeader
36 """
37 header_size = AmmosExtendedIFDataHeader.HEADER_SIZE
38
39 bytes = self.ammos_file.read(header_size)
40
41 logger.info("\nReading global frame body extended data header\n")
42 if ((not bytes) or (len(bytes) < header_size)):
43 logger.debug("Can not read all %s bytes of global frame extended data header", header_size)
44 return None
45 return AmmosExtendedIFDataHeader.from_bytes(bytes)
46
47 def read_next_if_data_body(self, number_of_data_blocks, data_length):
48 """
49 I return the next data body read from current position in file.
50
51 :param number_of_data_blocks: the number of data blocks inside the body
52 :type number_of_data_blocks: int
53
54 :param data_length: the length of the raw data inside a single block
55 :type data_length: int
56 """
57 header_size = AmmosIFDataBlockHeader.HEADER_SIZE
58
59 data_body = AmmosIFDataBody()
60
61 block_length = header_size + data_length
62
63 total = number_of_data_blocks*block_length
64
65 byte_string = self.ammos_file.read(block_length)
66
67 if len(byte_string) != total:
68 logger.debug("Can not read all %s bytes of data body", total)
69 return None
70
71 for i in range(0, number_of_data_blocks):
72 result = byte_string[i*block_length:(i*block_length+block_length)]
73 data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]),
74 result[header_size:]))
75
76 return data_body
77
78 def read_next_global_frame_body(self, data_header_length):
79 """
80 I return the next global frame body read from current position in file.
81
82 :param data_header_length: the length of the data header
83 :type data_header_length: int
84 """
85 if_data_header = None
86
87 if data_header_length == AmmosIFDataHeader.HEADER_SIZE:
88 if_data_header = self.read_next_global_frame_body_data_header()
89 if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE:
90 if_data_header = self.read_next_global_frame_body_extended_data_header()
91
92 if if_data_header is None:
93 logger.debug("Data header missing")
94 return None
95
96 if_data_body = self.read_next_if_data_body(if_data_header.block_count, if_data_header.block_length)
97
98 if if_data_body is None:
99 logger.debug("Data body missing")
100 return None
101
102 return AmmosGlobalFrameBody(if_data_header, if_data_body)
103
104 def payload(self):
105 return b"".join([each.global_frame_body.data_body.payload for each in self.container.global_frames])
Note: See TracBrowser for help on using the repository browser.