Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ✨ liq-src: kyber-pmm #624

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/liquidity-source/kyber-pmm/client/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package client

import "errors"

const (
ErrFirmQuoteInternalErrorText = "internal_error"
ErrFirmQuoteBlacklistText = "blacklist"
ErrFirmQuoteInsufficientLiquidityText = "insufficient_liquidity"
ErrFirmQuoteMarketConditionText = "market_condition"
)

var (
ErrListTokensFailed = errors.New("listTokens failed")
ErrListPairsFailed = errors.New("listPairs failed")
ErrListPriceLevelsFailed = errors.New("listPriceLevels failed")
ErrFirmQuoteFailed = errors.New("firm quote failed")
ErrFirmQuoteInternalError = errors.New(ErrFirmQuoteInternalErrorText)
ErrFirmQuoteBlacklist = errors.New(ErrFirmQuoteBlacklistText)
ErrFirmQuoteInsufficientLiquidity = errors.New(ErrFirmQuoteInsufficientLiquidityText)
ErrFirmQuoteMarketCondition = errors.New(ErrFirmQuoteMarketConditionText)
)
126 changes: 126 additions & 0 deletions pkg/liquidity-source/kyber-pmm/client/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package client

import (
"context"

"github.com/KyberNetwork/logger"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"

kyberpmm "github.com/KyberNetwork/kyberswap-dex-lib/pkg/liquidity-source/kyber-pmm"
)

const (
listTokensEndpoint = "/kyberswap/v1/tokens"
listPairsEndpoint = "/kyberswap/v1/pairs"
listPricesEndpoint = "/kyberswap/v1/prices"
firmEndpoint = "/kyberswap/v1/firm"
)

type httpClient struct {
client *resty.Client
config *kyberpmm.HTTPConfig
}

func NewHTTPClient(config *kyberpmm.HTTPConfig) *httpClient {
client := resty.New().
SetBaseURL(config.BaseURL).
SetTimeout(config.Timeout.Duration).
SetRetryCount(config.RetryCount)

return &httpClient{
client: client,
config: config,
}
}

func (c *httpClient) ListTokens(ctx context.Context) (map[string]kyberpmm.TokenItem, error) {
req := c.client.R().
SetContext(ctx)

var result kyberpmm.ListTokensResult
resp, err := req.SetResult(&result).Get(listTokensEndpoint)
if err != nil {
return nil, err
}

if !resp.IsSuccess() {
return nil, errors.WithMessagef(ErrListTokensFailed, "[kyberPMM] response status: %v, response error: %v", resp.Status(), resp.Error())
}

return result.Tokens, nil
}

func (c *httpClient) ListPairs(ctx context.Context) (map[string]kyberpmm.PairItem, error) {
req := c.client.R().
SetContext(ctx)

var result kyberpmm.ListPairsResult
resp, err := req.SetResult(&result).Get(listPairsEndpoint)
if err != nil {
return nil, err
}

if !resp.IsSuccess() {
return nil, errors.WithMessagef(ErrListPairsFailed, "[kyberPMM] response status: %v, response error: %v", resp.Status(), resp.Error())
}

return result.Pairs, nil
}

func (c *httpClient) ListPriceLevels(ctx context.Context) (kyberpmm.ListPriceLevelsResult, error) {
req := c.client.R().
SetContext(ctx)

var result kyberpmm.ListPriceLevelsResult
resp, err := req.SetResult(&result).Get(listPricesEndpoint)
if err != nil {
return result, err
}

if !resp.IsSuccess() {
return result, errors.WithMessagef(ErrListPriceLevelsFailed, "[kyberPMM] response status: %v, response error: %v", resp.Status(), resp.Error())
}

return result, nil
}

func (c *httpClient) Firm(ctx context.Context, params kyberpmm.FirmRequestParams) (kyberpmm.FirmResult, error) {
req := c.client.R().
SetContext(ctx).
SetBody(params)

var result kyberpmm.FirmResult
resp, err := req.SetResult(&result).Post(firmEndpoint)
if err != nil {
return kyberpmm.FirmResult{}, err
}

if !resp.IsSuccess() {
return kyberpmm.FirmResult{}, errors.WithMessagef(ErrFirmQuoteFailed, "[kyberPMM] response status: %v, response error: %v", resp.Status(), resp.Error())
}

if result.Error != "" {
parsedErr := parseFirmQuoteError(result.Error)
logger.Errorf("firm quote failed with error: %v", result.Error)

return kyberpmm.FirmResult{}, parsedErr
}

return result, nil
}

