Phasor 3.1.1
Stack VM based Programming Language
Loading...
Searching...
No Matches
Deserializer.py
Go to the documentation of this file.
1"""
2phasor.Deserializer
3===================
4Deserialises the binary ``.phsb`` format into a :class:`~phasor.Bytecode.Bytecode` object.
5"""
6
7from __future__ import annotations
8
9import struct
10import zlib
11from pathlib import Path
12
13from .Bytecode import Bytecode
14from .Instruction import Instruction
15from .Metadata import (
16 HEADER_SIZE, MAGIC, SEC_CONSTANTS, SEC_FUNCTIONS,
17 SEC_INSTRUCTIONS, SEC_VARIABLES, VERSION,
18)
19from .OpCode import OpCode
20from .Value import Value, ValueType
21
22
24 """Deserialises ``.phsb`` into :class:`~phasor.Bytecode.Bytecode`."""
25
26 def __init__(self) -> None:
27 """Initialise the deserializer with an empty data buffer and zero read position."""
28 self._data: bytes = b""
29 self._pos: int = 0
30
31 def deserialize(self, data: bytes) -> Bytecode:
32 """Parse a raw ``.phsb`` byte buffer into a :class:`~phasor.Bytecode.Bytecode` object.
33
34 Args:
35 data: Raw bytes of a ``.phsb`` file.
36
37 Returns:
38 A fully populated :class:`~phasor.Bytecode.Bytecode` instance.
39
40 Raises:
41 ValueError: If the magic number, version, or CRC-32 checksum is invalid,
42 or if any section tag is unexpected.
43 """
44 self._data = data
45 self._pos = 0
46 bytecode = Bytecode()
47 checksum = self._read_header()
48
49 data_start = self._pos
50 actual_crc = zlib.crc32(self._data[data_start:]) & 0xFFFFFFFF
51 if actual_crc != checksum:
52 raise ValueError(
53 f"Bytecode checksum mismatch: "
54 f"expected {checksum:#010x}, got {actual_crc:#010x}"
55 )
56
57 self._read_constant_pool(bytecode)
58 self._read_variable_mapping(bytecode)
59 self._read_function_entries(bytecode)
60 self._read_instructions(bytecode)
61
62 return bytecode
63
64 def load_from_file(self, path: Path) -> Bytecode:
65 """Read a ``.phsb`` file from disk and deserialise it.
66
67 Args:
68 path: Path to the ``.phsb`` file to load.
69
70 Returns:
71 A fully populated :class:`~phasor.Bytecode.Bytecode` instance.
72 """
73 data = Path(path).read_bytes()
74 return self.deserialize(data)
75
76 def _read_header(self) -> int:
77 """Read and validate the 16-byte file header.
78
79 Returns:
80 The CRC-32 checksum stored in the header, to be verified against
81 the actual data after reading.
82
83 Raises:
84 ValueError: If the magic number does not equal :data:`~phasor.Metadata.MAGIC`
85 or the version does not equal :data:`~phasor.Metadata.VERSION`.
86 """
87 magic = self._read_uint32()
88 if magic != MAGIC:
89 raise ValueError(
90 f"Invalid magic number: expected {MAGIC:#010x}, got {magic:#010x}"
91 )
92
93 version = self._read_uint32()
94 if version != VERSION:
95 raise ValueError(
96 f"Unsupported version: {version:#010x} (expected {VERSION:#010x})"
97 )
98
99 _flags = self._read_uint32()
100 checksum = self._read_uint32()
101 return checksum
102
103 def _read_constant_pool(self, bytecode: Bytecode) -> None:
104 """Read the :data:`~phasor.Metadata.SEC_CONSTANTS` section and append entries to :attr:`bytecode.constants <phasor.Bytecode.Bytecode.constants>`."""
105 section_id = self._read_uint8()
106 if section_id != SEC_CONSTANTS:
107 raise ValueError(
108 f"Expected constants section (0x{SEC_CONSTANTS:02x}), "
109 f"got 0x{section_id:02x}"
110 )
111 count = self._read_uint32()
112 for _ in range(count):
113 bytecode.constants.append(self._read_value())
114
115 def _read_variable_mapping(self, bytecode: Bytecode) -> None:
116 """Read the :data:`~phasor.Metadata.SEC_VARIABLES` section and populate :attr:`bytecode.variables <phasor.Bytecode.Bytecode.variables>` and :attr:`~phasor.Bytecode.Bytecode.next_var_index`."""
117 section_id = self._read_uint8()
118 if section_id != SEC_VARIABLES:
119 raise ValueError(
120 f"Expected variables section (0x{SEC_VARIABLES:02x}), "
121 f"got 0x{section_id:02x}"
122 )
123 count = self._read_uint32()
124 bytecode.next_var_index = self._read_int32()
125 for _ in range(count):
126 name = self._read_string()
127 index = self._read_int32()
128 bytecode.variables[name] = index
129
130 def _read_function_entries(self, bytecode: Bytecode) -> None:
131 """Read the :data:`~phasor.Metadata.SEC_FUNCTIONS` section and populate :attr:`bytecode.function_entries <phasor.Bytecode.Bytecode.function_entries>`."""
132 section_id = self._read_uint8()
133 if section_id != SEC_FUNCTIONS:
134 raise ValueError(
135 f"Expected functions section (0x{SEC_FUNCTIONS:02x}), "
136 f"got 0x{section_id:02x}"
137 )
138 count = self._read_uint32()
139 for _ in range(count):
140 name = self._read_string()
141 address = self._read_int32()
142 bytecode.function_entries[name] = address
143
144 def _read_instructions(self, bytecode: Bytecode) -> None:
145 """Read the :data:`~phasor.Metadata.SEC_INSTRUCTIONS` section and populate :attr:`bytecode.instructions <phasor.Bytecode.Bytecode.instructions>`."""
146 section_id = self._read_uint8()
147 if section_id != SEC_INSTRUCTIONS:
148 raise ValueError(
149 f"Expected instructions section (0x{SEC_INSTRUCTIONS:02x}), "
150 f"got 0x{section_id:02x}"
151 )
152 count = self._read_uint32()
153 for _ in range(count):
154 opcode = OpCode(self._read_uint8())
155 op1 = self._read_int32()
156 op2 = self._read_int32()
157 op3 = self._read_int32()
158 bytecode.instructions.append(Instruction(opcode, op1, op2, op3))
159
160 def _read_value(self) -> Value:
161 """Read a type-tagged value and return the corresponding :class:`~phasor.Value.Value`."""
162 tag = self._read_uint8()
163 if tag == 0:
164 return Value.null()
165 if tag == 1:
166 return Value.from_bool(self._read_uint8() != 0)
167 if tag == 2:
168 return Value.from_int(self._read_int64())
169 if tag == 3:
170 return Value.from_float(self._read_double())
171 if tag == 4:
172 return Value.from_string(self._read_string())
173 raise ValueError(f"Unknown value type tag: {tag}")
174
175 def _require(self, n: int) -> None:
176 """Raise ``ValueError`` if fewer than *n* bytes remain in the buffer."""
177 if self._pos + n > len(self._data):
178 raise ValueError(
179 f"Unexpected end of data at offset {self._pos} "
180 f"(need {n} more bytes)"
181 )
182
183 def _read_uint8(self) -> int:
184 """Read and return the next unsigned byte from the buffer."""
185 self._require(1)
186 v = self._data[self._pos]
187 self._pos += 1
188 return v
189
190 def _read_uint16(self) -> int:
191 """Read and return the next little-endian unsigned 16-bit integer from the buffer."""
192 self._require(2)
193 (v,) = struct.unpack_from("<H", self._data, self._pos)
194 self._pos += 2
195 return v
196
197 def _read_uint32(self) -> int:
198 """Read and return the next little-endian unsigned 32-bit integer from the buffer."""
199 self._require(4)
200 (v,) = struct.unpack_from("<I", self._data, self._pos)
201 self._pos += 4
202 return v
203
204 def _read_int32(self) -> int:
205 """Read and return the next little-endian signed 32-bit integer from the buffer."""
206 self._require(4)
207 (v,) = struct.unpack_from("<i", self._data, self._pos)
208 self._pos += 4
209 return v
210
211 def _read_int64(self) -> int:
212 """Read and return the next little-endian signed 64-bit integer from the buffer."""
213 self._require(8)
214 (v,) = struct.unpack_from("<q", self._data, self._pos)
215 self._pos += 8
216 return v
217
218 def _read_double(self) -> float:
219 """Read and return the next little-endian IEEE 754 double from the buffer."""
220 self._require(8)
221 (v,) = struct.unpack_from("<d", self._data, self._pos)
222 self._pos += 8
223 return v
224
225 def _read_string(self) -> str:
226 """Read a length-prefixed UTF-8 string (uint16 length + bytes) and return it."""
227 length = self._read_uint16()
228 self._require(length)
229 s = self._data[self._pos : self._pos + length].decode("utf-8")
230 self._pos += length
231 return s
Bytecode deserialize(self, bytes data)
None _read_function_entries(self, Bytecode bytecode)
None _read_instructions(self, Bytecode bytecode)
Bytecode load_from_file(self, Path path)
None _read_variable_mapping(self, Bytecode bytecode)
None _read_constant_pool(self, Bytecode bytecode)