forked from streamingfast/substreams-sink
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
156 lines (124 loc) · 5.21 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"encoding/json"
// "encoding/hex"
"context"
"fmt"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/streamingfast/cli"
. "github.com/streamingfast/cli"
"github.com/streamingfast/logging"
sink "github.com/streamingfast/substreams-sink"
// pbchanges "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1"
pbtokentransfer "github.com/streamingfast/substreams-sink/examples/advanced/eosio.token/proto/v1"
// pbsetabi "github.com/streamingfast/substreams-sink/examples/advanced/setabi/proto/v1"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
)
var expectedOutputModuleType = string(new(pbtokentransfer.TransferEvents).ProtoReflect().Descriptor().FullName())
// var expectedOutputModuleType = string(new(pbsetabi.SetABIEvents).ProtoReflect().Descriptor().FullName())
var zlog, tracer = logging.RootLogger("project", "github.com/change_to_org/change_to_project")
func main() {
logging.InstantiateLoggers()
Run(
"sinker",
"Description of your command",
Command(sinkRunE,
"sink <endpoint> <manifest> [<output_module>]",
"Run the sinker code",
RangeArgs(2, 3),
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags)
}),
),
OnCommandErrorLogAndExit(zlog),
)
}
func sinkRunE(cmd *cobra.Command, args []string) error {
endpoint := args[0]
manifestPath := args[1]
// Find the output module in the manifest sink.moduleName configuration. If you have no
// such configuration, you can change the value below and set the module name explicitly.
outputModuleName := sink.InferOutputModuleFromPackage
if len(args) == 3 {
outputModuleName = args[2]
}
sinker, err := sink.NewFromViper(
cmd,
// Should be the Protobuf full name of the map's module output, we use
// `substreams-database-changes` imported type. Adjust to your needs.
//
// If your Protobuf is defined in your Substreams manifest, you can use `substream protogen`
// while being in the same folder that contain `buf.gen.yaml` file in the example folder.
expectedOutputModuleType,
endpoint,
manifestPath,
outputModuleName,
// This is the block range, in our case defined as Substreams module's start block and up forever
"50:10000",
zlog,
tracer,
)
cli.NoError(err, "unable to create sinker: %s", err)
sinker.OnTerminating(func(err error) {
cli.NoError(err, "unexpected sinker error")
zlog.Info("sink is terminating")
})
// You **must** save the cursor somewhere, saving it to memory while
// make it last until the process is killed, in which on re-start, the
// sinker will resume from start block again. You can simply read from
// a file the string value of the cursor and use `sink.NewCursor(value)`
// to load it.
// Blocking call, will return on sinker termination
sinker.Run(context.Background(), sink.NewBlankCursor(), sink.NewSinkerHandlers(handleBlockScopedData, handleBlockUndoSignal))
return nil
}
func handleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *sink.Cursor) error {
_ = ctx
// changes := &pbchanges.DatabaseChanges{}
changes := &pbtokentransfer.TransferEvents{}
// changes := &pbsetabi.SetABIEvents{}
if err := data.Output.MapOutput.UnmarshalTo(changes); err != nil {
return fmt.Errorf("unable to unmarshal database changes: %w", err)
}
fmt.Printf("Block #%d (%s) data received with %d changes\n", data.Clock.Number, data.Clock.Id, len(changes.Items))
// // uncomment to see the decoded ascii string
// for i,_ := range changes.Items {
// // convert abi field from hex econding string to ascii encoding string
// bytes, err := hex.DecodeString(changes.Items[i].Abi)
// if err != nil {
// fmt.Errorf("Error decoding hex to ascii string: %v", err)
// }
// // Convert bytes to ASCII string
// abiAsciiString := string(bytes)
// // fmt.Println(abiAsciiString);
// changes.Items[i].Abi = abiAsciiString;
// }
jsonData, err := json.Marshal(changes)
if err != nil {
fmt.Errorf("Error marshaling to JSON: %v", err)
}
// Convert the JSON bytes to a string and print
fmt.Println(string(jsonData))
// Once you have processed the block, you **must** persist the cursor, persistence
// can take differnet form. For example, you can save it to a file, or to a database.
// You can simply use `os.WriteFile("cursor.txt", []byte(cursor.String()), 0644)` to
// save it to a file.
_ = cursor
// The isLive boolean is set to true when the sinker is running in the live portion
// of the chain. If there is **no** liveness checker configured on the sinker, the isLive
// value is going to be nil.
_ = isLive
return nil
}
func handleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) error {
_ = ctx
// The chain forked for one or more blocks, you **must** rewind your changes back to the
// last valid block, which is provided in the undo signal. You **must** also persist the
// last valid cursor also provided in the undo signal. You can simply use
// `os.WriteFile("cursor.txt", []byte(cursor.String()), 0644)` to
// save it to a file.
_ = cursor
fmt.Printf("Rewinding changes back to block %s\n", undoSignal.LastValidBlock)
return nil
}