Skip to content

Commit

Permalink
Fix pricing service lock on update (#872) (#873) (#874)
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Mindov <[email protected]>
  • Loading branch information
rokn authored Mar 30, 2023
1 parent 4538f66 commit 4397fea
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 9 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ docker-compose up
- [API](docs/api.md)

## Examples
* [Three Validators Bridge Network](./examples/three-validators/README.md)
* [Three Validators Bridge Network](./examples/three-validators/README.md)
36 changes: 28 additions & 8 deletions app/services/pricing/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ func (s *Service) GetTokenPriceInfo(networkId uint64, tokenAddressOrId string) (
}

func (s *Service) FetchAndUpdateUsdPrices() error {
s.minAmountsForApiMutex.Lock()
defer s.minAmountsForApiMutex.Unlock()
s.tokenPriceInfoMutex.Lock()
defer s.tokenPriceInfoMutex.Unlock()

results := s.fetchUsdPricesFromAPIs()
if results.AllPricesErr == nil {
err := s.updatePricesWithoutHbar(results.AllPrices)
Expand All @@ -108,8 +103,6 @@ func (s *Service) FetchAndUpdateUsdPrices() error {
}

func (s *Service) fetchAndUpdateNftFeesForApi() error {
s.nftFeesForApiMutex.Lock()
defer s.nftFeesForApiMutex.Unlock()
s.logger.Debugf("Populating NFT fees for API")

res := make(map[uint64]map[string]pricing.NonFungibleFee)
Expand Down Expand Up @@ -154,7 +147,10 @@ func (s *Service) fetchAndUpdateNftFeesForApi() error {
}

s.logger.Debugf("fetched all NFT fees and payment tokens successfully")

s.nftFeesForApiMutex.Lock()
s.nftFeesForApi = res
s.nftFeesForApiMutex.Unlock()

return nil
}
Expand Down Expand Up @@ -263,6 +259,13 @@ func (s *Service) GetHederaNftPrevFee(token string) (int64, bool) {
}

func (s *Service) loadStaticMinAmounts(bridgeConfig *config.Bridge) {
// This lock is used for more time than others to speed up things
// as here, there is no other polling/locking operation
s.tokenPriceInfoMutex.Lock()
s.minAmountsForApiMutex.Lock()
defer s.minAmountsForApiMutex.Unlock()
defer s.tokenPriceInfoMutex.Unlock()

for networkId, minAmountsByTokenAddress := range bridgeConfig.MinAmounts {
for tokenAddress, minAmount := range minAmountsByTokenAddress {
s.tokensPriceInfo[networkId][tokenAddress] = pricing.TokenPriceInfo{
Expand All @@ -285,8 +288,10 @@ func (s *Service) updateHbarPrice(results fetchResults) error {
priceInUsd = results.HbarPrice
}

s.tokenPriceInfoMutex.RLock()
defaultMinAmount := s.tokensPriceInfo[constants.HederaNetworkId][constants.Hbar].DefaultMinAmount
previousUsdPrice := s.tokensPriceInfo[constants.HederaNetworkId][constants.Hbar].UsdPrice
s.tokenPriceInfoMutex.RUnlock()

// Use the cached priceInUsd in case the price fetching failed
if priceInUsd.Cmp(decimal.NewFromFloat(0.0)) == 0 {
Expand Down Expand Up @@ -343,8 +348,14 @@ func (s *Service) calculateMinAmountWithFee(nativeAsset *asset.NativeAsset, deci
}

func (s *Service) updatePriceInfoContainers(nativeAsset *asset.NativeAsset, tokenPriceInfo pricing.TokenPriceInfo) error {
s.tokensPriceInfo[nativeAsset.ChainId][nativeAsset.Asset] = tokenPriceInfo

s.minAmountsForApiMutex.Lock()
s.minAmountsForApi[nativeAsset.ChainId][nativeAsset.Asset] = tokenPriceInfo.MinAmountWithFee.String()
s.minAmountsForApiMutex.Unlock()

s.tokenPriceInfoMutex.Lock()
s.tokensPriceInfo[nativeAsset.ChainId][nativeAsset.Asset] = tokenPriceInfo
s.tokenPriceInfoMutex.Unlock()

msgTemplate := "Updating UsdPrice [%s] and MinAmountWithFee [%s] for %s asset [%s]"
s.logger.Debugf(msgTemplate, tokenPriceInfo.UsdPrice.String(), tokenPriceInfo.MinAmountWithFee.String(), "native", nativeAsset.Asset)
Expand Down Expand Up @@ -373,8 +384,15 @@ func (s *Service) updatePriceInfoContainers(nativeAsset *asset.NativeAsset, toke
}

tokenPriceInfo.MinAmountWithFee = wrappedMinAmountWithFee

s.tokenPriceInfoMutex.Lock()
s.tokensPriceInfo[networkId][wrappedToken] = tokenPriceInfo
s.tokenPriceInfoMutex.Unlock()

s.minAmountsForApiMutex.Lock()
s.minAmountsForApi[networkId][wrappedToken] = wrappedMinAmountWithFee.String()
s.minAmountsForApiMutex.Unlock()

s.logger.Debugf(msgTemplate, tokenPriceInfo.UsdPrice.String(), wrappedMinAmountWithFee.String(), "wrapped", wrappedToken)
}

Expand All @@ -394,8 +412,10 @@ func (s *Service) updatePricesWithoutHbar(pricesByNetworkAndAddress map[uint64]m
continue
}
nativeAsset := s.assetsService.FungibleNativeAsset(networkId, assetAddress)
s.tokenPriceInfoMutex.RLock()
defaultMinAmount := s.tokensPriceInfo[networkId][assetAddress].DefaultMinAmount
previousUsdPrice := s.tokensPriceInfo[networkId][assetAddress].UsdPrice
s.tokenPriceInfoMutex.RUnlock()

// Use the cached priceInUsd in case the price fetching failed
if usdPrice.Cmp(decimal.NewFromFloat(0.0)) == 0 {
Expand Down
67 changes: 67 additions & 0 deletions app/services/pricing/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package pricing

import (
"errors"
"github.com/stretchr/testify/mock"
"math/big"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"

Expand Down Expand Up @@ -151,6 +153,17 @@ func Test_PriceFetchingServiceDown(t *testing.T) {
err = serviceInstance.updateHbarPrice(FetchResults2)
assert.Nil(t, err)

// Use min_amounts
FetchResults2.HbarPrice = decimal.NewFromFloat(0)
serviceInstance.tokensPriceInfo[296]["HBAR"] = pricing.TokenPriceInfo{
UsdPrice: decimal.NewFromFloat(0.1),
MinAmountWithFee: big.NewInt(5000000000),
DefaultMinAmount: big.NewInt(1000),
}

err = serviceInstance.updateHbarPrice(FetchResults2)
assert.Nil(t, err)

// Throw if no fetched price and no cache price
FetchResults2.HbarPrice = decimal.NewFromFloat(0)
serviceInstance.tokensPriceInfo[296]["HBAR"] = pricing.TokenPriceInfo{
Expand Down Expand Up @@ -181,6 +194,22 @@ func Test_GetTokenPriceInfo_NotExist(t *testing.T) {
assert.False(t, exists)
}

func Test_GetTokenPriceInfo_WhileUpdating(t *testing.T) {
setup(true, true)

ExecuteWithUpdatingService(t, func() {
serviceInstance.GetTokenPriceInfo(92919929912, "")
})
}

func Test_NftFees_WhileUpdating(t *testing.T) {
setup(true, true)

ExecuteWithUpdatingService(t, func() {
serviceInstance.NftFees()
})
}

func Test_GetMinAmountsForAPI(t *testing.T) {
setup(true, true)

Expand All @@ -191,6 +220,14 @@ func Test_GetMinAmountsForAPI(t *testing.T) {
assert.Equal(t, testConstants.EthereumNativeTokenMinAmountWithFee.String(), serviceInstance.minAmountsForApi[testConstants.EthereumNetworkId][testConstants.NetworkEthereumFungibleNativeToken])
}

func Test_GetMinAmountsForAPI_WhileUpdating(t *testing.T) {
setup(true, true)

ExecuteWithUpdatingService(t, func() {
serviceInstance.GetMinAmountsForAPI()
})
}

func Test_calculateMinAmountWithFee(t *testing.T) {
setup(true, true)

Expand Down Expand Up @@ -274,6 +311,36 @@ func Test_GetHederaNftPrevFee_ShouldNotExists(t *testing.T) {
assert.False(t, ok)
}

func ExecuteWithUpdatingService(t *testing.T, serviceCall func()) {
expectedTime := time.Millisecond * 200
waitTime := time.Second * 1

// Clear the mocks
mocks.MDiamondRouter.ExpectedCalls = []*mock.Call{}
mocks.MDiamondRouter.
On("Erc721Payment", &bind.CallOpts{}, common.HexToAddress(testConstants.NetworkPolygonWrappedNonFungibleTokenForHedera)).
After(waitTime).
Return(common.HexToAddress(testConstants.NftFeesForApi[testConstants.PolygonNetworkId][testConstants.NetworkPolygonWrappedNonFungibleTokenForHedera].PaymentToken), nil).
On("Erc721Fee", &bind.CallOpts{}, common.HexToAddress(testConstants.NetworkPolygonWrappedNonFungibleTokenForHedera)).
Return(big.NewInt(testConstants.NftFeesForApi[testConstants.PolygonNetworkId][testConstants.NetworkPolygonWrappedNonFungibleTokenForHedera].Fee.IntPart()), nil)


wg := sync.WaitGroup{}
wg.Add(1)
go func() {
_ = serviceInstance.FetchAndUpdateUsdPrices()
wg.Done()
}()

time.Sleep(time.Millisecond * 200)
startTime := time.Now()
serviceCall()
elapsedTime := time.Since(startTime)

assert.Less(t, elapsedTime, expectedTime, "Service call took too long to return")
wg.Wait()
}

func setup(setupMocks bool, setTokenPriceInfosAndMinAmounts bool) {
mocks.Setup()
helper.SetupNetworks()
Expand Down

0 comments on commit 4397fea

Please sign in to comment.