-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathoutput.go
46 lines (36 loc) · 999 Bytes
/
output.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package main
import (
"log"
"github.com/AdRoll/baker"
)
var ShardableDesc = baker.OutputDesc{
Name: "Shardable",
New: NewShardable,
Config: &ShardableConfig{},
Raw: true,
}
// A ShardableConfig specifies the Shardable configuration.
type ShardableConfig struct{}
// A Shardable output appends all received records, useful for examination in tests.
type Shardable struct {
idx int
}
func NewShardable(cfg baker.OutputParams) (baker.Output, error) {
return &Shardable{
idx: cfg.Index,
}, nil
}
// The output supports sharding
func (s *Shardable) CanShard() bool {
return true
}
func (s *Shardable) Run(input <-chan baker.OutputRecord, _ chan<- string) error {
// Do something with `input` record.
// s.idx identifies the output process index and should
// be used to manage the sharding
for data := range input {
log.Printf(`Shard #%d: Getting "%s"`, s.idx, data.Record)
}
return nil
}
func (s *Shardable) Stats() baker.OutputStats { return baker.OutputStats{} }