Skip to content

Commit

Permalink
code review and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Feb 5, 2024
1 parent 88a8e8c commit 97ccffe
Show file tree
Hide file tree
Showing 26 changed files with 1,498 additions and 711 deletions.
8 changes: 8 additions & 0 deletions server/.idea/watcherTasks.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions server/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
version: v1
managed:
# We are now using managed mode for now because I did not find how to support `;pbsubstreams` package
# which we use currently. It's not a big problem for now but I didn't want to change anything. We
# might revisit that later.
enabled: false
plugins:
- name: go
out: pb
- name: go-grpc
opt: paths=source_relative
- plugin: go-grpc
out: pb

opt:
- paths=source_relative
- require_unimplemented_servers=false
- name: connect-go
out: pb
opt: paths=source_relative
254 changes: 16 additions & 238 deletions server/cmd/blockmeta/main.go
Original file line number Diff line number Diff line change
@@ -1,262 +1,40 @@
package main

import (
"blockmeta_server/kvtool"
"context"
"flag"
"fmt"
"log"
"net"
"time"

pb "blockmeta_server/pb/blockmeta"
pbkv "blockmeta_server/pb/github.com/streamingfast/substreams-sink-kv/pb"
"github.com/streamingfast/dgrpc"
pbbmsrv "github.com/streamingfast/blockmeta-service/pb/sf/blockmeta/v2"
"github.com/streamingfast/blockmeta-service/service"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
)

var (
port = flag.Int("port", 50051, "The server port")
clientMode = flag.Bool("clientMode", false, "Run client mode for testing")
listenAddress = flag.String("grpc-listen-addr ", ":9000", "The server port")
sinkServerAddress = flag.String("grpc-listen-addr ", "", "The server port")
)

type serverBlock struct {
pb.UnimplementedBlockServer
pb.UnimplementedBlockByTimeServer
}

func (s *serverBlock) NumToID(ctx context.Context, in *pb.NumToIDReq) (*pb.BlockResp, error) {

prefix := kvtool.PackNumPrefixKey(in.BlockNum)

pbkvClient := connectToSinkServer()

response, err := pbkvClient.GetByPrefix(ctx, &pbkv.GetByPrefixRequest{Prefix: prefix})
if err != nil {
return nil, fmt.Errorf("error getting block data from sink server: %w", err)
}

if len(response.KeyValues) > 1 {
return nil, fmt.Errorf("more than one block found for block number: %v", in.BlockNum)
}

blockNum, blockID, err := kvtool.UnpackNumIDKey(response.KeyValues[0].Key)
if err != nil {
return nil, fmt.Errorf("error unpacking block number and block ID: %w", err)
}

blockPbTimestamp := kvtool.UnpackTimeValue(response.KeyValues[0].Value)
return &pb.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}, nil
}

func (s *serverBlock) IDToNum(ctx context.Context, in *pb.IDToNumReq) (*pb.BlockResp, error) {

prefix := kvtool.PackIDPrefixKey(in.BlockID)

pbkvClient := connectToSinkServer()

response, err := pbkvClient.GetByPrefix(ctx, &pbkv.GetByPrefixRequest{Prefix: prefix})
if err != nil {
return nil, fmt.Errorf("error getting block data from sink server: %w", err)
}

if len(response.KeyValues) > 1 {
return nil, fmt.Errorf("more than one block found for block id: %v", in.BlockID)
}

blockNum, blockID, err := kvtool.UnpackIDNumKey(response.KeyValues[0].Key)
if err != nil {
return nil, fmt.Errorf("error unpacking block number and block ID: %w", err)
}

blockPbTimestamp := kvtool.UnpackTimeValue(response.KeyValues[0].Value)
return &pb.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}, nil
}

func (s *serverBlock) Head(ctx context.Context, in *pb.Empty) (*pb.BlockResp, error) {
prefix := kvtool.TblPrefixTimelineBck + ":"
pbkvClient := connectToSinkServer()

response, err := pbkvClient.GetByPrefix(ctx, &pbkv.GetByPrefixRequest{Prefix: prefix, Limit: 1})
if err != nil {
return nil, fmt.Errorf("error getting block data from sink server: %w", err)
}

blockPbTimestamp, blockID, err := kvtool.UnpackTimeIDKey(response.KeyValues[0].Key, false)
if err != nil {
return nil, fmt.Errorf("error unpacking block number and block ID: %w", err)
}

blockNum := kvtool.UnpackBlockNumberValue(response.KeyValues[0].Value)

return &pb.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}, nil
}

