Skip to content

Commit

Permalink
fix: Correct and consistent passing of provided event loop biesnecker#4
Browse files Browse the repository at this point in the history
  • Loading branch information
saaj committed Jun 8, 2015
1 parent b406fed commit 62225f1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
25 changes: 14 additions & 11 deletions cleveland/actor.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import asyncio
import warnings

from .message import StopMessage
from .message import StopMessage, QueryMessage


class HandlerNotFoundError(KeyError): pass

class AbstractActor(object):

def __init__(self, *args, **kwargs):
self._loop = kwargs.get('loop', asyncio.get_event_loop())
self._loop = kwargs['loop'] if 'loop' in kwargs \
else asyncio.get_event_loop()
self._is_running = False
self._run_complete = asyncio.Future()
self._run_complete = asyncio.Future(loop = self._loop)

def start(self):
self._is_running = True
Expand Down Expand Up @@ -55,12 +56,12 @@ def tell(self, target, message):

@asyncio.coroutine
def ask(self, target, message):
result_future = message.result
if not result_future:
raise TypeError('Messages sent as asks must have a Future as its '
'results attribute.')
assert isinstance(message, QueryMessage)
if not hasattr('result', message):
message.result = asyncio.Future(loop = self._loop)

yield from self.tell(target, message)
return (yield from result_future)
return (yield from message.result)


class BaseActor(AbstractActor):
Expand Down Expand Up @@ -90,13 +91,14 @@ def _task(self):
if is_query:
message.result.set_exception(ex)
else:
warnings.warn('Unhandled exception from handler of {0}'.format(type(message)))
warnings.warn('Unhandled exception from handler of '
'{0}'.format(type(message)))
else:
if is_query:
message.result.set_result(response)
except KeyError as ex:
raise HandlerNotFoundError(type(message)) from ex

@asyncio.coroutine
def _stop(self):
yield from self._receive(StopMessage())
Expand All @@ -109,4 +111,5 @@ def _receive(self, message):
# in it so the call to _inbox.get() doesn't block. We don't actually have
# to do anything with it.
@asyncio.coroutine
def _stop_message_handler(self, message): pass
def _stop_message_handler(self, message):
pass
17 changes: 11 additions & 6 deletions cleveland/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio


class Message(object):

def __init__(self, payload=None):
Expand All @@ -8,12 +9,16 @@ def __init__(self, payload=None):
def __repr__(self):
return 'Message (Payload: {0})'.format(self.payload)

# Special type of message that expects a response

class QueryMessage(Message):
'''Special type of message that expects a response'''

result = None
'''Future, set in ``AbstractActor.ask`` if not set by user'''

def __init__(self, payload=None):
super().__init__(payload)
self.result = asyncio.Future()

# Special type of message that tells actors to quit processing their inbox
class StopMessage(Message): pass
class StopMessage(Message):
'''Special type of message that tells actors to quit processing
their inbox
'''

0 comments on commit 62225f1

Please sign in to comment.