Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 2 issues and minor enhancement. #1485

Merged
merged 3 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func NewInMemoryTendermintNodeAminoWithValidators(t *testing.T, genesisState []b
panic(err)
}
pocketTypes.CleanPocketNodes()
pocketTypes.StopEvidenceWorker()
PCA = nil
inMemKB = nil
err := inMemDB.Close()
Expand Down Expand Up @@ -168,6 +169,7 @@ func NewInMemoryTendermintNodeProtoWithValidators(t *testing.T, genesisState []b
}

pocketTypes.CleanPocketNodes()
pocketTypes.StopEvidenceWorker()

PCA = nil
inMemKB = nil
Expand Down
1 change: 1 addition & 0 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ func InitPocketCoreConfig(chains *types.HostedBlockchains, logger log.Logger) {
}

func ShutdownPocketCore() {
types.StopEvidenceWorker()
types.FlushSessionCache()
types.StopServiceMetrics()
}
Expand Down
44 changes: 28 additions & 16 deletions app/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ func TestQueryRelay(t *testing.T) {
_, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock)
select {
case <-evtChan:
assert.Equal(t, uint64(1), types.GlobalEvidenceWorker.SuccessfulTasks())
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: relay.Proof.Blockchain,
Expand Down Expand Up @@ -858,16 +859,18 @@ func TestQueryRelayMultipleNodes(t *testing.T) {
}
_, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock)
<-evtChan // Wait for block
chain := sdk.PlaceholderHash
sessionBlockHeight := int64(1)
// setup relay
for _, v := range validators {
relay := types.Relay{
Payload: payload,
Meta: types.RelayMeta{BlockHeight: 5}, // todo race condition here
Proof: types.RelayProof{
Entropy: 32598345349034509,
SessionBlockHeight: 1,
SessionBlockHeight: sessionBlockHeight,
ServicerPubKey: v.PublicKey().RawString(),
Blockchain: sdk.PlaceholderHash,
Blockchain: chain,
Token: aat,
Signature: "",
},
Expand All @@ -886,23 +889,32 @@ func TestQueryRelayMultipleNodes(t *testing.T) {
BodyString(expectedRequest).
Reply(200).
BodyString(expectedResponse)
}

validatorAddress := sdk.GetAddress(v.PublicKey())
node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress)
select {
case <-evtChan:
// verify that each store task was successful
assert.Equal(t, uint64(len(validators)), types.GlobalEvidenceWorker.SuccessfulTasks())
// check the evidence store of each node.
for _, v := range validators {
validatorAddress := sdk.GetAddress(v.PublicKey())
_node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress)

assert.Nil(t, nodeErr)
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: chain,
SessionBlockHeight: sessionBlockHeight,
}, types.RelayEvidence, sdk.NewInt(10000), _node.EvidenceStore)
assert.Nil(t, err)
assert.NotNil(t, inv)
assert.Equal(t, int64(1), inv.NumOfProofs)
}

assert.Nil(t, nodeErr)
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: relay.Proof.Blockchain,
SessionBlockHeight: relay.Proof.SessionBlockHeight,
}, types.RelayEvidence, sdk.NewInt(10000), node.EvidenceStore)
assert.Nil(t, err)
assert.NotNil(t, inv)
assert.Equal(t, inv.NumOfProofs, int64(1))
cleanup()
stopCli()
gock.Off()
}
cleanup()
stopCli()
gock.Off()
return
})
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/pokt-network/pocket-core
go 1.18

