Source code for quickrpc.network_transports

__all__ = ['UdpTransport', 'TcpServerTransport', 'TcpClientTransport']

import logging

import socket as sk
from select import select
from socketserver import ThreadingTCPServer, BaseRequestHandler
from threading import Thread, Event
from .transports import Transport, MuxTransport

L = lambda: logging.getLogger(__name__)

[docs]class UdpTransport(Transport): '''transport that communicates over UDP datagrams. Connectionless - sender/receiver are IP addresses. Sending and receiving is done on the same port. Sending with receiver=None makes a broadcast. Use messages > 500 Bytes at your own peril. ''' shorthand = 'udp'
[docs] @classmethod def fromstring(cls, expression): '''udp:1234 - the number being the port for send/receive.''' _, _, rest = expression.partition(':') return cls(port=int(rest))
def __init__(self, port): Transport.__init__(self) self.port = port
[docs] def open(self): self.socket = sk.socket(sk.AF_INET, sk.SOCK_DGRAM) self.socket.settimeout(0.5) self.socket.setsockopt(sk.SOL_SOCKET, sk.SO_BROADCAST, 1) self.socket.setsockopt(sk.SOL_SOCKET, sk.SO_REUSEADDR, 1) self.socket.setsockopt(sk.SOL_SOCKET, sk.IP_MULTICAST_LOOP, 1) try: self.socket.setsockopt(sk.SOL_SOCKET, sk.SO_REUSEPORT, 1) except AttributeError: # SO_REUSEPORT not available. pass self.socket.bind(('', self.port))
[docs] def run(self): self.running = True while self.running: try: data, addr = self.socket.recvfrom(2048) except sk.timeout: continue host, port = addr # not using leftover data here, since udp packets are # not guaranteed to arrive in order. L().debug('message from udp %s: %s'%(host, data)) self.received(data=data, sender=host) self.socket.close()
[docs] def send(self, data, receivers=None): L().debug('message to udp %r: %s'%(receivers, data)) if receivers: for receiver in receivers: self.socket.sendto(data, (receiver, self.port)) else: self.socket.sendto(data, ('<broadcast>', self.port))
[docs]class TcpClientTransport(Transport): '''Transport that connects to a TCP server. Optionally, a keepalive message can be configured. ``keepalive_msg`` is sent verbatim every ``keepalive_interval`` seconds while the connection is idle. Any sending or receiving resets the timer. You can change the attributes anytime. ''' shorthand = 'tcp' _CHECK_INTERVAL = 0.5
[docs] @classmethod def fromstring(cls, expression): '''tcp:<host>:<port>''' _, host, port = expression.split(':') # uses default connect timeout return cls(host=host, port=int(port))
def __init__(self, host, port, connect_timeout=10, keepalive_msg=b'', keepalive_interval=10, buffersize=1024): Transport.__init__(self) self.address = (host, port) self.name = '%s:%s'%self.address self.connect_timeout = connect_timeout self.keepalive_msg = keepalive_msg self.keepalive_interval = keepalive_interval self._keepalive_countdown = keepalive_interval self.buffersize = buffersize
[docs] def send(self, data, receivers=None): if receivers is not None and not self.name in receivers: return if not self.running: raise IOError('Tried to send over non-running transport!') self._keepalive_countdown = self.keepalive_interval L().debug('TcpClientTransport .send to %s: %r'%(self.name, data)) # FIXME: do something on failure self.socket.sendall(data)
[docs] def open(self): L().debug('TcpClientTransport.open() called') try: self.socket = sk.create_connection(self.address, self.connect_timeout) except ConnectionRefusedError: L().error('Connection to %s failed'%(self.name)) raise L().info('Connected to %s'%(self.name,)) # Sets the timeout for .read and .write self._keepalive_countdown = self.keepalive_interval
[docs] def run(self): '''run, blocking.''' self.running = True leftover = b'' while self.running: readable, _, _ = select([self.socket], [], [], self._CHECK_INTERVAL) if not readable: self._keepalive_tick() continue try: data = self.socket.recv(self.buffersize) except ConnectionError: data = b'' self._keepalive_countdown = self.keepalive_interval if data == b'': # Connection was closed. self.running=False self.socket=None L().info('Connection to %s closed by remote side.'%(self.name,)) break L().debug('data from %s: %r'%(self.name, data)) leftover = self.received(sender=self.name, data=leftover+data) if self.socket: L().info('Closing connection to %s.'%(self.name,)) self.socket.close() L().debug('TcpClientTransport %s has finished'%(self.name))
def _keepalive_tick(self): if self.keepalive_msg: self._keepalive_countdown -= self._CHECK_INTERVAL if self._keepalive_countdown <= 0: L().debug('send keepalive') self.socket.sendall(self.keepalive_msg) self._keepalive_countdown = self.keepalive_interval
[docs]class TcpServerTransport(MuxTransport): '''transport that accepts TCP connections as transports. Basically a mux transport coupled with a TcpServer. Each time somebody connects, the connection is wrapped into a transport and added to the muxer. There is (for now) no explicit notification about connects/disconnects; use the API for that. Use .close() for server-side disconnect. You can optionally pass an announcer (as returned by announcer_api.make_udp_announcer). It will be started/stopped together with the TcpServerTransport. Optionally, a keepalive message can be configured. On each connection, ``keepalive_msg`` is sent verbatim every ``keepalive_interval`` seconds while the connection is idle. Any sending or receiving resets the timer. You can change the attributes directly while transport is stopped. Threads: - TcpServerTransport.run() blocks (use .start() for automatic extra Thread) - .run() starts a new thread for listening to connections - each incoming connection will start another Thread. ''' shorthand = 'tcpserv'
[docs] @classmethod def fromstring(cls, expression): '''tcpserv:<interface>:<port> Leave <interface> empty to listen on all interfaces. ''' _, iface, port = expression.split(':') return cls(port=int(port), interface=iface)
def __init__(self, port, interface='', announcer=None, keepalive_msg=b'', keepalive_interval=10, buffersize=1024): self.addr = (interface, port) self.name = '%s:%s'%self.addr self.announcer = announcer self.keepalive_msg = keepalive_msg self.keepalive_interval = keepalive_interval self.buffersize = buffersize MuxTransport.__init__(self)
[docs] def open(self): self.server = ThreadingTCPServer(self.addr, _TcpConnection, bind_and_activate=True) self.server.mux = self Thread(target=self.server.serve_forever, name="TcpServerTransport_Listen").start() if self.announcer: try: self.announcer.transport.start() finally: self.server.shutdown()
[docs] def run(self): MuxTransport.run(self) if self.announcer: self.announcer.transport.stop() self.server.shutdown() self.server.server_close()
[docs] def close(self, name): '''close the connection with the given sender/receiver name. ''' for transport in self.transports: if transport.name == name: transport.transport_running.clear()
class _TcpConnection(BaseRequestHandler, Transport): '''Bridge between TcpServer (BaseRequestHandler) and Transport. Implicitly created by the TcpServer. .handle() waits until Transport.start() is called, and closes the connection and exits upon call of .stop(). The Transport also stops upon client-side close of connection. The _TcpConnection registers and unregisters itself with the TcpServerTransport. ''' _CHECK_INTERVAL = 0.5 # BaseRequestHandler overrides def __init__(self, request, client_address, server): # circumvent Transport.__init__, since none of the threading logic is used here #Transport.__init__(self) self._on_received = None self.keepalive_msg = server.mux.keepalive_msg self.keepalive_interval = server.mux.keepalive_interval self.buffersize = server.mux.buffersize self._keepalive_countdown = self.keepalive_interval BaseRequestHandler.__init__(self, request, client_address, server) @property def running(self): return self.transport_running.is_set() def setup(self): '''called by the ThreadingTCPServer. Adds the connection to the parent muxer, then waits for .start() to be called. ''' self.name = '%s:%s'%self.client_address L().info('TCP connect from %s'%self.name) self.transport_running = Event() # add myself to the muxer, which will .start() me. self.server.mux.add_transport(self) def handle(self): # should be set almost-instantly; otherwise something is wrong. self.transport_running.wait(timeout=1.0) leftover = b'' while self.transport_running.is_set(): readable, _, _ = select([self.request], [],[], self._CHECK_INTERVAL) if not readable: self._keepalive_tick() continue try: data = self.request.recv(self.buffersize) except ConnectionError: data = b'' self._keepalive_countdown = self.keepalive_interval #data = data.replace(b'\r\n', b'\n') if data == b'': # Connection was closed. L().info('Connection to %s closed by remote side.'%(self.name,)) self.stop() break L().debug('data from %s: %r'%(self.name, data)) leftover = self.received(sender=self.name, data=leftover+data) def finish(self): L().debug('Closed TCP connection to %s'%self.name) # Getting here implies that this transport already stopped. self.server.mux.remove_transport(self, stop=False) # Transport overrides def start(self): self.transport_running.set() def run(self): # _TcpConnection starts "running" by itself (since the connection is already opened by definition). raise Exception('You shall not use .run()') def stop(self): self.transport_running.clear() def send(self, data, receivers=None): if receivers is not None and not self.name in receivers: return if not self.transport_running.is_set(): raise IOError('Tried to send over non-running transport!') self._keepalive_countdown = self.keepalive_interval # FIXME: do something on failure L().debug('_TcpConnection .send to %s: %r'%(self.name, data)) try: self.request.sendall(data) except Exception: L().error('TcpServerTransport._TcpConnection: sending failed, see exc. info', exc_info=True) raise def _keepalive_tick(self): if self.keepalive_msg: self._keepalive_countdown -= self._CHECK_INTERVAL if self._keepalive_countdown <= 0: L().debug('send keepalive') self.request.sendall(self.keepalive_msg) self._keepalive_countdown = self.keepalive_interval