quickrpc.transports, .network_transports and .QtTransports modules

quickrpc.transports module

A Transport abstracts a two-way bytestream interface.

It can be started and stopped, and send and receive byte sequences to one or more receivers.

Classes defined here:
class quickrpc.transports.Transport[source]

Bases: object

A transport abstracts a two-way bytestream interface.

This is the base class, which provides multithreading logic but no actual communication channel.

In a subclass, the following methods must be implemented:

  • send() to send outgoing messages
  • open() to initialize the channel (if necessary)
  • run() to receive messages until it is time to stop.

Incoming messages are passed to a callback. It must be set before the first message arrives via set_on_received().

Provided threading functionality:

  • start() opens and runs the channel in a new thread
  • stop() signals run() to stop, by setting running to False.

The classmethod fromstring() can be used to create a Transport instance from a string (for enhanced configurability).

classmethod fromstring(expression)[source]

Creates a transport from a given string expression.

The expression must be “<shorthand>:<specific parameters>”, with shorthand being the wanted transport’s .shorthand property. For the specific parameters, see the respective transport’s .fromstring method.

The base class implementation searches among all known subclasses for the Transport matching the given shorthand, and returns Subclass.fromstring(expression).

open()[source]

Open the communication channel. e.g. bind and activate a socket.

Override me.

open is called on the new thread opened by start. I.e. the same thread in which the Transport will run.

When open() returns, the communication channel should be ready for send and receive.

received(sender, data)[source]

To be called by run() when the subclass received data.

sender is a unique string identifying the source. e.g. IP address and port.

If the given data has an undecodable “tail”, it is returned. In this case run() must prepend the tail to the next received bytes from this channel, because it is probably an incomplete message.

receiver_thread

The thread on which on_received will be called.

run()[source]

Runs the transport, blocking.

Override me.

This contains the transport’s mainloop, which must:

  • receive bytes from the channel (usually blocking)
  • pass the bytes to self.received
  • check periodically (e.g. each second) if self.running has been cleared
  • if so, close the channel and return.
send(data, receivers=None)[source]

Sends the given data to the specified receiver(s).

receivers is an iterable yielding strings. receivers=None sends the data to all connected peers.

TODO: specify behaviour when sending on a stopped or failed Transport.

set_on_received(on_received)[source]

Sets the function to call upon receiving data.

The callback’s signature is on_received(sender, data), where sender is a string describing the origin; data is the received bytes. If decoding leaves trailing bytes, they should be returned. The Transport stores them and prepends them to the next received bytes.

shorthand = ''
start(block=True, timeout=10)[source]

Run in a new thread.

If block is True, waits until startup is complete i.e. open() returns. Then returns True.

if nonblocking, returns a promise.

If something goes wrong during start, the Exception, like e.g. a socket.error, is passed through to the caller.

stop(block=True)[source]

Stop running transport (possibly from another thread).

Resets running to signal to run() that it should stop.

Actual stopping can take a moment. If block is True, stop() waits until run() returns.

class quickrpc.transports.MuxTransport[source]

Bases: quickrpc.transports.Transport

A transport that muxes several transports.

Incoming data is serialized into the thread of MuxTransport.run().

Add Transports via mux_transport += transport. Remove via mux_transport -= transport.

Adding a transport changes its on_received binding to the mux transport. If MuxTransport is already running, the added transport is start()ed by default.

Removing a transport stop()s it by default.

Running/Stopping the MuxTransport also runs/stops all muxed transports.

add_transport(transport, start=True)[source]

add and start the transport (if running).

classmethod fromstring(expression)[source]

mux:(<transport1>)(<transport2>)…

where <transport1>, .. are again valid transport expressions.

handle_received(sender, data)[source]

handles INCOMING data from any of the muxed transports. b’’ is returned as leftover ALWAYS; MuxTransport keeps internal remainder buffers for all senders, since the leftover is only available after the message was processed.

open()[source]

Start all transports that were added so far.

The subtransports are started in parallel, then we wait until all of them are up.

If any transport fails to start, all transports are stopped again, and TransportError is raised. It will have a .exceptions attribute being a list of all failures.

remove_transport(transport, stop=True)[source]

remove and stop the transport.

run()[source]

Runs the transport, blocking.

Override me.

This contains the transport’s mainloop, which must:

  • receive bytes from the channel (usually blocking)
  • pass the bytes to self.received
  • check periodically (e.g. each second) if self.running has been cleared
  • if so, close the channel and return.
send(data, receivers=None)[source]

Sends the given data to the specified receiver(s).

receivers is an iterable yielding strings. receivers=None sends the data to all connected peers.

TODO: specify behaviour when sending on a stopped or failed Transport.

shorthand = 'mux'
stop()[source]

Stop running transport (possibly from another thread).

Resets running to signal to run() that it should stop.

