'''transports based on Qt communication classes, running in the Qt event loop.
'''
__all__ = [
'QProcessTransport',
'QTcpTransport',
]
import sys
import logging
from PyQt4.QtCore import QProcess
from PyQt4.QtNetwork import QUdpSocket, QTcpSocket, QAbstractSocket, QHostAddress
from .transports import Transport
from .util import paren_partition
L = lambda: logging.getLogger(__name__)
[docs]class QProcessTransport(Transport):
'''A Transport communicating with a child process.
Start the process using .start().
Sent data is written to the process' stdin.
Data is received from the process's stdout and processed on the Qt mainloop
thread.
'''
shorthand='qprocess'
[docs] @classmethod
def fromstring(cls, expression):
'''qprocess:(<commandline>)'''
_, _, expr = expression.partition(':')
cmdline, _, _ = paren_partition(expr)
return cls(cmdline, sendername=expression)
def __init__(self, cmdline, sendername='qprocess'):
self.cmdline = cmdline
self.sendername = sendername
self.leftover = b''
self.process = QProcess()
self.process.readyRead.connect(self.on_ready_read)
self.process.finished.connect(self.on_finished)
[docs] def start(self):
L().debug('starting: %r'%self.cmdline)
self.process.start(self.cmdline)
[docs] def stop(self, kill=False):
if kill:
self.process.kill()
else:
self.process.terminate()
self.process.waitForFinished()
[docs] def send(self, data, receivers=None):
if receivers is not None and self.sendername not in receivers:
return
L().debug('message to child processs: %s'%data)
self.process.write(data.decode('utf8'))
[docs] def on_ready_read(self):
data = self.process.readAllStandardOutput().data()
errors = self.process.readAllStandardError().data().decode('utf8')
if errors:
L().error('Error from child process:\n%s' % errors)
pdata = data.decode('utf8')
if len(pdata) > 100:
pdata = pdata[:100] + '...'
#if pdata.startswith('{'):
L().debug('message from child process: %s'%pdata)
self.leftover = self.received(
sender=self.sendername,
data=self.leftover + data
)
[docs] def on_finished(self):
L().info('Child process exited.')
[docs]class QTcpTransport(Transport):
'''A Transport connecting to a TCP server.
Connect using .start().
Received data is processed on the Qt mainloop thread.
'''
shorthand='qtcp'
[docs] @classmethod
def fromstring(cls, expression):
'''qtcp:<host>:<port>'''
_, host, port = expression.split(':')
return cls(host=host, port=int(port), sendername=expression)
def __init__(self, host, port, sendername='qtcp'):
self.address = (host, port)
self.sendername = sendername
self.leftover = b''
self.socket = QTcpSocket()
self.socket.readyRead.connect(self.on_ready_read)
self.socket.error.connect(self.on_error)
self.socket.connected.connect(self.on_connect)
[docs] def start(self):
if self.socket.state() != QAbstractSocket.UnconnectedState:
L().debug('start(): Socket is not in UnconnectedState, doing nothing')
return
L().debug('connecting to: %s'%(self.address,))
self.socket.connectToHost(self.address[0], self.address[1])
[docs] def stop(self):
self.socket.flush()
self.socket.disconnectFromHost()
[docs] def send(self, data, receivers=None):
if receivers is not None and self.sendername not in receivers:
return
L().debug('message to tcp server: %s'%data)
self.socket.write(data.decode('utf8'))
[docs] def on_ready_read(self):
data = self.socket.readAll().data()
pdata = data
if len(pdata) > 100:
pdata = pdata[:100] + b'...'
#if pdata.startswith('{'):
L().debug('message from tcp server: %s'%pdata)
self.leftover = self.received(
sender=self.sendername,
data=self.leftover + data
)
[docs] def on_connect(self):
L().info('QTcpSocket: Established connection to %s'%(self.address,))
[docs] def on_error(self, error):
L().info('QTcpSocket raised error: %s'%error)
class QUdpTransport(Transport):
'''A Transport sending and receiving UDP datagrams.
Connectionless - sender/receiver are IP addresses. Sending and receiving is
done on the same port. Sending with receiver=None makes a broadcast.
Use messages > 500 Byte at your own peril.
Received data is processed on the Qt mainloop thread.
'''
shorthand='qudp'
@classmethod
def fromstring(cls, expression):
'''qudp:<port>'''
_, port = expression.split(':')
return cls(port=int(port))
def __init__(self, port):
self.port = port
self.leftover = b''
self.socket = QUdpSocket()
self.socket.readyRead.connect(self.on_ready_read)
self.socket.error.connect(self.on_error)
def start(self):
if self.socket.state() != QAbstractSocket.UnconnectedState:
L().debug('QUdpSocket.start(): Socket is not in UnconnectedState, doing nothing')
return
L().debug('QUdpTransport: binding to port %d'%(self.port,))
self.socket.bind(self.port, QUdpSocket.ShareAddress)
def stop(self):
self.socket.flush()
self.socket.close()
def send(self, data, receivers=None):
L().debug('message to udp %s: %s'%(receivers,data))
data = data.decode('utf8')
if receivers:
for receiver in receivers:
self.socket.writeDatagram(data, QHostAddress(receiver), self.port)
else:
self.socket.writeDatagram(data, QHostAddress.Broadcast, self.port)
def on_ready_read(self):
while self.socket.hasPendingDatagrams():
data, host, port = self.socket.readDatagram(self.socket.pendingDatagramSize())
assert isinstance(data, bytes)
sender = host.toString()
pdata = data
if len(pdata) > 100:
pdata = pdata[:100] + b'...'
L().debug('UDP message from %s: %s'%(sender, pdata))
self.leftover = self.received(
sender=sender,
data=self.leftover + data
)
def on_error(self, error):
L().info('QTcpSocket raised error: %s'%error)