func (s *serverBlock) At(ctx context.Context, in *pb.TimeReq) (*pb.BlockResp, error) {
prefix := kvtool.PackTimePrefixKey(in.Time.AsTime(), false)

pbkvClient := connectToSinkServer()

response, err := pbkvClient.GetByPrefix(ctx, &pbkv.GetByPrefixRequest{Prefix: prefix})
if err != nil {
return nil, fmt.Errorf("error getting block data from sink server: %w", err)
}

if len(response.KeyValues) > 1 {
return nil, fmt.Errorf("more than one block found for block timestamp: %v", in.Time)
}

blockPbTimestamp, blockID, err := kvtool.UnpackTimeIDKey(response.KeyValues[0].Key, false)
if err != nil {
return nil, fmt.Errorf("error unpacking block number and block ID: %w", err)
}

blockNum := kvtool.UnpackBlockNumberValue(response.KeyValues[0].Value)
return &pb.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}, nil
}

func (s *serverBlock) Before(ctx context.Context, in *pb.RelativeTimeReq) (*pb.BlockResp, error) {
prefix := kvtool.PackTimePrefixKey(in.Time.AsTime(), false)

pbkvClient := connectToSinkServer()

response, err := pbkvClient.Scan(ctx, &pbkv.ScanRequest{Begin: prefix, Limit: 4})
if err != nil {
return nil, fmt.Errorf("error getting block data from sink server: %w", err)
}

var blockID string
var blockNum uint64
blockPbTimestamp := &timestamppb.Timestamp{}

for i := 0; i < len(response.KeyValues); i++ {
blockPbTimestamp, blockID, err = kvtool.UnpackTimeIDKey(response.KeyValues[i].Key, false)
if err != nil {
return nil, fmt.Errorf("error unpacking block number and block ID: %w", err)
}

if !in.Inclusive && (blockPbTimestamp.AsTime() == in.Time.AsTime()) {
continue
}

blockNum = kvtool.UnpackBlockNumberValue(response.KeyValues[i].Value)
break
}
return &pb.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}, nil
}

func (s *serverBlock) After(ctx context.Context, in *pb.RelativeTimeReq) (*pb.BlockResp, error) {
prefix := kvtool.PackTimePrefixKey(in.Time.AsTime(), true)

pbkvClient := connectToSinkServer()

response, err := pbkvClient.Scan(ctx, &pbkv.ScanRequest{Begin: prefix, Limit: 4})
if err != nil {
return nil, fmt.Errorf("error getting block data from sink server: %w", err)
}

var blockID string
var blockNum uint64
blockPbTimestamp := &timestamppb.Timestamp{}

for i := 0; i < len(response.KeyValues); i++ {

blockPbTimestamp, blockID, err = kvtool.UnpackTimeIDKey(response.KeyValues[i].Key, true)
if err != nil {
return nil, fmt.Errorf("error unpacking block number and block ID: %w", err)
}

if !in.Inclusive && (blockPbTimestamp.AsTime() == in.Time.AsTime()) {
continue
}

blockNum = kvtool.UnpackBlockNumberValue(response.KeyValues[i].Value)

break
}
return &pb.BlockResp{Id: blockID, Num: blockNum, Time: blockPbTimestamp}, nil
}

// todo: convert to cobra and viper
func main() {
flag.Parse()
if !*clientMode {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

sBlock := grpc.NewServer()
pb.RegisterBlockServer(sBlock, &serverBlock{})
pb.RegisterBlockByTimeServer(sBlock, &serverBlock{})

log.Printf("server block listening at %v", lis.Addr())
if err := sBlock.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
} else {
testClient()
}
}

func connectToSinkServer() pbkv.KvClient {
conn, err := dgrpc.NewInternalClient("localhost:7878")
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", *listenAddress))
if err != nil {
log.Fatalf("did not connect to the sink server: %v", err)
log.Fatalf("failed to listen: %v", err)
}

return pbkv.NewKvClient(conn)
}

