Skip to content

Commit

Permalink
Introduce streaming api
Browse files Browse the repository at this point in the history
  • Loading branch information
Trevor Sibanda committed Nov 4, 2020
1 parent ac21702 commit 8fbd64e
Show file tree
Hide file tree
Showing 9 changed files with 517 additions and 1 deletion.
31 changes: 30 additions & 1 deletion faunadb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
from faunadb.query import _wrap
from faunadb.request_result import RequestResult
from faunadb._json import parse_json_or_none, to_json
from faunadb.streams import Subscription

API_VERSION = "3"
API_VERSION = "4"

class _LastTxnTime(object):
"""Wraps tracking the last transaction time supplied from the database."""
Expand Down Expand Up @@ -187,6 +188,30 @@ def query(self, expression, timeout_millis=None):
"""
return self._execute("POST", "", _wrap(expression), with_txn_time=True, query_timeout_ms=timeout_millis)

def stream(self, expression, options=None, on_start=None, on_error=None, on_version=None, on_history=None):
"""
Creates a stream Subscription to the result of the given read-only expression. When
executed.
The subscription returned by this method does not issue any requests until
the subscription's start method is called. Make sure to
subscribe to the events of interest, otherwise the received events are simply
ignored.
:param expression: A read-only expression.
:param options: Object that configures the stream subscription. E.g set fields to return
:param on_start: Callback for the stream's start event.
:param on_error: Callback for the stream's error event.
:param on_version: Callback for the stream's version events.
:param on_history: Callback for the stream's history_rewrite events.
"""
subscription = Subscription(self, expression, options)
subscription.on('start', on_start)
subscription.on('error', on_error)
subscription.on('version', on_version)
subscription.on('history_rewrite', on_history)
return subscription

def ping(self, scope=None, timeout=None):
"""
Ping FaunaDB.
Expand Down Expand Up @@ -264,3 +289,7 @@ def _perform_request(self, action, path, data, query, headers):
url = self.base_url + "/" + path
req = Request(action, url, params=query, data=to_json(data), auth=self.auth, headers=headers)
return self.session.send(self.session.prepare_request(req))

def _auth_header(self):
"""Returns the HTTP authentication header"""
return "Bearer {}".format(self.auth.username)
3 changes: 3 additions & 0 deletions faunadb/streams/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .client import Connection
from .dispatcher import EventDispatcher
from .subscription import Subscription
115 changes: 115 additions & 0 deletions faunadb/streams/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from time import time

try:
#python2
from urllib import urlencode
except ImportError:
#python3
from urllib.parse import urlencode

from hyper import HTTP20Connection
from faunadb._json import to_json, parse_json_or_none
from faunadb.request_result import RequestResult
from .events import parse_stream_request_result_or_none, Error
from .errors import StreamError

VALID_FIELDS = {"ref", "ts", "diff", "old", "new", "action"}


class Connection(object):
"""
The internal stream client connection interface.
This class handles the network side of a stream
subscription.
Current limitations:
Python requests module uses HTTP1; hyper is used for HTTP/2
"""
def __init__(self, client, expression, options):
self._client = client
self.options = options
self.conn = None
self._fields = None
if isinstance(self.options, dict):
self._fields = self.options.get("fields", None)
elif hasattr(self.options, "fields"):
self._fields = self.options.field
if isinstance(self._fields, list):
union = set(self._fields).union(VALID_FIELDS)
if union != VALID_FIELDS:
raise Exception("Valid fields options are %s, provided %s."%(VALID_FIELDS, self._fields))
self._state = "idle"
self._query = expression
self._data = to_json(expression).encode()
try:
self.conn = HTTP20Connection(
self._client.domain, port=self._client.port, enable_push=True)
except Exception as e:
raise StreamError(e)

def close(self):
"""
Closes the stream subscription by aborting its underlying http request.
"""
if self.conn is None:
raise StreamError('Cannot close inactive stream subscription.')
self.conn.close()
self._state = 'closed'

def error(self):
"""
Retrieves the error from the event loop in an async implementation.
"""
return self.error

def subscribe(self, on_event):
"""Initiates the stream subscription."""
if self._state != "idle":
raise StreamError('Stream subscription already started.')
try:
self._state = 'connecting'
headers = self._client.session.headers
headers["Authorization"] = self._client._auth_header()
if self._client._query_timeout_ms is not None:
headers["X-Query-Timeout"] = str(self._client._query_timeout_ms)
headers["X-Last-Seen-Txn"] = str(self._client.get_last_txn_time())
start_time = time()
url_params = ''
if isinstance(self._fields, list):
url_params= "?%s"%(urlencode({'fields': ",".join(self._fields)}))
id = self.conn.request("POST", "/stream%s"%(url_params), body=self._data, headers=headers)
self._state = 'open'
self._event_loop(id, on_event, start_time)
except Exception as e:
if callable(on_event):
on_event(Error(e), None)
raise

def _event_loop(self, stream_id, on_event, start_time):
""" Event loop for the stream. """
response = self.conn.get_response(stream_id)
if 'x-txn-time' in response.headers:
self._client.sync_last_txn_time(int(response.headers['x-txn-time'][0].decode()))
try:
for push in response.read_chunked(): # all pushes promised before response headers
raw = push.decode()
request_result = self.stream_chunk_to_request_result(response, raw, start_time, time())
event = parse_stream_request_result_or_none(request_result)
if event is not None and hasattr(event, 'txnTS'):
self._client.sync_last_txn_time(int(event.txnTS))
on_event(event, request_result)
if self._client.observer is not None:
self._client.observer(request_result)
except Exception as e:
self.error = e
self.close()
on_event(Error(e), None)

