Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-6204: Implement token based streaming #140

Merged
merged 4 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Client struct {
maxBackoff time.Duration

// lazily cached URLs
queryURL *url.URL
queryURL, streamURL *url.URL
}

// NewDefaultClient initialize a [fauna.Client] with recommend default settings
Expand Down Expand Up @@ -196,6 +196,16 @@ func (c *Client) parseQueryURL() (url *url.URL, err error) {
return
}

func (c *Client) parseStreamURL() (url *url.URL, err error) {
if c.streamURL != nil {
url = c.streamURL
} else if url, err = url.Parse(c.url); err == nil {
url = url.JoinPath("stream", "1")
c.streamURL = url
}
return
}

func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) {
req2 := req.Clone(req.Context())
body, rerr := io.ReadAll(req.Body)
Expand Down Expand Up @@ -291,6 +301,19 @@ func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator {
}
}

// Subscribe initiates a stream subscription for the given stream value.
func (c *Client) Subscribe(stream Stream) (*Events, error) {
streamReq := streamRequest{
apiRequest: apiRequest{c.ctx, c.headers},
Stream: stream,
}
if byteStream, err := streamReq.do(c); err == nil {
return newEvents(byteStream), nil
} else {
return nil, err
}
}

// QueryIterator is a [fauna.Client] iterator for paginated queries
type QueryIterator struct {
client *Client
Expand Down
79 changes: 79 additions & 0 deletions client_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,82 @@ func ExampleClient_Paginate() {
fmt.Printf("%d", len(items))
// Output: 20
}

func ExampleClient_Subscribe() {
// IMPORTANT: just for the purpose of example, don't actually hardcode secret
_ = os.Setenv(fauna.EnvFaunaSecret, "secret")
_ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal)

client, err := fauna.NewDefaultClient()
if err != nil {
log.Fatalf("client should have been initialized: %s", err)
}

// setup a collection
setupQuery, _ := fauna.FQL(`
if (!Collection.byName('StreamingSandbox').exists()) {
Collection.create({ name: 'StreamingSandbox' })
} else {
StreamingSandbox.all().forEach(.delete())
}
`, nil)
if _, err := client.Query(setupQuery); err != nil {
log.Fatalf("failed to setup the collection: %s", err)
}

// create a stream
streamQuery, _ := fauna.FQL(`StreamingSandbox.all().toStream()`, nil)
result, err := client.Query(streamQuery)
if err != nil {
log.Fatalf("failed to create a stream: %s", err)
}

var stream fauna.Stream
if err := result.Unmarshal(&stream); err != nil {
log.Fatalf("failed to unmarshal the stream value: %s", err)
}

// initiate the stream subscription
events, err := client.Subscribe(stream)
if err != nil {
log.Fatalf("failed to subscribe to the stream value: %s", err)
}
defer events.Close()

// produce some events while the subscription is open
createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil)
updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil)
deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil)

queries := []*fauna.Query{createQuery, updateQuery, deleteQuery}
for _, query := range queries {
if _, err := client.Query(query); err != nil {
log.Fatalf("failed execute CRUD query: %s", err)
}
}

// fetch the produced events
type Data struct {
Foo string `fauna:"foo"`
}

expect := 3
for expect > 0 {
event, err := events.Next()
if err != nil {
log.Fatalf("failed to receive next event: %s", err)
}
switch event.Type {
case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent:
var data Data
if err := event.Unmarshal(&data); err != nil {
log.Fatalf("failed to unmarshal event data: %s", err)
}
fmt.Printf("Event: %s Data: %+v\n", event.Type, data)
expect--
}
}
// Output: Event: add Data: {Foo:bar}
// Event: update Data: {Foo:baz}
// Event: remove Data: {Foo:baz}
}
72 changes: 58 additions & 14 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ type queryResponse struct {
Tags string `json:"query_tags"`
}

func parseQueryResponse(httpRes *http.Response) (qRes *queryResponse, err error) {
var bytesIn []byte
if bytesIn, err = io.ReadAll(httpRes.Body); err != nil {
err = fmt.Errorf("failed to read response body: %w", err)
return
}

if err = json.Unmarshal(bytesIn, &qRes); err != nil {
err = fmt.Errorf("failed to umarmshal response: %w", err)
}
return
}

func (r *queryResponse) queryTags() map[string]string {
ret := map[string]string{}

Expand All @@ -74,6 +87,7 @@ func (r *queryResponse) queryTags() map[string]string {

return ret
}

func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) {
var bytesOut []byte
if bytesOut, err = marshal(qReq); err != nil {
Expand All @@ -94,25 +108,15 @@ func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) {
return
}

var (
qRes queryResponse
bytesIn []byte
)

if bytesIn, err = io.ReadAll(httpRes.Body); err != nil {
err = fmt.Errorf("failed to read response body: %w", err)
return
}

if err = json.Unmarshal(bytesIn, &qRes); err != nil {
err = fmt.Errorf("failed to umarmshal response: %w", err)
var qRes *queryResponse
if qRes, err = parseQueryResponse(httpRes); err != nil {
return
}

cli.lastTxnTime.sync(qRes.TxnTime)
qRes.Header = httpRes.Header

if err = getErrFauna(httpRes.StatusCode, &qRes, attempts); err != nil {
if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err != nil {
return
}

Expand All @@ -123,10 +127,50 @@ func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) {
}

qSus = &QuerySuccess{
QueryInfo: newQueryInfo(&qRes),
QueryInfo: newQueryInfo(qRes),
Data: data,
StaticType: qRes.StaticType,
}
qSus.Stats.Attempts = attempts
return
}

type streamRequest struct {
apiRequest
Stream Stream
StartTS int64
}

func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) {
var bytesOut []byte
if bytesOut, err = marshal(streamReq); err != nil {
err = fmt.Errorf("marshal request failed: %w", err)
return
}

var streamURL *url.URL
if streamURL, err = cli.parseStreamURL(); err != nil {
return
}

var (
attempts int
httpRes *http.Response
)
if attempts, httpRes, err = streamReq.post(cli, streamURL, bytesOut); err != nil {
return
}

if httpRes.StatusCode != http.StatusOK {
var qRes *queryResponse
if qRes, err = parseQueryResponse(httpRes); err == nil {
if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err == nil {
err = fmt.Errorf("unknown error for http status: %d", httpRes.StatusCode)
}
}
return
}

bytes = httpRes.Body
return
}
7 changes: 7 additions & 0 deletions serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,13 @@ func encode(v any, hint string) (any, error) {
}
}
return out, nil

