Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce DecodeStreamer for Client.Do #48

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

MichaelUrman
Copy link

This enables a client to stream values out of the HTTP response. It does
so by providing a result that implements DecodeStreamer with something
like the following. Error handling has been omitted for clarity.

// DecodeStream decodes and processes each record of a JSON array
func (streamingDecoder) DecodeStream(r io.Reader) error {
    dec := json.NewDecoder(r)
    tok, _ := dec.Token() // want: json.Delim('[')
    for dec.More() {
        _ = dec.Decode(&record)
        // do something with record
    }
    tok, _ = dec.Token() // want: json.Delim(']')
    _, err = dec.Token() // want: io.EOF
    return nil
}

I'm open to name changes, or adding tests or docs if you point me to where they'd go. I have tested this with an implementation that looks like the above, and it allows me to receive records before the body is closed.

This enables a client to stream values out of the HTTP response. It does
so by providing a result that implements DecodeStreamer with something
like the following.  Error handling has been omitted for clarity.

    // DecodeStream decodes and processes each record of a JSON array
    func (streamingDecoder) DecodeStream(r io.Reader) error {
        dec := json.NewDecoder(r)
        tok, _ := dec.Token() // want: json.Delim('[')
        for dec.More() {
            _ = dec.Decode(&record)
            // do something with record
        }
        tok, _ = dec.Token() // want: json.Delim(']')
        _, err = dec.Token() // want: io.EOF
        return nil
    }
@jbguerraz
Copy link
Contributor

Hello @MichaelUrman
Thank you for bringing streaming to the table!
I've reviewed the changes, I was wondering if

defer resp.Body.Close()
could have an impact but most of all I'm wondering what should be the streaming api ?

Should:

var results interface{}
_, err = d.Query().Execute(qry, &results)

become something like:

var recordHandler func(record json.RawMessage)
_, err = d.Query().Stream(qry, recordHandler)

then decoder becomes part of the library, and then it opens the door to questions like "does each type of response from druid share the same structure? should we bring multiple decoders ? one per query type ?"

@MichaelUrman
Copy link
Author

Hi @jbguerraz, thanks for reviewing!

Yes, that defer resp.Body.Close() prevents simply passing a nil result to Do, then decoding the returned response's body later. I didn't remove it because any existing callers of Do(r, nil), would stop closing Body. This approach was the least intrusive I could come up with, and required only the same knowledge as choosing a good type for result already required.

For your alternate d.Query.Stream entry point, are you proposing the func(json.RawMessage) be called once for each entry in the array, allowing the implementation to decode individual entries with json.Unmarshal([]byte(record), &result)? I think that would work as well.

As tradeoffs, the approach currently in the PR enables the caller to use any JSON parser, and makes it easy to exit early, but by the same token it requires understanding the format of Druid's response. I think your proposed approach requires some use of encoding/json (to declare the callback), but could partially abstract Druid's response formats.

A third approach might mix these: a new API calls a callback/interface with an interface { io.Reader; Next(result interface{}) }; that is the caller can use it like an io.Reader, or read individual records. I think this approach may have too much complexity for its added flexibility.

Let me know if you need me to prototype another approach. I'm fairly inexpert in Druid, so can't yet speak towards its other response formats, and whether they're compatible with a func(json.RawMessage) callback.

@MichaelUrman
Copy link
Author

Hi @jbguerraz and any other members,

This is just a ping to see if there's anything I can do to further facilitate landing some sort of data streaming in go-druid. I'm willing to prototype other approaches, or to refine this one, if it will help suit your vision.

@jbguerraz
Copy link
Contributor

Hello @MichaelUrman
Didn't forget it. I need to find a moment to check the different response formats and so get a broader view.
Gonna give it a try in the weekend I believe. Thank you for your patience. If somehow this is critical for you we also can move ahead that way and iterate right after.

@MichaelUrman
Copy link
Author

Thanks! While I strongly expect to need streaming, I'm not blocked on it. So I'd rather you have a chance to get it right, and not have to live with the remnants of the "wrong" implementation.

(If I need it before it's ready here, there's always go mod replace. 😄)

@jbguerraz
Copy link
Contributor

jbguerraz commented Jun 25, 2021

Hello @MichaelUrman
I've checked it out a bit more :)
Maybe we could have a Query().Stream(builder.Query, chan json.RawMessage) (instead of Query().Execute(builder.Query, interface{}) that use (a new) Client.Stream (instead of existing Client.Do).
builder.Query would have a DecodeStream method or so (that would implement the row extraction logic, for each of the query https://github.com/grafadruid/go-druid/tree/master/builder/query , similar to https://github.com/grafadruid/druid-grafana/blob/master/pkg/druid.go#L425 ). That way the stream client would receive a json raw message per row, in a channel it provides.
We could start by providing the Client.Stream method which basically would be the same feature than this but without (ab)using execute / do methods. WDYT ?

@MichaelUrman
Copy link
Author

Apologies for the length of these thoughts. I'm not sure how to condense it in the time I have available. I hope it's at least clear. As always, if you'd like me to prototype some of this, let me know.

I believe we're considering some implicit questions here, and would like to try to make them explicit:

  1. How to request streaming?
  2. Who implements the streaming?
  3. If go-druid, how does it implement the streaming?

Regarding 1: The PR proposes an implicit approach by testing for an interface, and you've proposed an explicit separate call. I'm assuming some of the worry about implicit is a caller that already "accidentally" implements the interface, breaking compatibility. Or that a typo in implementing the interface could result in hard to diagnose lack of its use. One might address these by introducing a new go-druid type to wrap the interface, check specifically for that type.

Regarding 2: This PR gave all responsibility to the caller; this is powerful and liberating, but potentially repeats a lot of complexity. You've proposed having go-druid implement the streaming which avoids this. However, having go-druid implement this requires choosing an API for it.

So regarding 3: An API for streaming leaves us with several suboptimal options, due to limitations in go's syntax.

  • Returning a helper: pros include ease of reading; cons include a requirement to close it
  • Returning a channel (a special case of returning a helper, or could be the internals of a helper)
  • Accepting a callback or interface: pros include lifetime management of Body; cons include indirectness and limitations on the callback code (The PR's approach is similar to this, but the caller controls the loop so can break early.)
  • Accepting a channel (similar to a callback, but likely requires the caller start a goroutine)

I'm skeptical of exposing channels at all, as they are easy to misuse, for instance by not consuming all rows from it. I'm strongly skeptical of having the API accept a channel, as the ownership is backwards: typically the producer creates, sends, and closes the channel; but here the consumer would create it and the producer would send and close.

With that framework in mind here's what I think about your proposed Query().Stream(builder.Query, chan json.RawMessage). I like that it's explicit, and go-druid owns the complexity of streaming and understanding the JSON. I don't like the channel in the API, especially with its unusual ownership semantics, and I don't like being tied to encoding/json. Also I'm too uninformed about what would be in this message (but that's just as true with the PR's approach).

By contrast, I don't love how this PR's approach is so implicit and requires both heavy lifting and knowledge of the underlying response format, but I do like its ownership semantics and ability to use other JSON parsers. (Though it seems go-druid already requires knowledge of the response format when creating a struct for result.)

(Note that the two proposals can be combined. An explicit go-druid interface-wrapping type can be the building block upon which your proposed additional API is built. Then if the interface-wrapping type is exported, it can be used by those with unusual needs.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants