This repository has been archived by the owner on Nov 3, 2024. It is now read-only.
forked from streamingfast/bstream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoneblock_source.go
118 lines (100 loc) · 2.66 KB
/
oneblock_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package bstream
import (
"bytes"
"context"
"fmt"
"io"
"time"
"github.com/streamingfast/dstore"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)
type oneBlocksSource struct {
*shutter.Shutter
oneBlockFiles []*OneBlockFile
downloader OneBlockDownloaderFunc
handler Handler
ctx context.Context
skipperFunc func(idSuffix string) bool
}
type OneBlocksSourceOption func(*oneBlocksSource)
// OneBlocksSourceWithSkipperFunc allows a lookup function to prevent downloading the same file over and over
func OneBlocksSourceWithSkipperFunc(f func(string) bool) OneBlocksSourceOption {
return func(s *oneBlocksSource) {
s.skipperFunc = f
}
}
func NewOneBlocksSource(
lowestBlockNum uint64,
store dstore.Store,
handler Handler,
options ...OneBlocksSourceOption,
) (*oneBlocksSource, error) {
ctx := context.Background()
listCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
files, err := listOneBlocks(listCtx, lowestBlockNum, 0, store)
if err != nil {
zlog.Warn("error listing oneblocks", zap.Uint64("lowest_block_num", lowestBlockNum), zap.Error(err))
return nil, err
}
sourceCtx, cancel := context.WithCancel(ctx)
src := &oneBlocksSource{
oneBlockFiles: files,
downloader: OneBlockDownloaderFromStore(store),
handler: handler,
ctx: sourceCtx,
Shutter: shutter.New(
shutter.RegisterOnTerminating(func(_ error) {
cancel()
}),
),
}
for _, opt := range options {
opt(src)
}
return src, nil
}
func (s *oneBlocksSource) Run() {
s.Shutdown(s.run())
}
func (s *oneBlocksSource) run() error {
for _, file := range s.oneBlockFiles {
if s.skipperFunc != nil && s.skipperFunc(file.ID) {
continue
}
data, err := s.downloader(s.ctx, file)
if err != nil {
return err
}
reader := bytes.NewReader(data)
blockReader, err := getBlockReaderFactory().New(reader)
if err != nil {
return fmt.Errorf("unable to create block reader: %w", err)
}
blk, err := blockReader.Read()
if err != nil && err != io.EOF {
return fmt.Errorf("block reader failed: %w", err)
}
if err := s.handler.ProcessBlock(blk, nil); err != nil {
return err
}
}
zlog.Debug("one_blocks_source finish sending blocks", zap.Int("count", len(s.oneBlockFiles)))
return nil
}
func listOneBlocks(ctx context.Context, from uint64, to uint64, store dstore.Store) (out []*OneBlockFile, err error) {
fromStr := fmt.Sprintf("%010d", from)
err = store.WalkFrom(ctx, "", fromStr, func(filename string) error {
obf, err := NewOneBlockFile(filename)
if err != nil {
return nil
}
if to != 0 && obf.Num > to {
return dstore.StopIteration
}
out = append(out, obf)
return nil
})
return
}