AmmosReader 0.314 documentation
import math
import os
from ammosreader.AmmosGlobalFrameBody import AmmosGlobalFrameBody
from ammosreader.AmmosIFDataHeader import AmmosIFDataHeader
from ammosreader.AmmosExtendedIFDataHeader import AmmosExtendedIFDataHeader
from ammosreader.AmmosGlobalFrameHeader import AmmosGlobalFrameHeader
from ammosreader.AmmosSingleFrame import AmmosSingleFrame
from ammosreader.AmmosIFDataBlock import AmmosIFDataBlock
from ammosreader.AmmosContainer import AmmosContainer
[docs]class AmmosIFReader():
def __init__(self, file_name):
self.file_name = file_name
self.file = open(self.file_name, "rb")
self.file_size = os.path.getsize(self.file_name)
self.container = AmmosContainer(self.file_name, [])
self.tags = []
[docs] def read_all_frames_left(self):
frames_read = 0
while True:
print("Reading single frame", frames_read, '...')
current_frame = self.read_next_single_frame()
if current_frame is not None:
frames_read += 1
self.container.add_frame(current_frame)
if frames_read % 10000 == 0:
print("#", end="")
else:
print("Frame:", frames_read+1, " incomplete")
break
print(len(self.container.global_frames), "frames read")
[docs] def read_next_global_frame_header(self):
bytes = self.file.read(24)
print("Reading next global frame header")
if ((not bytes) or (len(bytes) < 24)):
print("Can not read all 24 bytes of global frame header")
return None
return AmmosGlobalFrameHeader.from_bytes(bytes)
[docs] def read_next_global_frame_body_data_header(self):
bytes = self.file.read(56)
# print("\nReading global frame body standard data header\n")
if ((not bytes) or (len(bytes) < 56)):
print("Can not read all 56 bytes of global frame body data header")
return None
data_header = AmmosIFDataHeader.from_bytes(bytes)
# print("Data header", data_header)
return data_header
[docs] def read_next_global_frame_body_extended_data_header(self):
bytes = self.file.read(76)
# print("\nReading global frame body extended data header\n")
if ((not bytes) or (len(bytes) < 76)):
print("Can not read all ", 76, "bytes of global frame extended data header")
return None
extended_data_header = AmmosExtendedIFDataHeader.from_bytes(bytes)
# print("Extended data header", extended_data_header)
return extended_data_header
[docs] def read_next_if_data_blocks(self, n, length):
# FIXME: Describe the parameters better
data_blocks = []
block_length = 4 + length
total = n*block_length
byte_string = self.file.read(block_length)
if len(byte_string) != total:
print("Can not read all", total, "bytes of data body")
return None
for i in range(0, n):
result = byte_string[i*block_length:(i*block_length+block_length)]
data_blocks.append(AmmosIFDataBlock(result[0:4], result[4:]))
return data_blocks
[docs] def read_next_global_frame_body(self, global_frame_header):
if_data_header = None
if global_frame_header.data_header_length == 56:
if_data_header = self.read_next_global_frame_body_data_header()
else:
if_data_header = self.read_next_global_frame_body_extended_data_header()
if if_data_header is None:
print("Data header missing")
return None
if_data_body = self.read_next_if_data_blocks(if_data_header.block_count, if_data_header.block_length)
if if_data_body is None:
print("Data body missing")
return None
return AmmosGlobalFrameBody(if_data_header, if_data_body)
[docs] def read_next_single_frame(self):
global_frame_header = self.read_next_global_frame_header()
print("\nReading next global frame header\n", global_frame_header)
# print("File pointer", self.file.tell())
if global_frame_header is None:
print("Global frame header missing")
return None
if global_frame_header.data_header_length is None:
print("Data header length empty")
return None
if global_frame_header.frame_type == 2:
global_frame_body = self.read_next_global_frame_body(global_frame_header)
if global_frame_body is None:
return None
else:
print("Unsupported frame type", global_frame_header.frame_type, "found")
return None
return AmmosSingleFrame(global_frame_header, global_frame_body)