diff --git a/app/common_test.go b/app/common_test.go index 74adfa4e0..089857d28 100644 --- a/app/common_test.go +++ b/app/common_test.go @@ -103,6 +103,7 @@ func NewInMemoryTendermintNodeAminoWithValidators(t *testing.T, genesisState []b panic(err) } pocketTypes.CleanPocketNodes() + pocketTypes.StopEvidenceWorker() PCA = nil inMemKB = nil err := inMemDB.Close() @@ -168,6 +169,7 @@ func NewInMemoryTendermintNodeProtoWithValidators(t *testing.T, genesisState []b } pocketTypes.CleanPocketNodes() + pocketTypes.StopEvidenceWorker() PCA = nil inMemKB = nil diff --git a/app/config.go b/app/config.go index cf5afa06a..34c718acc 100644 --- a/app/config.go +++ b/app/config.go @@ -471,6 +471,7 @@ func InitPocketCoreConfig(chains *types.HostedBlockchains, logger log.Logger) { } func ShutdownPocketCore() { + types.StopEvidenceWorker() types.FlushSessionCache() types.StopServiceMetrics() } diff --git a/app/query_test.go b/app/query_test.go index 2a20c1986..296e10d0a 100644 --- a/app/query_test.go +++ b/app/query_test.go @@ -791,6 +791,7 @@ func TestQueryRelay(t *testing.T) { _, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock) select { case <-evtChan: + assert.Equal(t, uint64(1), types.GlobalEvidenceWorker.SuccessfulTasks()) inv, err := types.GetEvidence(types.SessionHeader{ ApplicationPubKey: aat.ApplicationPublicKey, Chain: relay.Proof.Blockchain, @@ -858,6 +859,8 @@ func TestQueryRelayMultipleNodes(t *testing.T) { } _, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock) <-evtChan // Wait for block + chain := sdk.PlaceholderHash + sessionBlockHeight := int64(1) // setup relay for _, v := range validators { relay := types.Relay{ @@ -865,9 +868,9 @@ func TestQueryRelayMultipleNodes(t *testing.T) { Meta: types.RelayMeta{BlockHeight: 5}, // todo race condition here Proof: types.RelayProof{ Entropy: 32598345349034509, - SessionBlockHeight: 1, + SessionBlockHeight: sessionBlockHeight, ServicerPubKey: v.PublicKey().RawString(), - Blockchain: sdk.PlaceholderHash, + Blockchain: chain, Token: aat, Signature: "", }, @@ -886,23 +889,32 @@ func TestQueryRelayMultipleNodes(t *testing.T) { BodyString(expectedRequest). Reply(200). BodyString(expectedResponse) + } - validatorAddress := sdk.GetAddress(v.PublicKey()) - node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress) + select { + case <-evtChan: + // verify that each store task was successful + assert.Equal(t, uint64(len(validators)), types.GlobalEvidenceWorker.SuccessfulTasks()) + // check the evidence store of each node. + for _, v := range validators { + validatorAddress := sdk.GetAddress(v.PublicKey()) + _node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress) + + assert.Nil(t, nodeErr) + inv, err := types.GetEvidence(types.SessionHeader{ + ApplicationPubKey: aat.ApplicationPublicKey, + Chain: chain, + SessionBlockHeight: sessionBlockHeight, + }, types.RelayEvidence, sdk.NewInt(10000), _node.EvidenceStore) + assert.Nil(t, err) + assert.NotNil(t, inv) + assert.Equal(t, int64(1), inv.NumOfProofs) + } - assert.Nil(t, nodeErr) - inv, err := types.GetEvidence(types.SessionHeader{ - ApplicationPubKey: aat.ApplicationPublicKey, - Chain: relay.Proof.Blockchain, - SessionBlockHeight: relay.Proof.SessionBlockHeight, - }, types.RelayEvidence, sdk.NewInt(10000), node.EvidenceStore) - assert.Nil(t, err) - assert.NotNil(t, inv) - assert.Equal(t, inv.NumOfProofs, int64(1)) + cleanup() + stopCli() + gock.Off() } - cleanup() - stopCli() - gock.Off() return }) } diff --git a/go.mod b/go.mod index 892c2b5d2..4b2ea8d50 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/pokt-network/pocket-core go 1.17 require ( + github.com/alitto/pond v1.8.1 github.com/go-kit/kit v0.12.0 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 diff --git a/go.sum b/go.sum index 91a3f6a6a..5a8f9f24a 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/alitto/pond v1.8.1 h1:GzrU4ZERX0JDNMmAY2k5y1Wqgmul77nt3bsDgvwVgO4= +github.com/alitto/pond v1.8.1/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -157,8 +159,6 @@ github.com/facebookgo/subset v0.0.0-20150612182917-8dac2c3c4870/go.mod h1:5tD+ne github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= -github.com/fiagdao/tendermint v0.32.11-0.20220824195748-2087fcc480c1 h1:L1dbcsGeC1syYCTrhpsQ/GJrysPhZczZf9BF/vVJyPg= -github.com/fiagdao/tendermint v0.32.11-0.20220824195748-2087fcc480c1/go.mod h1:9Q8P/AXsxZ2BxD4irxhueg4HxHKSVK2r9jTUDpqDWjI= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= diff --git a/types/config.go b/types/config.go index 885830200..ae510515f 100644 --- a/types/config.go +++ b/types/config.go @@ -34,6 +34,9 @@ type PocketConfig struct { ValidatorCacheSize int64 `json:"validator_cache_size"` ApplicationCacheSize int64 `json:"application_cache_size"` RPCTimeout int64 `json:"rpc_timeout"` + RPCMaxIdleConns int `json:"rpc_max_idle_conns"` + RPCMaxConnsPerHost int `json:"rpc_max_conns_per_host"` + RPCMaxIdleConnsPerHost int `json:"rpc_max_idle_conns_per_host"` PrometheusAddr string `json:"pocket_prometheus_port"` PrometheusMaxOpenfiles int `json:"prometheus_max_open_files"` MaxClaimAgeForProofRetry int `json:"max_claim_age_for_proof_retry"` @@ -100,6 +103,9 @@ const ( DefaultPocketPrometheusListenAddr = "8083" DefaultPrometheusMaxOpenFile = 3 DefaultRPCTimeout = 30000 + DefaultRPCMaxIdleConns = 1000 + DefaultRPCMaxConnsPerHost = 1000 + DefaultRPCMaxIdleConnsPerHost = 1000 DefaultMaxClaimProofRetryAge = 32 DefaultProofPrevalidation = false DefaultCtxCacheSize = 20 @@ -133,6 +139,9 @@ func DefaultConfig(dataDir string) Config { ValidatorCacheSize: DefaultValidatorCacheSize, ApplicationCacheSize: DefaultApplicationCacheSize, RPCTimeout: DefaultRPCTimeout, + RPCMaxIdleConns: DefaultRPCMaxIdleConns, + RPCMaxConnsPerHost: DefaultRPCMaxConnsPerHost, + RPCMaxIdleConnsPerHost: DefaultRPCMaxIdleConnsPerHost, PrometheusAddr: DefaultPocketPrometheusListenAddr, PrometheusMaxOpenfiles: DefaultPrometheusMaxOpenFile, MaxClaimAgeForProofRetry: DefaultMaxClaimProofRetryAge, diff --git a/x/pocketcore/keeper/service.go b/x/pocketcore/keeper/service.go index f2d437df8..dbbc76cbe 100644 --- a/x/pocketcore/keeper/service.go +++ b/x/pocketcore/keeper/service.go @@ -61,8 +61,12 @@ func (k Keeper) HandleRelay(ctx sdk.Ctx, relay pc.Relay) (*pc.RelayResponse, sdk } return nil, err } - // store the proof before execution, because the proof corresponds to the previous relay - relay.Proof.Store(maxPossibleRelays, node.EvidenceStore) + // move this to a worker that will insert this proof in a series style to avoid memory consumption and relay proof race conditions + // https://github.com/pokt-network/pocket-core/issues/1457 + pc.GlobalEvidenceWorker.Submit(func() { + // store the proof before execution, because the proof corresponds to the previous relay + relay.Proof.Store(maxPossibleRelays, node.EvidenceStore) + }) // attempt to execute respPayload, err := relay.Execute(hostedBlockchains, &nodeAddress) if err != nil { diff --git a/x/pocketcore/keeper/service_test.go b/x/pocketcore/keeper/service_test.go index 4b24bd51d..6e6422716 100644 --- a/x/pocketcore/keeper/service_test.go +++ b/x/pocketcore/keeper/service_test.go @@ -66,8 +66,10 @@ func TestKeeper_HandleRelay(t *testing.T) { t.Fatalf(er.Error()) } validRelay.Proof.Signature = hex.EncodeToString(clientSig) + httpClient := types.GetChainsClient() defer gock.Off() // Flush pending mocks after test execution - + defer gock.RestoreClient(httpClient) + gock.InterceptClient(httpClient) gock.New("https://www.google.com:443"). Post("/"). Reply(200). diff --git a/x/pocketcore/types/config.go b/x/pocketcore/types/config.go index 782400a45..fa2867f07 100644 --- a/x/pocketcore/types/config.go +++ b/x/pocketcore/types/config.go @@ -3,6 +3,7 @@ package types import ( "encoding/hex" "fmt" + "github.com/alitto/pond" "github.com/pokt-network/pocket-core/crypto" "github.com/pokt-network/pocket-core/types" "github.com/tendermint/tendermint/config" @@ -20,13 +21,16 @@ var ( globalRPCTimeout time.Duration GlobalPocketConfig types.PocketConfig GlobalTenderMintConfig config.Config + GlobalEvidenceWorker *pond.WorkerPool ) func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) { ConfigOnce.Do(func() { InitGlobalServiceMetric(chains, logger, c.PocketConfig.PrometheusAddr, c.PocketConfig.PrometheusMaxOpenfiles) }) + InitHttpClient(c.PocketConfig.RPCMaxIdleConns, c.PocketConfig.RPCMaxConnsPerHost, c.PocketConfig.RPCMaxIdleConnsPerHost) InitPocketNodeCaches(c, logger) + InitEvidenceWorker(c, logger) GlobalPocketConfig = c.PocketConfig GlobalTenderMintConfig = c.TendermintConfig if GlobalPocketConfig.LeanPocket { @@ -37,6 +41,18 @@ func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) { SetRPCTimeout(c.PocketConfig.RPCTimeout) } +func InitEvidenceWorker(_ types.Config, logger log.Logger) { + panicHandler := func(p interface{}) { + logger.Error(fmt.Sprintf("evidence storage task panicked: %v", p)) + } + GlobalEvidenceWorker = pond.New( + 1, 0, + pond.IdleTimeout(100), + pond.PanicHandler(panicHandler), + pond.Strategy(pond.Balanced()), + ) +} + func ConvertEvidenceToProto(config types.Config) error { // we have to add a random pocket node so that way lean pokt can still support getting the global evidence cache node := AddPocketNode(crypto.GenerateEd25519PrivKey().GenPrivateKey(), log.NewNopLogger()) @@ -67,6 +83,13 @@ func ConvertEvidenceToProto(config types.Config) error { return nil } +func StopEvidenceWorker() { + if !GlobalEvidenceWorker.Stopped() { + GlobalEvidenceWorker.StopAndWait() + } + GlobalEvidenceWorker = nil +} + func FlushSessionCache() { for _, k := range GlobalPocketNodes { if k.SessionStore != nil { diff --git a/x/pocketcore/types/service.go b/x/pocketcore/types/service.go index c37b6232e..ce8e52966 100644 --- a/x/pocketcore/types/service.go +++ b/x/pocketcore/types/service.go @@ -17,6 +17,10 @@ import ( const DEFAULTHTTPMETHOD = "POST" +var ( + chainHttpClient *http.Client +) + // "Relay" - A read / write API request from a hosted (non native) external blockchain type Relay struct { Payload Payload `json:"payload"` // the data payload of the request @@ -24,6 +28,36 @@ type Relay struct { Proof RelayProof `json:"proof"` // the authentication scheme needed for work } +func GetChainsClient() *http.Client { + if chainHttpClient == nil { + InitHttpClient(1000, 1000, 1000) + } + + return chainHttpClient +} + +func InitHttpClient(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int) { + var chainsTransport *http.Transport + + t, ok := http.DefaultTransport.(*http.Transport) + if ok { + chainsTransport = t.Clone() + // this params may could be handled by config.json, but how much people know about this? + // tbd: figure out the right values to this, rn the priority is stop recreating new http client and connections. + chainsTransport.MaxIdleConns = maxIdleConns + chainsTransport.MaxConnsPerHost = maxConnsPerHost + chainsTransport.MaxIdleConnsPerHost = maxIdleConnsPerHost + } // if not ok, probably is because is *gock.Transport - test only + + chainHttpClient = &http.Client{ + Timeout: globalRPCTimeout * time.Second, + } + + if chainsTransport != nil { + chainHttpClient.Transport = chainsTransport + } +} + // "Validate" - Checks the validity of a relay request using store data func (r *Relay) Validate(ctx sdk.Ctx, posKeeper PosKeeper, appsKeeper AppsKeeper, pocketKeeper PocketKeeper, hb *HostedBlockchains, sessionBlockHeight int64, node *PocketNode) (maxPossibleRelays sdk.BigInt, err sdk.Error) { // validate payload @@ -316,7 +350,7 @@ func executeHTTPRequest(payload, url, userAgent string, basicAuth BasicAuth, met } } // execute the request - resp, err := (&http.Client{Timeout: globalRPCTimeout * time.Millisecond}).Do(req) + resp, err := GetChainsClient().Do(req) if err != nil { return "", err } diff --git a/x/pocketcore/types/service_test.go b/x/pocketcore/types/service_test.go index 8d1f0814f..f3d7a1484 100644 --- a/x/pocketcore/types/service_test.go +++ b/x/pocketcore/types/service_test.go @@ -172,8 +172,10 @@ func TestRelay_Execute(t *testing.T) { }, } validRelay.Proof.RequestHash = validRelay.RequestHashString() + httpClient := GetChainsClient() defer gock.Off() // Flush pending mocks after test execution - + defer gock.RestoreClient(httpClient) + gock.InterceptClient(httpClient) gock.New("https://server.com"). Post("/relay"). Reply(200). @@ -185,6 +187,7 @@ func TestRelay_Execute(t *testing.T) { URL: "https://server.com/relay/", }}, } + response, err := validRelay.Execute(&hb, &nodeAddr) assert.True(t, err == nil) assert.Equal(t, response, "bar")