func parseFirmQuoteError(errorMessage string) error {
switch errorMessage {
case ErrFirmQuoteInternalErrorText:
return ErrFirmQuoteInternalError
case ErrFirmQuoteBlacklistText:
return ErrFirmQuoteBlacklist
case ErrFirmQuoteInsufficientLiquidityText:
return ErrFirmQuoteInsufficientLiquidity
case ErrFirmQuoteMarketConditionText:
return ErrFirmQuoteMarketCondition
default:
return ErrFirmQuoteInternalError
}
}
167 changes: 167 additions & 0 deletions pkg/liquidity-source/kyber-pmm/client/memory_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package client

import (
"context"
"errors"

"github.com/KyberNetwork/logger"
"github.com/dgraph-io/ristretto"

kyberpmm "github.com/KyberNetwork/kyberswap-dex-lib/pkg/source/kyber-pmm"
)

const (
defaultNumCounts = 5000
defaultMaxCost = 500
defaultBufferItems = 64

defaultSingleItemCost = 1

cacheKeyTokens = "tokens"
cacheKeyPairs = "pairs"
cacheKeyPriceLevels = "price-levels"
)

type memoryCacheClient struct {
config *kyberpmm.MemoryCacheConfig
cache *ristretto.Cache
fallbackClient kyberpmm.IClient
}

func NewMemoryCacheClient(
config *kyberpmm.MemoryCacheConfig,
fallbackClient kyberpmm.IClient,
) *memoryCacheClient {
cache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: defaultNumCounts,
MaxCost: defaultMaxCost,
BufferItems: defaultBufferItems,
})
if err != nil {
logger.Errorf("failed to init memory cache, err %v", err.Error())
}

return &memoryCacheClient{
config: config,
cache: cache,
fallbackClient: fallbackClient,
}
}

func (c *memoryCacheClient) ListTokens(ctx context.Context) (map[string]kyberpmm.TokenItem, error) {
cachedTokens, err := c.listTokensFromCache()
if err == nil {
return cachedTokens, nil
}

// Cache missed. Using fallbackClient
tokens, err := c.fallbackClient.ListTokens(ctx)
if err != nil {
return nil, err
}

if err = c.saveTokensToCache(tokens); err != nil {
logger.
WithFields(logger.Fields{"error": err}).
Warn("memory cache failed")
}

return tokens, err
}

// listTokensFromCache only returns if tokens are able to fetch from cache
func (c *memoryCacheClient) listTokensFromCache() (map[string]kyberpmm.TokenItem, error) {
cachedTokens, found := c.cache.Get(cacheKeyTokens)
if !found {
return nil, errors.New("no tokens data in cache")
}

return cachedTokens.(map[string]kyberpmm.TokenItem), nil
}

func (c *memoryCacheClient) saveTokensToCache(tokens map[string]kyberpmm.TokenItem) error {
c.cache.SetWithTTL(cacheKeyTokens, tokens, defaultSingleItemCost, c.config.TTL.Tokens.Duration)
c.cache.Wait()

return nil
}

func (c *memoryCacheClient) ListPairs(ctx context.Context) (map[string]kyberpmm.PairItem, error) {
cachedPairs, err := c.listPairsFromCache()
if err == nil {
return cachedPairs, nil
}

// Cache missed. Using fallbackClient
pairs, err := c.fallbackClient.ListPairs(ctx)
if err != nil {
return nil, err
}

if err = c.savePairsToCache(pairs); err != nil {
logger.
WithFields(logger.Fields{"error": err}).
Warn("memory cache failed")
}

return pairs, err
}

// listPairsFromCache only returns if pairs are able to fetch from cache
func (c *memoryCacheClient) listPairsFromCache() (map[string]kyberpmm.PairItem, error) {
cachedPairs, found := c.cache.Get(cacheKeyPairs)
if !found {
return nil, errors.New("no pairs data in cache")
}

return cachedPairs.(map[string]kyberpmm.PairItem), nil
}

func (c *memoryCacheClient) savePairsToCache(tokens map[string]kyberpmm.PairItem) error {
c.cache.SetWithTTL(cacheKeyPairs, tokens, defaultSingleItemCost, c.config.TTL.Pairs.Duration)
c.cache.Wait()

return nil
}

