Skip to content

Commit

Permalink
Add a CL proxy to allow multiple EL nodes (#29)
Browse files Browse the repository at this point in the history
* Add cl proxy

* Add secondary flag and update readme

* Send FCU without payload attr to secondary
  • Loading branch information
ferranbt authored Nov 27, 2024
1 parent 62ad77a commit 73d41fd
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ To stop the playground, press `Ctrl+C`.
The `EL` instance is deployed with this deterministic enode address:

```
enode://3479db4d9217fb5d7a8ed4d61ac36e120b05d36c2eefb795dc42ff2e971f251a2315f5649ea1833271e020b9adc98d5db9973c7ed92d6b2f1f2223088c3d852f@127.0.0.1:8545
enode://3479db4d9217fb5d7a8ed4d61ac36e120b05d36c2eefb795dc42ff2e971f251a2315f5649ea1833271e020b9adc98d5db9973c7ed92d6b2f1f2223088c3d852f@127.0.0.1:30303
```

Options:
Expand Down
193 changes: 193 additions & 0 deletions cl-proxy/cl-proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package clproxy

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/flashbots/mev-boost-relay/common"
"github.com/sirupsen/logrus"
)

type Config struct {
LogOutput io.Writer
Port uint64
Primary string
Secondary string
}

func DefaultConfig() *Config {
return &Config{
LogOutput: os.Stdout,
Port: 5656,
}
}

type ClProxy struct {
config *Config
log *logrus.Entry
server *http.Server
}

func New(config *Config) (*ClProxy, error) {
log := common.LogSetup(false, "info")
log.Logger.SetOutput(config.LogOutput)

proxy := &ClProxy{
config: config,
log: log,
}

return proxy, nil
}

// Run starts the HTTP server
func (s *ClProxy) Run() error {
mux := http.NewServeMux()
s.server = &http.Server{
Addr: fmt.Sprintf(":%d", s.config.Port),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
Handler: mux,
}

mux.HandleFunc("/", s.handleRequest)

s.log.Infof("Starting server on port %d", s.config.Port)
if err := s.server.ListenAndServe(); err != http.ErrServerClosed {
return fmt.Errorf("server error: %v", err)
}
return nil
}

// Close gracefully shuts down the server
func (s *ClProxy) Close() error {
s.log.Info("Shutting down server...")

// Create a context with timeout for shutdown
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Attempt graceful shutdown
if err := s.server.Shutdown(ctx); err != nil {
return fmt.Errorf("server shutdown error: %v", err)
}

return nil
}

