-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathmonitor.go
119 lines (97 loc) · 2.5 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package solana
import (
"context"
"fmt"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc/ws"
)
type SubResponse struct {
Slot uint64
InstructionErr error
}
type subscriber struct {
clientWS *ws.Client
}
func newSubscriber(wsEndpoint string) (subscriber, error) {
if wsEndpoint == "" {
return subscriber{}, fmt.Errorf("wsEndpoint is required")
}
wsClient, err := ws.Connect(context.Background(), wsEndpoint)
if err != nil {
return subscriber{}, fmt.Errorf("could not connect to ws: %w", err)
}
return subscriber{wsClient}, nil
}
func (s subscriber) Pull(
ctx context.Context,
txID TxID,
status CommitmentStatus,
) (SubResponse, error) {
tx, err := solana.SignatureFromBase58(string(txID))
if err != nil {
return SubResponse{}, fmt.Errorf("invalid txID: %w", err)
}
ct, err := mapToCommitmentType(status)
if err != nil {
return SubResponse{}, err
}
sub, err := s.clientWS.SignatureSubscribe(tx, ct)
if err != nil {
return SubResponse{}, fmt.Errorf("could not subscribe to signature: %w", err)
}
defer sub.Unsubscribe()
select {
case <-ctx.Done():
return SubResponse{}, fmt.Errorf("context cancelled")
case res := <-sub.Response():
resp := SubResponse{
Slot: res.Context.Slot,
}
if res.Value.Err != nil {
resp.InstructionErr = fmt.Errorf("transaction confirmed with error: %v", res.Value.Err)
}
return resp, nil
case subErr := <-sub.Err():
return SubResponse{}, fmt.Errorf("subscription error: %w", subErr)
}
}
type MonitorResponse struct {
// Ok is true if the transaction reached the desired commitment status.
Ok bool
// InstructionErr is filled if the transaction was confirmed with an error.
InstructionErr error
}
type monitor struct {
sub subscriberService
}
func NewMonitor(wsEndpoint string, opts ...MonitorOption) (Monitor, error) {
m := &monitor{}
for _, opt := range opts {
if err := opt(m); err != nil {
return nil, fmt.Errorf("could not apply option: %w", err)
}
}
if m.sub == nil {
sub, err := newSubscriber(wsEndpoint)
if err != nil {
return monitor{}, err
}
m.sub = sub
}
return m, nil
}
// WaitForCommitmentStatus waits for a transaction to reach a specific commitment status.
func (m monitor) WaitForCommitmentStatus(
ctx context.Context,
txID TxID,
status CommitmentStatus,
) (MonitorResponse, error) {
res, err := m.sub.Pull(ctx, txID, status)
if err != nil {
return MonitorResponse{}, err
}
return MonitorResponse{
Ok: true,
InstructionErr: res.InstructionErr,
}, nil
}