func testClient() {
conn, err := dgrpc.NewInternalClient("localhost:50051")
if err != nil {
log.Fatalf("did not connect: %v", err)
}

c := pb.NewBlockClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

blockResp, err := c.NumToID(ctx, &pb.NumToIDReq{BlockNum: 6})
if err != nil {
log.Fatalf("could not get block data NumToID request: %v", err)
}

blockSecondResp, err := c.IDToNum(ctx, &pb.IDToNumReq{BlockID: "f37c632d361e0a93f08ba29b1a2c708d9caa3ee19d1ee8d2a02612bffe49f0a9"})
if err != nil {
log.Fatalf("could not get block data for IDToNum request: %v", err)
}

d := pb.NewBlockByTimeClient(conn)
blockThirdResp, err := d.At(ctx, &pb.TimeReq{Time: timestamppb.New(time.UnixMilli(1438270241000))})
if err != nil {
log.Fatalf("could not get block data for At request: %v", err)
}
sinkClient := service.ConnectToSinkServer(*sinkServerAddress)
blockService := service.NewBlockService(sinkClient)
blockByTimeService := service.NewBlockByTimeService(sinkClient)

blockFourthResp, err := d.After(ctx, &pb.RelativeTimeReq{Time: timestamppb.New(time.UnixMilli(1438270241000)), Inclusive: true})
if err != nil {
log.Fatalf("could not get block data for After request: %v", err)
}
sBlock := grpc.NewServer()
pbbmsrv.RegisterBlockServer(sBlock, blockService)
pbbmsrv.RegisterBlockByTimeServer(sBlock, blockByTimeService)

blockFifthResp, err := d.Before(ctx, &pb.RelativeTimeReq{Time: timestamppb.New(time.UnixMilli(1438270241000)), Inclusive: false})
if err != nil {
log.Fatalf("could not get block data for Before request: %v", err)
log.Printf("server block listening at %v", listener.Addr())
if err := sBlock.Serve(listener); err != nil {
log.Fatalf("failed to serve: %v", err)
}

blockSithResp, err := c.Head(ctx, &pb.Empty{})

fmt.Printf("Blockresponse for NumToID request: %s\n", blockResp.String())

fmt.Printf("Blockresponse for IDToNum request: %s\n", blockSecondResp.String())

fmt.Printf("Blockresponse for At request: %s\n", blockThirdResp.String())

fmt.Printf("Blockresponse for After request: %s\n", blockFourthResp.String())

fmt.Printf("Blockresponse for Before request: %s\n", blockFifthResp.String())

fmt.Printf("Blockresponse for Head request: %s\n", blockSithResp.String())

}
65 changes: 65 additions & 0 deletions server/cmd/tester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"context"
"fmt"
"log"
"time"

pbbmsrv "github.com/streamingfast/blockmeta-service/pb/sf/blockmeta/v2"
"github.com/streamingfast/dgrpc"
"google.golang.org/protobuf/types/known/timestamppb"
)

func main() {
conn, err := dgrpc.NewInternalClient("localhost:50051")
if err != nil {
log.Fatalf("did not connect: %v", err)
}

c := pbbmsrv.NewBlockClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

blockResp, err := c.NumToID(ctx, &pbbmsrv.NumToIDReq{BlockNum: 6})
if err != nil {
log.Fatalf("could not get block data NumToID request: %v", err)
}

blockSecondResp, err := c.IDToNum(ctx, &pbbmsrv.IDToNumReq{BlockID: "f37c632d361e0a93f08ba29b1a2c708d9caa3ee19d1ee8d2a02612bffe49f0a9"})
if err != nil {
log.Fatalf("could not get block data for IDToNum request: %v", err)
}

d := pbbmsrv.NewBlockByTimeClient(conn)
blockThirdResp, err := d.At(ctx, &pbbmsrv.TimeReq{Time: timestamppb.New(time.UnixMilli(1438270241000))})
if err != nil {
log.Fatalf("could not get block data for At request: %v", err)
}

blockFourthResp, err := d.After(ctx, &pbbmsrv.RelativeTimeReq{Time: timestamppb.New(time.UnixMilli(1438270241000)), Inclusive: true})
if err != nil {
log.Fatalf("could not get block data for After request: %v", err)
}

blockFifthResp, err := d.Before(ctx, &pbbmsrv.RelativeTimeReq{Time: timestamppb.New(time.UnixMilli(1438270241000)), Inclusive: false})
if err != nil {
log.Fatalf("could not get block data for Before request: %v", err)
}

blockSithResp, err := c.Head(ctx, &pbbmsrv.Empty{})

fmt.Printf("Blockresponse for NumToID request: %s\n", blockResp.String())

fmt.Printf("Blockresponse for IDToNum request: %s\n", blockSecondResp.String())

fmt.Printf("Blockresponse for At request: %s\n", blockThirdResp.String())

fmt.Printf("Blockresponse for After request: %s\n", blockFourthResp.String())

fmt.Printf("Blockresponse for Before request: %s\n", blockFifthResp.String())

fmt.Printf("Blockresponse for Head request: %s\n", blockSithResp.String())

}
2 changes: 1 addition & 1 deletion server/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module blockmeta_server
module github.com/streamingfast/blockmeta-service

go 1.21.6

Expand Down
Loading

0 comments on commit 97ccffe

Please sign in to comment.