source: ammosreader/ammosreader/AmmosIFReader.py@ 752e2a9

AmmosSource guix
Last change on this file since 752e2a9 was 752e2a9, checked in by Enrico Schwass <ennoausberlin@…>, 3 years ago

remove src parent directory

  • Property mode set to 100644
File size: 4.7 KB
Line 
1import math
2import os
3
4from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
5from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader
6from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader
7from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
8from ammosreader.AmmosSingleFrame import AmmosSingleFrame
9from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock
10from ammosreader.AmmosContainer import AmmosContainer
11
12
13class AmmosIFReader():
14
15 def __init__(self, file_name):
16
17 self.file_name = file_name
18 self.file = open(self.file_name, "rb")
19
20 self.container = AmmosContainer(self.file_name, [])
21
22 self.tags = []
23
24 def add_tag(self, tag):
25 self.tags.append(tag)
26
27 def read_all_frames_left(self):
28
29 frames_read = 0
30
31 while True:
32 # print("Reading single frame", frames_read, '...')
33 current_frame = self.read_next_single_frame()
34 if current_frame is not None:
35 frames_read += 1
36 self.container.add_frame(current_frame)
37 # if frames_read % 10000 == 0:
38 # print("#", end="")
39 else:
40 # print("Frame:", frames_read+1, " incomplete")
41 break
42
43 # print(len(self.container.global_frames), "frames read")
44
45 def read_next_global_frame_header(self):
46 bytes = self.file.read(24)
47 # print("Reading next global frame header")
48 if ((not bytes) or (len(bytes) < 24)):
49 # print("Can not read all 24 bytes of global frame header")
50 return None
51
52 return AmmosGlobalFrameHeader.from_bytes(bytes)
53
54 def read_next_global_frame_body_data_header(self):
55
56 bytes = self.file.read(56)
57
58 # print("\nReading global frame body standard data header\n")
59 if ((not bytes) or (len(bytes) < 56)):
60 # print("Can not read all 56 bytes of global frame body data header")
61 return None
62
63 data_header = AmmosIFDataHeader.from_bytes(bytes)
64 # print("Data header", data_header)
65 return data_header
66
67 def read_next_global_frame_body_extended_data_header(self):
68
69 bytes = self.file.read(76)
70 # print("\nReading global frame body extended data header\n")
71
72 if ((not bytes) or (len(bytes) < 76)):
73 # print("Can not read all ", 76, "bytes of global frame extended data header")
74 return None
75 extended_data_header = AmmosExtendedIFDataHeader.from_bytes(bytes)
76 # print("Extended data header", extended_data_header)
77 return extended_data_header
78
79 def read_next_if_data_blocks(self, n, length):
80
81 # FIXME: Describe the parameters better
82
83 data_blocks = []
84
85 block_length = 4 + length
86
87 total = n*block_length
88
89 byte_string = self.file.read(block_length)
90
91 if len(byte_string) != total:
92 # print("Can not read all", total, "bytes of data body")
93 return None
94
95 for i in range(0, n):
96 result = byte_string[i*block_length:(i*block_length+block_length)]
97 data_blocks.append(AmmosIFDataBlock(result[0:4], result[4:]))
98
99 return data_blocks
100
101 def read_next_global_frame_body(self, global_frame_header):
102
103 if_data_header = None
104
105 if global_frame_header.data_header_length == 56:
106 if_data_header = self.read_next_global_frame_body_data_header()
107 else:
108 if_data_header = self.read_next_global_frame_body_extended_data_header()
109
110 if if_data_header is None:
111 # print("Data header missing")
112 return None
113
114 if_data_body = self.read_next_if_data_blocks(if_data_header.block_count, if_data_header.block_length)
115
116 if if_data_body is None:
117 # print("Data body missing")
118 return None
119
120 return AmmosGlobalFrameBody(if_data_header, if_data_body)
121
122 def read_next_single_frame(self):
123
124 global_frame_header = self.read_next_global_frame_header()
125
126 # print("\nReading next global frame header\n", global_frame_header)
127 # print("File pointer", self.file.tell())
128
129 if global_frame_header is None:
130 # print("Global frame header missing")
131 return None
132
133 if global_frame_header.data_header_length is None:
134 # print("Data header length empty")
135 return None
136
137 if global_frame_header.frame_type == 2:
138
139 global_frame_body = self.read_next_global_frame_body(global_frame_header)
140 if global_frame_body is None:
141 return None
142
143 else:
144 # print("Unsupported frame type", global_frame_header.frame_type, "found")
145 return None
146
147 return AmmosSingleFrame(global_frame_header, global_frame_body)
Note: See TracBrowser for help on using the repository browser.