source: ammosreader/ammosreader/AmmosIFReader.py@ efb5900

AmmosSource guix
Last change on this file since efb5900 was 6d0f203, checked in by Enrico Schwass <ennoausberlin@…>, 3 years ago

apply changes from Franks session

  • Property mode set to 100644
File size: 4.1 KB
Line 
1"""I provide a specialized Ammos Reader for IF data."""
2import logging
3
4from ammosreader.AbstractAmmosReader import AbstractAmmosReader
5from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
6from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader
7from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader
8from ammosreader.AmmosIFDataBody import AmmosIFDataBody
9from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock
10from ammosreader.AmmosIFDataBlockHeader import AmmosIFDataBlockHeader
11
12
13class AmmosIFReader(AbstractAmmosReader):
14 """I read the IF data embedded in an R&S AMMOS recording."""
15
16 def __init__(self, file_name):
17 super().__init__(file_name)
18
19 def read_next_global_frame_body_data_header(self):
20
21 header_size = AmmosIFDataHeader.HEADER_SIZE
22
23 bytes = self.ammos_file.read(header_size)
24
25 logging.info("\nReading global frame body standard data header\n")
26 if ((not bytes) or (len(bytes) < header_size)):
27 logging.debug("Can not read all %s bytes of global frame body data header", header_size)
28 return None
29 return AmmosIFDataHeader.from_bytes(bytes)
30
31 def read_next_global_frame_body_extended_data_header(self):
32 """
33 I return the next global frame body extended data header from current position in file.
34
35 :return: the next Ammos Extended IF data header or None if incomplete
36 :rtype: AmmosExtendedIFDataHeader
37 """
38 header_size = AmmosExtendedIFDataHeader.HEADER_SIZE
39
40 bytes = self.ammos_file.read(header_size)
41
42 logging.info("\nReading global frame body extended data header\n")
43 if ((not bytes) or (len(bytes) < header_size)):
44 logging.debug("Can not read all %s bytes of global frame extended data header", header_size)
45 return None
46 return AmmosExtendedIFDataHeader.from_bytes(bytes)
47
48 def read_next_if_data_body(self, number_of_data_blocks, data_length):
49 """
50 I return the next data body read from current position in file.
51
52 :param number_of_data_blocks: the number of data blocks inside the body
53 :type number_of_data_blocks: int
54
55 :param data_length: the length of the raw data inside a single block
56 :type data_length: int
57 """
58 header_size = AmmosIFDataBlockHeader.HEADER_SIZE
59
60 data_body = AmmosIFDataBody()
61
62 block_length = header_size + data_length
63
64 total = number_of_data_blocks*block_length
65
66 byte_string = self.ammos_file.read(block_length)
67
68 if len(byte_string) != total:
69 logging.debug("Can not read all %s bytes of data body", total)
70 return None
71
72 for i in range(0, number_of_data_blocks):
73 result = byte_string[i*block_length:(i*block_length+block_length)]
74 data_body.add_data_block(AmmosIFDataBlock(AmmosIFDataBlockHeader.from_bytes(result[0:header_size]),
75 result[header_size:]))
76
77 return data_body
78
79 def read_next_global_frame_body(self, data_header_length):
80 """
81 I return the next global frame body read from current position in file.
82
83 :param data_header_length: the length of the data header
84 :type data_header_length: int
85 """
86 if_data_header = None
87
88 if data_header_length == AmmosIFDataHeader.HEADER_SIZE:
89 if_data_header = self.read_next_global_frame_body_data_header()
90 if data_header_length == AmmosExtendedIFDataHeader.HEADER_SIZE:
91 if_data_header = self.read_next_global_frame_body_extended_data_header()
92
93 if if_data_header is None:
94 logging.debug("Data header missing")
95 return None
96
97 if_data_body = self.read_next_if_data_body(if_data_header.block_count, if_data_header.block_length)
98
99 if if_data_body is None:
100 logging.debug("Data body missing")
101 return None
102
103 return AmmosGlobalFrameBody(if_data_header, if_data_body)
104
105 def payload(self):
106 return b"".join([each.global_frame_body.data_body.payload for each in self.container.global_frames])
Note: See TracBrowser for help on using the repository browser.