case streamRequest:
out := map[string]any{"token": string(vt.Stream)}
if vt.StartTS > 0 {
out["start_ts"] = vt.StartTS
}
return out, nil
}

switch value := reflect.ValueOf(v); value.Kind() {
Expand Down
146 changes: 146 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package fauna

import (
"encoding/json"
"io"
)

// EventType represents a Fauna's event type.
type EventType string

const (
// AddEvent happens when a new value is added to the stream's watched set.
AddEvent EventType = "add"
// UpdateEvent happens when a value in the stream's watched set changes.
UpdateEvent EventType = "update"
// Remove event happens when a value in the stream's watched set is removed.
RemoveEvent EventType = "remove"
// StatusEvent happens periodically and comunicates the stream's latest
// transacion time as well as ops aquired during its idle period.
StatusEvent EventType = "status"
)

// Event represents a streaming event.
//
// Events of type [fauna.StatusEvent] have its [fauna.Event.Data] field set to
// nil. Other event's [fauna.Data] can be unmarshalled via the
// [fauna.Event.Unmarshal] method.
type Event struct {
// Type is this event's type.
Type EventType
// TxnTime is the transaction time that produce this event.
TxnTime int64
// Data is the event's data.
Data any
// Stats contains the ops acquired to process the event.
Stats Stats
}

// Unmarshal will unmarshal the raw [fauna.Event.Data] (if present) into the
// known type provided as `into`. `into` must be a pointer to a map or struct.
func (e *Event) Unmarshal(into any) error {
return decodeInto(e.Data, into)
}

// ErrEvent contains error information present in error events.
//
// Error events with "abort" code contain its aborting value present in the
// [fauan.ErrEvent.Abort]. The aborting values can be unmarshalled with the
// [fauna.ErrEvent.Unmarshal] method.
type ErrEvent struct {
// Code is the error's code.
Code string `json:"code"`

// Message is the error's message.
Message string `json:"message"`

// Abort is the error's abort data, present if [fauna.ErrEvent.Code] is
// equals to "abort".
Abort any `json:"abort,omitempty"`
}

// Error provides the underlying error message.
func (e *ErrEvent) Error() string {
return e.Message
}

// Unmarshal will unmarshal the raw [fauna.ErrEvent.Abort] (if present) into the
// known type provided as `into`. `into` must be a pointer to a map or struct.
func (e *ErrEvent) Unmarshal(into any) error {
return decodeInto(e.Abort, into)
}

// Events is an iterator of Fauna events.
//
// The next available event can be obtained by calling the
// [fauna.Subscription.Next] method. Note this method blocks until the next
// event is available or until the events iterator is closed via the
// [fauna.Events.Close] method.
type Events struct {
byteStream io.ReadCloser
decoder *json.Decoder
}

func newEvents(byteStream io.ReadCloser) *Events {
return &Events{
byteStream: byteStream,
decoder: json.NewDecoder(byteStream),
}
}

// Close gracefully closes the stream subscription.
func (es *Events) Close() (err error) {
// XXX: Is there a way to make sure there are no bytes left on the stream
// after closing it? According to go's docs, the underlying connection will
// remain unusable for the duration of its idle time if there are bytes left
// in its read buffer.
Comment on lines +93 to +96
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, probably want to drain on close. There might be a more elegant way to do this, but we've implemented this in the past via io.Copy

e.g. something like

defer es.byteStream.Close()
_, err = io.Copy(io.Discard, io.LimitReader(es.byteStream, 4096))
return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I'll look into it soon. We need to be careful because if the stream is still open, calling read on the bytes stream will block.

return es.byteStream.Close()
}

type rawEvent = struct {
Type EventType `json:"type"`
TxnTime int64 `json:"txn_ts"`
Data any `json:"data,omitempty"`
Error *ErrEvent `json:"error,omitempty"`
Stats Stats `json:"stats"`
}

// Next blocks until the next event is available.
//
// Note that network errors of type [fauna.ErrEvent] are considered fatal and
// close the underlying stream. Calling next after an error event occurs will
// return an error.
func (es *Events) Next() (event *Event, err error) {
raw := rawEvent{}
if err = es.decoder.Decode(&raw); err == nil {
event, err = convertRawEvent(&raw)
if _, ok := err.(*ErrEvent); ok {
es.Close() // no more events are comming
}
}
return
}

func convertRawEvent(raw *rawEvent) (event *Event, err error) {
if raw.Error != nil {
if raw.Error.Abort != nil {
if raw.Error.Abort, err = convert(false, raw.Error.Abort); err != nil {
return
}
}
err = raw.Error
} else {
if raw.Data != nil {
if raw.Data, err = convert(false, raw.Data); err != nil {
return
}
}
event = &Event{
Type: raw.Type,
TxnTime: raw.TxnTime,
Data: raw.Data,
Stats: raw.Stats,
}
}
return
}
Loading
Loading