Skip to content

Commit

Permalink
archive test
Browse files Browse the repository at this point in the history
  • Loading branch information
dvush committed Oct 29, 2024
1 parent f78fadb commit b576bc9
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 6 deletions.
11 changes: 7 additions & 4 deletions proxy/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ const (
)

var (
errUnknownPeer = errors.New("unknown peers can't send to the public address")

errUnknownPeer = errors.New("unknown peers can't send to the public address")
errSubsidyWrongEndpoint = errors.New("subsidy can only be called on public method")
errSubsidyWrongCaller = errors.New("subsidy can only be called by Flashbots")

apiNow = time.Now
)

func (prx *NewProxy) PublicJSONRPCHandler() (*rpcserver.JSONRPCHandler, error) {
Expand Down Expand Up @@ -124,7 +125,9 @@ func (prx *NewProxy) MevSendBundle(ctx context.Context, mevSendBundle rpctypes.M
return errUnknownPeer
}
} else {
mevSendBundle.Metadata.Signer = &signer
mevSendBundle.Metadata = &rpctypes.MevBundleMetadata{
Signer: &signer,
}
}
parsedRequest := ParsedRequest{
publicEndpoint: publicEndpoint,
Expand Down Expand Up @@ -236,7 +239,7 @@ type ParsedRequest struct {
}

func (prx *NewProxy) HandleParsedRequest(ctx context.Context, parsedRequest ParsedRequest) error {
parsedRequest.receivedAt = time.Now()
parsedRequest.receivedAt = apiNow()
select {
case <-ctx.Done():
case prx.shareQueue <- &parsedRequest:
Expand Down
3 changes: 2 additions & 1 deletion proxy/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func (aq *ArchiveQueue) Run() {

if needFlush {
aq.flush(pendingBatch)
pendingBatch = nil
}

forceFlush = false
Expand Down Expand Up @@ -177,7 +178,7 @@ func (aq *ArchiveQueue) flush(batch []*ParsedRequest) {
if err != nil {
aq.log.Error("Error while making RPC request to archive", slog.Any("error", err))
}
if res.Error != nil {
if res != nil && res.Error != nil {
aq.log.Error("Archive returned error", slog.Any("error", res.Error))
}
}
5 changes: 5 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,8 @@ func (prx *NewProxy) RequestNewPeers() error {
}
return nil
}

// FlushArchiveQueue forces the archive queue to flush
func (prx *NewProxy) FlushArchiveQueue() {
prx.archiveFlushQueue <- struct{}{}
}
63 changes: 62 additions & 1 deletion proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func StartTestOrderflowProxy(name string) (*OrderflowProxyTestSetup, error) {
}

func TestMain(m *testing.M) {
archiveServerRequests = make(chan *RequestData, 1)
archiveServerRequests = make(chan *RequestData)
builderHub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
defer r.Body.Close()
Expand Down Expand Up @@ -180,7 +180,9 @@ func createProxy(localBuilder, name string) *NewProxy {
CertValidDuration: time.Hour * 24,
CertHosts: []string{"localhost", "127.0.0.1"},
BuilderConfigHubEndpoint: builderHub.URL,
ArchiveEndpoint: archiveServer.URL,
LocalBuilderEndpoint: localBuilder,
EthRPC: "eth-rpc-not-set",
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -232,7 +234,25 @@ func proxiesUpdatePeers(t *testing.T) {
time.Sleep(time.Millisecond * 50)
}

func proxiesFlushQueue() {
for _, instance := range proxies {
instance.proxy.FlushArchiveQueue()
}
}

func TestProxyBundleRequestWithPeerUpdate(t *testing.T) {
defer func() {
proxiesFlushQueue()
for {
select {
case <-time.After(time.Millisecond * 100):
expectNoRequest(t, archiveServerRequests)
return
case <-archiveServerRequests:
}
}
}()

signer, err := signature.NewSignerFromHexPrivateKey("0xd63b3c447fdea415a05e4c0b859474d14105a88178efdf350bc9f7b05be3cc58")
require.NoError(t, err)
client, err := RPCClientWithCertAndSigner(proxies[0].localServerEndpoint, proxies[0].proxy.PublicCertPEM, signer)
Expand Down Expand Up @@ -294,3 +314,44 @@ func TestProxyBundleRequestWithPeerUpdate(t *testing.T) {
builderRequest = expectRequest(t, proxies[2].localBuilderRequests)
require.Equal(t, expectedRequest, builderRequest.body)
}

func TestProxySendToArchive(t *testing.T) {
signer, err := signature.NewSignerFromHexPrivateKey("0xd63b3c447fdea415a05e4c0b859474d14105a88178efdf350bc9f7b05be3cc58")
require.NoError(t, err)
client, err := RPCClientWithCertAndSigner(proxies[0].localServerEndpoint, proxies[0].proxy.PublicCertPEM, signer)
require.NoError(t, err)

// we start with no peers
builderHubPeers = nil
err = proxies[0].proxy.RegisterSecrets()
require.NoError(t, err)
proxiesUpdatePeers(t)

apiNow = func() time.Time {
return time.Unix(1730000000, 0)
}
defer func() {
apiNow = time.Now
}()

resp, err := client.Call(context.Background(), EthSendBundleMethod, &rpctypes.EthSendBundleArgs{
BlockNumber: 123,
})
require.NoError(t, err)
require.Nil(t, resp.Error)
_ = expectRequest(t, proxies[0].localBuilderRequests)

resp, err = client.Call(context.Background(), EthSendBundleMethod, &rpctypes.EthSendBundleArgs{
BlockNumber: 456,
})
require.NoError(t, err)
require.Nil(t, resp.Error)

_ = expectRequest(t, proxies[0].localBuilderRequests)

proxiesFlushQueue()
archiveRequest := expectRequest(t, archiveServerRequests)

expectedArchiveRequest := `{"method":"flashbots_newOrderEvents","params":[{"orderEvents":[{"eth_sendBundle":{"params":{"txs":null,"blockNumber":"0x7b","signingAddress":"0x9349365494be4f6205e5d44bdc7ec7dcd134becf"},"metadata":{"receivedAt":1730000000000}}},{"eth_sendBundle":{"params":{"txs":null,"blockNumber":"0x1c8","signingAddress":"0x9349365494be4f6205e5d44bdc7ec7dcd134becf"},"metadata":{"receivedAt":1730000000000}}}]}],"id":0,"jsonrpc":"2.0"}`
require.Equal(t, expectedArchiveRequest, archiveRequest.body)
}

0 comments on commit b576bc9

Please sign in to comment.