func (c *memoryCacheClient) ListPriceLevels(ctx context.Context) (kyberpmm.ListPriceLevelsResult, error) {
cachedPriceLevels, err := c.listPriceLevelsFromCache()
if err == nil {
return cachedPriceLevels, nil
}

// Cache missed. Using fallbackClient
priceLevels, err := c.fallbackClient.ListPriceLevels(ctx)
if err != nil {
return kyberpmm.ListPriceLevelsResult{}, err
}

if err = c.savePriceLevelsToCache(priceLevels); err != nil {
logger.
WithFields(logger.Fields{"error": err}).
Warn("memory cache failed")
}

return priceLevels, err
}

// listPriceLevelsFromCache only returns if price levels are able to fetch from cache
func (c *memoryCacheClient) listPriceLevelsFromCache() (kyberpmm.ListPriceLevelsResult, error) {
cachedPriceLevels, found := c.cache.Get(cacheKeyPriceLevels)
if !found {
return kyberpmm.ListPriceLevelsResult{}, errors.New("no price levels data in cache")
}

return cachedPriceLevels.(kyberpmm.ListPriceLevelsResult), nil
}

func (c *memoryCacheClient) savePriceLevelsToCache(priceLevelsAndInventory kyberpmm.ListPriceLevelsResult) error {
c.cache.SetWithTTL(cacheKeyPriceLevels, priceLevelsAndInventory, defaultSingleItemCost, c.config.TTL.PriceLevels.Duration)
c.cache.Wait()

return nil
}

func (c *memoryCacheClient) Firm(ctx context.Context, params kyberpmm.FirmRequestParams) (kyberpmm.FirmResult, error) {
return c.fallbackClient.Firm(ctx, params)
}
26 changes: 26 additions & 0 deletions pkg/liquidity-source/kyber-pmm/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kyberpmm

import (
"github.com/KyberNetwork/blockchain-toolkit/time/durationjson"
)

type Config struct {
DexID string `json:"dexID,omitempty"`
RFQContractAddress string `mapstructure:"rfq_contract_address" json:"rfq_contract_address,omitempty"`
HTTP HTTPConfig `mapstructure:"http" json:"http,omitempty"`
MemoryCache MemoryCacheConfig `mapstructure:"memory_cache" json:"memory_cache,omitempty"`
}

type HTTPConfig struct {
BaseURL string `mapstructure:"base_url" json:"base_url,omitempty"`
Timeout durationjson.Duration `mapstructure:"timeout" json:"timeout,omitempty"`
RetryCount int `mapstructure:"retry_count" json:"retry_count,omitempty"`
}

type MemoryCacheConfig struct {
TTL struct {
Tokens durationjson.Duration `mapstructure:"tokens" json:"tokens,omitempty"`
Pairs durationjson.Duration `mapstructure:"pairs" json:"pairs,omitempty"`
PriceLevels durationjson.Duration `mapstructure:"price_levels" json:"price_levels,omitempty"`
} `mapstructure:"ttl"`
}
14 changes: 14 additions & 0 deletions pkg/liquidity-source/kyber-pmm/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kyberpmm

type SwapDirection uint8

const (
DexTypeKyberPMM = "kyber-pmm"

PoolIDPrefix = "kyber_pmm"
PoolIDSeparator = "_"
)

var (
DefaultGas = Gas{Swap: 100000}
)
12 changes: 12 additions & 0 deletions pkg/liquidity-source/kyber-pmm/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package kyberpmm

import "errors"

var (
ErrTokenNotFound = errors.New("token not found")
ErrNoPriceLevelsForPool = errors.New("no price levels for pool")
ErrEmptyPriceLevels = errors.New("empty price levels")
ErrInsufficientLiquidity = errors.New("insufficient liquidity")
ErrInvalidFirmQuoteParams = errors.New("invalid firm quote params")
ErrNoSwapLimit = errors.New("swap limit is required for PMM pools")
)
10 changes: 10 additions & 0 deletions pkg/liquidity-source/kyber-pmm/iface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package kyberpmm

import "context"

type IClient interface {
ListTokens(ctx context.Context) (map[string]TokenItem, error)
ListPairs(ctx context.Context) (map[string]PairItem, error)
ListPriceLevels(ctx context.Context) (ListPriceLevelsResult, error)
Firm(ctx context.Context, params FirmRequestParams) (FirmResult, error)
}
Loading