Skip to content

Commit

Permalink
Enable state streaming API over gRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
m-Peter committed Jan 4, 2024
1 parent 70dcc68 commit 59bfb20
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
19 changes: 18 additions & 1 deletion server/access/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ import (
"net"

"github.com/onflow/flow-emulator/adapters"
"github.com/onflow/flow-emulator/emulator"
mockModule "github.com/onflow/flow-go/module/mock"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/onflow/flow-go/access"
legacyaccess "github.com/onflow/flow-go/access/legacy"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/engine/access/state_stream/backend"
"github.com/onflow/flow-go/model/flow"
flowgo "github.com/onflow/flow-go/model/flow"
accessproto "github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/executiondata"
legacyaccessproto "github.com/onflow/flow/protobuf/go/flow/legacy/access"
"github.com/rs/zerolog"
"google.golang.org/grpc"
Expand All @@ -52,7 +56,7 @@ type GRPCServer struct {
listener net.Listener
}

func NewGRPCServer(logger *zerolog.Logger, adapter *adapters.AccessAdapter, chain flow.Chain, host string, port int, debug bool) *GRPCServer {
func NewGRPCServer(logger *zerolog.Logger, blockchain *emulator.Blockchain, adapter *adapters.AccessAdapter, chain flow.Chain, host string, port int, debug bool) *GRPCServer {
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(grpcprometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpcprometheus.UnaryServerInterceptor),
Expand All @@ -71,6 +75,19 @@ func NewGRPCServer(logger *zerolog.Logger, adapter *adapters.AccessAdapter, chai
reflection.Register(grpcServer)
}

streamConfig := backend.Config{
EventFilterConfig: state_stream.DefaultEventFilterConfig,
RpcMetricsEnabled: false,
MaxGlobalStreams: state_stream.DefaultMaxGlobalStreams,
ClientSendTimeout: state_stream.DefaultSendTimeout,
ClientSendBufferSize: state_stream.DefaultSendBufferSize,
ResponseLimit: state_stream.DefaultResponseLimit,
HeartbeatInterval: state_stream.DefaultHeartbeatInterval,
}
streamBackend := NewStateStreamBackend(blockchain, *logger)
handler := backend.NewHandler(streamBackend, chain, streamConfig)
executiondata.RegisterExecutionDataAPIServer(grpcServer, handler)

return &GRPCServer{
logger: logger,
host: host,
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer {

accessAdapter := adapters.NewAccessAdapter(logger, emulatedBlockchain)
livenessTicker := utils.NewLivenessTicker(conf.LivenessCheckTolerance)
grpcServer := access.NewGRPCServer(logger, accessAdapter, chain, conf.Host, conf.GRPCPort, conf.GRPCDebug)
grpcServer := access.NewGRPCServer(logger, emulatedBlockchain, accessAdapter, chain, conf.Host, conf.GRPCPort, conf.GRPCDebug)
restServer, err := access.NewRestServer(logger, emulatedBlockchain, accessAdapter, chain, conf.Host, conf.RESTPort, conf.RESTDebug)
if err != nil {
logger.Error().Err(err).Msg("❗ Failed to startup REST API")
Expand Down

0 comments on commit 59bfb20

Please sign in to comment.