-
Notifications
You must be signed in to change notification settings - Fork 49
/
response.go
154 lines (132 loc) · 3.77 KB
/
response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package gremgo
import (
"encoding/json"
"errors"
"fmt"
)
type GremlinError struct {
Attributes interface{} `json:"attributes" omitempty`
Code float64 `json:"code"`
Message string `json:"message"`
}
func (e *GremlinError) Error() string {
return fmt.Sprint("code: ", e.Code, " ; message:", e.Message)
}
type response struct {
data interface{}
requestId string
code int
}
func (c *Client) handleResponse(msg []byte) (err error) {
resp, err := marshalResponse(msg)
if err != nil {
switch e := err.(type) {
case *GremlinError:
if int(e.Code) == 407 { //Server request authentication
return c.authenticate(resp.requestId)
}
}
}
c.saveResponse(resp, err)
return
}
// marshalResponse creates a response struct for every incoming response for further manipulation
func marshalResponse(msg []byte) (resp response, err error) {
var j map[string]interface{}
err = json.Unmarshal(msg, &j)
if err != nil {
return
}
status := j["status"].(map[string]interface{})
result := j["result"].(map[string]interface{})
gremErr := GremlinError{nil, status["code"].(float64), status["message"].(string)}
resp.code = int(gremErr.Code)
err = responseDetectError(resp.code)
if err != nil {
resp.data = err // Modify response vehicle to have error (if exists) as data
} else {
resp.data = result["data"]
}
// If we have valid error convert that to GremlinError
if err != nil {
err = &gremErr
}
resp.requestId = j["requestId"].(string)
return
}
// saveResponse makes the response available for retrieval by the requester. Mutexes are used for thread safety.
func (c *Client) saveResponse(resp response, err error) {
c.respMutex.Lock()
// Retrieve old data container (for requests with multiple responses)
var container []interface{}
existingData, ok := c.results.Load(resp.requestId)
if ok {
container = existingData.([]interface{})
}
newData := append(container, resp.data) // Create new data container with new data
c.results.Store(resp.requestId, newData) // Add new data to buffer for future retrieval
// if err is not nil, set it to map
if err != nil {
c.resultsErr.Store(resp.requestId, err)
}
respNotifier, _ := c.responseNotifier.LoadOrStore(resp.requestId, make(chan int, 1))
if resp.code != 206 {
respNotifier.(chan int) <- 1
}
c.respMutex.Unlock()
}
// retrieveResponse retrieves the response saved by saveResponse.
func (c *Client) retrieveResponse(id string) (data []interface{}, err error) {
resp, _ := c.responseNotifier.Load(id)
n := <-resp.(chan int)
if n == 1 {
if dataI, ok := c.results.Load(id); ok {
// Capture error first
if errI, ok := c.resultsErr.Load(id); ok {
err = errI.(error)
}
// Capture data now
data = dataI.([]interface{})
close(resp.(chan int))
c.responseNotifier.Delete(id)
c.deleteResponse(id)
}
}
return
}
// deleteRespones deletes the response from the container. Used for cleanup purposes by requester.
func (c *Client) deleteResponse(id string) {
c.results.Delete(id)
c.resultsErr.Delete(id)
return
}
// responseDetectError detects any possible errors in responses from Gremlin Server and generates an error for each code
func responseDetectError(code int) (err error) {
switch {
case code == 200:
break
case code == 204:
break
case code == 206:
break
case code == 401:
err = errors.New("UNAUTHORIZED")
case code == 407:
err = errors.New("AUTHENTICATE")
case code == 498:
err = errors.New("MALFORMED REQUEST")
case code == 499:
err = errors.New("INVALID REQUEST ARGUMENTS")
case code == 500:
err = errors.New("SERVER ERROR")
case code == 597:
err = errors.New("SCRIPT EVALUATION ERROR")
case code == 598:
err = errors.New("SERVER TIMEOUT")
case code == 599:
err = errors.New("SERVER SERIALIZATION ERROR")
default:
err = errors.New("UNKNOWN ERROR")
}
return
}