Source code for pyplanet.utils.gbxparser

import io
import struct
from asyncio import iscoroutinefunction

import aiofiles

[docs]class GbxException(BaseException): """ Exception with parsing the Gbx file. """ pass
class _LookBackUtils: PREDEFINED_STRINGS = { 11: 'Valley', 12: 'Canyon', 13: 'Lagoon', 17: 'TMCommon', 202: 'Storm', 299: 'SMCommon', 10003: 'Common', } def __init__(self, buffer): self.buffer = buffer = list() self.version = None async def read_string(self): length, = struct.unpack('<L', await return struct.unpack('<{}s'.format(length), await[0].decode() async def read_lookback_string(self): if self.version is None: # We should see the lookback version right now. self.version, = struct.unpack('<L', await # Get the index. idx, = struct.unpack('<L', await if idx == 0: return None # Check if this will be the first occurrence. if idx & 0xc0000000 != 0 and idx & 0x3fffffff == 0: value = await self.read_string() return value # Check if idx is telling us that it's empty. if idx == 0xffffffff: return '' # Check if it's a predefined value. if idx & 0x3fffffff == idx: return self.PREDEFINED_STRINGS[idx] # Get from local store. idx &= 0x3fffffff if idx - 1 >= len( raise GbxException('String not found in lookback list!. Offset: {}'.format(await self.buffer.tell)) return[idx - 1] def reset(self): if self.version: = list() self.version = None class _AsyncBufferProxy: def __init__(self, buffer): """ :param buffer: Buffer :type buffer: io.BufferedIOBase """ self.buffer = buffer async def read(self, size=1): return async def seek(self, offset, whence=io.SEEK_CUR): return, whence) async def tell(self): return self.buffer.tell()
[docs]class GbxParser: """ Async GBX Map Information Parser. Author: Toffe. """ def __init__(self, file=None, buffer=None): """ Initiate a parser with either a file path or buffer. :param file: File path. :param buffer: Buffer :type file: str """ super().__init__() if file and not isinstance(file, str): raise Exception('File should be a string, pointing to the file you want to load.') if not file and not buffer: raise Exception('File or buffer is required!') self.file = file if buffer: if iscoroutinefunction( self.buffer = buffer else: self.buffer = _AsyncBufferProxy(buffer) else: self.buffer = _AsyncBufferProxy(buffer) self.strings = _LookBackUtils(self.buffer) self.result = dict() self.header = None self.header_xml = None self.header_length = 0 self.header_chunk_count = 0 self.header_chunks = dict() self.parse_thumb = False self.parse_header_xml = False
[docs] async def seek(self, offset): """ We need to override the second param to move from the current position. :param offset: offset to move away. :type offset: int """ return await, io.SEEK_CUR)
async def parse(self, thumb=False, header_xml=False): self.parse_thumb = thumb self.parse_header_xml = header_xml if self.file: async with, mode='rb') as self.buffer: self.strings = _LookBackUtils(self.buffer) return await self.__parse() elif self.buffer: return await self.__parse() raise Exception('No buffer or file given at init.') async def __parse(self): # Skip until class reference. await # Read class ID. class_id, = struct.unpack('<I', await if class_id != ((0x3 << 24) | (0x43 << 12)): raise GbxException('Gbx file has no valid parser, only maps are supported right now.') self.result.update(await self.__parse_header()) return self.result async def __parse_header(self): self.header_length, = struct.unpack('<I', await self.header_chunk_count, = struct.unpack('<I', await self.header_chunks = dict() self.header = dict() # Save header data from binary. for nr in range(self.header_chunk_count): chunk_id, = struct.unpack('<I', await chunk_size, = struct.unpack('<I', await self.header_chunks[chunk_id] = chunk_size & ~0x80000000 # Parse all header chunks. for chunk_id, chunk_size in self.header_chunks.items(): self.strings.reset() self.header.update(await self.__parse_chunk(chunk_id, chunk_size)) return self.header async def __parse_chunk(self, chunk_id, chunk_size): if chunk_id == 0x03043002: version, = struct.unpack('<B', await await time_bronze, time_silver, time_gold, time_author = struct.unpack('<LLLL', await price, is_multilap, map_type = struct.unpack('<LLL', await is_multilap = True if is_multilap == 1 else False await author_score, editor = struct.unpack('<LL', await editor = 'simple' if editor == 1 else 'advanced' await checkpoints, laps = struct.unpack('<LL', await return dict( time_bronze=time_bronze, time_silver=time_silver, time_gold=time_gold, time_author=time_author, price=price, is_multilap=is_multilap, map_type=map_type, author_score=author_score, editor=editor, checkpoints=checkpoints, laps=laps ) elif chunk_id == 0x03043003: version, = struct.unpack('<B', await uid = await self.strings.read_lookback_string() environment = await self.strings.read_lookback_string() author_login = await self.strings.read_lookback_string() name = await self.strings.read_string() await await self.strings.read_string() # Unknown, mostly empty. mood = await self.strings.read_lookback_string() decoration_env_id = await self.strings.read_lookback_string() decoration_env_author = await self.strings.read_lookback_string() await*4+16) map_type = await self.strings.read_string() map_style = await self.strings.read_string() await title_id = await self.strings.read_lookback_string() return dict( uid=uid, environment=environment, author_login=author_login, name=name, mood=mood, decoration_env_id=decoration_env_id, decoration_env_author=decoration_env_author, map_type=map_type, map_style=map_style, title_id=title_id ) elif chunk_id == 0x03043004: version, = struct.unpack('<B', await await - 1) elif chunk_id == 0x03043005: self.header_xml = await self.strings.read_string() elif chunk_id == 0x03043007: has_thumb = bool(struct.unpack('<L', await[0]) comment = None thumb = None if has_thumb: thumb_size, = struct.unpack('<L', await await # Skip XML thumb tag. if self.parse_thumb: thumb = struct.unpack('<{}s'.format(thumb_size), await[0].decode() else: await await + 10) # </Thumbnail.jpg></Comments> comment_size, = struct.unpack('<L', await if comment_size > 0: comment = struct.unpack('<{}s'.format(comment_size), await[0].decode() await # </Comments> else: await - 4) return dict(has_thumb=has_thumb, thumb=thumb, comment=comment) elif chunk_id == 0x03043008: version, = struct.unpack('<L', await author_version, = struct.unpack('<L', await author_login = await self.strings.read_string() author_nickname = await self.strings.read_string() author_zone = await self.strings.read_string() author_extra = await self.strings.read_string() return dict( author_version=author_version, author_login=author_login, author_nickname=author_nickname, author_zone=author_zone, author_extra=author_extra ) return dict()