-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtextstream.go
177 lines (148 loc) · 4.17 KB
/
textstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package client
import (
"bufio"
"bytes"
"encoding/json"
"errors"
"io"
"strconv"
"strings"
"time"
// Namespace imports
. "github.com/djthorpe/go-errors"
)
/////////////////////////////////////////////////////////////////////////////
// TYPES
// Implementation of a text stream, as per
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
type TextStream struct {
buf *bytes.Buffer
}
// Implementation of a text stream, as per
// https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
type TextStreamEvent struct {
// The event ID to set the EventSource object's last event ID value.
Id string `json:"id,omitempty"`
// A string identifying the type of event described
Event string `json:"event,omitempty"`
// The data field for the message
Data string `json:"data"`
// The reconnection time. If the connection to the server is lost,
// the client should wait for the specified time before attempting to reconnect.
Retry time.Duration `json:"retry,omitempty"`
}
// Callback for text stream events, return an error if you want to return from the Decode method
type TextStreamCallback func(TextStreamEvent) error
/////////////////////////////////////////////////////////////////////////////
// GLOBALS
const (
// Mime type for text stream
ContentTypeTextStream = "text/event-stream"
)
/////////////////////////////////////////////////////////////////////////////
// LIFECYCLE
// Create a new text stream decoder
func NewTextStream() *TextStream {
return &TextStream{
buf: new(bytes.Buffer),
}
}
/////////////////////////////////////////////////////////////////////////////
// STRINGIFY
// Return the text stream event as a string
func (t TextStreamEvent) String() string {
data, _ := json.MarshalIndent(t, "", " ")
return string(data)
}
/////////////////////////////////////////////////////////////////////////////
// PUBLIC METHODS
// Decode a text stream. The reader should be a stream of text/event-stream data
// and the method will return when all the data has been scanned, or the callback
// returns an error
func (t *TextStream) Decode(r io.Reader, callback TextStreamCallback) error {
var event *TextStreamEvent
scanner := bufio.NewScanner(r)
// Reset the buffer
t.buf.Reset()
// If the callback is nil, then return without doing anything
if callback == nil {
return nil
}
// Loop through until EOF or error
for scanner.Scan() {
data := strings.TrimSpace(scanner.Text())
if data == "" {
// Reset the buffer
t.buf.Reset()
// Eject the text stream event
if !event.IsZero() {
if err := callback(*event); err != nil {
if errors.Is(err, io.EOF) {
return nil
} else {
return err
}
} else {
event = nil
}
}
// Continue processing
continue
}
// Split the data
fields := strings.SplitN(data, ":", 2)
if len(fields) != 2 {
return ErrUnexpectedResponse.Withf("%q", data)
}
// Create a new event if necessary
if event == nil {
event = new(TextStreamEvent)
}
// Populate the event
key := strings.TrimSpace(strings.ToLower(fields[0]))
value := strings.TrimSpace(fields[1])
switch key {
case "id":
event.Id = value
case "event":
event.Event = value
case "data":
// Concatenate data
event.Data = event.Data + value
case "retry":
// Retry time in milliseconds, ignore if not a number
if retry, err := strconv.ParseInt(value, 10, 64); err == nil {
event.Retry = time.Duration(retry) * time.Millisecond
}
default:
// Ignore other fields
}
}
// Eject the final text stream event
if !event.IsZero() {
if err := callback(*event); err != nil {
if errors.Is(err, io.EOF) {
return nil
} else {
return err
}
}
}
// Return any scanner errors
return scanner.Err()
}
// Return true if the event contains no content
func (t *TextStreamEvent) IsZero() bool {
if t == nil {
return true
}
return t.Id == "" && t.Event == "" && t.Data == "" && t.Retry == 0
}
// Decode the text stream event data as JSON
func (t *TextStreamEvent) Json(v any) error {
// Do nothing if there is no data
if t.Data == "" {
return nil
}
return json.Unmarshal([]byte(t.Data), v)
}