Actual stopping can take a moment. If block is True, stop() waits until run() returns.

class quickrpc.transports.RestartingTransport(transport, check_interval=10, name='')[source]

Bases: quickrpc.transports.Transport

A transport that wraps another transport and keeps restarting it.

E.g. you can wrap a TcpClientTransport to try reconnecting it.

>>> tr = RestartingTransport(TcpClientTransport(*address), check_interval=10)

check_interval gives the Restart interval in seconds. It may not be kept exactly. It cannot be lower than 1 second. Restarting is attempted as long as the transport is running.

Adding a transport changes its on_received handler to the RestartingTransport.

classmethod fromstring(expression)[source]

restart:10:<subtransport>

10 (seconds) is the restart interval.

<subtransport> is any valid transport string.

open()[source]

Open the communication channel. e.g. bind and activate a socket.

Override me.

open is called on the new thread opened by start. I.e. the same thread in which the Transport will run.

When open() returns, the communication channel should be ready for send and receive.

receiver_thread

Thread on which receive() is called - in this case, receiver_thread of the child.

run()[source]

Runs the transport, blocking.

Override me.

This contains the transport’s mainloop, which must:

  • receive bytes from the channel (usually blocking)
  • pass the bytes to self.received
  • check periodically (e.g. each second) if self.running has been cleared
  • if so, close the channel and return.
send(data, receivers=None)[source]

Sends the given data to the specified receiver(s).

receivers is an iterable yielding strings. receivers=None sends the data to all connected peers.

TODO: specify behaviour when sending on a stopped or failed Transport.

shorthand = 'restart'
stop()[source]

Stop running transport (possibly from another thread).

Resets running to signal to run() that it should stop.

Actual stopping can take a moment. If block is True, stop() waits until run() returns.

subtransport_running

True if the child transport is currently running.

class quickrpc.transports.StdioTransport[source]

Bases: quickrpc.transports.Transport

class ‘quickrpc.transports.StdioTransport’ undocumented

classmethod fromstring(expression)[source]

No configuration options, just use “stdio:”.

run()[source]

run, blocking.

send(data, receivers=None)[source]

Sends the given data to the specified receiver(s).

receivers is an iterable yielding strings. receivers=None sends the data to all connected peers.

TODO: specify behaviour when sending on a stopped or failed Transport.

shorthand = 'stdio'
stop()[source]

Stop running transport (possibly from another thread).

Resets running to signal to run() that it should stop.

Actual stopping can take a moment. If block is True, stop() waits until run() returns.

quickrpc.transports.TcpServerTransport(port, interface='', announcer=None)[source]

function ‘quickrpc.transports.TcpServerTransport’ undocumented

quickrpc.transports.TcpClientTransport(host, port)[source]

function ‘quickrpc.transports.TcpClientTransport’ undocumented

quickrpc.transports.RestartingTcpClientTransport(host, port, check_interval=10)[source]

Convenience wrapper for the most common use case. Returns TcpClientTransport wrapped in a RestartingTransport.

quickrpc.network_transports module

module ‘quickrpc.network_transports’ undocumented

class quickrpc.network_transports.UdpTransport(port)[source]

Bases: quickrpc.transports.Transport

transport that communicates over UDP datagrams.

Connectionless - sender/receiver are IP addresses. Sending and receiving is done on the same port. Sending with receiver=None makes a broadcast.

Use messages > 500 Bytes at your own peril.

classmethod fromstring(expression)[source]

udp:1234 - the number being the port for send/receive.

open()[source]

Open the communication channel. e.g. bind and activate a socket.

Override me.

open is called on the new thread opened by start. I.e. the same thread in which the Transport will run.

When open() returns, the communication channel should be ready for send and receive.

run()[source]

Runs the transport, blocking.

Override me.

This contains the transport’s mainloop, which must:

  • receive bytes from the channel (usually blocking)
  • pass the bytes to self.received
  • check periodically (e.g. each second) if self.running has been cleared
  • if so, close the channel and return.
send(data, receivers=None)[source]

Sends the given data to the specified receiver(s).

receivers is an iterable yielding strings. receivers=None sends the data to all connected peers.

TODO: specify behaviour when sending on a stopped or failed Transport.

shorthand = 'udp'
class quickrpc.network_transports.TcpServerTransport(port, interface='', announcer=None, keepalive_msg=b'', keepalive_interval=10, buffersize=1024)[source]

Bases: quickrpc.transports.MuxTransport

transport that accepts TCP connections as transports.

Basically a mux transport coupled with a TcpServer. Each time somebody connects, the connection is wrapped into a transport and added to the muxer.

There is (for now) no explicit notification about connects/disconnects; use the API for that.

Use .close() for server-side disconnect.

You can optionally pass an announcer (as returned by announcer_api.make_udp_announcer). It will be started/stopped together with the TcpServerTransport.