type jsonrpcMessage struct {
Version string `json:"jsonrpc,omitempty"`
ID json.RawMessage `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params []json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
}

func (s *ClProxy) handleRequest(w http.ResponseWriter, r *http.Request) {
// Only accept POST requests
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

data, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}

// Multiplex all the request to both primary and secondary but omit the
// block building requests (this is, remove 'params' field from FCU and omit get payload).
// There are two reasons for this:
// - The secondary builder does not use the Engine API to build blocks but the relayer so these requests are not necessary.
// - The CL->EL setup is not configured anyway to handle two block builders throught the Engine API.
// Note that we still have to relay this request to the primary EL node since we need
// to have a fallback node in the CL.
var jsonRPCRequest jsonrpcMessage
if err := json.Unmarshal(data, &jsonRPCRequest); err != nil {
http.Error(w, "Bad request", http.StatusBadRequest)
return
}

s.log.Info(fmt.Sprintf("Received request: method=%s", jsonRPCRequest.Method))

// proxy to primary and consider its response as the final response to send back to the CL
resp, err := s.proxy(s.config.Primary, r, data)
if err != nil {
s.log.Errorf("Error multiplexing to primary: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
defer resp.Body.Close()

respData, err := io.ReadAll(resp.Body)
if err != nil {
s.log.Errorf("Error reading response from primary: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
w.Write(respData)

if s.config.Secondary == "" {
return
}

if strings.HasPrefix(jsonRPCRequest.Method, "engine_getPayload") {
// the only request we do not send since the secondary builder does not have the payload id
// and it will always fail
return
}

if strings.HasPrefix(jsonRPCRequest.Method, "engine_forkchoiceUpdated") {
// set to nil the second parameter of the forkchoiceUpdated call
if len(jsonRPCRequest.Params) == 1 {
// not expected
s.log.Warn("ForkchoiceUpdated call with only one parameter")
} else {
jsonRPCRequest.Params[1] = nil

data, err = json.Marshal(jsonRPCRequest)
if err != nil {
s.log.Errorf("Error marshalling forkchoiceUpdated request: %v", err)
return
}
}
}

// proxy to secondary
s.log.Info(fmt.Sprintf("Multiplexing request to secondary: method=%s", jsonRPCRequest.Method))
if _, err := s.proxy(s.config.Secondary, r, data); err != nil {
s.log.Errorf("Error multiplexing to secondary: %v", err)
}
}

func (s *ClProxy) proxy(dst string, r *http.Request, data []byte) (*http.Response, error) {
// Create a new request
req, err := http.NewRequest(http.MethodPost, dst, bytes.NewBuffer(data))
if err != nil {
return nil, err
}

// Copy headers. It is important since we have to copy
// the JWT header from the CL
req.Header = r.Header

// Perform the request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}

return resp, nil
}
38 changes: 36 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
ecrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/ferranbt/builder-playground/artifacts"
clproxy "github.com/ferranbt/builder-playground/cl-proxy"
mevboostrelay "github.com/ferranbt/builder-playground/mev-boost-relay"

"github.com/hashicorp/go-uuid"
Expand Down Expand Up @@ -61,6 +62,7 @@ var genesisDelayFlag uint64
var watchPayloadsFlag bool
var latestForkFlag bool
var useRethForValidation bool
var secondaryBuilderPort uint64

var rootCmd = &cobra.Command{
Use: "playground",
Expand Down Expand Up @@ -168,6 +170,7 @@ func main() {
rootCmd.Flags().BoolVar(&watchPayloadsFlag, "watch-payloads", false, "")
rootCmd.Flags().BoolVar(&latestForkFlag, "electra", false, "")
rootCmd.Flags().BoolVar(&useRethForValidation, "use-reth-for-validation", false, "enable flashbots_validateBuilderSubmissionV* on reth and use them for validation")
rootCmd.Flags().Uint64Var(&secondaryBuilderPort, "secondary", 1234, "port to use for the secondary builder")

downloadArtifactsCmd.Flags().BoolVar(&validateFlag, "validate", false, "")
validateCmd.Flags().Uint64Var(&numBlocksValidate, "num-blocks", 5, "")
Expand Down Expand Up @@ -369,6 +372,31 @@ func setupServices(svcManager *serviceManager, out *output) error {
return err
}

// Start the cl proxy
{
cfg := clproxy.DefaultConfig()
cfg.Primary = "http://localhost:8551"

if secondaryBuilderPort != 0 {
cfg.Secondary = fmt.Sprintf("http://localhost:%d", secondaryBuilderPort)
}

var err error
if cfg.LogOutput, err = out.LogOutput("cl-proxy"); err != nil {
return err
}
clproxy, err := clproxy.New(cfg)
if err != nil {
return fmt.Errorf("failed to create cl proxy: %w", err)
}

go func() {
if err := clproxy.Run(); err != nil {
svcManager.emitError()
}
}()
}

rethVersion := func() string {
cmd := exec.Command(rethBin, "--version")
out, err := cmd.Output()
Expand Down Expand Up @@ -404,13 +432,14 @@ func setupServices(svcManager *serviceManager, out *output) error {
"--p2p-secret-key", defaultRethDiscoveryPrivKeyLoc,
"--addr", "127.0.0.1",
"--port", "30303",
"--disable-discovery",
// "--disable-discovery",
// http config
"--http",
"--http.api", "admin,eth,net,web3",
"--http.port", "8545",
"--authrpc.port", "8551",
"--authrpc.jwtsecret", "{{.Dir}}/jwtsecret",
"-vvvv",
).
If(useRethForValidation, func(s *service) *service {
return s.WithReplacementArgs("--http.api", "admin,eth,web3,net,rpc,flashbots")
Expand Down Expand Up @@ -470,7 +499,7 @@ func setupServices(svcManager *serviceManager, out *output) error {
"--http-port", "3500",
"--disable-packet-filter",
"--target-peers", "0",
"--execution-endpoint", "http://localhost:8551",
"--execution-endpoint", "http://localhost:5656",
"--execution-jwt", "{{.Dir}}/jwtsecret",
"--builder", "http://localhost:5555",
"--builder-fallback-epochs-since-finalization", "0",
Expand Down Expand Up @@ -538,6 +567,11 @@ func setupServices(svcManager *serviceManager, out *output) error {
ports: []*port{
{name: "http", port: 5555},
},
}, &service{
name: "cl-proxy",
ports: []*port{
{name: "jsonrpc", port: 5656},
},
})

// print services info
Expand Down

0 comments on commit 73d41fd

Please sign in to comment.