-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathactivity.go
99 lines (82 loc) · 2.38 KB
/
activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package flogoelasticstream
import (
"encoding/json"
"net/http"
"github.com/TIBCOSoftware/flogo-lib/core/activity"
"github.com/TIBCOSoftware/flogo-lib/logger"
)
// activityLog is the default logger for the exec Activity
var log = logger.GetLogger("activity-tibco-flogoelasticstream")
type elasticResponse struct {
ScrollID string `json:"_scroll_id"`
Hits struct {
Hits []map[string]interface{} `json:"hits"`
} `json:"hits"`
}
type ElasicStreamActivity struct {
metadata *activity.Metadata
}
// NewActivity creates a new AppActivity
func NewActivity(metadata *activity.Metadata) activity.Activity {
return &ElasicStreamActivity{metadata: metadata}
}
// Metadata returns the activity's metadata
func (a *ElasicStreamActivity) Metadata() *activity.Metadata {
return a.metadata
}
func (a *ElasicStreamActivity) Eval(context activity.Context) (done bool, err error) {
basicAuthUser, _ := context.GetInput("basicAuthUser").(string)
basicAuthPassword, _ := context.GetInput("basicAuthPassword").(string)
elasticbaseURL, _ := context.GetInput("elasticbaseURL").(string)
elasticQuery, _ := context.GetInput("elasticQuery").(string)
client := &http.Client{}
// Get first scroll page
req, err := http.NewRequest("GET", elasticbaseURL+elasticQuery, nil)
if err != nil {
panic(err)
}
if basicAuthUser != "" {
req.SetBasicAuth(basicAuthUser, basicAuthPassword)
}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
var allHits []map[string]interface{}
var er elasticResponse
// Decode JSON into struct
err = json.NewDecoder(resp.Body).Decode(&er)
if err != nil {
panic(err)
}
// Append to allHits
allHits = append(allHits, er.Hits.Hits...)
for {
if er.ScrollID != "" && len(er.Hits.Hits) > 0 {
// Still more pages to scroll
req, err := http.NewRequest("GET", elasticbaseURL+"_search/scroll?scroll=1m&scroll_id="+er.ScrollID, nil)
if err != nil {
panic(err)
}
req.SetBasicAuth(basicAuthUser, basicAuthPassword)
resp, err := client.Do(req)
if err != nil {
panic(err)
}
err = json.NewDecoder(resp.Body).Decode(&er)
if err != nil {
panic(err)
}
// Append to allHits
allHits = append(allHits, er.Hits.Hits...)
} else {
// Done scrolling
break
}
}
log.Info("allHits length: %v\n", len(allHits))
context.SetOutput("result", allHits)
context.SetOutput("hits", len(allHits))
return true, nil
}