Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/first streamable block refactor #73

Merged
merged 10 commits into from
Dec 16, 2024
107 changes: 42 additions & 65 deletions block/fetcher/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type fetchBlock func(ctx context.Context, requestedSlot uint64) (slot uint64, ou

type RPCFetcher struct {
rpcClients *firecoreRPC.Clients[*rpc.Client]
optimizeForSingleTarget bool
latestConfirmedSlot uint64
latestFinalizedSlot uint64
latestBlockRetryInterval time.Duration
Expand All @@ -46,11 +45,9 @@ type RPCFetcher struct {
logger *zap.Logger
}

func NewRPC(rpcClients *firecoreRPC.Clients[*rpc.Client], fetchInterval time.Duration, latestBlockRetryInterval time.Duration, optimizeForSingleTarget bool, isMainnet bool, logger *zap.Logger) *RPCFetcher {
func NewRPC(fetchInterval time.Duration, latestBlockRetryInterval time.Duration, isMainnet bool, logger *zap.Logger) *RPCFetcher {
f := &RPCFetcher{
rpcClients: rpcClients,
fetchInterval: fetchInterval,
optimizeForSingleTarget: optimizeForSingleTarget,
latestBlockRetryInterval: latestBlockRetryInterval,
isMainnet: isMainnet,
logger: logger,
Expand All @@ -63,48 +60,48 @@ func (f *RPCFetcher) IsBlockAvailable(requestedSlot uint64) bool {
return requestedSlot <= f.latestConfirmedSlot
}

func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbstream.Block, skip bool, err error) {
func (f *RPCFetcher) FetchSortValue(ctx context.Context, client *rpc.Client) (sortValue uint64, err error) {
num, err := client.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return 0, fmt.Errorf("fetching head block num: %w", err)
}
return num, nil
}

func (f *RPCFetcher) Fetch(ctx context.Context, client *rpc.Client, requestedSlot uint64) (b *pbbstream.Block, skipped bool, err error) {
if f.isMainnet && requestedSlot >= 13334464 && requestedSlot <= 13334475 {
// know issue fetching these blocks on mainnet, ugly but works
return nil, true, nil
}

sleepDuration := time.Duration(0)
_, err = firecoreRPC.WithClients(f.rpcClients, func(client *rpc.Client) (na interface{}, err error) {

for f.latestConfirmedSlot < requestedSlot {
time.Sleep(sleepDuration)
f.latestConfirmedSlot, err = client.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return nil, fmt.Errorf("fetching latestConfirmedSlot block num: %w", err)
}

f.logger.Info("got latest confirmed slot block", zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot), zap.Uint64("requested_block_num", requestedSlot))
//
if f.latestConfirmedSlot >= requestedSlot {
break
}
sleepDuration = f.latestBlockRetryInterval
for f.latestConfirmedSlot < requestedSlot {
time.Sleep(sleepDuration)
f.latestConfirmedSlot, err = client.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return nil, false, fmt.Errorf("fetching latestConfirmedSlot block num: %w", err)
}

if f.latestFinalizedSlot < requestedSlot {
f.latestFinalizedSlot, err = client.GetSlot(ctx, rpc.CommitmentFinalized)
if err != nil {
return nil, fmt.Errorf("fetching latest finalized Slot block num: %w", err)
}
f.logger.Info("got latest finalized slot block", zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("requested_block_num", requestedSlot))
f.logger.Info("got latest confirmed slot block", zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot), zap.Uint64("requested_block_num", requestedSlot))
//
if f.latestConfirmedSlot >= requestedSlot {
break
}
sleepDuration = f.latestBlockRetryInterval
}

return nil, nil
})

if err != nil {
return nil, false, err
if f.latestFinalizedSlot < requestedSlot {
f.latestFinalizedSlot, err = client.GetSlot(ctx, rpc.CommitmentFinalized)
if err != nil {
return nil, false, fmt.Errorf("fetching latest finalized Slot block num: %w", err)
}
f.logger.Info("got latest finalized slot block", zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("requested_block_num", requestedSlot))
}

f.logger.Info("fetcher fetching block", zap.Uint64("block_num", requestedSlot), zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot))

blockResult, skip, err := f.fetch(ctx, requestedSlot, f.latestConfirmedSlot)
blockResult, skip, err := f.fetch(ctx, client, requestedSlot, f.latestConfirmedSlot)
if err != nil {
return nil, false, fmt.Errorf("fetching block %d: %w", requestedSlot, err)
}
Expand All @@ -126,48 +123,28 @@ func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbs
return block, false, nil
}

func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64, lastConfirmBlockNum uint64) (*rpc.GetBlockResult, bool, error) {
currentSlot := requestedSlot
var lastErrorPrintedAt time.Time
func (f *RPCFetcher) fetch(ctx context.Context, client *rpc.Client, requestedSlot uint64, lastConfirmBlockNum uint64) (*rpc.GetBlockResult, bool, error) {
f.logger.Info("calling GetBlockWithOptions", zap.String("endpoints", fmt.Sprintf("%s", client)))
blockResult, err := client.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts)

for {
out, err := firecoreRPC.WithClients(f.rpcClients, func(client *rpc.Client) (*rpc.GetBlockResult, error) {
f.logger.Info("calling GetBlockWithOptions", zap.String("endpoints", fmt.Sprintf("%s", client)))
blockResult, err := client.GetBlockWithOpts(ctx, currentSlot, GetBlockOpts)
return blockResult, err
})
if err != nil {
var rpcErr *jsonrpc.RPCError
if errors.As(err, &rpcErr) {

if err != nil {
var rpcErr *jsonrpc.RPCError
if errors.As(err, &rpcErr) {

if rpcErr.Code == -32009 || rpcErr.Code == -32007 {
f.logger.Info("fetcher block was skipped", zap.Uint64("block_num", currentSlot), zap.Int("rpc_error_code", rpcErr.Code))
return nil, true, nil
}

if rpcErr.Code == -32004 {
if f.optimizeForSingleTarget && currentSlot < lastConfirmBlockNum {
f.logger.Info("fetcher block was supposedly skipped", zap.Uint64("block_num", currentSlot))
return nil, true, nil
}

f.logger.Warn("block not available. trying same block", zap.Uint64("block_num", currentSlot))
continue
}
if rpcErr.Code == -32009 || rpcErr.Code == -32007 {
f.logger.Info("fetcher block was skipped", zap.Uint64("block_num", requestedSlot))
return nil, true, nil
}

if lastErrorPrintedAt.IsZero() || time.Since(lastErrorPrintedAt) > 30*time.Second {
f.logger.Warn("error getting block", zap.Uint64("block_num", currentSlot), zap.Error(err))
lastErrorPrintedAt = time.Now()
if rpcErr.Code == -32004 {
return nil, false, fmt.Errorf("block not available %d", requestedSlot)
}

//we retry forever!
continue
}

return out, false, nil
return nil, false, fmt.Errorf("getting block %d: %w", requestedSlot, err)
}

return blockResult, false, nil
}

func blockFromBlockResult(slot uint64, finalizedSlot uint64, result *rpc.GetBlockResult, logger *zap.Logger) (*pbbstream.Block, error) {
Expand Down
3 changes: 0 additions & 3 deletions cmd/firesol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"github.com/streamingfast/firehose-core/cmd/tools"
Expand Down Expand Up @@ -39,8 +38,6 @@ func newFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
Short: "fetch blocks from different sources",
Args: cobra.ExactArgs(2),
}
time.Now().UnixMilli()
cmd.AddCommand(rpc.NewFetchCmd(logger, tracer))
cmd.AddCommand(rpc.NewNextBlockCmd(logger, tracer))
return cmd
}
77 changes: 0 additions & 77 deletions cmd/firesol/rpc/check.go

This file was deleted.

38 changes: 20 additions & 18 deletions cmd/firesol/rpc/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"strconv"
"time"

firecoreRPC "github.com/streamingfast/firehose-core/rpc"
"github.com/spf13/pflag"

"github.com/gagliardetto/solana-go/rpc"
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/blockpoller"
firecoreRPC "github.com/streamingfast/firehose-core/rpc"
"github.com/streamingfast/firehose-solana/block/fetcher"
"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand All @@ -29,17 +30,16 @@ func NewFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd.Flags().String("state-dir", "/data/poller", "interval between fetch")
cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch")
cmd.Flags().Duration("latest-block-retry-interval", time.Second, "interval between fetch")
cmd.Flags().Duration("max-block-fetch-duration", 3*time.Second, "maximum delay before considering a block fetch as failed")
cmd.Flags().Duration("interval-between-clients-sort", 10*time.Minute, "interval between sorting clients base on their head block")
cmd.Flags().Int("block-fetch-batch-size", 10, "Number of blocks to fetch in a single batch")
cmd.Flags().Bool("optimize-single-target", false, "Only set this if every endpoint is pointing to a single node (not a cluster). It allows reducing the number of RPC calls by making assumptions about the last block")
cmd.Flags().String("network", "mainnet", "network to fetch from (mainnet, devnet, testnet) -- only used to patch a known issue on some slots")

return cmd
}

func fetchRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()

stateDir := sflags.MustGetString(cmd, "state-dir")

startBlock, err := strconv.ParseUint(args[0], 10, 64)
Expand All @@ -48,38 +48,40 @@ func fetchRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecut
}

fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch")
maxBlockFetchDuration := sflags.MustGetDuration(cmd, "max-block-fetch-duration")
intervalBetweenClientsSort := sflags.MustGetDuration(cmd, "interval-between-clients-sort")

logger.Info(
"launching firehose-solana poller",
zap.String("state_dir", stateDir),
zap.Uint64("first_streamable_block", startBlock),
zap.Duration("interval_between_fetch", fetchInterval),
zap.Duration("latest_block_retry_interval", sflags.MustGetDuration(cmd, "latest-block-retry-interval")),
)
logger.Info("launching firehose-solana poller")
cmd.Flags().VisitAll(func(flag *pflag.Flag) {
logger.Info("flag", zap.String("name", flag.Name), zap.String("value", flag.Value.String()))
})

rpcEndpoints := sflags.MustGetStringArray(cmd, "endpoints")
rpcClients := firecoreRPC.NewClients[*rpc.Client]()
rpcClients := firecoreRPC.NewClients[*rpc.Client](maxBlockFetchDuration, firecoreRPC.NewStickyRollingStrategy[*rpc.Client](), logger)
for _, rpcEndpoint := range rpcEndpoints {
client := rpc.New(rpcEndpoint)
rpcClients.Add(client)
}

latestBlockRetryInterval := sflags.MustGetDuration(cmd, "latest-block-retry-interval")
optimizeSingleTarget := sflags.MustGetBool(cmd, "optimize-single-target")
var isMainnet bool
switch sflags.MustGetString(cmd, "network") {
case "mainnet", "mainnet-beta":
isMainnet = true
}

poller := blockpoller.New(
fetcher.NewRPC(rpcClients, fetchInterval, latestBlockRetryInterval, optimizeSingleTarget, isMainnet, logger),
blockFetcher := fetcher.NewRPC(fetchInterval, latestBlockRetryInterval, isMainnet, logger)
rpcClients.StartSorting(cmd.Context(), firecoreRPC.SortDirectionDescending, blockFetcher, intervalBetweenClientsSort)

poller := blockpoller.New[*rpc.Client](
blockFetcher,
blockpoller.NewFireBlockHandler("type.googleapis.com/sf.solana.type.v1.Block"),
blockpoller.WithStoringState(stateDir),
blockpoller.WithLogger(logger),
rpcClients,
blockpoller.WithLogger[*rpc.Client](logger),
blockpoller.WithStoringState[*rpc.Client](stateDir),
)

err = poller.Run(ctx, startBlock, sflags.MustGetInt(cmd, "block-fetch-batch-size"))
err = poller.Run(startBlock, nil, sflags.MustGetInt(cmd, "block-fetch-batch-size"))
if err != nil {
return fmt.Errorf("running poller: %w", err)
}
Expand Down
Loading
Loading