# coding: utf8
'''A Transport abstracts a two-way bytestream interface.
It can be started and stopped, and send and receive byte sequences
to one or more receivers.
Classes defined here:
* :any:`Transport`: abstract base
* :any:`MuxTransport`: a transport that multiplexes several sub-transports.
* :any:`RestartingTransport`: a transport that automatically restarts its child.
* :any:`StdioTransport`: reads from stdin, writes to stdout.
* :any:`TcpServerTransport`: a transport that accepts tcp connections and muxes
them into one transport. Actually a forward to quickrpc.network_transports.
* :any:`TcpClientTransport`: connects to a TCP server. This is a forward to
quickrpc.network_transports.
* :any:`RestartingTcpClientTransport`: a TCP Client that reconnects automatically.
'''
__all__ = [
'Transport',
'MuxTransport',
'RestartingTransport',
'StdioTransport',
'TcpServerTransport',
'TcpClientTransport',
'RestartingTcpClientTransport',
]
from collections import namedtuple
import logging
import queue
import sys
import select
import threading
import time
from .util import subclasses, paren_partition
from .promise import Promise, PromiseDoneError
L = lambda: logging.getLogger(__name__)
class TransportError(Exception):
'''Generic Transport-related error.'''
[docs]class Transport(object):
'''A transport abstracts a two-way bytestream interface.
This is the base class, which provides multithreading logic
but no actual communication channel.
In a subclass, the following methods must be implemented:
* :meth:`send` to send outgoing messages
* :meth:`open` to initialize the channel (if necessary)
* :meth:`run` to receive messages until it is time to stop.
Incoming messages are passed to a callback. It must be set before the first
message arrives via :meth:`set_on_received`.
Provided threading functionality:
- :meth:`start` opens and runs the channel in a new thread
- :meth:`stop` signals :meth:`run` to stop, by setting :attr:`running` to False.
The classmethod :meth:`fromstring` can be used to create a
:class:`Transport` instance from a string (for enhanced configurability).
'''
# The shorthand to use for string creation.
shorthand = ''
def __init__(self):
self._on_received = None
self.running = False
# This lock guards calls to .start() and .stop().
# E.g. someone might try to stop while we are still starting.
self._transition_lock = threading.Lock()
self._thread = None
[docs] @classmethod
def fromstring(cls, expression):
'''Creates a transport from a given string expression.
The expression must be "<shorthand>:<specific parameters>",
with shorthand being the wanted transport's .shorthand property.
For the specific parameters, see the respective transport's .fromstring
method.
The base class implementation searches among all known subclasses
for the Transport matching the given shorthand, and returns
``Subclass.fromstring(expression)``.
'''
shorthand, _, expr = expression.partition(':')
for subclass in subclasses(cls):
if subclass.shorthand == shorthand:
return subclass.fromstring(expression)
raise ValueError('Could not find a transport class with shorthand %s'%shorthand)
@property
def receiver_thread(self):
'''The thread on which on_received will be called.'''
return self._thread
[docs] def open(self):
'''Open the communication channel. e.g. bind and activate a socket.
Override me.
``open`` is called on the new thread opened by `start`. I.e. the same thread in
which the Transport will ``run``.
When ``open()`` returns, the communication channel should be ready for send and receive.
'''
[docs] def run(self):
'''Runs the transport, blocking.
Override me.
This contains the transport's mainloop, which must:
- receive bytes from the channel (usually blocking)
- pass the bytes to ``self.received``
- check periodically (e.g. each second) if ``self.running`` has been cleared
- if so, close the channel and return.
'''
self.running = True
[docs] def start(self, block=True, timeout=10):
'''Run in a new thread.
If ``block`` is True, waits until startup is complete i.e. :meth:`open`
returns. Then returns True.
if nonblocking, returns a promise.
If something goes wrong during start, the Exception, like e.g. a
``socket.error``, is passed through to the caller.
'''
def starter():
with self._transition_lock:
try:
self.open()
except Exception as e:
p.set_exception(e)
return
else:
p.set_result(True)
try:
self.run()
finally:
self.running = False
self._thread = threading.Thread(target=starter, name=self.__class__.__name__)
p = Promise(setter_thread=self._thread)
self._thread.start()
return p.result(timeout=timeout) if block else p
[docs] def stop(self, block=True):
'''Stop running transport (possibly from another thread).
Resets :attr:`running` to signal to :meth:`run` that it should stop.
Actual stopping can take a moment. If ``block`` is True, :meth:`.stop`
waits until :meth:`run` returns.
'''
with self._transition_lock:
self.running = False
thread = self._thread
# If cross-thread stop, wait until actually stopped.
# conditions:
# - block is set
# - thread is not None
# - thread is not the current_thread() (catches "stop-from-within" deadlock)
if block and thread and thread is not threading.current_thread():
self._thread.join()
self._thread = None
[docs] def set_on_received(self, on_received):
'''Sets the function to call upon receiving data.
The callback's signature is ``on_received(sender, data)``, where ``sender`` is a
string describing the origin; ``data`` is the received bytes.
If decoding leaves trailing bytes, they should be returned. The Transport
stores them and prepends them to the next received bytes.
'''
self._on_received = on_received
[docs] def send(self, data, receivers=None):
'''Sends the given data to the specified receiver(s).
``receivers`` is an iterable yielding strings. ``receivers=None`` sends
the data to all connected peers.
TODO: specify behaviour when sending on a stopped or failed Transport.
'''
raise NotImplementedError("Override me")
[docs] def received(self, sender, data):
'''To be called by :meth:`run` when the subclass received data.
``sender`` is a unique string identifying the source. e.g. IP address and port.
If the given data has an undecodable "tail", it is returned.
In this case :meth:`run` must prepend the tail to the next received bytes from this channel,
because it is probably an incomplete message.
'''
if not self._on_received:
raise AttributeError("Transport received a message but has no handler set.")
return self._on_received(sender, data)
[docs]class StdioTransport(Transport):
shorthand = 'stdio'
[docs] @classmethod
def fromstring(cls, expression):
'''No configuration options, just use "stdio:".'''
return cls()
[docs] def stop(self):
L().debug('StdioTransport.stop() called')
Transport.stop(self)
[docs] def send(self, data, receivers=None):
if receivers is not None and 'stdio' not in receivers:
return
L().debug('StdioTransport.send %r'%data)
sys.stdout.buffer.write(data)
sys.stdout.buffer.flush()
[docs] def run(self):
'''run, blocking.'''
L().debug('StdioTransport.run() called')
self.running = True
leftover = b''
while self.running:
# FIXME: This loses bytes on startup.
data = self._input()
#data = input().encode('utf8') + b'\n'
if data is None:
continue
L().debug("received: %r"%data)
leftover = self.received(sender='stdio', data=leftover + data)
L().debug('StdioTransport has finished')
def _input(self, timeout=0.1):
'''Input with 0.1s timeout. Return None on timeout.'''
i, o, e = select.select([sys.stdin.buffer], [], [], timeout)
if i:
return sys.stdin.buffer.read1(65536)
else:
return None
InData = namedtuple('InData', 'sender data')
[docs]class MuxTransport(Transport):
'''A transport that muxes several transports.
Incoming data is serialized into the thread of MuxTransport.run().
Add Transports via mux_transport += transport.
Remove via mux_transport -= transport.
Adding a transport changes its on_received binding to the mux transport.
If MuxTransport is already running, the added transport is start()ed by default.
Removing a transport stop()s it by default.
Running/Stopping the MuxTransport also runs/stops all muxed transports.
'''
shorthand='mux'
[docs] @classmethod
def fromstring(cls, expression):
'''mux:(<transport1>)(<transport2>)...
where <transport1>, .. are again valid transport expressions.
'''
_, _, params = expression.partition(':')
t = cls()
while params != '':
expr, _, params = paren_partition(params)
t.add_transport(Transport.fromstring(expr))
return t
def __init__(self):
Transport.__init__(self)
self.in_queue = queue.Queue()
self.transports = []
self.running = False
# sender --> leftover bytes
self.leftovers = {}
[docs] def send(self, data, receivers=None):
# Let everyone decide for himself.
for transport in self.transports:
transport.send(data, receivers=receivers)
[docs] def handle_received(self, sender, data):
'''handles INCOMING data from any of the muxed transports.
b'' is returned as leftover ALWAYS; MuxTransport keeps
internal remainder buffers for all senders, since the
leftover is only available after the message was processed.
'''
self.in_queue.put(InData(sender, data))
return b''
[docs] def add_transport(self, transport, start=True):
'''add and start the transport (if running).'''
self.transports.append(transport)
transport.set_on_received(self.handle_received)
if start and self.running:
transport.start()
return self
[docs] def remove_transport(self, transport, stop=True):
'''remove and stop the transport.'''
self.transports.remove(transport)
transport.set_on_received(None)
if stop:
transport.stop()
return self
__iadd__ = add_transport
__isub__ = remove_transport
[docs] def stop(self):
L().debug('MuxTransport.stop() called')
Transport.stop(self)
[docs] def open(self):
'''Start all transports that were added so far.
The subtransports are started in parallel, then we wait until all of
them are up.
If any transport fails to start, all transports are stopped again,
and TransportError is raised. It will have a .exceptions attribute being
a list of all failures.
'''
L().debug('MuxTransport.run() called')
promises = []
exceptions = []
running = []
for transport in self.transports:
promises.append(transport.start(block=False))
# wait on all the promises
for transport, promise in zip(self.transports, promises):
try:
if promise.result():
running.append(transport)
except Exception as e:
exceptions.append(e)
if exceptions:
# Oh my. Stop everything again.
L().error('Some transports failed to start. Aborting.')
for transport in running:
transport.stop()
e = TransportError()
e.exceptions = exceptions
raise e
L().debug('Thread overview: %s'%([t.name for t in threading.enumerate()],))
[docs] def run(self):
self.running = True
while self.running:
try:
indata = self.in_queue.get(timeout=0.5)
except queue.Empty:
# timeout passed, check self.running and try again.
continue
L().debug('MuxTransport: received %r'%(indata,))
leftover = self.leftovers.get(indata.sender, b'')
leftover = self.received(indata.sender, leftover + indata.data)
self.leftovers[indata.sender] = leftover
# stop all transports
for transport in self.transports:
transport.stop()
L().debug('MuxTransport has finished')
[docs]class RestartingTransport(Transport):
'''A transport that wraps another transport and keeps restarting it.
E.g. you can wrap a TcpClientTransport to try reconnecting it.
>>> tr = RestartingTransport(TcpClientTransport(*address), check_interval=10)
check_interval gives the Restart interval in seconds. It may not be kept exactly.
It cannot be lower than 1 second. Restarting is attempted as long as the transport is running.
Adding a transport changes its on_received handler to the RestartingTransport.
'''
shorthand='restart'
[docs] @classmethod
def fromstring(cls, expression):
'''restart:10:<subtransport>
10 (seconds) is the restart interval.
<subtransport> is any valid transport string.
'''
_, _, expr = expression.partition(':')
interval, _, expr = expr.partition(':')
if interval:
interval = int(interval)
else:
interval=10
return cls(
transport=Transport.fromstring(expr),
check_interval=interval,
name=expression
)
def __init__(self, transport, check_interval=10, name=''):
Transport.__init__(self)
self.check_interval = check_interval
self.transport = transport
self.transport.set_on_received(self.received)
self._poll_interval = 1
self.name = name
self._start_promise = None
@property
def receiver_thread(self):
'''Thread on which receive() is called - in this case, receiver_thread of the child.'''
return self.transport.receiver_thread
@property
def subtransport_running(self):
'''True if the child transport is currently running.'''
return self.transport.running
[docs] def stop(self):
# First stop self!
Transport.stop(self)
[docs] def open(self):
self._start_promise = self.transport.start(block=False)
[docs] def run(self):
self.running = True
restart_timer = self.check_interval
while self.running:
time.sleep(self._poll_interval)
if self._start_promise:
# still starting, wait for result
try:
self._start_promise.result(timeout=1.0)
except TimeoutError:
pass
except Exception as e:
L().info('Start of (%s) failed. Traceback follows. Retry in %g seconds'%(self.transport.name, self.check_interval), exc_info=True)
self._start_promise = None
else:
# started
self._start_promise = None
elif not self.transport.running:
restart_timer -= self._poll_interval
if restart_timer <= 0:
L().info("trying to restart (%s)"%self.name)
self._start_promise = self.transport.start(block=False)
restart_timer = self.check_interval
self.transport.stop()
[docs] def send(self, data, receivers=None):
self.transport.send(data, receivers)
[docs]def RestartingTcpClientTransport(host, port, check_interval=10):
'''Convenience wrapper for the most common use case. Returns TcpClientTransport wrapped in a RestartingTransport.'''
t = TcpClientTransport(host, port)
return RestartingTransport(t, check_interval=check_interval, name=t.name)
[docs]def TcpServerTransport(port, interface='', announcer=None):
from .network_transports import TcpServerTransport
return TcpServerTransport(port, interface, announcer)
[docs]def TcpClientTransport(host, port):
from .network_transports import TcpClientTransport
return TcpClientTransport(host, port)