-
Notifications
You must be signed in to change notification settings - Fork 2
/
stream.go
108 lines (87 loc) · 1.86 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package kstreams
import (
"fmt"
"log/slog"
"time"
"golang.org/x/sync/errgroup"
)
type Option func(*App)
type App struct {
numRoutines int
brokers []string
groupName string
t *Topology
routines []*Worker
log *slog.Logger
eg *errgroup.Group
commitInterval time.Duration
}
var WithWorkersCount = func(n int) Option {
return func(s *App) {
s.numRoutines = n
}
}
var WithLog = func(log *slog.Logger) Option {
return func(s *App) {
s.log = log
}
}
var WithBrokers = func(brokers []string) Option {
return func(s *App) {
s.brokers = brokers
}
}
var WithCommitInterval = func(commitInterval time.Duration) Option {
return func(s *App) {
s.commitInterval = commitInterval
}
}
type NullWriter struct{}
func (NullWriter) Write([]byte) (int, error) { return 0, nil }
func NullLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(NullWriter{}, nil))
}
func New(t *Topology, groupName string, opts ...Option) *App {
s := &App{
numRoutines: 1,
brokers: []string{"localhost:9092"},
groupName: groupName,
t: t,
routines: []*Worker{},
log: NullLogger(),
commitInterval: time.Second * 5,
}
for _, opt := range opts {
opt(s)
}
return s
}
// Run blocks until it's exited, either by an error or by a graceful shutdown
// triggered by a call to Close.
func (c *App) Run() error {
grp := errgroup.Group{}
c.eg = &grp
for i := 0; i < c.numRoutines; i++ {
routine, err := NewWorker(
c.log.WithGroup("worker"),
fmt.Sprintf("routine-%d", i),
c.t,
c.groupName,
c.brokers,
c.commitInterval)
if err != nil {
return err
}
c.routines = append(c.routines, routine)
grp.Go(routine.Run)
}
return grp.Wait()
}
func (c *App) Close() error {
for _, routine := range c.routines {
go func(routine *Worker) {
routine.Close()
}(routine)
}
return c.eg.Wait()
}