'''A `RemoteAPI` is an interface-like class whose methods correspond to remote calls.
Let us start with an example::
from quickrpc.remote_api import RemoteAPI, incoming, outgoing
class MyAPI(RemoteAPI):
@incoming
def notify(self, sender, arg1=val1, arg2=val2):
"""notification that something happened"""
@outgoing
def helloworld(self, receivers, arg1=val1):
"""Tell everybody that I am here."""
@incoming(has_reply=true)
def echo(self, sender, text="test"):
"""returns the text that was sent."""
RemoteAPI is used by subclassing it. Remote methods are defined by the
``@incoming`` and ``@outgoing`` decorators.
Important:
The method body of remote methods must be empty.
This is because by caling ``@outgoing`` methods, you actually issue a call
over the :class:`~.transports.Transport` that is bound to the
``RemoteAPI`` at runtime. Since the API is meant to be used by both sides (by
means of inverting it), ``@incoming`` methods should be empty, too. The side
effect of this is that the class definition is more or less a printable
specification of your interface.
``@incoming`` methods have a ``.connect()`` method to attach an implementation
to that message. The connected handler has the same signature as the
``@incoming`` method, except for the ``self`` argument.
By default, all defined calls are resultless (i.e. notifications). To define
calls with return value, decorate with ``has_reply=True`` kwarg.
When handling such a call on the incoming side, your handler's return value
is returned to the sender. Exceptions are caught and sent as error reply.
On the ``outgoing`` side, the call immediately returns a
:class:`~.Promise` object.
You then use :meth:`~.Promise.result` to get at the actual
result. This will block
until the result arrived.
(TODO: make blocking call by default, add block=False param for Promises)
'''
import logging
from .promise import Promise
from .action_queue import ActionQueue
import itertools as it
import inspect
from functools import wraps
from .codecs import Codec, Message, Reply, ErrorReply
from .transports import Transport
from .security import Security
L = lambda: logging.getLogger(__name__)
__all__ = [
'RemoteAPI',
'incoming',
'outgoing',
]
[docs]class RemoteAPI(object):
'''Describes an API i.e. a set of allowed outgoing and incoming calls.
Subclass and add your calls.
:attr:`.codec` holds the Codec for (de)serializing data.
:attr:`.transport` holds the underlying transport.
:attr:`.security` holds the security provider.
Both can also be strings, then :meth:`.Transport.fromstring` / :meth:`.Codec.fromstring` are used
to acquire the respective objects. In this case, transport still needs to be started
via ``myapi.transport.start()``.
Methods marked as ``@outgoing`` are automatically turned into
messages when called. The method body is executed before sending. (use e.g.
for validation of outgoing data).
They must accept a special `receivers` argument, which is passed to the
Transport.
Methods marked as ``@incoming`` are called by the transport when
messages arrive. They work like signals - you can connect your
own handler(s) to them. Connected handlers must have the same
signature as the incoming call. All @incoming methods MUST support
a `senders` argument.
Connect like this:
>>> def handler(self, foo=None): pass
>>> remote_api.some_method.connect(handler)
>>> # later
>>> remote_api.some_method.disconnect(handler)
Execution order: the method of remote_api is executed first,
then the connected handlers in the order of registering.
Incoming messages with unknown method will not be processed. If the message
has ``.id != 0``, it will automatically be replied with an error.
Threading:
* outgoing messages are sent on the calling thread.
* If async_processing = False, incoming messages are handled on the thread
which handles Transport receive events. I.e. the Transport
implementation defines the behaviour.
* If async_processing = True, an extra Thread is used to handle messages.
The latter allows the receive handler to run concurrently to message
handling, allowing further requests to be sent out and to await the result.
However it means one extra thread. In any case, only one incoming message
is handled at a time.
Recommendation is to set ``async_processing=True`` if there are any outgoing
calls that have a reply, ``False`` if not.
Inverting:
You can :meth:`.invert` the whole api,
swapping incoming and outgoing methods. When inverted, the ``sender`` and
``receiver`` arguments of each method swap their roles. This is also possible
upon initialization by giving ``invert=True`` kwarg.
'''
def __init__(self, codec='jrpc', transport=None, security='null', invert=False, async_processing=False):
if isinstance(codec, str):
codec = Codec.fromstring(codec)
if isinstance(transport, str):
transport = Transport.fromstring(transport)
if isinstance(security, str):
security = Security.fromstring(security)
self.codec = codec
self.transport = transport
self.security = security
# FIXME: limit size of _pending_replies somehow
self._pending_replies = {}
self._id_dispenser = it.count()
# pull the 0
next(self._id_dispenser)
if invert:
self.invert()
# just use the presence of _action_queue as flag.
if async_processing:
self._action_queue = ActionQueue()
else:
self._action_queue = None
@property
def transport(self):
'''Gets/sets the transport used to send and receive messages.
You can change the transport at runtime.'''
return self._transport
@transport.setter
def transport(self, value):
self._transport = value
if self._transport:
self._transport.set_on_received(self._handle_received)
[docs] def invert(self):
'''Swaps ``@incoming`` and ``@outgoing`` decoration
on all methods of this INSTANCE.
I.e. generates the opposite-side API.
Do this before connecting any handlers to incoming calls.
You can achieve the same effect by instantiating with ``invert=True`` kwarg.
'''
for attr in dir(self):
field = getattr(self, attr)
if hasattr(field, '_remote_api_incoming') or hasattr(field, '_remote_api_outgoing'):
# The decorators add a "method" .inverted() to the field,
# which will yield the inverse-decorated field.
setattr(self, attr, field.inverted().__get__(self))
# ---- handling of incoming messages ----
def _handle_received(self, sender, data):
'''called by the Transport when data comes in.'''
messages, remainder = self.codec.decode(data, sec_in=self.security.sec_in)
for message in messages:
if isinstance(message, Exception):
self.message_error(sender, message)
continue
elif isinstance(message, Reply) or isinstance(message, ErrorReply):
self._deliver_reply(message)
else:
self._handle_method(sender, message)
return remainder
def _handle_method(self, sender, message):
try:
method = getattr(self, message.method)
except AttributeError:
self.message_error(sender, AttributeError("Incoming call of %s not defined on the api"%message.method), message)
return
if not hasattr(method, "_remote_api_incoming"):
self.message_error(sender, AttributeError("Incoming call of %s not marked as @incoming on the api"%message.method), message)
return
def action():
has_reply = method._remote_api_incoming['has_reply']
try:
result = method(sender, message)
except Exception as e:
if has_reply:
L().debug('Exception in message handler, returning as result: '+str(e), exc_info=True)
self.message_error(sender, e, message)
else:
# Complain and continue, since the user cannot install sensible handling above from here.
L().error('Exception in message handler caught: '+str(e), exc_info = True)
else:
if has_reply:
try:
data = self.codec.encode_reply(message, result, sec_out=self.security.sec_out)
self.transport.send(data, receivers=[sender])
except Exception as e:
L().error('Exception in message handler while sending response: '+str(e), exc_info=True)
if self._action_queue:
# message processed in extra thread, we return instantly after .put
self._action_queue.put(action)
else:
# message processed in this thread, return when done.
action()
[docs] def message_error(self, sender, exception, in_reply_to=None):
'''Called each time that an incoming message causes problems.
By default, it logs the error as warning. in_reply_to is the message that
triggered the error, None if decoding failed. If the requested method can be
identified and has a reply, an error reply is returned to the sender.
'''
L().warning(exception)
if in_reply_to.id:
data = self.codec.encode_error(in_reply_to, exception, errorcode=0, sec_out=self.security.sec_out)
self.transport.send(data, receivers=[sender])
def _deliver_reply(self, reply):
id = reply.id
try:
promise = self._pending_replies.pop(id)
except KeyError:
# do not raise, since it cannot be caught by user.
L().warning('Received reply that was never requested: %r'%(reply,))
return
#FIXME: secinfo is discarded
if isinstance(reply, Reply):
promise.set_result(reply.result)
else:
# Put the ErrorReply in the result queue.
promise.set_exception(reply.exception)
# ---- handling of outgoing messages ----
def _new_request(self):
call_id = next(self._id_dispenser)
promise = Promise(setter_thread=self.transport.receiver_thread)
self._pending_replies[call_id] = promise
return call_id, promise
# ---- stuff ----
[docs] def unhandled_calls(self):
'''Generator, returns the names of all *incoming*, unconnected methods.
If no results are returned, all incoming messages are connected. Use
this to check for missed ``.connect`` calls.
'''
result = []
for attr in dir(self):
field = getattr(self, attr)
if hasattr(field, '_remote_api_incoming') and not field._listeners:
yield attr
[docs]def incoming(unbound_method=None, has_reply=False, allow_positional_args=False):
'''Marks a method as possible incoming message.
``@incoming(has_reply=False, allow_positional_args=False)``
Incoming methods keep list of connected listeners, which are called with the
signature of the incoming method (excluding ``self``). The first argument
will be passed positional and is a string describing the sender of the message.
The remaining arguments can be chosen freely and will usually be passed as named
args.
Optionally, you can receive security info (the ``secinfo`` dict extracted
from the message). For this, call ``myapi.<method>.pass_secinfo(True)``.
Listener calls then receive an additional kwarg called ``secinfo``, containing
the received dictionary. I.e. your handler(s) must add a ``secinfo=``
parameter in addition to the signature specified in the ``RemoteAPI``.
Listeners can be added with ``myapi.<method>.connect(handler)`` and
disconnected with ``.disconnect(handler)``. They are called in the order that
they were added.
If ``has_reply=True``, the handler should return a value that is sent back
to the sender. If multiple handlers are connected, at most one of them must
return something.
Notice:
Processing of incoming messages does not resume until all listeners returned.
This means that if you issue a followup remote call in a listener, the
result can not arrive while the listener is executing. If you want to
do this, use promise.then() to resume when the result is there.
You can also spawn a new thread in your listener, to do the processing.
However, be aware that this makes you vulnerable against DOS attacks,
since an attacker can make you open arbitrary many threads this way.
If ``allow_positional_args=True``, messages with positional (unnamed)
arguments are accepted. Otherwise such arguments throw an error message without
executing the handler(s). Note that the :class:`.Codec` must support positional
and/or mixed args as well. It is strongly recommended to use named args only.
Lastly, the incoming method has a ``myapi.<method>.inverted()`` method, which
will return the ``@outgoing`` variant of it.
'''
if not unbound_method:
# when called as @decorator(...)
return lambda unbound_method: incoming(unbound_method=unbound_method, has_reply=has_reply, allow_positional_args=allow_positional_args)
# when called as @decorator or explicitly
pass_secinfo = [False]
@wraps(unbound_method)
def fn(self, sender, message):
if isinstance(message.kwargs, dict):
args, kwargs = [], message.kwargs
else:
if not allow_positional_args:
raise ValueError('Please call with named parameters only!')
if isinstance(message.kwargs, list):
args, kwargs = message.kwargs, {}
else:
args, kwargs = [message.kwargs], {}
L().debug('incoming call of %s, args=%r, kwargs=%r'%(message.method, args, kwargs))
try:
replies = [unbound_method(self, sender, *args, **kwargs)]
except TypeError:
# signature is wrong
raise TypeError('incoming call with wrong signature')
if pass_secinfo[0]:
kwargs['secinfo'] = message.secinfo
for listener in fn._listeners:
replies.append(listener(sender, *args, **kwargs))
if has_reply:
replies = [r for r in replies if r is not None]
if len(replies) > 1:
raise ValueError('Incoming call produced more than one reply!')
replies.append(None) # If there is no result, reply with None
return replies[0]
# Presence of this attribute indicates that this method is a valid incoming target
fn._remote_api_incoming = {'has_reply': has_reply}
fn._listeners = []
fn._unbound_method = unbound_method
fn.pass_secinfo = lambda val: pass_secinfo.__setitem__(0, val)
fn.connect = lambda listener: fn._listeners.append(listener)
fn.disconnect = lambda listener: fn._listeners.remove(listener)
fn.inverted = lambda: outgoing(unbound_method, has_reply=has_reply, allow_positional_args=allow_positional_args)
return fn
[docs]def outgoing(unbound_method=None, has_reply=False, allow_positional_args=False):
'''Marks a method as possible outgoing message.
``@outgoing(has_reply=False, allow_position_args=False)``
Invocation of outgoing methods leads to a message being sent over the
:class:`.Transport` of the :class:`RemoteAPI`.
The first argument must be the list of receivers of the message, as a list
of strings. When calling the method, usually you will use the sender name(s)
received via an incoming call. Set receivers=None to send to all connected
peers.
The remaining arguments can be choosen freely. The argument values can be
anything supported by the :class:`.Codec` that you use. The builtin Codecs
support all the "atomic" builtin types, as well as dicts and lists.
If ``has_reply=True``, the other side is expected to return a result value. In this case,
calling the outgoing method returns a :class:`.Promise` immediately.
If ``allow_positional_args=True``, calls with positional (unnamed)
arguments are accepted. Otherwise such arguments raise :class:`ValueError`.
**For sending, they will be converted into named arguments.**
It is strongly recommended to use named args only.
Lastly, the outgoing method has a ``myapi.<method>.inverted()`` method, which
will return the ``@incoming`` variant of it.
'''
if not unbound_method:
# when called as @decorator(...)
return lambda unbound_method: outgoing(unbound_method=unbound_method, has_reply=has_reply, allow_positional_args=allow_positional_args)
# when called as @decorator or explicitly
if allow_positional_args:
sig = inspect.signature(unbound_method)
# cut off self and sender/receiver arg
argnames = [p.name for p in sig.parameters.values()][2:]
else:
argnames = []
@wraps(unbound_method)
def fn(self, receivers=None, *args, **kwargs):
if args and not allow_positional_args:
raise ValueError('Please call with named parameters only!')
else:
# map positional to named args
for name, arg in zip(argnames, args):
if name in kwargs:
raise ValueError('argument %s given twice!'%name)
kwargs[name] = arg
# this ensures that all args and kwargs are valid
unbound_method(self, receivers, **kwargs)
if has_reply:
call_id, promise = self._new_request()
else:
call_id = 0
data = self.codec.encode(unbound_method.__name__, kwargs=kwargs, id=call_id, sec_out=self.security.sec_out)
self.transport.send(data, receivers=receivers)
if has_reply:
return promise
fn._remote_api_outgoing = {'has_reply': has_reply}
fn.inverted = lambda: incoming(unbound_method, has_reply=has_reply, allow_positional_args=allow_positional_args)
return fn