source: ammosreader/ammosreader/AmmosIFReader.py@ f3b78e8

AmmosSource guix
Last change on this file since f3b78e8 was 7752d1f, checked in by Enrico Schwass <ennoausberlin@…>, 2 years ago

mostly style changes after running pylint

  • Property mode set to 100644
File size: 4.2 KB
Line 
1"""I provide a specialized Ammos Reader for IF data."""
2
3from ammosreader.AbstractAmmosReader import AbstractAmmosReader
4from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
5from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader
6from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader
7from ammosreader.AmmosIFDataBody import AmmosIFDataBody
8from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock
9from ammosreader.AmmosIFDataBlockHeader import AmmosIFDataBlockHeader
10from ammosreader import logger
11
12
13class AmmosIFReader(AbstractAmmosReader):
14 """I read the IF data embedded in an R&S AMMOS recording."""
15
16 def read_next_global_frame_body_data_header(self):
17 """I read the next data header of a global frame body from current position in file."""
18 header_size = AmmosIFDataHeader.HEADER_SIZE
19
20 in_bytes = self.ammos_file.read(header_size)
21
22 logger.info("\nReading global frame body standard data header\n")
23 if ((not in_bytes) or (len(in_bytes) < header_size)):
24 logger.debug("Can not read all %s bytes of global frame body data header", header_size)
25 return None
26 return AmmosIFDataHeader.from_bytes(in_bytes)
27
28 def read_next_global_frame_body_extended_data_header(self):
29 """
30 I return the next global frame body extended data header from current position in file.
31
32 :return: the next Ammos Extended IF data header or None if incomplete
33 :rtype: AmmosExtendedIFDataHeader
34 """
35 header_size = AmmosExtendedIFDataHeader.HEADER_SIZE
36
37 in_bytes = self.ammos_file.read(header_size)
38
39 logger.info("\nReading global frame body extended data header\n")
40 if ((not in_bytes) or (len(in_bytes) < header_size)):
41 logger.debug("Can not read all %s bytes of global frame extended data header", header_size)
42 return None
43 return AmmosExtendedIFDataHeader.from_bytes(in_bytes)
44
45 def read_next_if_data_body(self, number_of_data_blocks, data_length):
46 """
47 I return the next data body read from current position in file.
48
49 :param number_of_data_blocks: the number of data blocks inside the body
50 :type number_of_data_blocks: int
51
52 :param data_length: the length of the raw data inside a single block
53 :type data_length: int
54 """
55 header_size = AmmosIFDataBlockHeader.HEADER_SIZE
56
57 data_body = AmmosIFDataBody()
58
59 block_length = header_size + data_length
60
61 total = number_of_data_blocks*block_length
62
63 byte_string = self.ammos_file.read(block_length)
64
65 if len(byte_string) != total:
66 logger.debug("Can not read all %s bytes of data body", total)
67 return None
68
69 for i in range(0, number_of_data_blocks):
70 result = byte_string[i*block_length:(i*block_length+block_length)]
71 data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]),
72 result[header_size:]))
73
74 return data_body
75
76 def read_next_global_frame_body(self, data_header_length):
77 """
78 I return the next global frame body read from current position in file.
79
80 :param data_header_length: the length of the data header
81 :type data_header_length: int
82 """
83 if_data_header = None
84
85 if data_header_length == AmmosIFDataHeader.HEADER_SIZE:
86 if_data_header = self.read_next_global_frame_body_data_header()
87 if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE:
88 if_data_header = self.read_next_global_frame_body_extended_data_header()
89
90 if if_data_header is None:
91 logger.debug("Data header missing")
92 return None
93
94 if_data_body = self.read_next_if_data_body(if_data_header.block_count, if_data_header.block_length)
95
96 if if_data_body is None:
97 logger.debug("Data body missing")
98 return None
99
100 return AmmosGlobalFrameBody(if_data_header, if_data_body)
101
102 def payload(self):
103 """I return just the pure date (payload) from my container."""
104 return b"".join([each.global_frame_body.data_body.payload for each in self.container.global_frames])
Note: See TracBrowser for help on using the repository browser.