diff --git a/proxy/api.go b/proxy/api.go index 55e983d..6eacbd3 100644 --- a/proxy/api.go +++ b/proxy/api.go @@ -23,10 +23,11 @@ const ( ) var ( - errUnknownPeer = errors.New("unknown peers can't send to the public address") - + errUnknownPeer = errors.New("unknown peers can't send to the public address") errSubsidyWrongEndpoint = errors.New("subsidy can only be called on public method") errSubsidyWrongCaller = errors.New("subsidy can only be called by Flashbots") + + apiNow = time.Now ) func (prx *NewProxy) PublicJSONRPCHandler() (*rpcserver.JSONRPCHandler, error) { @@ -124,7 +125,9 @@ func (prx *NewProxy) MevSendBundle(ctx context.Context, mevSendBundle rpctypes.M return errUnknownPeer } } else { - mevSendBundle.Metadata.Signer = &signer + mevSendBundle.Metadata = &rpctypes.MevBundleMetadata{ + Signer: &signer, + } } parsedRequest := ParsedRequest{ publicEndpoint: publicEndpoint, @@ -236,7 +239,7 @@ type ParsedRequest struct { } func (prx *NewProxy) HandleParsedRequest(ctx context.Context, parsedRequest ParsedRequest) error { - parsedRequest.receivedAt = time.Now() + parsedRequest.receivedAt = apiNow() select { case <-ctx.Done(): case prx.shareQueue <- &parsedRequest: diff --git a/proxy/archive.go b/proxy/archive.go index de96955..8590c7b 100644 --- a/proxy/archive.go +++ b/proxy/archive.go @@ -46,6 +46,7 @@ func (aq *ArchiveQueue) Run() { if needFlush { aq.flush(pendingBatch) + pendingBatch = nil } forceFlush = false @@ -177,7 +178,7 @@ func (aq *ArchiveQueue) flush(batch []*ParsedRequest) { if err != nil { aq.log.Error("Error while making RPC request to archive", slog.Any("error", err)) } - if res.Error != nil { + if res != nil && res.Error != nil { aq.log.Error("Archive returned error", slog.Any("error", res.Error)) } } diff --git a/proxy/proxy.go b/proxy/proxy.go index bdf101b..31a91a6 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -173,3 +173,8 @@ func (prx *NewProxy) RequestNewPeers() error { } return nil } + +// FlushArchiveQueue forces the archive queue to flush +func (prx *NewProxy) FlushArchiveQueue() { + prx.archiveFlushQueue <- struct{}{} +} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 8972b63..3e4a0fa 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -110,7 +110,7 @@ func StartTestOrderflowProxy(name string) (*OrderflowProxyTestSetup, error) { } func TestMain(m *testing.M) { - archiveServerRequests = make(chan *RequestData, 1) + archiveServerRequests = make(chan *RequestData) builderHub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) defer r.Body.Close() @@ -180,7 +180,9 @@ func createProxy(localBuilder, name string) *NewProxy { CertValidDuration: time.Hour * 24, CertHosts: []string{"localhost", "127.0.0.1"}, BuilderConfigHubEndpoint: builderHub.URL, + ArchiveEndpoint: archiveServer.URL, LocalBuilderEndpoint: localBuilder, + EthRPC: "eth-rpc-not-set", }) if err != nil { panic(err) @@ -232,7 +234,25 @@ func proxiesUpdatePeers(t *testing.T) { time.Sleep(time.Millisecond * 50) } +func proxiesFlushQueue() { + for _, instance := range proxies { + instance.proxy.FlushArchiveQueue() + } +} + func TestProxyBundleRequestWithPeerUpdate(t *testing.T) { + defer func() { + proxiesFlushQueue() + for { + select { + case <-time.After(time.Millisecond * 100): + expectNoRequest(t, archiveServerRequests) + return + case <-archiveServerRequests: + } + } + }() + signer, err := signature.NewSignerFromHexPrivateKey("0xd63b3c447fdea415a05e4c0b859474d14105a88178efdf350bc9f7b05be3cc58") require.NoError(t, err) client, err := RPCClientWithCertAndSigner(proxies[0].localServerEndpoint, proxies[0].proxy.PublicCertPEM, signer) @@ -294,3 +314,44 @@ func TestProxyBundleRequestWithPeerUpdate(t *testing.T) { builderRequest = expectRequest(t, proxies[2].localBuilderRequests) require.Equal(t, expectedRequest, builderRequest.body) } + +func TestProxySendToArchive(t *testing.T) { + signer, err := signature.NewSignerFromHexPrivateKey("0xd63b3c447fdea415a05e4c0b859474d14105a88178efdf350bc9f7b05be3cc58") + require.NoError(t, err) + client, err := RPCClientWithCertAndSigner(proxies[0].localServerEndpoint, proxies[0].proxy.PublicCertPEM, signer) + require.NoError(t, err) + + // we start with no peers + builderHubPeers = nil + err = proxies[0].proxy.RegisterSecrets() + require.NoError(t, err) + proxiesUpdatePeers(t) + + apiNow = func() time.Time { + return time.Unix(1730000000, 0) + } + defer func() { + apiNow = time.Now + }() + + resp, err := client.Call(context.Background(), EthSendBundleMethod, &rpctypes.EthSendBundleArgs{ + BlockNumber: 123, + }) + require.NoError(t, err) + require.Nil(t, resp.Error) + _ = expectRequest(t, proxies[0].localBuilderRequests) + + resp, err = client.Call(context.Background(), EthSendBundleMethod, &rpctypes.EthSendBundleArgs{ + BlockNumber: 456, + }) + require.NoError(t, err) + require.Nil(t, resp.Error) + + _ = expectRequest(t, proxies[0].localBuilderRequests) + + proxiesFlushQueue() + archiveRequest := expectRequest(t, archiveServerRequests) + + expectedArchiveRequest := `{"method":"flashbots_newOrderEvents","params":[{"orderEvents":[{"eth_sendBundle":{"params":{"txs":null,"blockNumber":"0x7b","signingAddress":"0x9349365494be4f6205e5d44bdc7ec7dcd134becf"},"metadata":{"receivedAt":1730000000000}}},{"eth_sendBundle":{"params":{"txs":null,"blockNumber":"0x1c8","signingAddress":"0x9349365494be4f6205e5d44bdc7ec7dcd134becf"},"metadata":{"receivedAt":1730000000000}}}]}],"id":0,"jsonrpc":"2.0"}` + require.Equal(t, expectedArchiveRequest, archiveRequest.body) +}