source: ammosreader/AmmosIFReader.py@ 1e781ba

AmmosSource guix
Last change on this file since 1e781ba was 1e781ba, checked in by recknagel <recknagel@…>, 3 years ago

former radardex-project

  • Property mode set to 100644
File size: 4.7 KB
RevLine 
[1e781ba]1import math
2import os
3
4from AmmosGlobalFrameBody import AmmosGlobalFrameBody
5from AmmosIFDataHeader import AmmosIFDataHeader
6from AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader
7from AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
8from AmmosSingleFrame import AmmosSingleFrame
9from AmmosIFDataBlock import AmmosIFDataBlock
10from AmmosContainer import AmmosContainer
11
12
13class AmmosIFReader():
14
15 def __init__(self, file_name):
16
17 self.file_name = file_name
18 self.file = open(self.file_name, "rb")
19 self.file_size = os.path.getsize(self.file_name)
20
21 self.container = AmmosContainer(self.file_name, [])
22
23 self.tags = []
24
25 def rewind_to_start(self):
26 self.file.seek(0)
27
28 def add_tag(self, tag):
29 self.tags.append(tag)
30
31 def read_all_frames_left(self):
32
33 frames_read = 0
34
35 while True:
36 print("Reading single frame", frames_read, '...')
37 current_frame = self.read_next_single_frame()
38 if current_frame is not None:
39 frames_read += 1
40 self.container.add_frame(current_frame)
41 if frames_read % 10000 == 0:
42 print("#", end="")
43 else:
44 print("Frame:", frames_read+1, " incomplete")
45 break
46
47 print(len(self.container.global_frames), "frames read")
48
49 def read_next_global_frame_header(self):
50 bytes = self.file.read(24)
51 print("Reading next global frame header")
52 if ((not bytes) or (len(bytes) < 24)):
53 print("Can not read all 24 bytes of global frame header")
54 return None
55
56 return AmmosGlobalFrameHeader.from_bytes(bytes)
57
58 def read_next_global_frame_body_data_header(self):
59
60 bytes = self.file.read(56)
61
62 # print("\nReading global frame body standard data header\n")
63 if ((not bytes) or (len(bytes) < 56)):
64 print("Can not read all 56 bytes of global frame body data header")
65 return None
66
67 data_header = AmmosIFDataHeader.from_bytes(bytes)
68 # print("Data header", data_header)
69 return data_header
70
71 def read_next_global_frame_body_extended_data_header(self):
72
73 bytes = self.file.read(76)
74 # print("\nReading global frame body extended data header\n")
75
76 if ((not bytes) or (len(bytes) < 76)):
77 print("Can not read all ", 76, "bytes of global frame extended data header")
78 return None
79 extended_data_header = AmmosExtendedIFDataHeader.from_bytes(bytes)
80 # print("Extended data header", extended_data_header)
81 return extended_data_header
82
83 def read_next_if_data_blocks(self, n, length):
84
85 # FIXME: Describe the parameters better
86
87 data_blocks = []
88
89 block_length = 4 + length
90
91 total = n*block_length
92
93 byte_string = self.file.read(block_length)
94
95 if len(byte_string) != total:
96 print("Can not read all", total, "bytes of data body")
97 return None
98
99 for i in range(0, n):
100 result = byte_string[i*block_length:(i*block_length+block_length)]
101 data_blocks.append(AmmosIFDataBlock(result[0:4], result[4:]))
102
103 return data_blocks
104
105 def read_next_global_frame_body(self, global_frame_header):
106
107 if_data_header = None
108
109 if global_frame_header.data_header_length == 56:
110 if_data_header = self.read_next_global_frame_body_data_header()
111 else:
112 if_data_header = self.read_next_global_frame_body_extended_data_header()
113
114 if if_data_header is None:
115 print("Data header missing")
116 return None
117
118 if_data_body = self.read_next_if_data_blocks(if_data_header.block_count, if_data_header.block_length)
119
120 if if_data_body is None:
121 print("Data body missing")
122 return None
123
124 return AmmosGlobalFrameBody(if_data_header, if_data_body)
125
126 def read_next_single_frame(self):
127
128 global_frame_header = self.read_next_global_frame_header()
129
130 print("\nReading next global frame header\n", global_frame_header)
131 # print("File pointer", self.file.tell())
132
133 if global_frame_header is None:
134 print("Global frame header missing")
135 return None
136
137 if global_frame_header.data_header_length is None:
138 print("Data header length empty")
139 return None
140
141 if global_frame_header.frame_type == 2:
142
143 global_frame_body = self.read_next_global_frame_body(global_frame_header)
144 if global_frame_body is None:
145 return None
146
147 else:
148 print("Unsupported frame type", global_frame_header.frame_type, "found")
149 return None
150
151 return AmmosSingleFrame(global_frame_header, global_frame_body)
Note: See TracBrowser for help on using the repository browser.