Source code for quickrpc.terse_codec

import logging
import base64
import binascii
import re
from .codecs import Codec, Message, Reply, ErrorReply, DecodeError, EncodeError, RemoteError, _fmt_exc
L = lambda: logging.getLogger(__name__)

 
[docs]class TerseCodec(Codec): '''Terse codec: encodes with minimum puncutation. encodes to: method[id] param1:1, param2:"foo"<NL> values: * int/float: 1.0 * bytes: '(base64-string' * str: "python-escaped str" * list: [val1 val2 val3 ...] * dict: {key1:val1 key2:val2 ...} Reply is encoded to: [id]:value Error is encoded to: [id]! message:"string" details:"string" * Commands must be terminated by newline. * Newlines, double quote and backslash in strings are escaped as usual * Allowed dtypes: int, float, str, bytes (content base64-encoded), list, dict ''' shorthand = 'terse'
[docs] @classmethod def fromstring(cls, expression): '''terse:''' return cls()
[docs] def encode(self, method, kwargs, id=0, sec_out=None): '''encodes the call, including trailing newline''' if sec_out: raise EncodeError('Security is not supported by TerseCodec.') return _encode_method(method, id, kwargs)
[docs] def encode_reply(self, in_reply_to, result, sec_out=None): if sec_out: raise EncodeError('Security is not supported by TerseCodec.') return b'[%d]:%s'%(in_reply_to.id, _encode_value(result))
[docs] def encode_error(self, in_reply_to, exception, errorcode=0, sec_out=None): if sec_out: raise EncodeError('Security is not supported by TerseCodec.') text = _encode_value(str(exception)) details = _encode_value(_fmt_exc(exception)) return b'[%d]! message:%s details:%s'%(in_reply_to.id, text, details)
[docs] def decode(self, data, sec_in=None): if sec_in: raise DecodeError('Security is not supported by TerseCodec') lines = data.split(b'\n') leftover = lines.pop() messages = [] for line in lines: try: obj, idx = _decode(line + b'\n') except DecodeError as e: L().warning(e) continue else: if idx <= len(line): L().warning('_decode left something over: %r'%line[idx:]) messages.append(obj) if leftover: L().debug('leftover data: %r'%(leftover[:50]+b" ... "+leftover[-50:])) return messages, leftover
def _encode_method(method, id, params): return b'%s%s %s\n'%( method.encode('utf8'), b'/%d'%id if id>0 else b'', b' '.join( name.encode('utf8') + b':' + _encode_value(value) for name, value in params.items() ) ) def _encode_value(value): if isinstance(value, (int, float)): return b'%g'%value elif isinstance(value, str): value = value.replace('\\', '\\\\').replace('"', '\\"').replace('\n', '\\n') return ('"'+value+'"').encode('utf8') elif isinstance(value, bytes): return b"'" + base64.b64encode(value) + b"'" elif isinstance(value, (list, tuple, set)): return _encode_iterable(value) elif isinstance(value, dict): return _encode_dict(value) def _encode_iterable(l): return b'[' + b' '.join(_encode_value(value) for value in l) + b']' def _encode_dict(d): return b'{' + ( b' '.join( str(key).encode('utf8') + b':' + _encode_value(value) for key, value in d.items() ) ) + b'}' def _decode(data): '''returns object, idx OR raises DecodeError. identifier params <NL> ''' idx = 0 params = {} # skip empty telegrams while data[idx:idx+1] in b'\r\n': idx += 1 if b'\n' not in data: raise DecodeError('Incomplete data') if data[idx] == b'[': idnum, idx = _decode_idnum(data, idx) if data[idx] == b':': return _decode_reply(data, idx+1, id=idnum) elif data[idx] == b'!': return _decode_error(data, idx+1, id=idnum) else: raise DecodeError('Received invalid reply') return _decode_message(data, idx) def _decode_message(data, idx): method, idx = _decode_identifier(data, idx) if data[idx] == b'[': id = _decode_idnum(data, idx) else: id = 0 idx = _skipws(data, idx) params, idx = _decode_pairs(data, idx) idx = _expect(data, idx, b'\n') return Message(method, params, id=id), idx def _decode_reply(data, idx, id): idx = _skipws(data, idx) val, idx = _decode_value(data, idx) idx = _expect(data, idx, b'\n') return Reply(val, id), idx def _decode_error(data, idx, id): idx = _skipws(data, idx) data, idx = _decode_pairs(data, idx) idx = _expect(data, idx, b'\n') msg = data.get('message', '') details = data.get('details', '') return ErrorReply(message, details), idx def _decode_pairs(data, idx, assignchar = b':'): pairs = dict() while True: idx = _skipws(data, idx) # XXX: sentinel chars from other grammar terms! if data[idx:idx+1] in b'}\n': return pairs, idx key, value, idx = _decode_pair(data, idx, assignchar) pairs[key] = value def _decode_pair(data, idx, assignchar): key, idx = _decode_identifier(data, idx) # this may throw DecodeError idx = _expect(data, idx, assignchar) value, idx = _decode_value(data, idx) return key, value, idx def _decode_value(data, idx): idx = _skipws(data, idx) ch = data[idx:idx+1] if ch in b'-.0123456789': return _decode_num(data, idx) func = { b'[': _decode_list, b'{': _decode_dict, b'"': _decode_str, b"'": _decode_bytes, }.get(ch, None) if func is None: raise DecodeError('Unsupported Value at position %s'%idx) return func(data, idx) _num_re = re.compile(br'-?\d*(\.\d*)?([eE][+-?]\d+)?') def _decode_num(data, idx): m = _num_re.match(data, idx) if not m: raise DecodeError('Expected number at position %d'%idx) if b'.' not in m.group() and b'e' not in m.group().lower(): return int(m.group()), m.end() return float(m.group()), m.end() def _decode_bytes(data, idx): try: end = data.index(b"'", idx+1) except ValueError: raise DecodeError('unterminated bytes value at %d'%idx) try: value = base64.b64decode(data[idx+1:end]) except binascii.Error: raise DecodeError('invalid base64 string') return value, end+1 _str_re = re.compile(br'(\\"|[^"])*["]') def _decode_str(data, idx): idx = _expect(data, idx, b'"') m = _str_re.match(data, idx) if not m.group(): raise DecodeError('Expected quoted value at position %d'%idx) value = m.group()[:-1].decode('utf8').replace('\\n', '\n').replace('\\"', '\"').replace('\\\\', '\\') return value, m.end() def _decode_dict(data, idx): idx = _expect(data, idx, b'{') contents, idx = _decode_pairs(data, idx, assignchar=b':') idx = _expect(data, idx, b'}') return contents, idx def _decode_list(data, idx): idx = _expect(data, idx, b'[') l = [] while True: idx = _skipws(data, idx) if data[idx:idx+1] == b']': break value, idx = _decode_value(data, idx) l.append(value) idx = _expect(data, idx, b']') return l, idx _id_re = re.compile(br'.*?(?=[ :\n])') def _decode_identifier(data, idx): m = _id_re.match(data, idx) if not m: raise DecodeError('Expected identifier at position %d'%idx) return m.group().strip().decode('utf8'), m.end() _idnum_re = re.compile(br'\[\d+\]') def _decode_idnum(data, idx): m = _idnum_re.match(data, idx) if not m: raise DecodeError('Expected [id] at position %d'%idx) return int(m.group()[1:-1].decode('ascii')), m.end() def _skipws(data, idx): while data[idx:idx+1] in b' \r': idx += 1 return idx def _expect(data, idx, chars): while data[idx:idx+1] == b' ': idx += 1 if data[idx:idx+len(chars)] != chars: raise DecodeError('Expected characters "%s" at position %d'%(chars.decode('ascii'), idx)) return idx + len(chars)