From edfcd292279f01b4328ced55eb6a239093af38e5 Mon Sep 17 00:00:00 2001 From: Vitaly Drogan Date: Wed, 30 Oct 2024 16:06:53 +0100 Subject: [PATCH] Create a binary for proxying from central. infra --- Makefile | 14 +- README.md | 73 ++++++++- cmd/{httpserver => receiver-proxy}/main.go | 34 ++-- cmd/sender-proxy/main.go | 172 ++++++++++++++++++++ proxy/api.go | 38 ++--- proxy/proxy.go | 40 ++--- proxy/proxy_test.go | 8 +- proxy/sender_proxy.go | 179 +++++++++++++++++++++ proxy/servers.go | 59 ++++++- proxy/sharing.go | 18 ++- receiver.dockerfile | 23 +++ httpserver.dockerfile => sender.dockerfile | 8 +- 12 files changed, 582 insertions(+), 84 deletions(-) rename cmd/{httpserver => receiver-proxy}/main.go (83%) create mode 100644 cmd/sender-proxy/main.go create mode 100644 proxy/sender_proxy.go create mode 100644 receiver.dockerfile rename httpserver.dockerfile => sender.dockerfile (79%) diff --git a/Makefile b/Makefile index 3ff41a2..55b3473 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,8 @@ clean: ## Clean the build directory .PHONY: build build: ## Build the HTTP server @mkdir -p ./build - go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/orderflow-proxy cmd/httpserver/main.go + go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/sender-proxy cmd/sender-proxy/main.go + go build -trimpath -ldflags "-X github.com/flashbots/tdx-orderflow-proxy/common.Version=${VERSION}" -v -o ./build/receiver-proxy cmd/receiver-proxy/main.go ##@ Test & Development @@ -75,6 +76,13 @@ docker: ## Build the HTTP server Docker image DOCKER_BUILDKIT=1 docker build \ --platform linux/amd64 \ --build-arg VERSION=${VERSION} \ - --file httpserver.dockerfile \ - --tag your-project \ + --file sender.dockerfile \ + --tag tdx-orderflow-proxy-sender-proxy \ + . + + DOCKER_BUILDKIT=1 docker build \ + --platform linux/amd64 \ + --build-arg VERSION=${VERSION} \ + --file receiver.dockerfile \ + --tag tdx-orderflow-proxy-receiver-proxy \ . diff --git a/README.md b/README.md index 0bbc329..74e1e2c 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,13 @@ make build ``` -## Run +There are two separate programs in this repo: +* receiver proxy that should be part of tdx image +* sender proxy that is part of infra that sends orderflow to all peers -Orderflow proxy will: +## Run receiver proxy + +Receiver proxy will: * generate SSL certificate * generate orderflow signer @@ -24,15 +28,15 @@ Orderflow proxy will: * proxy local request to other builders in the network * archive local requests by sending them to archive endpoint -Flags for the orderflow proxy +Flags for the receiver proxy ``` -./build/orderflow-proxy -h +./build/receiver-proxy -h NAME: - orderflow-proxy - Serve API, and metrics + receiver-proxy - Serve API, and metrics USAGE: - orderflow-proxy [global options] command [command options] + receiver-proxy [global options] command [command options] COMMANDS: help, h Shows a list of commands or help for one command @@ -54,7 +58,62 @@ GLOBAL OPTIONS: --log-json log in JSON format (default: false) --log-debug log debug messages (default: false) --log-uid generate a uuid and add to all log messages (default: false) - --log-service value add 'service' tag to logs (default: "your-project") + --log-service value add 'service' tag to logs (default: "tdx-orderflow-proxy-receiver") --pprof enable pprof debug endpoint (pprof is served on $metrics-addr/debug/pprof/*) (default: false) --help, -h show help ``` + + +## Run sender proxy + +Sender proxy will: +* listen for http requests +* sign request with `orderflow-signer-key` +* poxy them to the peers received form builder config hub + +``` +./build/sender-proxy -h +NAME: + sender-proxy - Serve API, and metrics + +USAGE: + sender-proxy [global options] command [command options] + +COMMANDS: + help, h Shows a list of commands or help for one command + +GLOBAL OPTIONS: + --listen-address value address to listen on for requests (default: "127.0.0.1:8080") + --builder-confighub-endpoint value address of the builder config hub enpoint (directly or throught the cvm-proxy) (default: "http://127.0.0.1:14892") + --orderflow-signer-key value ordreflow will be signed with this address (default: "0xfb5ad18432422a84514f71d63b45edf51165d33bef9c2bd60957a48d4c4cb68e") + --max-request-body-size-bytes value Maximum size of the request body, if 0 default will be used (default: 0) + --metrics-addr value address to listen on for Prometheus metrics (metrics are served on $metrics-addr/metrics) (default: "127.0.0.1:8090") + --log-json log in JSON format (default: false) + --log-debug log debug messages (default: false) + --log-uid generate a uuid and add to all log messages (default: false) + --log-service value add 'service' tag to logs (default: "tdx-orderflow-proxy-sender") + --pprof enable pprof debug endpoint (pprof is served on $metrics-addr/debug/pprof/*) (default: false) + --help, -h show help +dvush@ripper> ./build/sender-proxy -h ~/flashbots/orderflow-proxy +NAME: + sender-proxy - Serve API, and metrics + +USAGE: + sender-proxy [global options] command [command options] + +COMMANDS: + help, h Shows a list of commands or help for one command + +GLOBAL OPTIONS: + --listen-address value address to listen on for requests (default: "127.0.0.1:8080") + --builder-confighub-endpoint value address of the builder config hub enpoint (directly or throught the cvm-proxy) (default: "http://127.0.0.1:14892") + --orderflow-signer-key value ordreflow will be signed with this address (default: "0xfb5ad18432422a84514f71d63b45edf51165d33bef9c2bd60957a48d4c4cb68e") + --max-request-body-size-bytes value Maximum size of the request body, if 0 default will be used (default: 0) + --metrics-addr value address to listen on for Prometheus metrics (metrics are served on $metrics-addr/metrics) (default: "127.0.0.1:8090") + --log-json log in JSON format (default: false) + --log-debug log debug messages (default: false) + --log-uid generate a uuid and add to all log messages (default: false) + --log-service value add 'service' tag to logs (default: "tdx-orderflow-proxy-sender") + --pprof enable pprof debug endpoint (pprof is served on $metrics-addr/debug/pprof/*) (default: false) + --help, -h show help +``` diff --git a/cmd/httpserver/main.go b/cmd/receiver-proxy/main.go similarity index 83% rename from cmd/httpserver/main.go rename to cmd/receiver-proxy/main.go index cc1f1fa..0d8d25d 100644 --- a/cmd/httpserver/main.go +++ b/cmd/receiver-proxy/main.go @@ -105,7 +105,7 @@ var flags []cli.Flag = []cli.Flag{ }, &cli.StringFlag{ Name: "log-service", - Value: "your-project", + Value: "tdx-orderflow-proxy-receiver", Usage: "add 'service' tag to logs", }, &cli.BoolFlag{ @@ -117,7 +117,7 @@ var flags []cli.Flag = []cli.Flag{ func main() { app := &cli.App{ - Name: "orderflow-proxy", + Name: "receiver-proxy", Usage: "Serve API, and metrics", Flags: flags, Action: func(cCtx *cli.Context) error { @@ -180,25 +180,25 @@ func main() { flashbotsSignerAddress := eth.HexToAddress(flashbotsSignerStr) maxRequestBodySizeBytes := cCtx.Int64("max-request-body-size-bytes") - proxyConfig := &proxy.NewProxyConfig{ - NewProxyConstantConfig: proxy.NewProxyConstantConfig{Log: log, Name: name, FlashbotsSignerAddress: flashbotsSignerAddress}, - CertValidDuration: certDuration, - CertHosts: certHosts, - BuilderConfigHubEndpoint: builderConfigHubEndpoint, - ArchiveEndpoint: archiveEndpoint, - LocalBuilderEndpoint: builderEndpoint, - EthRPC: rpcEndpoint, - MaxRequestBodySizeBytes: maxRequestBodySizeBytes, + proxyConfig := &proxy.ReceiverProxyConfig{ + ReceiverProxyConstantConfig: proxy.ReceiverProxyConstantConfig{Log: log, Name: name, FlashbotsSignerAddress: flashbotsSignerAddress}, + CertValidDuration: certDuration, + CertHosts: certHosts, + BuilderConfigHubEndpoint: builderConfigHubEndpoint, + ArchiveEndpoint: archiveEndpoint, + LocalBuilderEndpoint: builderEndpoint, + EthRPC: rpcEndpoint, + MaxRequestBodySizeBytes: maxRequestBodySizeBytes, } - instance, err := proxy.NewNewProxy(*proxyConfig) + instance, err := proxy.NewReceiverProxy(*proxyConfig) if err != nil { - log.Error("failed to create proxy server", "err", err) + log.Error("Failed to create proxy server", "err", err) return err } err = instance.RegisterSecrets() if err != nil { - log.Error("failed to generate and publish secrets", "err", err) + log.Error("Failed to generate and publish secrets", "err", err) return err } @@ -206,12 +206,14 @@ func main() { publicListenAddr := cCtx.String("public-listen-addr") certListenAddr := cCtx.String("cert-listen-addr") - servers, err := proxy.StartServers(instance, publicListenAddr, localListenAddr, certListenAddr) + servers, err := proxy.StartReceiverServers(instance, publicListenAddr, localListenAddr, certListenAddr) if err != nil { - log.Error("failed to start proxy server", "err", err) + log.Error("Failed to start proxy server", "err", err) return err } + log.Info("Started receiver proxy", "publicListenAddress", publicListenAddr, "localListenAddress", localListenAddr, "certListenAddress", certListenAddr) + <-exit servers.Stop() return nil diff --git a/cmd/sender-proxy/main.go b/cmd/sender-proxy/main.go new file mode 100644 index 0000000..a4a1bf7 --- /dev/null +++ b/cmd/sender-proxy/main.go @@ -0,0 +1,172 @@ +package main + +import ( + "log" + "net/http" + "net/http/pprof" + "os" + "os/signal" + "syscall" + "time" + + "github.com/VictoriaMetrics/metrics" + "github.com/flashbots/go-utils/signature" + "github.com/flashbots/tdx-orderflow-proxy/common" + "github.com/flashbots/tdx-orderflow-proxy/proxy" + "github.com/google/uuid" + "github.com/urfave/cli/v2" // imports as package "cli" +) + +var flags []cli.Flag = []cli.Flag{ + // input and output + &cli.StringFlag{ + Name: "listen-address", + Value: "127.0.0.1:8080", + Usage: "address to listen on for requests", + }, + &cli.StringFlag{ + Name: "builder-confighub-endpoint", + Value: "http://127.0.0.1:14892", + Usage: "address of the builder config hub enpoint (directly or throught the cvm-proxy)", + }, + &cli.StringFlag{ + Name: "orderflow-signer-key", + Value: "0xfb5ad18432422a84514f71d63b45edf51165d33bef9c2bd60957a48d4c4cb68e", + Usage: "ordreflow will be signed with this address", + }, + &cli.Int64Flag{ + Name: "max-request-body-size-bytes", + Value: 0, + Usage: "Maximum size of the request body, if 0 default will be used", + }, + + // logging, metrics and debug + &cli.StringFlag{ + Name: "metrics-addr", + Value: "127.0.0.1:8090", + Usage: "address to listen on for Prometheus metrics (metrics are served on $metrics-addr/metrics)", + }, + &cli.BoolFlag{ + Name: "log-json", + Value: false, + Usage: "log in JSON format", + }, + &cli.BoolFlag{ + Name: "log-debug", + Value: false, + Usage: "log debug messages", + }, + &cli.BoolFlag{ + Name: "log-uid", + Value: false, + Usage: "generate a uuid and add to all log messages", + }, + &cli.StringFlag{ + Name: "log-service", + Value: "tdx-orderflow-proxy-sender", + Usage: "add 'service' tag to logs", + }, + &cli.BoolFlag{ + Name: "pprof", + Value: false, + Usage: "enable pprof debug endpoint (pprof is served on $metrics-addr/debug/pprof/*)", + }, +} + +func main() { + app := &cli.App{ + Name: "sender-proxy", + Usage: "Serve API, and metrics", + Flags: flags, + Action: func(cCtx *cli.Context) error { + logJSON := cCtx.Bool("log-json") + logDebug := cCtx.Bool("log-debug") + logUID := cCtx.Bool("log-uid") + logService := cCtx.String("log-service") + + log := common.SetupLogger(&common.LoggingOpts{ + Debug: logDebug, + JSON: logJSON, + Service: logService, + Version: common.Version, + }) + + if logUID { + id := uuid.Must(uuid.NewRandom()) + log = log.With("uid", id.String()) + } + + exit := make(chan os.Signal, 1) + signal.Notify(exit, os.Interrupt, syscall.SIGTERM) + + // metrics server + go func() { + metricsAddr := cCtx.String("metrics-addr") + usePprof := cCtx.Bool("pprof") + metricsMux := http.NewServeMux() + metricsMux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + metrics.WritePrometheus(w, true) + }) + if usePprof { + metricsMux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + metricsMux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + metricsMux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + metricsMux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + metricsMux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + } + + metricsServer := &http.Server{ + Addr: metricsAddr, + ReadHeaderTimeout: 5 * time.Second, + Handler: metricsMux, + } + + err := metricsServer.ListenAndServe() + if err != nil { + log.Error("Failed to start metrics server", "err", err) + } + }() + + builderConfigHubEndpoint := cCtx.String("builder-confighub-endpoint") + orderflowSignerKeyStr := cCtx.String("orderflow-signer-key") + orderflowSigner, err := signature.NewSignerFromHexPrivateKey(orderflowSignerKeyStr) + if err != nil { + log.Error("Failed to get signer from private key", "error", err) + } + log.Info("Ordeflow signing address", "address", orderflowSigner.Address()) + maxRequestBodySizeBytes := cCtx.Int64("max-request-body-size-bytes") + + proxyConfig := &proxy.SenderProxyConfig{ + SenderProxyConstantConfig: proxy.SenderProxyConstantConfig{ + Log: log, + OrderflowSigner: orderflowSigner, + }, + BuilderConfigHubEndpoint: builderConfigHubEndpoint, + MaxRequestBodySizeBytes: maxRequestBodySizeBytes, + } + + instance, err := proxy.NewSenderProxy(*proxyConfig) + if err != nil { + log.Error("Failed to create proxy server", "err", err) + return err + } + + listenAddr := cCtx.String("listen-address") + servers, err := proxy.StartSenderServers(instance, listenAddr) + if err != nil { + log.Error("Failed to start proxy server", "err", err) + return err + } + + log.Info("Started sender proxy", "listenAddres", listenAddr) + + <-exit + servers.Stop() + return nil + }, + } + + if err := app.Run(os.Args); err != nil { + log.Fatal(err) + } +} diff --git a/proxy/api.go b/proxy/api.go index ddfebd4..bb91db7 100644 --- a/proxy/api.go +++ b/proxy/api.go @@ -32,7 +32,7 @@ var ( apiNow = time.Now ) -func (prx *Proxy) PublicJSONRPCHandler(maxRequestBodySizeBytes int64) (*rpcserver.JSONRPCHandler, error) { +func (prx *ReceiverProxy) PublicJSONRPCHandler(maxRequestBodySizeBytes int64) (*rpcserver.JSONRPCHandler, error) { handler, err := rpcserver.NewJSONRPCHandler(rpcserver.Methods{ EthSendBundleMethod: prx.EthSendBundlePublic, MevSendBundleMethod: prx.MevSendBundlePublic, @@ -51,7 +51,7 @@ func (prx *Proxy) PublicJSONRPCHandler(maxRequestBodySizeBytes int64) (*rpcserve return handler, err } -func (prx *Proxy) LocalJSONRPCHandler(maxRequestBodySizeBytes int64) (*rpcserver.JSONRPCHandler, error) { +func (prx *ReceiverProxy) LocalJSONRPCHandler(maxRequestBodySizeBytes int64) (*rpcserver.JSONRPCHandler, error) { handler, err := rpcserver.NewJSONRPCHandler(rpcserver.Methods{ EthSendBundleMethod: prx.EthSendBundleLocal, MevSendBundleMethod: prx.MevSendBundleLocal, @@ -70,7 +70,7 @@ func (prx *Proxy) LocalJSONRPCHandler(maxRequestBodySizeBytes int64) (*rpcserver return handler, err } -func (prx *Proxy) ValidateSigner(ctx context.Context, req *ParsedRequest, publicEndpoint bool) error { +func (prx *ReceiverProxy) ValidateSigner(ctx context.Context, req *ParsedRequest, publicEndpoint bool) error { req.signer = rpcserver.GetSigner(ctx) if !publicEndpoint { return nil @@ -99,7 +99,7 @@ func (prx *Proxy) ValidateSigner(ctx context.Context, req *ParsedRequest, public return nil } -func (prx *Proxy) EthSendBundle(ctx context.Context, ethSendBundle rpctypes.EthSendBundleArgs, publicEndpoint bool) error { +func (prx *ReceiverProxy) EthSendBundle(ctx context.Context, ethSendBundle rpctypes.EthSendBundleArgs, publicEndpoint bool) error { parsedRequest := ParsedRequest{ publicEndpoint: publicEndpoint, ethSendBundle: ðSendBundle, @@ -126,15 +126,15 @@ func (prx *Proxy) EthSendBundle(ctx context.Context, ethSendBundle rpctypes.EthS return prx.HandleParsedRequest(ctx, parsedRequest) } -func (prx *Proxy) EthSendBundlePublic(ctx context.Context, ethSendBundle rpctypes.EthSendBundleArgs) error { +func (prx *ReceiverProxy) EthSendBundlePublic(ctx context.Context, ethSendBundle rpctypes.EthSendBundleArgs) error { return prx.EthSendBundle(ctx, ethSendBundle, true) } -func (prx *Proxy) EthSendBundleLocal(ctx context.Context, ethSendBundle rpctypes.EthSendBundleArgs) error { +func (prx *ReceiverProxy) EthSendBundleLocal(ctx context.Context, ethSendBundle rpctypes.EthSendBundleArgs) error { return prx.EthSendBundle(ctx, ethSendBundle, false) } -func (prx *Proxy) MevSendBundle(ctx context.Context, mevSendBundle rpctypes.MevSendBundleArgs, publicEndpoint bool) error { +func (prx *ReceiverProxy) MevSendBundle(ctx context.Context, mevSendBundle rpctypes.MevSendBundleArgs, publicEndpoint bool) error { parsedRequest := ParsedRequest{ publicEndpoint: publicEndpoint, mevSendBundle: &mevSendBundle, @@ -164,15 +164,15 @@ func (prx *Proxy) MevSendBundle(ctx context.Context, mevSendBundle rpctypes.MevS return prx.HandleParsedRequest(ctx, parsedRequest) } -func (prx *Proxy) MevSendBundlePublic(ctx context.Context, mevSendBundle rpctypes.MevSendBundleArgs) error { +func (prx *ReceiverProxy) MevSendBundlePublic(ctx context.Context, mevSendBundle rpctypes.MevSendBundleArgs) error { return prx.MevSendBundle(ctx, mevSendBundle, true) } -func (prx *Proxy) MevSendBundleLocal(ctx context.Context, mevSendBundle rpctypes.MevSendBundleArgs) error { +func (prx *ReceiverProxy) MevSendBundleLocal(ctx context.Context, mevSendBundle rpctypes.MevSendBundleArgs) error { return prx.MevSendBundle(ctx, mevSendBundle, false) } -func (prx *Proxy) EthCancelBundle(ctx context.Context, ethCancelBundle rpctypes.EthCancelBundleArgs, publicEndpoint bool) error { +func (prx *ReceiverProxy) EthCancelBundle(ctx context.Context, ethCancelBundle rpctypes.EthCancelBundleArgs, publicEndpoint bool) error { parsedRequest := ParsedRequest{ publicEndpoint: publicEndpoint, ethCancelBundle: ðCancelBundle, @@ -195,15 +195,15 @@ func (prx *Proxy) EthCancelBundle(ctx context.Context, ethCancelBundle rpctypes. return prx.HandleParsedRequest(ctx, parsedRequest) } -func (prx *Proxy) EthCancelBundlePublic(ctx context.Context, ethCancelBundle rpctypes.EthCancelBundleArgs) error { +func (prx *ReceiverProxy) EthCancelBundlePublic(ctx context.Context, ethCancelBundle rpctypes.EthCancelBundleArgs) error { return prx.EthCancelBundle(ctx, ethCancelBundle, true) } -func (prx *Proxy) EthCancelBundleLocal(ctx context.Context, ethCancelBundle rpctypes.EthCancelBundleArgs) error { +func (prx *ReceiverProxy) EthCancelBundleLocal(ctx context.Context, ethCancelBundle rpctypes.EthCancelBundleArgs) error { return prx.EthCancelBundle(ctx, ethCancelBundle, false) } -func (prx *Proxy) EthSendRawTransaction(ctx context.Context, ethSendRawTransaction rpctypes.EthSendRawTransactionArgs, publicEndpoint bool) error { +func (prx *ReceiverProxy) EthSendRawTransaction(ctx context.Context, ethSendRawTransaction rpctypes.EthSendRawTransactionArgs, publicEndpoint bool) error { parsedRequest := ParsedRequest{ publicEndpoint: publicEndpoint, ethSendRawTransaction: ðSendRawTransaction, @@ -220,15 +220,15 @@ func (prx *Proxy) EthSendRawTransaction(ctx context.Context, ethSendRawTransacti return prx.HandleParsedRequest(ctx, parsedRequest) } -func (prx *Proxy) EthSendRawTransactionPublic(ctx context.Context, ethSendRawTransaction rpctypes.EthSendRawTransactionArgs) error { +func (prx *ReceiverProxy) EthSendRawTransactionPublic(ctx context.Context, ethSendRawTransaction rpctypes.EthSendRawTransactionArgs) error { return prx.EthSendRawTransaction(ctx, ethSendRawTransaction, true) } -func (prx *Proxy) EthSendRawTransactionLocal(ctx context.Context, ethSendRawTransaction rpctypes.EthSendRawTransactionArgs) error { +func (prx *ReceiverProxy) EthSendRawTransactionLocal(ctx context.Context, ethSendRawTransaction rpctypes.EthSendRawTransactionArgs) error { return prx.EthSendRawTransaction(ctx, ethSendRawTransaction, false) } -func (prx *Proxy) BidSubsidiseBlock(ctx context.Context, bidSubsidiseBlock rpctypes.BidSubsisideBlockArgs, publicEndpoint bool) error { +func (prx *ReceiverProxy) BidSubsidiseBlock(ctx context.Context, bidSubsidiseBlock rpctypes.BidSubsisideBlockArgs, publicEndpoint bool) error { if !publicEndpoint { return errSubsidyWrongEndpoint } @@ -254,11 +254,11 @@ func (prx *Proxy) BidSubsidiseBlock(ctx context.Context, bidSubsidiseBlock rpcty return prx.HandleParsedRequest(ctx, parsedRequest) } -func (prx *Proxy) BidSubsidiseBlockPublic(ctx context.Context, bidSubsidiseBlock rpctypes.BidSubsisideBlockArgs) error { +func (prx *ReceiverProxy) BidSubsidiseBlockPublic(ctx context.Context, bidSubsidiseBlock rpctypes.BidSubsisideBlockArgs) error { return prx.BidSubsidiseBlock(ctx, bidSubsidiseBlock, true) } -func (prx *Proxy) BidSubsidiseBlockLocal(ctx context.Context, bidSubsidiseBlock rpctypes.BidSubsisideBlockArgs) error { +func (prx *ReceiverProxy) BidSubsidiseBlockLocal(ctx context.Context, bidSubsidiseBlock rpctypes.BidSubsisideBlockArgs) error { return prx.BidSubsidiseBlock(ctx, bidSubsidiseBlock, false) } @@ -276,7 +276,7 @@ type ParsedRequest struct { bidSubsidiseBlock *rpctypes.BidSubsisideBlockArgs } -func (prx *Proxy) HandleParsedRequest(ctx context.Context, parsedRequest ParsedRequest) error { +func (prx *ReceiverProxy) HandleParsedRequest(ctx context.Context, parsedRequest ParsedRequest) error { parsedRequest.receivedAt = apiNow() prx.Log.Info("Received request", slog.Bool("isPublicEndpoint", parsedRequest.publicEndpoint), slog.String("method", parsedRequest.method)) if parsedRequest.publicEndpoint { diff --git a/proxy/proxy.go b/proxy/proxy.go index 0eb88e4..05406dc 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -21,8 +21,8 @@ var ( peerUpdateTime = time.Minute * 5 ) -type Proxy struct { - NewProxyConstantConfig +type ReceiverProxy struct { + ReceiverProxyConstantConfig ConfigHub *BuilderConfigHub @@ -50,14 +50,14 @@ type Proxy struct { peerUpdaterClose chan struct{} } -type NewProxyConstantConfig struct { +type ReceiverProxyConstantConfig struct { Log *slog.Logger Name string FlashbotsSignerAddress common.Address } -type NewProxyConfig struct { - NewProxyConstantConfig +type ReceiverProxyConfig struct { + ReceiverProxyConstantConfig CertValidDuration time.Duration CertHosts []string @@ -71,7 +71,7 @@ type NewProxyConfig struct { MaxRequestBodySizeBytes int64 } -func NewNewProxy(config NewProxyConfig) (*Proxy, error) { +func NewReceiverProxy(config ReceiverProxyConfig) (*ReceiverProxy, error) { orderflowSigner, err := signature.NewRandomSigner() if err != nil { return nil, err @@ -88,14 +88,14 @@ func NewNewProxy(config NewProxyConfig) (*Proxy, error) { localBuilder := rpcclient.NewClient(config.LocalBuilderEndpoint) - prx := &Proxy{ - NewProxyConstantConfig: config.NewProxyConstantConfig, - ConfigHub: NewBuilderConfigHub(config.Log, config.BuilderConfigHubEndpoint), - OrderflowSigner: orderflowSigner, - PublicCertPEM: cert, - Certificate: certificate, - localBuilder: localBuilder, - requestUniqueKeysRLU: expirable.NewLRU[uuid.UUID, struct{}](requestsRLUSize, nil, requestsRLUTTL), + prx := &ReceiverProxy{ + ReceiverProxyConstantConfig: config.ReceiverProxyConstantConfig, + ConfigHub: NewBuilderConfigHub(config.Log, config.BuilderConfigHubEndpoint), + OrderflowSigner: orderflowSigner, + PublicCertPEM: cert, + Certificate: certificate, + localBuilder: localBuilder, + requestUniqueKeysRLU: expirable.NewLRU[uuid.UUID, struct{}](requestsRLUSize, nil, requestsRLUTTL), } maxRequestBodySizeBytes := DefaultMaxRequestBodySizeBytes if config.MaxRequestBodySizeBytes != 0 { @@ -132,7 +132,7 @@ func NewNewProxy(config NewProxyConfig) (*Proxy, error) { queue: shareQeueuCh, updatePeers: updatePeersCh, localBuilder: prx.localBuilder, - singer: prx.OrderflowSigner, + signer: prx.OrderflowSigner, } go queue.Run() @@ -172,7 +172,7 @@ func NewNewProxy(config NewProxyConfig) (*Proxy, error) { return prx, nil } -func (prx *Proxy) Stop() { +func (prx *ReceiverProxy) Stop() { close(prx.shareQueue) close(prx.updatePeers) close(prx.archiveQueue) @@ -180,14 +180,14 @@ func (prx *Proxy) Stop() { close(prx.peerUpdaterClose) } -func (prx *Proxy) TLSConfig() *tls.Config { +func (prx *ReceiverProxy) TLSConfig() *tls.Config { return &tls.Config{ Certificates: []tls.Certificate{prx.Certificate}, MinVersion: tls.VersionTLS13, } } -func (prx *Proxy) RegisterSecrets() error { +func (prx *ReceiverProxy) RegisterSecrets() error { const maxRetries = 10 const timeBetweenRetries = time.Second * 10 @@ -212,7 +212,7 @@ func (prx *Proxy) RegisterSecrets() error { } // RequestNewPeers updates currently available peers from the builder config hub -func (prx *Proxy) RequestNewPeers() error { +func (prx *ReceiverProxy) RequestNewPeers() error { builders, err := prx.ConfigHub.Builders() if err != nil { return err @@ -230,6 +230,6 @@ func (prx *Proxy) RequestNewPeers() error { } // FlushArchiveQueue forces the archive queue to flush -func (prx *Proxy) FlushArchiveQueue() { +func (prx *ReceiverProxy) FlushArchiveQueue() { prx.archiveFlushQueue <- struct{}{} } diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 2b376e3..2b93c9a 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -45,7 +45,7 @@ func ServeHTTPRequestToChan(channel chan *RequestData) *httptest.Server { } type OrderflowProxyTestSetup struct { - proxy *Proxy + proxy *ReceiverProxy publicServer *http.Server localServer *http.Server certServer *httptest.Server @@ -170,10 +170,10 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func createProxy(localBuilder, name string) *Proxy { +func createProxy(localBuilder, name string) *ReceiverProxy { log := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) - proxy, err := NewNewProxy(NewProxyConfig{ - NewProxyConstantConfig: NewProxyConstantConfig{ + proxy, err := NewReceiverProxy(ReceiverProxyConfig{ + ReceiverProxyConstantConfig: ReceiverProxyConstantConfig{ Log: log, Name: name, }, diff --git a/proxy/sender_proxy.go b/proxy/sender_proxy.go new file mode 100644 index 0000000..c9dc7bb --- /dev/null +++ b/proxy/sender_proxy.go @@ -0,0 +1,179 @@ +package proxy + +import ( + "context" + "log/slog" + "net/http" + "time" + + "github.com/flashbots/go-utils/rpcserver" + "github.com/flashbots/go-utils/rpctypes" + "github.com/flashbots/go-utils/signature" +) + +type SenderProxyConstantConfig struct { + Log *slog.Logger + OrderflowSigner *signature.Signer +} + +type SenderProxyConfig struct { + SenderProxyConstantConfig + BuilderConfigHubEndpoint string + MaxRequestBodySizeBytes int64 +} + +type SenderProxy struct { + SenderProxyConstantConfig + ConfigHub *BuilderConfigHub + Handler http.Handler + + updatePeers chan []ConfighubBuilder + shareQueue chan *ParsedRequest + + peerUpdaterClose chan struct{} +} + +func NewSenderProxy(config SenderProxyConfig) (*SenderProxy, error) { + maxRequestBodySizeBytes := DefaultMaxRequestBodySizeBytes + if config.MaxRequestBodySizeBytes != 0 { + maxRequestBodySizeBytes = config.MaxRequestBodySizeBytes + } + + prx := &SenderProxy{ + SenderProxyConstantConfig: config.SenderProxyConstantConfig, + ConfigHub: NewBuilderConfigHub(config.Log, config.BuilderConfigHubEndpoint), + Handler: nil, + updatePeers: make(chan []ConfighubBuilder), + shareQueue: make(chan *ParsedRequest), + peerUpdaterClose: make(chan struct{}), + } + + handler, err := rpcserver.NewJSONRPCHandler(rpcserver.Methods{ + EthSendBundleMethod: prx.EthSendBundle, + MevSendBundleMethod: prx.MevSendBundle, + EthCancelBundleMethod: prx.EthCancelBundle, + EthSendRawTransactionMethod: prx.EthSendRawTransaction, + BidSubsidiseBlockMethod: prx.BidSubsidiseBlock, + }, + rpcserver.JSONRPCHandlerOpts{ + Log: prx.Log, + MaxRequestBodySizeBytes: maxRequestBodySizeBytes, + }, + ) + if err != nil { + return nil, err + } + prx.Handler = handler + + queue := ShareQueue{ + log: prx.Log, + queue: prx.shareQueue, + updatePeers: prx.updatePeers, + localBuilder: nil, + signer: prx.OrderflowSigner, + } + go queue.Run() + + go func() { + for { + select { + case _, more := <-prx.peerUpdaterClose: + if !more { + return + } + case <-time.After(peerUpdateTime): + builders, err := prx.ConfigHub.Builders() + if err != nil { + prx.Log.Error("Failed to update peers", slog.Any("error", err)) + } + + select { + case prx.updatePeers <- builders: + default: + } + } + } + }() + return prx, nil +} + +func (prx *SenderProxy) Stop() { + close(prx.shareQueue) + close(prx.updatePeers) + close(prx.peerUpdaterClose) +} + +func (prx *SenderProxy) EthSendBundle(ctx context.Context, ethSendBundle rpctypes.EthSendBundleArgs) error { + parsedRequest := ParsedRequest{ + publicEndpoint: true, + ethSendBundle: ðSendBundle, + method: EthSendBundleMethod, + } + + err := ValidateEthSendBundle(ðSendBundle, true) + if err != nil { + return err + } + + return prx.HandleParsedRequest(ctx, parsedRequest) +} + +func (prx *SenderProxy) MevSendBundle(ctx context.Context, mevSendBundle rpctypes.MevSendBundleArgs) error { + parsedRequest := ParsedRequest{ + publicEndpoint: true, + mevSendBundle: &mevSendBundle, + method: MevSendBundleMethod, + } + + err := ValidateMevSendBundle(&mevSendBundle, true) + if err != nil { + return err + } + + return prx.HandleParsedRequest(ctx, parsedRequest) +} + +func (prx *SenderProxy) EthCancelBundle(ctx context.Context, ethCancelBundle rpctypes.EthCancelBundleArgs) error { + parsedRequest := ParsedRequest{ + publicEndpoint: true, + ethCancelBundle: ðCancelBundle, + method: EthCancelBundleMethod, + } + + err := ValidateEthCancelBundle(ðCancelBundle, true) + if err != nil { + return err + } + + return prx.HandleParsedRequest(ctx, parsedRequest) +} + +func (prx *SenderProxy) EthSendRawTransaction(ctx context.Context, ethSendRawTransaction rpctypes.EthSendRawTransactionArgs) error { + parsedRequest := ParsedRequest{ + publicEndpoint: true, + ethSendRawTransaction: ðSendRawTransaction, + method: EthSendRawTransactionMethod, + } + return prx.HandleParsedRequest(ctx, parsedRequest) +} + +func (prx *SenderProxy) BidSubsidiseBlock(ctx context.Context, bidSubsidiseBlock rpctypes.BidSubsisideBlockArgs) error { + parsedRequest := ParsedRequest{ + publicEndpoint: true, + bidSubsidiseBlock: &bidSubsidiseBlock, + method: BidSubsidiseBlockMethod, + } + + return prx.HandleParsedRequest(ctx, parsedRequest) +} + +func (prx *SenderProxy) HandleParsedRequest(ctx context.Context, parsedRequest ParsedRequest) error { + parsedRequest.receivedAt = apiNow() + prx.Log.Info("Received request", slog.String("method", parsedRequest.method)) + + select { + case <-ctx.Done(): + case prx.shareQueue <- &parsedRequest: + } + return nil +} diff --git a/proxy/servers.go b/proxy/servers.go index da229ab..3692178 100644 --- a/proxy/servers.go +++ b/proxy/servers.go @@ -12,14 +12,14 @@ var ( HTTPDefaultWriteTimeout = 30 * time.Second ) -type OrderflowProxyServers struct { - proxy *Proxy +type ReceiverProxyServers struct { + proxy *ReceiverProxy publicServer *http.Server localServer *http.Server certServer *http.Server } -func StartServers(proxy *Proxy, publicListenAddress, localListenAddress, certListenAddress string) (*OrderflowProxyServers, error) { +func StartReceiverServers(proxy *ReceiverProxy, publicListenAddress, localListenAddress, certListenAddress string) (*ReceiverProxyServers, error) { publicServer := &http.Server{ Addr: publicListenAddress, Handler: proxy.PublicHandler, @@ -78,7 +78,7 @@ func StartServers(proxy *Proxy, publicListenAddress, localListenAddress, certLis } }() - return &OrderflowProxyServers{ + return &ReceiverProxyServers{ proxy: proxy, publicServer: publicServer, localServer: localServer, @@ -86,9 +86,58 @@ func StartServers(proxy *Proxy, publicListenAddress, localListenAddress, certLis }, nil } -func (s *OrderflowProxyServers) Stop() { +func (s *ReceiverProxyServers) Stop() { _ = s.publicServer.Close() _ = s.localServer.Close() _ = s.certServer.Close() s.proxy.Stop() } + +type SenderProxyServers struct { + proxy *SenderProxy + server *http.Server +} + +func StartSenderServers(proxy *SenderProxy, listenAddress string) (*SenderProxyServers, error) { + server := &http.Server{ + Addr: listenAddress, + Handler: proxy.Handler, + ReadTimeout: HTTPDefaultReadTimeout, + WriteTimeout: HTTPDefaultWriteTimeout, + } + + errCh := make(chan error) + + go func() { + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + err = errors.Join(errors.New("HTTP server failed"), err) + errCh <- err + } + }() + + select { + case err := <-errCh: + return nil, err + case <-time.After(time.Millisecond * 100): + } + + go func() { + for { + err, more := <-errCh + if !more { + return + } + proxy.Log.Error("Error in HTTP server", slog.Any("error", err)) + } + }() + + return &SenderProxyServers{ + proxy: proxy, + server: server, + }, nil +} + +func (s *SenderProxyServers) Stop() { + _ = s.server.Close() + s.proxy.Stop() +} diff --git a/proxy/sharing.go b/proxy/sharing.go index eec9926..0ebb6da 100644 --- a/proxy/sharing.go +++ b/proxy/sharing.go @@ -22,7 +22,7 @@ type ShareQueue struct { queue chan *ParsedRequest updatePeers chan []ConfighubBuilder localBuilder rpcclient.RPCClient - singer *signature.Signer + signer *signature.Signer } type shareQueuePeer struct { @@ -54,11 +54,15 @@ func (p *shareQueuePeer) SendRequest(log *slog.Logger, request *ParsedRequest) { func (sq *ShareQueue) Run() { var ( - localBuilder = newShareQueuePeer("local-builder", sq.localBuilder) + localBuilder *shareQueuePeer peers []shareQueuePeer ) - defer localBuilder.Close() - go sq.proxyRequests(&localBuilder) + if sq.localBuilder != nil { + builderPeer := newShareQueuePeer("local-builder", sq.localBuilder) + localBuilder = &builderPeer + go sq.proxyRequests(localBuilder) + defer localBuilder.Close() + } for { select { case req, more := <-sq.queue: @@ -66,7 +70,9 @@ func (sq *ShareQueue) Run() { if !more { return } - localBuilder.SendRequest(sq.log, req) + if localBuilder != nil { + localBuilder.SendRequest(sq.log, req) + } if !req.publicEndpoint { for _, peer := range peers { peer.SendRequest(sq.log, req) @@ -85,7 +91,7 @@ func (sq *ShareQueue) Run() { if info.Name == sq.name { continue } - client, err := RPCClientWithCertAndSigner(OrderflowProxyURLFromIP(info.IP), []byte(info.OrderflowProxy.TLSCert), sq.singer) + client, err := RPCClientWithCertAndSigner(OrderflowProxyURLFromIP(info.IP), []byte(info.OrderflowProxy.TLSCert), sq.signer) if err != nil { sq.log.Error("Failed to create a peer client", slog.Any("error", err)) shareQueueInternalErrors.Inc() diff --git a/receiver.dockerfile b/receiver.dockerfile new file mode 100644 index 0000000..5fb0bac --- /dev/null +++ b/receiver.dockerfile @@ -0,0 +1,23 @@ +# syntax=docker/dockerfile:1 +FROM golang:1.23 AS builder +ARG VERSION +WORKDIR /build +ADD go.mod /build/ +RUN --mount=type=cache,target=/root/.cache/go-build CGO_ENABLED=0 GOOS=linux \ + go mod download +ADD . /build/ +RUN --mount=type=cache,target=/root/.cache/go-build CGO_ENABLED=0 GOOS=linux \ + go build \ + -trimpath \ + -ldflags "-s -X main.version=${VERSION}" \ + -v \ + -o receiver-proxy \ + cmd/receiver-proxy/main.go + +FROM alpine:latest +WORKDIR /app +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /build/receiver-proxy /app/receiver-proxy +ENV LISTEN_ADDR=":8080" +EXPOSE 8080 +CMD ["/app/receiver-proxy"] diff --git a/httpserver.dockerfile b/sender.dockerfile similarity index 79% rename from httpserver.dockerfile rename to sender.dockerfile index 83184ce..4c4cdc0 100644 --- a/httpserver.dockerfile +++ b/sender.dockerfile @@ -11,13 +11,13 @@ RUN --mount=type=cache,target=/root/.cache/go-build CGO_ENABLED=0 GOOS=linux \ -trimpath \ -ldflags "-s -X main.version=${VERSION}" \ -v \ - -o your-project \ - cmd/httpserver/main.go + -o sender-proxy \ + cmd/sender-proxy/main.go FROM alpine:latest WORKDIR /app COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ -COPY --from=builder /build/your-project /app/your-project +COPY --from=builder /build/sender-proxy /app/sender-proxy ENV LISTEN_ADDR=":8080" EXPOSE 8080 -CMD ["/app/your-project"] +CMD ["/app/sender-proxy"]