Optionally, a keepalive message can be configured. On each connection, keepalive_msg is sent verbatim every keepalive_interval seconds while the connection is idle. Any sending or receiving resets the timer. You can change the attributes directly while transport is stopped.

Threads:
  • TcpServerTransport.run() blocks (use .start() for automatic extra Thread)
  • .run() starts a new thread for listening to connections
  • each incoming connection will start another Thread.
close(name)[source]

close the connection with the given sender/receiver name.

classmethod fromstring(expression)[source]

tcpserv:<interface>:<port>

Leave <interface> empty to listen on all interfaces.

open()[source]

Start all transports that were added so far.

The subtransports are started in parallel, then we wait until all of them are up.

If any transport fails to start, all transports are stopped again, and TransportError is raised. It will have a .exceptions attribute being a list of all failures.

run()[source]

Runs the transport, blocking.

Override me.

This contains the transport’s mainloop, which must:

  • receive bytes from the channel (usually blocking)
  • pass the bytes to self.received
  • check periodically (e.g. each second) if self.running has been cleared
  • if so, close the channel and return.
shorthand = 'tcpserv'
class quickrpc.network_transports.TcpClientTransport(host, port, connect_timeout=10, keepalive_msg=b'', keepalive_interval=10, buffersize=1024)[source]

Bases: quickrpc.transports.Transport

Transport that connects to a TCP server.

Optionally, a keepalive message can be configured. keepalive_msg is sent verbatim every keepalive_interval seconds while the connection is idle. Any sending or receiving resets the timer. You can change the attributes anytime.

classmethod fromstring(expression)[source]

tcp:<host>:<port>

open()[source]

Open the communication channel. e.g. bind and activate a socket.

Override me.

open is called on the new thread opened by start. I.e. the same thread in which the Transport will run.

When open() returns, the communication channel should be ready for send and receive.

run()[source]

run, blocking.

send(data, receivers=None)[source]

Sends the given data to the specified receiver(s).

receivers is an iterable yielding strings. receivers=None sends the data to all connected peers.

TODO: specify behaviour when sending on a stopped or failed Transport.

shorthand = 'tcp'

quickrpc.QtTransports module

transports based on Qt communication classes, running in the Qt event loop.

class quickrpc.QtTransports.QProcessTransport(cmdline, sendername='qprocess')[source]

Bases: quickrpc.transports.Transport

A Transport communicating with a child process.

Start the process using .start().

Sent data is written to the process’ stdin.

Data is received from the process’s stdout and processed on the Qt mainloop thread.

classmethod fromstring(expression)[source]

qprocess:(<commandline>)

on_finished()[source]

method ‘quickrpc.QtTransports.QProcessTransport.on_finished’ undocumented

on_ready_read()[source]

method ‘quickrpc.QtTransports.QProcessTransport.on_ready_read’ undocumented

send(data, receivers=None)[source]

Sends the given data to the specified receiver(s).

receivers is an iterable yielding strings. receivers=None sends the data to all connected peers.

TODO: specify behaviour when sending on a stopped or failed Transport.

shorthand = 'qprocess'
start()[source]

Run in a new thread.

If block is True, waits until startup is complete i.e. open() returns. Then returns True.

if nonblocking, returns a promise.

If something goes wrong during start, the Exception, like e.g. a socket.error, is passed through to the caller.

stop(kill=False)[source]

Stop running transport (possibly from another thread).

Resets running to signal to run() that it should stop.

Actual stopping can take a moment. If block is True, stop() waits until run() returns.

class quickrpc.QtTransports.QTcpTransport(host, port, sendername='qtcp')[source]

Bases: quickrpc.transports.Transport

A Transport connecting to a TCP server.

Connect using .start().

Received data is processed on the Qt mainloop thread.

classmethod fromstring(expression)[source]

qtcp:<host>:<port>

on_connect()[source]

method ‘quickrpc.QtTransports.QTcpTransport.on_connect’ undocumented

on_error(error)[source]

method ‘quickrpc.QtTransports.QTcpTransport.on_error’ undocumented

on_ready_read()[source]

method ‘quickrpc.QtTransports.QTcpTransport.on_ready_read’ undocumented

send(data, receivers=None)[source]

Sends the given data to the specified receiver(s).

receivers is an iterable yielding strings. receivers=None sends the data to all connected peers.

TODO: specify behaviour when sending on a stopped or failed Transport.

shorthand = 'qtcp'
start()[source]

Run in a new thread.

If block is True, waits until startup is complete i.e. open() returns. Then returns True.

if nonblocking, returns a promise.

If something goes wrong during start, the Exception, like e.g. a socket.error, is passed through to the caller.

stop()[source]

Stop running transport (possibly from another thread).

Resets running to signal to run() that it should stop.

Actual stopping can take a moment. If block is True, stop() waits until run() returns.