def stream_chunk_to_request_result(self, response, raw_chunk, start_time, end_time):
""" Converts a stream chunk to a RequestResult. """
response_content = parse_json_or_none(raw_chunk)
return RequestResult(
"POST", "/stream", self._query, self._data,
raw_chunk, response_content, None, response.headers,
start_time, end_time)

35 changes: 35 additions & 0 deletions faunadb/streams/dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging

class EventDispatcher(object):
"""
Event dispatch interface for stream subscription.
"""
def __init__(self):
self.callbacks = {}

def on(self, event_type, callback):
"""
Subscribe to an event.
"""
if callable(callback):
self.callbacks[event_type] = callback
elif callback is not None:
raise Exception("Callback for event `%s` is not callable."%(event_type))

def _noop(self, event, request_result):
"""
Default callback for unregistered event types.
"""
logging.debug("Unhandled stream event %s; %s"%(event, request_result))
pass

def dispatch(self, event, request_result):
"""
Dispatch the given event to the appropriate listeners.
"""
fn = self.callbacks.get(event.event, None)
if fn is None:
return self._noop(event, request_result)
return fn(event)


6 changes: 6 additions & 0 deletions faunadb/streams/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from faunadb.errors import FaunaError

class StreamError(FaunaError):
"""Stream Error"""
def __init__(self, error, request_result = None):
super(StreamError, self).__init__(error, request_result)
133 changes: 133 additions & 0 deletions faunadb/streams/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@

from faunadb._json import parse_json_or_none
from faunadb.errors import BadRequest, PermissionDenied


def parse_stream_request_result_or_none(request_result):
"""
Parses a stream RequestResult into a stream Event type.
"""
event = None
parsed = request_result.response_content
if parsed is None:
return UnknownEvent(request_result)
evt_type = parsed.get('event', None)
if evt_type == "start":
event = Start(parsed)
elif evt_type is None and 'errors' in parsed:
evt_type = 'error'
event = Error(BadRequest(request_result))
elif evt_type == 'error':
event = Error(parsed)
elif evt_type == 'version':
event = Version(parsed)
elif evt_type == 'history_rewrite':
event = HistoryRewrite(parsed)
else:
event = UnknownEvent(request_result)

return event


class Event(object):
"""
A stream event.
"""
def __init__(self, event):
self.event = event

class ProtocolEvent(Event):
"""
Stream protocol event.
"""
def __init__(self, event):
super(ProtocolEvent, self).__init__(event)


class Start(ProtocolEvent):
"""
Stream's start event. A stream subscription always begins with a start event.
Upcoming events are guaranteed to have transaction timestamps equal to or greater than
the stream's start timestamp.
:param data: Data
:param txnTS: Timestamp
"""
def __init__(self, parsed):
super(Start, self).__init__('start')
self.data = parsed['data']
self.txnTS = parsed['txnTS']

def __repr__(self):
return "stream:event:Start(data=%s, txnTS=%d)"%(self.data, self.txnTS)

class Error(ProtocolEvent):
"""
An error event is fired both for client and server errors that may occur as
a result of a subscription.
"""
def __init__(self, parsed):
super(Error, self).__init__('error')
self.error = None
self.code = None
self.description = None
if isinstance(parsed, dict):
if 'data' in parsed:
self.error = parsed['data']
if isinstance(parsed['data'], dict):
self.code = parsed['data'].get('code', None)
self.description = parsed['data'].get('description', None)
elif 'errors' in parsed:
self.error = parsed['errors']
else:
self.error = parsed
else:
self.error = parsed

def __repr__(self):
return "stream:event:Error(%s)"%(self.error)

class HistoryRewrite(Event):
"""
A history rewrite event occurs upon any modifications to the history of the
subscribed document.
:param data: Data
:param txnTS: Timestamp
"""
def __init__(self, parsed):
super(HistoryRewrite, self).__init__('history_rewrite')
if isinstance(parsed, dict):
self.data = parsed.get('data', None)
self.txnTS = parsed.get('txnTS')

def __repr__(self):
return "stream:event:HistoryRewrite(data=%s, txnTS=%s)" % (self.data, self.txnTS)

class Version(Event):
"""
A version event occurs upon any modifications to the current state of the
subscribed document.
:param data: Data
:param txnTS: Timestamp
"""
def __init__(self, parsed):
super(Version, self).__init__('version')
if isinstance(parsed, dict):
self.data = parsed.get('data', None)
self.txnTS = parsed.get('txnTS')

def __repr__(self):
return "stream:event:Version(data=%s, txnTS=%s)" % (self.data, self.txnTS)


class UnknownEvent(Event):
"""
Unknown stream event.
"""
def __init__(self, parsed):
super(UnknownEvent, self).__init__(None)
self.event = 'unknown'
self.data = parsed

35 changes: 35 additions & 0 deletions faunadb/streams/subscription.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from .client import Connection
from .dispatcher import EventDispatcher


class Subscription(object):
"""
A stream subscription which dispatches events received to the registered
listener functions. This class must be constructed via the FaunaClient stream
method.
"""
def __init__(self, client, expression, options=None):
self._client = Connection(client, expression, options)
self._dispatcher = EventDispatcher()

def start(self):
"""
Initiates the underlying subscription network calls.
"""
self._client.subscribe(self._dispatcher.dispatch)

def on(self, event_type, callback):
"""
Registers a callback for a specific event type.
"""
self._dispatcher.on(event_type, callback)

def close(self):
"""
Stops the current subscription and closes the underlying network connection.
"""
self._client.close()

def __repr__(self):
return "stream:Subscription(state=%s, expression=%s, options=%s)"%(self._client._state,
self._client._query,self._client._options)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"iso8601",
"requests",
"future",
"hyper"
]

tests_requires = [
Expand Down
Loading

0 comments on commit 8fbd64e

Please sign in to comment.