Skip to content

Commit

Permalink
Merge pull request #6413 from multiversx/requester-resolver-receipt-data
Browse files Browse the repository at this point in the history
Requester and resolver light client
  • Loading branch information
miiu96 authored Sep 11, 2024
2 parents e3ebbf0 + 78fe9d0 commit 946d912
Show file tree
Hide file tree
Showing 36 changed files with 632 additions and 15 deletions.
23 changes: 23 additions & 0 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,20 @@
MaxBatchSize = 30000
MaxOpenFiles = 10

[ReceiptsDataStorage]
[ReceiptsDataStorage.Cache]
Name = "ReceiptsDataStorage"
Capacity = 500000
Type = "SizeLRU"
SizeInBytes = 52428800 #50MB
[ReceiptsDataStorage.DB]
FilePath = "ReceiptsData"
Type = "LvlDBSerial"
BatchDelaySeconds = 2
MaxBatchSize = 30000
MaxOpenFiles = 10


[UnsignedTransactionStorage]
[UnsignedTransactionStorage.Cache]
Name = "UnsignedTransactionStorage"
Expand Down Expand Up @@ -392,6 +406,15 @@
Type = "TxCache"
Shards = 16

[ReceiptsDataPool]
Name = "ReceiptsDataPool"
Capacity = 600000
SizePerSender = 20000
SizeInBytes = 104857600 #100MB
SizeInBytesPerSender = 12288000
Type = "SizeLRU"
Shards = 16