require (
github.com/alitto/pond v1.8.1
github.com/go-kit/kit v0.12.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.2
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alitto/pond v1.8.1 h1:GzrU4ZERX0JDNMmAY2k5y1Wqgmul77nt3bsDgvwVgO4=
github.com/alitto/pond v1.8.1/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
9 changes: 9 additions & 0 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type PocketConfig struct {
ValidatorCacheSize int64 `json:"validator_cache_size"`
ApplicationCacheSize int64 `json:"application_cache_size"`
RPCTimeout int64 `json:"rpc_timeout"`
RPCMaxIdleConns int `json:"rpc_max_idle_conns"`
RPCMaxConnsPerHost int `json:"rpc_max_conns_per_host"`
RPCMaxIdleConnsPerHost int `json:"rpc_max_idle_conns_per_host"`
PrometheusAddr string `json:"pocket_prometheus_port"`
PrometheusMaxOpenfiles int `json:"prometheus_max_open_files"`
MaxClaimAgeForProofRetry int `json:"max_claim_age_for_proof_retry"`
Expand Down Expand Up @@ -100,6 +103,9 @@ const (
DefaultPocketPrometheusListenAddr = "8083"
DefaultPrometheusMaxOpenFile = 3
DefaultRPCTimeout = 30000
DefaultRPCMaxIdleConns = 1000
DefaultRPCMaxConnsPerHost = 1000
DefaultRPCMaxIdleConnsPerHost = 1000
DefaultMaxClaimProofRetryAge = 32
DefaultProofPrevalidation = false
DefaultCtxCacheSize = 20
Expand Down Expand Up @@ -133,6 +139,9 @@ func DefaultConfig(dataDir string) Config {
ValidatorCacheSize: DefaultValidatorCacheSize,
ApplicationCacheSize: DefaultApplicationCacheSize,
RPCTimeout: DefaultRPCTimeout,
RPCMaxIdleConns: DefaultRPCMaxIdleConns,
RPCMaxConnsPerHost: DefaultRPCMaxConnsPerHost,
RPCMaxIdleConnsPerHost: DefaultRPCMaxIdleConnsPerHost,
PrometheusAddr: DefaultPocketPrometheusListenAddr,
PrometheusMaxOpenfiles: DefaultPrometheusMaxOpenFile,
MaxClaimAgeForProofRetry: DefaultMaxClaimProofRetryAge,
Expand Down
8 changes: 6 additions & 2 deletions x/pocketcore/keeper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ func (k Keeper) HandleRelay(ctx sdk.Ctx, relay pc.Relay) (*pc.RelayResponse, sdk
}
return nil, err
}
// store the proof before execution, because the proof corresponds to the previous relay
relay.Proof.Store(maxPossibleRelays, node.EvidenceStore)
// move this to a worker that will insert this proof in a series style to avoid memory consumption and relay proof race conditions
// https://github.com/pokt-network/pocket-core/issues/1457
pc.GlobalEvidenceWorker.Submit(func() {
// store the proof before execution, because the proof corresponds to the previous relay
relay.Proof.Store(maxPossibleRelays, node.EvidenceStore)
})
// attempt to execute
respPayload, err := relay.Execute(hostedBlockchains, &nodeAddress)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion x/pocketcore/keeper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ func TestKeeper_HandleRelay(t *testing.T) {
t.Fatalf(er.Error())
}
validRelay.Proof.Signature = hex.EncodeToString(clientSig)
httpClient := types.GetChainsClient()
defer gock.Off() // Flush pending mocks after test execution

defer gock.RestoreClient(httpClient)
gock.InterceptClient(httpClient)
gock.New("https://www.google.com:443").
Post("/").
Reply(200).
Expand Down
22 changes: 22 additions & 0 deletions x/pocketcore/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"encoding/hex"
"fmt"
"github.com/alitto/pond"
"github.com/pokt-network/pocket-core/crypto"
"github.com/pokt-network/pocket-core/types"
"github.com/tendermint/tendermint/config"
Expand All @@ -20,13 +21,16 @@ var (
globalRPCTimeout time.Duration
GlobalPocketConfig types.PocketConfig
GlobalTenderMintConfig config.Config
GlobalEvidenceWorker *pond.WorkerPool
)

func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) {
ConfigOnce.Do(func() {
InitGlobalServiceMetric(chains, logger, c.PocketConfig.PrometheusAddr, c.PocketConfig.PrometheusMaxOpenfiles)
})
InitHttpClient(c.PocketConfig.RPCMaxIdleConns, c.PocketConfig.RPCMaxConnsPerHost, c.PocketConfig.RPCMaxIdleConnsPerHost)
InitPocketNodeCaches(c, logger)
InitEvidenceWorker(c, logger)
GlobalPocketConfig = c.PocketConfig
GlobalTenderMintConfig = c.TendermintConfig
if GlobalPocketConfig.LeanPocket {
Expand All @@ -37,6 +41,17 @@ func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) {
SetRPCTimeout(c.PocketConfig.RPCTimeout)
}

func InitEvidenceWorker(_ types.Config, logger log.Logger) {
panicHandler := func(p interface{}) {
logger.Error(fmt.Sprintf("evidence storage task panicked: %v", p))
}
GlobalEvidenceWorker = pond.New(
1, 0, pond.MinWorkers(1),
pond.PanicHandler(panicHandler),
pond.Strategy(pond.Balanced()),
)
}

func ConvertEvidenceToProto(config types.Config) error {
// we have to add a random pocket node so that way lean pokt can still support getting the global evidence cache
node := AddPocketNode(crypto.GenerateEd25519PrivKey().GenPrivateKey(), log.NewNopLogger())
Expand Down Expand Up @@ -67,6 +82,13 @@ func ConvertEvidenceToProto(config types.Config) error {
return nil
}

func StopEvidenceWorker() {
if !GlobalEvidenceWorker.Stopped() {
GlobalEvidenceWorker.StopAndWait()
}
GlobalEvidenceWorker = nil
}

func FlushSessionCache() {
for _, k := range GlobalPocketNodes {
if k.SessionStore != nil {
Expand Down
36 changes: 35 additions & 1 deletion x/pocketcore/types/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,47 @@ import (

const DEFAULTHTTPMETHOD = "POST"

var (
chainHttpClient *http.Client
)

// "Relay" - A read / write API request from a hosted (non native) external blockchain
type Relay struct {
Payload Payload `json:"payload"` // the data payload of the request
Meta RelayMeta `json:"meta"` // metadata for the relay request
Proof RelayProof `json:"proof"` // the authentication scheme needed for work
}

func GetChainsClient() *http.Client {
if chainHttpClient == nil {
InitHttpClient(1000, 1000, 1000)
oten91 marked this conversation as resolved.
Show resolved Hide resolved
}

return chainHttpClient
}

func InitHttpClient(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int) {
var chainsTransport *http.Transport

t, ok := http.DefaultTransport.(*http.Transport)
if ok {
chainsTransport = t.Clone()
// this params may could be handled by config.json, but how much people know about this?
// tbd: figure out the right values to this, rn the priority is stop recreating new http client and connections.
chainsTransport.MaxIdleConns = maxIdleConns
chainsTransport.MaxConnsPerHost = maxConnsPerHost
chainsTransport.MaxIdleConnsPerHost = maxIdleConnsPerHost
} // if not ok, probably is because is *gock.Transport - test only

chainHttpClient = &http.Client{
Timeout: globalRPCTimeout * time.Second,
}

if chainsTransport != nil {
chainHttpClient.Transport = chainsTransport
}
}

// "Validate" - Checks the validity of a relay request using store data
func (r *Relay) Validate(ctx sdk.Ctx, posKeeper PosKeeper, appsKeeper AppsKeeper, pocketKeeper PocketKeeper, hb *HostedBlockchains, sessionBlockHeight int64, node *PocketNode) (maxPossibleRelays sdk.BigInt, err sdk.Error) {
// validate payload
Expand Down Expand Up @@ -316,7 +350,7 @@ func executeHTTPRequest(payload, url, userAgent string, basicAuth BasicAuth, met
}
}
// execute the request
resp, err := (&http.Client{Timeout: globalRPCTimeout * time.Millisecond}).Do(req)
resp, err := GetChainsClient().Do(req)
if err != nil {
return "", err
}
Expand Down
5 changes: 4 additions & 1 deletion x/pocketcore/types/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ func TestRelay_Execute(t *testing.T) {
},
}
validRelay.Proof.RequestHash = validRelay.RequestHashString()
httpClient := GetChainsClient()
defer gock.Off() // Flush pending mocks after test execution

defer gock.RestoreClient(httpClient)
gock.InterceptClient(httpClient)
gock.New("https://server.com").
Post("/relay").
Reply(200).
Expand All @@ -185,6 +187,7 @@ func TestRelay_Execute(t *testing.T) {
URL: "https://server.com/relay/",
}},
}

response, err := validRelay.Execute(&hb, &nodeAddr)
assert.True(t, err == nil)
assert.Equal(t, response, "bar")
Expand Down