Source code for quickrpc.promise

'''Defines a basic :class:`Promise` class.

A Promise (also known as a Deferred or a Future) is like an order slip
for something that is still being produced.

This is just a barebone implementation, with method names aligned with
:class:`concurrent.Future` from the standard lib.
'''

import logging
from enum import Enum
from threading import Event, current_thread

L = lambda: logging.getLogger(__name__)

__all__ = ['Promise', 'PromiseError', 'PromiseTimeoutError', 'PromiseDoneError', 'PromiseDeadlockError']

class PromiseState(Enum):
    pending = 0
    fulfilled = 1
    failed = 2
    
[docs]class PromiseError(Exception): '''promise-related error'''
[docs]class PromiseTimeoutError(PromiseError, TimeoutError): '''waiting for the promise took too long.'''
[docs]class PromiseDoneError(PromiseError, RuntimeError): '''raised to the promise issuer if a result or exception was already set.'''
[docs]class PromiseDeadlockError(PromiseError, RuntimeError): '''raised if the result-setter thread tries to wait for the result (i.e. itself).'''
[docs]class Promise(object): '''Encapsulates a result that will arrive later. A Promise (also known as a Deferred or a Future) is like an order slip for something that is still being produced. Promises are dispensed by asynchronous functions. Calling .result() waits until the operation is complete, then returns the result. You can also use .then(callback) to have the promise call you with the result. The constructor takes an argument ``setter_thread``, which should be the thread that will set the result later. If not given, the current thread is assumed (which will usually be the case). The ``setter_thread`` is used to provide basic deadlock protection. ''' def __init__(self, setter_thread=None): self._state = PromiseState.pending self._evt = Event() self._result = None self._setter_thread = setter_thread or current_thread() self._callback = lambda result: None self._errback = lambda error: None
[docs] def set_result(self, val): '''called by the promise issuer to set the result.''' self._set(PromiseState.fulfilled, val) try: self._callback(val) except Exception as e: L().error('Promise callback raised an exception', exc_info=True)
[docs] def set_exception(self, exception): '''called by the promise issuer to indicate failure.''' if not isinstance(exception, BaseException): # shrinkwrap whatever it is exception = Exception(repr(exception)) self._set(PromiseState.failed, exception) try: self._errback(exception) except Exception as e: L().error('Promise errback raised an exception', exc_info=True)
def _set(self, state, result): if self._evt.is_set(): raise PromiseDoneError() self._state = state self._result = result self._evt.set()
[docs] def result(self, timeout=1.0): '''Return the result, waiting for it if necessary. If the promise failed, this will raise the exception that the issuer gave. If the promise is still unfulfilled after the `timeout` (in seconds) elapsed, PromiseTimeoutError is raised. If the promise is unfulfilled and the calling thread is the designated promise-setter thread, PromiseDeadlockError is raised immediately. ''' L().debug('Promise.result: current_Thread is %r and setter_thread is %r'%(current_thread(), self._setter_thread)) if not self._evt.is_set() and current_thread() is self._setter_thread: raise PromiseDeadlockError('The thread would wait for itself') if not self._evt.wait(timeout): raise PromiseTimeoutError() if self._state == PromiseState.fulfilled: return self._result elif self._state == PromiseState.failed: raise self._result else: assert False, 'unexpected Promise state'
[docs] def then(self, callback, errback=None): '''set handler to run as soon as the result is set. callback takes the result as single argument. You can also set an errback that is called in case of an exception. If not set, the exception will be passed to callback as result. If the result already arrived, callback or errback is called immediately. ''' self._callback = callback self._errback = errback or callback if self._state == PromiseState.fulfilled: self._callback(self._result) elif self._state == PromiseState.failed: self._errback(self._result)
__call__ = result