[TrieNodesChunksDataPool]
Name = "TrieNodesDataPool"
Capacity = 400
Expand Down
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type Config struct {
PeerBlockBodyStorage StorageConfig
BlockHeaderStorage StorageConfig
TxStorage StorageConfig
ReceiptsDataStorage StorageConfig
UnsignedTransactionStorage StorageConfig
RewardTxStorage StorageConfig
ShardHdrNonceHashStorage StorageConfig
Expand All @@ -171,6 +172,7 @@ type Config struct {
TxBlockBodyDataPool CacheConfig
PeerBlockBodyDataPool CacheConfig
TxDataPool CacheConfig
ReceiptsDataPool CacheConfig
UnsignedTransactionDataPool CacheConfig
RewardTransactionDataPool CacheConfig
TrieNodesChunksDataPool CacheConfig
Expand Down
11 changes: 11 additions & 0 deletions dataRetriever/dataPool/dataPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type dataPool struct {
peerAuthentications storage.Cacher
heartbeats storage.Cacher
validatorsInfo dataRetriever.ShardedDataCacherNotifier
receipts storage.Cacher
}

// DataPoolArgs represents the data pool's constructor structure
Expand All @@ -44,6 +45,7 @@ type DataPoolArgs struct {
PeerAuthentications storage.Cacher
Heartbeats storage.Cacher
ValidatorsInfo dataRetriever.ShardedDataCacherNotifier
Receipts storage.Cacher
}

// NewDataPool creates a data pools holder object
Expand Down Expand Up @@ -90,6 +92,9 @@ func NewDataPool(args DataPoolArgs) (*dataPool, error) {
if check.IfNil(args.ValidatorsInfo) {
return nil, dataRetriever.ErrNilValidatorInfoPool
}
if check.IfNil(args.Receipts) {
return nil, dataRetriever.ErrNilReceiptsPool
}

return &dataPool{
transactions: args.Transactions,
Expand All @@ -106,6 +111,7 @@ func NewDataPool(args DataPoolArgs) (*dataPool, error) {
peerAuthentications: args.PeerAuthentications,
heartbeats: args.Heartbeats,
validatorsInfo: args.ValidatorsInfo,
receipts: args.Receipts,
}, nil
}

Expand Down Expand Up @@ -179,6 +185,11 @@ func (dp *dataPool) ValidatorsInfo() dataRetriever.ShardedDataCacherNotifier {
return dp.validatorsInfo
}

// Receipts returns the holder for receipts info
func (dp *dataPool) Receipts() storage.Cacher {
return dp.receipts
}

// Close closes all the components
func (dp *dataPool) Close() error {
var lastError error
Expand Down
1 change: 1 addition & 0 deletions dataRetriever/dataPool/dataPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func createMockDataPoolArgs() dataPool.DataPoolArgs {
PeerAuthentications: testscommon.NewCacherStub(),
Heartbeats: testscommon.NewCacherStub(),
ValidatorsInfo: testscommon.NewShardedDataStub(),
Receipts: testscommon.NewCacherStub(),
}
}

Expand Down
6 changes: 6 additions & 0 deletions dataRetriever/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,15 @@ var ErrResolveTypeUnknown = errors.New("unknown resolve type")
// ErrNilMiniblocksPool signals that a nil miniblocks pool has been provided
var ErrNilMiniblocksPool = errors.New("nil miniblocks pool")

// ErrNilReceiptsPool signals that a nil receipts pool has been provided
var ErrNilReceiptsPool = errors.New("nil receipts pool")

// ErrNilMiniblocksStorage signals that a nil miniblocks storage has been provided
var ErrNilMiniblocksStorage = errors.New("nil miniblocks storage")

// ErrNilReceiptsStorage signals that a nil receipts storage has been provided
var ErrNilReceiptsStorage = errors.New("nil receipts storage")

// ErrNilDataPoolHolder signals that the data pool holder is nil
var ErrNilDataPoolHolder = errors.New("nil data pool holder")

Expand Down
7 changes: 7 additions & 0 deletions dataRetriever/factory/dataPoolFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func NewDataPoolFromConfig(args ArgsDataPool) (dataRetriever.PoolsHolder, error)
return nil, fmt.Errorf("%w while creating the cache for the validator info results", err)
}

cacherCfg = factory.GetCacherFromConfig(mainConfig.ReceiptsDataPool)
receiptsDataPool, err := storageunit.NewCache(cacherCfg)
if err != nil {
return nil, fmt.Errorf("%w while creating the cache for the receipts data", err)
}

currBlockTransactions := dataPool.NewCurrentBlockTransactionsPool()
currEpochValidatorInfo := dataPool.NewCurrentEpochValidatorInfoPool()
dataPoolArgs := dataPool.DataPoolArgs{
Expand All @@ -163,6 +169,7 @@ func NewDataPoolFromConfig(args ArgsDataPool) (dataRetriever.PoolsHolder, error)
PeerAuthentications: peerAuthPool,
Heartbeats: heartbeatPool,
ValidatorsInfo: validatorsInfo,
Receipts: receiptsDataPool,
}
return dataPool.NewDataPool(dataPoolArgs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (brcf *baseRequestersContainerFactory) generateCommonRequesters() error {
}

func (brcf *baseRequestersContainerFactory) generateTxRequesters(topic string) error {

shardC := brcf.shardCoordinator
noOfShards := shardC.NumberOfShards()

Expand Down Expand Up @@ -172,6 +171,26 @@ func (brcf *baseRequestersContainerFactory) createTxRequester(
return requesters.NewTransactionRequester(arg)
}

func (brcf *baseRequestersContainerFactory) createReceiptRequester(
topic string,
excludedTopic string,
targetShardID uint32,
numIntraShardPeers int,
) (dataRetriever.Requester, error) {
requestSender, err := brcf.createOneRequestSenderWithSpecifiedNumRequests(topic, excludedTopic, targetShardID, 0, numIntraShardPeers)
if err != nil {
return nil, err
}

arg := requesters.ArgReceiptRequester{
ArgBaseRequester: requesters.ArgBaseRequester{
RequestSender: requestSender,
Marshaller: brcf.marshaller,
},
}
return requesters.NewReceiptRequester(arg)
}

func (brcf *baseRequestersContainerFactory) generateMiniBlocksRequesters() error {
shardC := brcf.shardCoordinator
noOfShards := shardC.NumberOfShards()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (mrcf *metaRequestersContainerFactory) Create() (dataRetriever.RequestersCo
return nil, err
}

err = mrcf.generateReceiptRequesters()
if err != nil {
return nil, err
}

return mrcf.container, nil
}

Expand Down Expand Up @@ -177,6 +182,17 @@ func (mrcf *metaRequestersContainerFactory) generateMetaChainHeaderRequesters()
return mrcf.container.Add(identifierHeader, requester)
}

func (mrcf *metaRequestersContainerFactory) generateReceiptRequesters() error {
// only one receipts topic
identifierReceipt := factory.ReceiptTopic + mrcf.shardCoordinator.CommunicationIdentifier(core.MetachainShardId)
requester, err := mrcf.createReceiptRequester(identifierReceipt, EmptyExcludePeersOnTopic, core.MetachainShardId, mrcf.numTotalPeers)
if err != nil {
return err
}

return mrcf.container.Add(identifierReceipt, requester)
}

func (mrcf *metaRequestersContainerFactory) createMetaChainHeaderRequester(
identifier string,
shardId uint32,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,9 @@ func TestMetaRequestersContainerFactory_With4ShardsShouldWork(t *testing.T) {
numRequestersTrieNodes := 2
numRequestersPeerAuth := 1
numRequesterValidatorInfo := 1
numRequesterReceiptsData := 1
totalRequesters := numRequestersShardHeadersForMetachain + numRequesterMetablocks + numRequestersMiniBlocks +
numRequestersUnsigned + numRequestersTxs + numRequestersTrieNodes + numRequestersRewards + numRequestersPeerAuth + numRequesterValidatorInfo
numRequestersUnsigned + numRequestersTxs + numRequestersTrieNodes + numRequestersRewards + numRequestersPeerAuth + numRequesterValidatorInfo + numRequesterReceiptsData

assert.Equal(t, totalRequesters, container.Len())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func (srcf *shardRequestersContainerFactory) Create() (dataRetriever.RequestersC
return nil, err
}

err = srcf.generateReceiptRequesters()
if err != nil {
return nil, err
}

return srcf.container, nil
}

Expand Down Expand Up @@ -136,6 +141,18 @@ func (srcf *shardRequestersContainerFactory) generateMetablockHeaderRequesters()
return srcf.container.Add(identifierHdr, requester)
}

func (srcf *shardRequestersContainerFactory) generateReceiptRequesters() error {
// only one receipts topic
selfShardID := srcf.shardCoordinator.SelfId()
identifierReceipt := factory.ReceiptTopic
requester, err := srcf.createReceiptRequester(identifierReceipt, EmptyExcludePeersOnTopic, selfShardID, srcf.numTotalPeers)
if err != nil {
return err
}

return srcf.container.Add(identifierReceipt, requester)
}

func (srcf *shardRequestersContainerFactory) generateTrieNodesRequesters() error {
shardC := srcf.shardCoordinator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,9 @@ func TestShardRequestersContainerFactory_With4ShardsShouldWork(t *testing.T) {
numRequesterTrieNodes := 1
numRequesterPeerAuth := 1
numRequesterValidatorInfo := 1
numRequesterReceiptsData := 1
totalRequesters := numRequesterTxs + numRequesterHeaders + numRequesterMiniBlocks + numRequesterMetaBlockHeaders +
numRequesterSCRs + numRequesterRewardTxs + numRequesterTrieNodes + numRequesterPeerAuth + numRequesterValidatorInfo
numRequesterSCRs + numRequesterRewardTxs + numRequesterTrieNodes + numRequesterPeerAuth + numRequesterValidatorInfo + numRequesterReceiptsData

assert.Equal(t, totalRequesters, container.Len())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,48 @@ func (brcf *baseResolversContainerFactory) generateValidatorInfoResolver() error

return brcf.container.Add(identifierValidatorInfo, validatorInfoResolver)
}

func (brcf *baseResolversContainerFactory) generateReceiptResolver() error {
shardC := brcf.shardCoordinator

// only one shard receipt topic
identifier := factory.ReceiptTopic + shardC.CommunicationIdentifier(shardC.SelfId())

receiptDataStorer, err := brcf.store.GetStorer(dataRetriever.ReceiptDataUnit)
if err != nil {
return err
}
resolverSender, err := brcf.createOneResolverSenderWithSpecifiedNumRequests(identifier, EmptyExcludePeersOnTopic, shardC.SelfId())
if err != nil {
return err
}

arg := resolvers.ArgReceiptResolver{
ArgBaseResolver: resolvers.ArgBaseResolver{
SenderResolver: resolverSender,
Marshaller: brcf.marshalizer,
AntifloodHandler: brcf.inputAntifloodHandler,
Throttler: brcf.throttler,
},
ReceiptPool: brcf.dataPools.Receipts(),
ReceiptStorage: receiptDataStorer,
DataPacker: brcf.dataPacker,
IsFullHistoryNode: brcf.isFullHistoryNode,
}
resolver, err := resolvers.NewReceiptResolver(arg)
if err != nil {
return err
}

err = brcf.mainMessenger.RegisterMessageProcessor(resolver.RequestTopic(), common.DefaultResolversIdentifier, resolver)
if err != nil {
return err
}

err = brcf.fullArchiveMessenger.RegisterMessageProcessor(resolver.RequestTopic(), common.DefaultResolversIdentifier, resolver)
if err != nil {
return err
}

return brcf.container.Add(identifier, resolver)
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func (mrcf *metaResolversContainerFactory) Create() (dataRetriever.ResolversCont
return nil, err
}

err = mrcf.generateReceiptResolver()
if err != nil {
return nil, err
}

return mrcf.container, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func createDataPoolsForMeta() dataRetriever.PoolsHolder {
RewardTransactionsCalled: func() dataRetriever.ShardedDataCacherNotifier {
return testscommon.NewShardedDataStub()
},
ReceiptsCalled: func() storage.Cacher {
return testscommon.NewCacherStub()
},
}

return pools
Expand Down Expand Up @@ -338,8 +341,9 @@ func TestMetaResolversContainerFactory_With4ShardsShouldWork(t *testing.T) {
numResolversTrieNodes := 2
numResolversPeerAuth := 1
numResolverValidatorInfo := 1
numRezolverReceiptsData := 1
totalResolvers := numResolversShardHeadersForMetachain + numResolverMetablocks + numResolversMiniBlocks +
numResolversUnsigned + numResolversTxs + numResolversTrieNodes + numResolversRewards + numResolversPeerAuth + numResolverValidatorInfo
numResolversUnsigned + numResolversTxs + numResolversTrieNodes + numResolversRewards + numResolversPeerAuth + numResolverValidatorInfo + numRezolverReceiptsData

assert.Equal(t, totalResolvers, container.Len())
assert.Equal(t, totalResolvers, registerMainCnt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (srcf *shardResolversContainerFactory) Create() (dataRetriever.ResolversCon
return nil, err
}

err = srcf.generateReceiptResolver()
if err != nil {
return nil, err
}

return srcf.container, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package resolverscontainer_test

import (
"errors"
"github.com/stretchr/testify/require"
"strings"
"testing"

Expand Down Expand Up @@ -74,6 +75,9 @@ func createDataPoolsForShard() dataRetriever.PoolsHolder {
pools.RewardTransactionsCalled = func() dataRetriever.ShardedDataCacherNotifier {
return testscommon.NewShardedDataStub()
}
pools.ReceiptsCalled = func() storage.Cacher {
return testscommon.NewCacherStub()
}

return pools
}
Expand Down Expand Up @@ -439,7 +443,8 @@ func TestShardResolversContainerFactory_With4ShardsShouldWork(t *testing.T) {
args.ShardCoordinator = shardCoordinator
rcf, _ := resolverscontainer.NewShardResolversContainerFactory(args)

container, _ := rcf.Create()
container, err := rcf.Create()
require.Nil(t, err)

numResolverSCRs := noOfShards + 1
numResolverTxs := noOfShards + 1
Expand All @@ -450,8 +455,9 @@ func TestShardResolversContainerFactory_With4ShardsShouldWork(t *testing.T) {
numResolverTrieNodes := 1
numResolverPeerAuth := 1
numResolverValidatorInfo := 1
numResolverReceiptData := 1
totalResolvers := numResolverTxs + numResolverHeaders + numResolverMiniBlocks + numResolverMetaBlockHeaders +
numResolverSCRs + numResolverRewardTxs + numResolverTrieNodes + numResolverPeerAuth + numResolverValidatorInfo
numResolverSCRs + numResolverRewardTxs + numResolverTrieNodes + numResolverPeerAuth + numResolverValidatorInfo + numResolverReceiptData

assert.Equal(t, totalResolvers, container.Len())
assert.Equal(t, totalResolvers, registerMainCnt)
Expand Down
Loading

0 comments on commit 946d912

Please sign in to comment.