From 73d41fdc9b2d06f09f70f6322477fec9c758bd18 Mon Sep 17 00:00:00 2001 From: Ferran Borreguero Date: Wed, 27 Nov 2024 11:40:54 +0000 Subject: [PATCH] Add a CL proxy to allow multiple EL nodes (#29) * Add cl proxy * Add secondary flag and update readme * Send FCU without payload attr to secondary --- README.md | 2 +- cl-proxy/cl-proxy.go | 193 +++++++++++++++++++++++++++++++++++++++++++ main.go | 38 ++++++++- 3 files changed, 230 insertions(+), 3 deletions(-) create mode 100644 cl-proxy/cl-proxy.go diff --git a/README.md b/README.md index b6e9208..6ba2895 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ To stop the playground, press `Ctrl+C`. The `EL` instance is deployed with this deterministic enode address: ``` -enode://3479db4d9217fb5d7a8ed4d61ac36e120b05d36c2eefb795dc42ff2e971f251a2315f5649ea1833271e020b9adc98d5db9973c7ed92d6b2f1f2223088c3d852f@127.0.0.1:8545 +enode://3479db4d9217fb5d7a8ed4d61ac36e120b05d36c2eefb795dc42ff2e971f251a2315f5649ea1833271e020b9adc98d5db9973c7ed92d6b2f1f2223088c3d852f@127.0.0.1:30303 ``` Options: diff --git a/cl-proxy/cl-proxy.go b/cl-proxy/cl-proxy.go new file mode 100644 index 0000000..df5cb2a --- /dev/null +++ b/cl-proxy/cl-proxy.go @@ -0,0 +1,193 @@ +package clproxy + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + "github.com/flashbots/mev-boost-relay/common" + "github.com/sirupsen/logrus" +) + +type Config struct { + LogOutput io.Writer + Port uint64 + Primary string + Secondary string +} + +func DefaultConfig() *Config { + return &Config{ + LogOutput: os.Stdout, + Port: 5656, + } +} + +type ClProxy struct { + config *Config + log *logrus.Entry + server *http.Server +} + +func New(config *Config) (*ClProxy, error) { + log := common.LogSetup(false, "info") + log.Logger.SetOutput(config.LogOutput) + + proxy := &ClProxy{ + config: config, + log: log, + } + + return proxy, nil +} + +// Run starts the HTTP server +func (s *ClProxy) Run() error { + mux := http.NewServeMux() + s.server = &http.Server{ + Addr: fmt.Sprintf(":%d", s.config.Port), + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + Handler: mux, + } + + mux.HandleFunc("/", s.handleRequest) + + s.log.Infof("Starting server on port %d", s.config.Port) + if err := s.server.ListenAndServe(); err != http.ErrServerClosed { + return fmt.Errorf("server error: %v", err) + } + return nil +} + +// Close gracefully shuts down the server +func (s *ClProxy) Close() error { + s.log.Info("Shutting down server...") + + // Create a context with timeout for shutdown + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Attempt graceful shutdown + if err := s.server.Shutdown(ctx); err != nil { + return fmt.Errorf("server shutdown error: %v", err) + } + + return nil +} + +type jsonrpcMessage struct { + Version string `json:"jsonrpc,omitempty"` + ID json.RawMessage `json:"id,omitempty"` + Method string `json:"method,omitempty"` + Params []json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` +} + +func (s *ClProxy) handleRequest(w http.ResponseWriter, r *http.Request) { + // Only accept POST requests + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + data, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + + // Multiplex all the request to both primary and secondary but omit the + // block building requests (this is, remove 'params' field from FCU and omit get payload). + // There are two reasons for this: + // - The secondary builder does not use the Engine API to build blocks but the relayer so these requests are not necessary. + // - The CL->EL setup is not configured anyway to handle two block builders throught the Engine API. + // Note that we still have to relay this request to the primary EL node since we need + // to have a fallback node in the CL. + var jsonRPCRequest jsonrpcMessage + if err := json.Unmarshal(data, &jsonRPCRequest); err != nil { + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + + s.log.Info(fmt.Sprintf("Received request: method=%s", jsonRPCRequest.Method)) + + // proxy to primary and consider its response as the final response to send back to the CL + resp, err := s.proxy(s.config.Primary, r, data) + if err != nil { + s.log.Errorf("Error multiplexing to primary: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + defer resp.Body.Close() + + respData, err := io.ReadAll(resp.Body) + if err != nil { + s.log.Errorf("Error reading response from primary: %v", err) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(resp.StatusCode) + w.Write(respData) + + if s.config.Secondary == "" { + return + } + + if strings.HasPrefix(jsonRPCRequest.Method, "engine_getPayload") { + // the only request we do not send since the secondary builder does not have the payload id + // and it will always fail + return + } + + if strings.HasPrefix(jsonRPCRequest.Method, "engine_forkchoiceUpdated") { + // set to nil the second parameter of the forkchoiceUpdated call + if len(jsonRPCRequest.Params) == 1 { + // not expected + s.log.Warn("ForkchoiceUpdated call with only one parameter") + } else { + jsonRPCRequest.Params[1] = nil + + data, err = json.Marshal(jsonRPCRequest) + if err != nil { + s.log.Errorf("Error marshalling forkchoiceUpdated request: %v", err) + return + } + } + } + + // proxy to secondary + s.log.Info(fmt.Sprintf("Multiplexing request to secondary: method=%s", jsonRPCRequest.Method)) + if _, err := s.proxy(s.config.Secondary, r, data); err != nil { + s.log.Errorf("Error multiplexing to secondary: %v", err) + } +} + +func (s *ClProxy) proxy(dst string, r *http.Request, data []byte) (*http.Response, error) { + // Create a new request + req, err := http.NewRequest(http.MethodPost, dst, bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + + // Copy headers. It is important since we have to copy + // the JWT header from the CL + req.Header = r.Header + + // Perform the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + return resp, nil +} diff --git a/main.go b/main.go index 6e41297..aa4cf92 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,7 @@ import ( ecrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ferranbt/builder-playground/artifacts" + clproxy "github.com/ferranbt/builder-playground/cl-proxy" mevboostrelay "github.com/ferranbt/builder-playground/mev-boost-relay" "github.com/hashicorp/go-uuid" @@ -61,6 +62,7 @@ var genesisDelayFlag uint64 var watchPayloadsFlag bool var latestForkFlag bool var useRethForValidation bool +var secondaryBuilderPort uint64 var rootCmd = &cobra.Command{ Use: "playground", @@ -168,6 +170,7 @@ func main() { rootCmd.Flags().BoolVar(&watchPayloadsFlag, "watch-payloads", false, "") rootCmd.Flags().BoolVar(&latestForkFlag, "electra", false, "") rootCmd.Flags().BoolVar(&useRethForValidation, "use-reth-for-validation", false, "enable flashbots_validateBuilderSubmissionV* on reth and use them for validation") + rootCmd.Flags().Uint64Var(&secondaryBuilderPort, "secondary", 1234, "port to use for the secondary builder") downloadArtifactsCmd.Flags().BoolVar(&validateFlag, "validate", false, "") validateCmd.Flags().Uint64Var(&numBlocksValidate, "num-blocks", 5, "") @@ -369,6 +372,31 @@ func setupServices(svcManager *serviceManager, out *output) error { return err } + // Start the cl proxy + { + cfg := clproxy.DefaultConfig() + cfg.Primary = "http://localhost:8551" + + if secondaryBuilderPort != 0 { + cfg.Secondary = fmt.Sprintf("http://localhost:%d", secondaryBuilderPort) + } + + var err error + if cfg.LogOutput, err = out.LogOutput("cl-proxy"); err != nil { + return err + } + clproxy, err := clproxy.New(cfg) + if err != nil { + return fmt.Errorf("failed to create cl proxy: %w", err) + } + + go func() { + if err := clproxy.Run(); err != nil { + svcManager.emitError() + } + }() + } + rethVersion := func() string { cmd := exec.Command(rethBin, "--version") out, err := cmd.Output() @@ -404,13 +432,14 @@ func setupServices(svcManager *serviceManager, out *output) error { "--p2p-secret-key", defaultRethDiscoveryPrivKeyLoc, "--addr", "127.0.0.1", "--port", "30303", - "--disable-discovery", + // "--disable-discovery", // http config "--http", "--http.api", "admin,eth,net,web3", "--http.port", "8545", "--authrpc.port", "8551", "--authrpc.jwtsecret", "{{.Dir}}/jwtsecret", + "-vvvv", ). If(useRethForValidation, func(s *service) *service { return s.WithReplacementArgs("--http.api", "admin,eth,web3,net,rpc,flashbots") @@ -470,7 +499,7 @@ func setupServices(svcManager *serviceManager, out *output) error { "--http-port", "3500", "--disable-packet-filter", "--target-peers", "0", - "--execution-endpoint", "http://localhost:8551", + "--execution-endpoint", "http://localhost:5656", "--execution-jwt", "{{.Dir}}/jwtsecret", "--builder", "http://localhost:5555", "--builder-fallback-epochs-since-finalization", "0", @@ -538,6 +567,11 @@ func setupServices(svcManager *serviceManager, out *output) error { ports: []*port{ {name: "http", port: 5555}, }, + }, &service{ + name: "cl-proxy", + ports: []*port{ + {name: "jsonrpc", port: 5656}, + }, }) // print services info