Jakub Wojciech Klama 134e17798c Import lib9p 7ddb1164407da19b9b1afb83df83ae65a71a9a66.
Approved by:	trasz
MFC after:	1 month
Sponsored by:	Conclusive Engineering (development), vStack.com (funding)
2020-05-14 19:57:52 +00:00

654 lines
22 KiB
Python

#! /usr/bin/env python
from __future__ import print_function
#__all__ = ['EncDec', 'EncDecSimple', 'EncDecTyped', 'EncDecA',
# 'SequenceError', 'Sequencer']
import abc
import struct
import sys
_ProtoStruct = {
'1': struct.Struct('<B'),
'2': struct.Struct('<H'),
'4': struct.Struct('<I'),
'8': struct.Struct('<Q'),
'_string_': None, # handled specially
}
for _i in (1, 2, 4, 8):
_ProtoStruct[_i] = _ProtoStruct[str(_i)]
del _i
class EncDec(object):
__metaclass__ = abc.ABCMeta
"""
Base class for en/de-coders, which are put into sequencers.
All have a name and arbitrary user-supplied auxiliary data
(default=None).
All provide a pack() and unpack(). The pack() function
returns a "bytes" value. This is internally implemented as a
function apack() that returns a list of struct.pack() bytes,
and pack() just joins them up as needed.
The pack/unpack functions take a dictionary of variable names
and values, and a second dictionary for conditionals, but at
this level conditionals don't apply: they are just being
passed through. Variable names do apply to array encoders
EncDec also provide b2s() and s2b() static methods, which
convert strings to bytes and vice versa, as reversibly as
possible (using surrogateescape encoding). In Python2 this is
a no-op since the string type *is* the bytes type (<type
'unicode'>) is the unicode-ized string type).
EncDec also provides b2u() and u2b() to do conversion to/from
Unicode.
These are partly for internal use (all strings get converted
to UTF-8 byte sequences when coding a _string_ type) and partly
for doctests, where we just want some py2k/py3k compat hacks.
"""
def __init__(self, name, aux):
self.name = name
self.aux = aux
@staticmethod
def b2u(byte_sequence):
"transform bytes to unicode"
return byte_sequence.decode('utf-8', 'surrogateescape')
@staticmethod
def u2b(unicode_sequence):
"transform unicode to bytes"
return unicode_sequence.encode('utf-8', 'surrogateescape')
if sys.version_info[0] >= 3:
b2s = b2u
@staticmethod
def s2b(string):
"transform string to bytes (leaves raw byte sequence unchanged)"
if isinstance(string, bytes):
return string
return string.encode('utf-8', 'surrogateescape')
else:
@staticmethod
def b2s(byte_sequence):
"transform bytes to string - no-op in python2.7"
return byte_sequence
@staticmethod
def s2b(string):
"transform string or unicode to bytes"
if isinstance(string, unicode):
return string.encode('utf-8', 'surrogateescape')
return string
def pack(self, vdict, cdict, val):
"encode value <val> into a byte-string"
return b''.join(self.apack(vdict, cdict, val))
@abc.abstractmethod
def apack(self, vdict, cdict, val):
"encode value <val> into [bytes1, b2, ..., bN]"
@abc.abstractmethod
def unpack(self, vdict, cdict, bstring, offset, noerror=False):
"unpack bytes from <bstring> at <offset>"
class EncDecSimple(EncDec):
r"""
Encode/decode a simple (but named) field. The field is not an
array, which requires using EncDecA, nor a typed object
like a qid or stat instance -- those require a Sequence and
EncDecTyped.
The format is one of '1'/1, '2'/2, '4'/4, '8'/8, or '_string_'.
Note: using b2s here is purely a doctest/tetsmod python2/python3
compat hack. The output of e.pack is <type 'bytes'>; b2s
converts it to a string, purely for display purposes. (It might
be better to map py2 output to bytes but they just print as a
string anyway.) In normal use, you should not call b2s here.
>>> e = EncDecSimple('eggs', 2)
>>> e.b2s(e.pack({}, {}, 0))
'\x00\x00'
>>> e.b2s(e.pack({}, {}, 256))
'\x00\x01'
Values that cannot be packed produce a SequenceError:
>>> e.pack({}, {}, None)
Traceback (most recent call last):
...
SequenceError: failed while packing 'eggs'=None
>>> e.pack({}, {}, -1)
Traceback (most recent call last):
...
SequenceError: failed while packing 'eggs'=-1
Unpacking both returns a value, and tells how many bytes it
used out of the bytestring or byte-array argument. If there
are not enough bytes remaining at the starting offset, it
raises a SequenceError, unless noerror=True (then unset
values are None)
>>> e.unpack({}, {}, b'\x00\x01', 0)
(256, 2)
>>> e.unpack({}, {}, b'', 0)
Traceback (most recent call last):
...
SequenceError: out of data while unpacking 'eggs'
>>> e.unpack({}, {}, b'', 0, noerror=True)
(None, 2)
Note that strings can be provided as regular strings, byte
strings (same as regular strings in py2k), or Unicode strings
(same as regular strings in py3k). Unicode strings will be
converted to UTF-8 before being packed. Since this leaves
7-bit characters alone, these examples work in both py2k and
py3k. (Note: the UTF-8 encoding of u'\u1234' is
'\0xe1\0x88\0xb4' or 225, 136, 180. The b2i trick below is
another py2k vs py3k special case just for doctests: py2k
tries to display the utf-8 encoded data as a string.)
>>> e = EncDecSimple('spam', '_string_')
>>> e.b2s(e.pack({}, {}, 'p3=unicode,p2=bytes'))
'\x13\x00p3=unicode,p2=bytes'
>>> e.b2s(e.pack({}, {}, b'bytes'))
'\x05\x00bytes'
>>> import sys
>>> ispy3k = sys.version_info[0] >= 3
>>> b2i = lambda x: x if ispy3k else ord(x)
>>> [b2i(x) for x in e.pack({}, {}, u'\u1234')]
[3, 0, 225, 136, 180]
The byte length of the utf-8 data cannot exceed 65535 since
the encoding has the length as a 2-byte field (a la the
encoding for 'eggs' here). A too-long string produces
a SequenceError as well.
>>> e.pack({}, {}, 16384 * 'spam')
Traceback (most recent call last):
...
SequenceError: string too long (len=65536) while packing 'spam'
Unpacking strings produces byte arrays. (Of course,
in py2k these are also known as <type 'str'>.)
>>> unpacked = e.unpack({}, {}, b'\x04\x00data', 0)
>>> etype = bytes if ispy3k else str
>>> print(isinstance(unpacked[0], etype))
True
>>> e.b2s(unpacked[0])
'data'
>>> unpacked[1]
6
You may use e.b2s() to conver them to unicode strings in py3k,
or you may set e.autob2s. This still only really does
anything in py3k, since py2k strings *are* bytes, so it's
really just intended for doctest purposes (see EncDecA):
>>> e.autob2s = True
>>> e.unpack({}, {}, b'\x07\x00stringy', 0)
('stringy', 9)
"""
def __init__(self, name, fmt, aux=None):
super(EncDecSimple, self).__init__(name, aux)
self.fmt = fmt
self.struct = _ProtoStruct[fmt]
self.autob2s = False
def __repr__(self):
if self.aux is None:
return '{0}({1!r}, {2!r})'.format(self.__class__.__name__,
self.name, self.fmt)
return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__,
self.name, self.fmt, self.aux)
__str__ = __repr__
def apack(self, vdict, cdict, val):
"encode a value"
try:
if self.struct:
return [self.struct.pack(val)]
sval = self.s2b(val)
if len(sval) > 65535:
raise SequenceError('string too long (len={0:d}) '
'while packing {1!r}'.format(len(sval), self.name))
return [EncDecSimple.string_len.pack(len(sval)), sval]
# Include AttributeError in case someone tries to, e.g.,
# pack name=None and self.s2b() tries to use .encode on it.
except (struct.error, AttributeError):
raise SequenceError('failed '
'while packing {0!r}={1!r}'.format(self.name, val))
def _unpack1(self, via, bstring, offset, noerror):
"internal function to unpack single item"
try:
tup = via.unpack_from(bstring, offset)
except struct.error as err:
if 'unpack_from requires a buffer of at least' in str(err):
if noerror:
return None, offset + via.size
raise SequenceError('out of data '
'while unpacking {0!r}'.format(self.name))
# not clear what to do here if noerror
raise SequenceError('failed '
'while unpacking {0!r}'.format(self.name))
assert len(tup) == 1
return tup[0], offset + via.size
def unpack(self, vdict, cdict, bstring, offset, noerror=False):
"decode a value; return the value and the new offset"
if self.struct:
return self._unpack1(self.struct, bstring, offset, noerror)
slen, offset = self._unpack1(EncDecSimple.string_len, bstring, offset,
noerror)
if slen is None:
return None, offset
nexto = offset + slen
if len(bstring) < nexto:
if noerror:
val = None
else:
raise SequenceError('out of data '
'while unpacking {0!r}'.format(self.name))
else:
val = bstring[offset:nexto]
if self.autob2s:
val = self.b2s(val)
return val, nexto
# string length: 2 byte unsigned field
EncDecSimple.string_len = _ProtoStruct[2]
class EncDecTyped(EncDec):
r"""
EncDec for typed objects (which are build from PFODs, which are
a sneaky class variant of OrderedDict similar to namedtuple).
Calling the klass() function with no arguments must create an
instance with all-None members.
We also require a Sequencer to pack and unpack the members of
the underlying pfod.
>>> qid_s = Sequencer('qid')
>>> qid_s.append_encdec(None, EncDecSimple('type', 1))
>>> qid_s.append_encdec(None, EncDecSimple('version', 4))
>>> qid_s.append_encdec(None, EncDecSimple('path', 8))
>>> len(qid_s)
3
>>> from pfod import pfod
>>> qid = pfod('qid', ['type', 'version', 'path'])
>>> len(qid._fields)
3
>>> qid_inst = qid(1, 2, 3)
>>> qid_inst
qid(type=1, version=2, path=3)
>>> e = EncDecTyped(qid, 'aqid', qid_s)
>>> e.b2s(e.pack({}, {}, qid_inst))
'\x01\x02\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00'
>>> e.unpack({}, {},
... b'\x01\x02\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00', 0)
(qid(type=1, version=2, path=3), 13)
If an EncDecTyped instance has a conditional sequencer, note
that unpacking will leave un-selected items set to None (see
the Sequencer example below):
>>> breakfast = pfod('breakfast', 'eggs spam ham')
>>> breakfast()
breakfast(eggs=None, spam=None, ham=None)
>>> bfseq = Sequencer('breakfast')
>>> bfseq.append_encdec(None, EncDecSimple('eggs', 1))
>>> bfseq.append_encdec('yuck', EncDecSimple('spam', 1))
>>> bfseq.append_encdec(None, EncDecSimple('ham', 1))
>>> e = EncDecTyped(breakfast, 'bfname', bfseq)
>>> e.unpack({}, {'yuck': False}, b'\x02\x01\x04', 0)
(breakfast(eggs=2, spam=None, ham=1), 2)
This used just two of the three bytes: eggs=2, ham=1.
>>> e.unpack({}, {'yuck': True}, b'\x02\x01\x04', 0)
(breakfast(eggs=2, spam=1, ham=4), 3)
This used the third byte, so ham=4.
"""
def __init__(self, klass, name, sequence, aux=None):
assert len(sequence) == len(klass()._fields) # temporary
super(EncDecTyped, self).__init__(name, aux)
self.klass = klass
self.name = name
self.sequence = sequence
def __repr__(self):
if self.aux is None:
return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__,
self.klass, self.name, self.sequence)
return '{0}({1!r}, {2!r}, {3!r}, {4!r})'.format(self.__class__.__name__,
self.klass, self.name, self.sequence, self.aux)
__str__ = __repr__
def apack(self, vdict, cdict, val):
"""
Pack each of our instance variables.
Note that some packing may be conditional.
"""
return self.sequence.apack(val, cdict)
def unpack(self, vdict, cdict, bstring, offset, noerror=False):
"""
Unpack each instance variable, into a new object of
self.klass. Return the new instance and new offset.
Note that some unpacking may be conditional.
"""
obj = self.klass()
offset = self.sequence.unpack_from(obj, cdict, bstring, offset, noerror)
return obj, offset
class EncDecA(EncDec):
r"""
EncDec for arrays (repeated objects).
We take the name of repeat count variable, and a sub-coder
(Sequencer instance). For instance, we can en/de-code
repeat='nwname' copies of name='wname', or nwname of
name='wqid', in a Twalk en/de-code.
Note that we don't pack or unpack the repeat count itself --
that must be done by higher level code. We just get its value
from vdict.
>>> subcode = EncDecSimple('wname', '_string_')
>>> e = EncDecA('nwname', 'wname', subcode)
>>> e.b2s(e.pack({'nwname': 2}, {}, ['A', 'BC']))
'\x01\x00A\x02\x00BC'
>>> subcode.autob2s = True # so that A and BC decode to py3k str
>>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02\x00BC', 0)
(['A', 'BC'], 7)
When using noerror, the first sub-item that fails to decode
completely starts the None-s. Strings whose length fails to
decode are assumed to be zero bytes long as well, for the
purpose of showing the expected packet length:
>>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02\x00', 0, noerror=True)
(['A', None], 7)
>>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02', 0, noerror=True)
(['A', None], 5)
>>> e.unpack({'nwname': 3}, {}, b'\x01\x00A\x02', 0, noerror=True)
(['A', None, None], 7)
As a special case, supplying None for the sub-coder
makes the repeated item pack or unpack a simple byte
string. (Note that autob2s is not supported here.)
A too-short byte string is simply truncated!
>>> e = EncDecA('count', 'data', None)
>>> e.b2s(e.pack({'count': 5}, {}, b'12345'))
'12345'
>>> x = list(e.unpack({'count': 3}, {}, b'123', 0))
>>> x[0] = e.b2s(x[0])
>>> x
['123', 3]
>>> x = list(e.unpack({'count': 3}, {}, b'12', 0, noerror=True))
>>> x[0] = e.b2s(x[0])
>>> x
['12', 3]
"""
def __init__(self, repeat, name, sub, aux=None):
super(EncDecA, self).__init__(name, aux)
self.repeat = repeat
self.name = name
self.sub = sub
def __repr__(self):
if self.aux is None:
return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__,
self.repeat, self.name, self.sub)
return '{0}({1!r}, {2!r}, {3!r}, {4!r})'.format(self.__class__.__name__,
self.repeat, self.name, self.sub, self.aux)
__str__ = __repr__
def apack(self, vdict, cdict, val):
"pack each val[i], for i in range(vdict[self.repeat])"
num = vdict[self.repeat]
assert num == len(val)
if self.sub is None:
assert isinstance(val, bytes)
return [val]
parts = []
for i in val:
parts.extend(self.sub.apack(vdict, cdict, i))
return parts
def unpack(self, vdict, cdict, bstring, offset, noerror=False):
"unpack repeatedly, per self.repeat, into new array."
num = vdict[self.repeat]
if num is None and noerror:
num = 0
else:
assert num >= 0
if self.sub is None:
nexto = offset + num
if len(bstring) < nexto and not noerror:
raise SequenceError('out of data '
'while unpacking {0!r}'.format(self.name))
return bstring[offset:nexto], nexto
array = []
for i in range(num):
obj, offset = self.sub.unpack(vdict, cdict, bstring, offset,
noerror)
array.append(obj)
return array, offset
class SequenceError(Exception):
"sequence error: item too big, or ran out of data"
pass
class Sequencer(object):
r"""
A sequencer is an object that packs (marshals) or unpacks
(unmarshals) a series of objects, according to their EncDec
instances.
The objects themselves (and their values) come from, or
go into, a dictionary: <vdict>, the first argument to
pack/unpack.
Some fields may be conditional. The conditions are in a
separate dictionary (the second or <cdict> argument).
Some objects may be dictionaries or PFODs, e.g., they may
be a Plan9 qid or stat structure. These have their own
sub-encoding.
As with each encoder, we have both an apack() function
(returns a list of parts) and a plain pack(). Users should
mostly stick with plain pack().
>>> s = Sequencer('monty')
>>> s
Sequencer('monty')
>>> e = EncDecSimple('eggs', 2)
>>> s.append_encdec(None, e)
>>> s.append_encdec(None, EncDecSimple('spam', 1))
>>> s[0]
(None, EncDecSimple('eggs', 2))
>>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {}))
'\x01\x02A'
When particular fields are conditional, they appear in
packed output, or are taken from the byte-string during
unpacking, only if their condition is true.
As with struct, use unpack_from to start at an arbitrary
offset and/or omit verification that the entire byte-string
is consumed.
>>> s = Sequencer('python')
>>> s.append_encdec(None, e)
>>> s.append_encdec('.u', EncDecSimple('spam', 1))
>>> s[1]
('.u', EncDecSimple('spam', 1))
>>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {'.u': True}))
'\x01\x02A'
>>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {'.u': False}))
'\x01\x02'
>>> d = {}
>>> s.unpack(d, {'.u': True}, b'\x01\x02A')
>>> print(d['eggs'], d['spam'])
513 65
>>> d = {}
>>> s.unpack(d, {'.u': False}, b'\x01\x02A', 0)
Traceback (most recent call last):
...
SequenceError: 1 byte(s) unconsumed
>>> s.unpack_from(d, {'.u': False}, b'\x01\x02A', 0)
2
>>> print(d)
{'eggs': 513}
The incoming dictionary-like object may be pre-initialized
if you like; only sequences that decode are filled-in:
>>> d = {'eggs': None, 'spam': None}
>>> s.unpack_from(d, {'.u': False}, b'\x01\x02A', 0)
2
>>> print(d['eggs'], d['spam'])
513 None
Some objects may be arrays; if so their EncDec is actually
an EncDecA, the repeat count must be in the dictionary, and
the object itself must have a len() and be index-able:
>>> s = Sequencer('arr')
>>> s.append_encdec(None, EncDecSimple('n', 1))
>>> ae = EncDecSimple('array', 2)
>>> s.append_encdec(None, EncDecA('n', 'array', ae))
>>> ae.b2s(s.pack({'n': 2, 'array': [257, 514]}, {}))
'\x02\x01\x01\x02\x02'
Unpacking an array creates a list of the number of items.
The EncDec encoder that decodes the number of items needs to
occur first in the sequencer, so that the dictionary will have
acquired the repeat-count variable's value by the time we hit
the array's encdec:
>>> d = {}
>>> s.unpack(d, {}, b'\x01\x04\x00')
>>> d['n'], d['array']
(1, [4])
"""
def __init__(self, name):
self.name = name
self._codes = []
self.debug = False # or sys.stderr
def __repr__(self):
return '{0}({1!r})'.format(self.__class__.__name__, self.name)
__str__ = __repr__
def __len__(self):
return len(self._codes)
def __iter__(self):
return iter(self._codes)
def __getitem__(self, index):
return self._codes[index]
def dprint(self, *args, **kwargs):
if not self.debug:
return
if isinstance(self.debug, bool):
dest = sys.stdout
else:
dest = self.debug
print(*args, file=dest, **kwargs)
def append_encdec(self, cond, code):
"add EncDec en/de-coder, conditional on cond"
self._codes.append((cond, code))
def apack(self, vdict, cdict):
"""
Produce packed representation of each field.
"""
packed_data = []
for cond, code in self._codes:
# Skip this item if it's conditional on a false thing.
if cond is not None and not cdict[cond]:
self.dprint('skip %r - %r is False' % (code, cond))
continue
# Pack the item.
self.dprint('pack %r - no cond or %r is True' % (code, cond))
packed_data.extend(code.apack(vdict, cdict, vdict[code.name]))
return packed_data
def pack(self, vdict, cdict):
"""
Flatten packed data.
"""
return b''.join(self.apack(vdict, cdict))
def unpack_from(self, vdict, cdict, bstring, offset=0, noerror=False):
"""
Unpack from byte string.
The values are unpacked into a dictionary vdict;
some of its entries may themselves be ordered
dictionaries created by typedefed codes.
Raises SequenceError if the string is too short,
unless you set noerror, in which case we assume
you want see what you can get out of the data.
"""
for cond, code in self._codes:
# Skip this item if it's conditional on a false thing.
if cond is not None and not cdict[cond]:
self.dprint('skip %r - %r is False' % (code, cond))
continue
# Unpack the item.
self.dprint('unpack %r - no cond or %r is True' % (code, cond))
obj, offset = code.unpack(vdict, cdict, bstring, offset, noerror)
vdict[code.name] = obj
return offset
def unpack(self, vdict, cdict, bstring, noerror=False):
"""
Like unpack_from but unless noerror=True, requires that
we completely use up the given byte string.
"""
offset = self.unpack_from(vdict, cdict, bstring, 0, noerror)
if not noerror and offset != len(bstring):
raise SequenceError('{0} byte(s) unconsumed'.format(
len(bstring) - offset))
if __name__ == '__main__':
import doctest
doctest.testmod()