-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Trevor Sibanda
committed
Nov 17, 2020
1 parent
ac21702
commit 4752921
Showing
10 changed files
with
544 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .client import Connection | ||
from .dispatcher import EventDispatcher | ||
from .subscription import Subscription |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
from time import time | ||
|
||
try: | ||
#python2 | ||
from urllib import urlencode | ||
except ImportError: | ||
#python3 | ||
from urllib.parse import urlencode | ||
|
||
from hyper import HTTP20Connection | ||
from faunadb._json import to_json, parse_json_or_none | ||
from faunadb.request_result import RequestResult | ||
from .events import parse_stream_request_result_or_none, Error | ||
from .errors import StreamError | ||
|
||
VALID_FIELDS = {"diff", "prev", "document", "action"} | ||
|
||
|
||
class Connection(object): | ||
""" | ||
The internal stream client connection interface. | ||
This class handles the network side of a stream | ||
subscription. | ||
Current limitations: | ||
Python requests module uses HTTP1; hyper is used for HTTP/2 | ||
""" | ||
def __init__(self, client, expression, options): | ||
self._client = client | ||
self.options = options | ||
self.conn = None | ||
self._fields = None | ||
if isinstance(self.options, dict): | ||
self._fields = self.options.get("fields", None) | ||
elif hasattr(self.options, "fields"): | ||
self._fields = self.options.field | ||
if isinstance(self._fields, list): | ||
union = set(self._fields).union(VALID_FIELDS) | ||
if union != VALID_FIELDS: | ||
raise Exception("Valid fields options are %s, provided %s."%(VALID_FIELDS, self._fields)) | ||
self._state = "idle" | ||
self._query = expression | ||
self._data = to_json(expression).encode() | ||
try: | ||
self.conn = HTTP20Connection( | ||
self._client.domain, port=self._client.port, enable_push=True) | ||
except Exception as e: | ||
raise StreamError(e) | ||
|
||
def close(self): | ||
""" | ||
Closes the stream subscription by aborting its underlying http request. | ||
""" | ||
if self.conn is None: | ||
raise StreamError('Cannot close inactive stream subscription.') | ||
self.conn.close() | ||
self._state = 'closed' | ||
|
||
def subscribe(self, on_event): | ||
"""Initiates the stream subscription.""" | ||
if self._state != "idle": | ||
raise StreamError('Stream subscription already started.') | ||
try: | ||
self._state = 'connecting' | ||
headers = self._client.session.headers | ||
headers["Authorization"] = self._client._auth_header() | ||
if self._client._query_timeout_ms is not None: | ||
headers["X-Query-Timeout"] = str(self._client._query_timeout_ms) | ||
headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time()) | ||
start_time = time() | ||
url_params = '' | ||
if isinstance(self._fields, list): | ||
url_params= "?%s"%(urlencode({'fields': ",".join(self._fields)})) | ||
id = self.conn.request("POST", "/stream%s"%(url_params), body=self._data, headers=headers) | ||
self._state = 'open' | ||
self._event_loop(id, on_event, start_time) | ||
except Exception as e: | ||
if callable(on_event): | ||
on_event(Error(e), None) | ||
|
||
def _event_loop(self, stream_id, on_event, start_time): | ||
""" Event loop for the stream. """ | ||
response = self.conn.get_response(stream_id) | ||
if 'x-txn-time' in response.headers: | ||
self._client.sync_last_txn_time(int(response.headers['x-txn-time'][0].decode())) | ||
try: | ||
for push in response.read_chunked(): # all pushes promised before response headers | ||
raw = push.decode() | ||
request_result = self._stream_chunk_to_request_result(response, raw, start_time, time()) | ||
event = parse_stream_request_result_or_none(request_result) | ||
if event is not None and hasattr(event, 'txn'): | ||
self._client.sync_last_txn_time(int(event.txn)) | ||
on_event(event, request_result) | ||
if self._client.observer is not None: | ||
self._client.observer(request_result) | ||
except Exception as e: | ||
self.error = e | ||
self.close() | ||
on_event(Error(e), None) | ||
|
||
def _stream_chunk_to_request_result(self, response, raw_chunk, start_time, end_time): | ||
""" Converts a stream chunk to a RequestResult. """ | ||
response_content = parse_json_or_none(raw_chunk) | ||
return RequestResult( | ||
"POST", "/stream", self._query, self._data, | ||
raw_chunk, response_content, None, response.headers, | ||
start_time, end_time) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import logging | ||
|
||
class EventDispatcher(object): | ||
""" | ||
Event dispatch interface for stream subscription. | ||
""" | ||
def __init__(self): | ||
self.callbacks = {} | ||
|
||
def on(self, event_type, callback): | ||
""" | ||
Subscribe to an event. | ||
""" | ||
if callable(callback): | ||
self.callbacks[event_type] = callback | ||
elif callback is not None: | ||
raise Exception("Callback for event `%s` is not callable."%(event_type)) | ||
|
||
def _noop(self, event, request_result): | ||
""" | ||
Default callback for unregistered event types. | ||
""" | ||
logging.debug("Unhandled stream event %s; %s"%(event, request_result)) | ||
pass | ||
|
||
def dispatch(self, event, request_result): | ||
""" | ||
Dispatch the given event to the appropriate listeners. | ||
""" | ||
fn = self.callbacks.get(event.type, None) | ||
if fn is None: | ||
return self._noop(event, request_result) | ||
return fn(event) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from faunadb.errors import FaunaError | ||
|
||
class StreamError(FaunaError): | ||
"""Stream Error""" | ||
def __init__(self, error, request_result = None): | ||
super(StreamError, self).__init__(error, request_result) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
|
||
from faunadb._json import parse_json_or_none | ||
from faunadb.errors import BadRequest, PermissionDenied | ||
|
||
|
||
def parse_stream_request_result_or_none(request_result): | ||
""" | ||
Parses a stream RequestResult into a stream Event type. | ||
""" | ||
event = None | ||
parsed = request_result.response_content | ||
if parsed is None: | ||
return UnknownEvent(request_result) | ||
evt_type = parsed.get('type', None) | ||
if evt_type == "start": | ||
event = Start(parsed) | ||
elif evt_type is None and 'errors' in parsed: | ||
event = Error(BadRequest(request_result)) | ||
elif evt_type == 'error': | ||
event = Error(parsed) | ||
elif evt_type == 'version': | ||
event = Version(parsed) | ||
elif evt_type == 'history_rewrite': | ||
event = HistoryRewrite(parsed) | ||
else: | ||
event = UnknownEvent(request_result) | ||
|
||
return event | ||
|
||
|
||
class Event(object): | ||
""" | ||
A stream event. | ||
""" | ||
def __init__(self, event_type): | ||
self.type = event_type | ||
|
||
class ProtocolEvent(Event): | ||
""" | ||
Stream protocol event. | ||
""" | ||
def __init__(self, event_type): | ||
super(ProtocolEvent, self).__init__(event_type) | ||
|
||
|
||
class Start(ProtocolEvent): | ||
""" | ||
Stream's start event. A stream subscription always begins with a start event. | ||
Upcoming events are guaranteed to have transaction timestamps equal to or greater than | ||
the stream's start timestamp. | ||
:param data: Data | ||
:param txn: Timestamp | ||
""" | ||
def __init__(self, parsed): | ||
super(Start, self).__init__('start') | ||
self.event = parsed['event'] | ||
self.txn = parsed['txn'] | ||
|
||
def __repr__(self): | ||
return "stream:event:Start(event=%s, txn=%d)"%(self.event, self.txn) | ||
|
||
class Error(ProtocolEvent): | ||
""" | ||
An error event is fired both for client and server errors that may occur as | ||
a result of a subscription. | ||
""" | ||
def __init__(self, parsed): | ||
super(Error, self).__init__('error') | ||
self.error = None | ||
self.code = None | ||
self.description = None | ||
if isinstance(parsed, dict): | ||
if 'event' in parsed: | ||
self.error = parsed['event'] | ||
if isinstance(parsed['event'], dict): | ||
self.code = parsed['event'].get('code', None) | ||
self.description = parsed['event'].get('description', None) | ||
elif 'errors' in parsed: | ||
self.error = parsed['errors'] | ||
else: | ||
self.error = parsed | ||
else: | ||
self.error = parsed | ||
|
||
def __repr__(self): | ||
return "stream:event:Error(%s)"%(self.error) | ||
|
||
class HistoryRewrite(Event): | ||
""" | ||
A history rewrite event occurs upon any modifications to the history of the | ||
subscribed document. | ||
:param data: Data | ||
:param txn: Timestamp | ||
""" | ||
def __init__(self, parsed): | ||
super(HistoryRewrite, self).__init__('history_rewrite') | ||
if isinstance(parsed, dict): | ||
self.event = parsed.get('event', None) | ||
self.txn = parsed.get('txn') | ||
|
||
def __repr__(self): | ||
return "stream:event:HistoryRewrite(event=%s, txn=%s)" % (self.event, self.txn) | ||
|
||
class Version(Event): | ||
""" | ||
A version event occurs upon any modifications to the current state of the | ||
subscribed document. | ||
:param data: Data | ||
:param txn: Timestamp | ||
""" | ||
def __init__(self, parsed): | ||
super(Version, self).__init__('version') | ||
if isinstance(parsed, dict): | ||
self.event = parsed.get('event', None) | ||
self.txn = parsed.get('txn') | ||
|
||
def __repr__(self): | ||
return "stream:event:Version(event=%s, txn=%s)" % (self.event, self.txn) | ||
|
||
|
||
class UnknownEvent(Event): | ||
""" | ||
Unknown stream event. | ||
""" | ||
def __init__(self, parsed): | ||
super(UnknownEvent, self).__init__(None) | ||
self.event = 'unknown' | ||
self.event = parsed | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from .client import Connection | ||
from .dispatcher import EventDispatcher | ||
|
||
|
||
class Subscription(object): | ||
""" | ||
A stream subscription which dispatches events received to the registered | ||
listener functions. This class must be constructed via the FaunaClient stream | ||
method. | ||
""" | ||
def __init__(self, client, expression, options=None): | ||
self._client = Connection(client, expression, options) | ||
self._dispatcher = EventDispatcher() | ||
|
||
def start(self): | ||
""" | ||
Initiates the underlying subscription network calls. | ||
""" | ||
self._client.subscribe(self._dispatcher.dispatch) | ||
|
||
def on(self, event_type, callback): | ||
""" | ||
Registers a callback for a specific event type. | ||
""" | ||
self._dispatcher.on(event_type, callback) | ||
|
||
def close(self): | ||
""" | ||
Stops the current subscription and closes the underlying network connection. | ||
""" | ||
self._client.close() | ||
|
||
def __repr__(self): | ||
return "stream:Subscription(state=%s, expression=%s, options=%s)"%(self._client._state, | ||
self._client._query,self._client._options) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
"iso8601", | ||
"requests", | ||
"future", | ||
"hyper" | ||
] | ||
|
||
tests_requires = [ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.