From eb704c9ac1bd5f4519e590b5a18678f56140b6c2 Mon Sep 17 00:00:00 2001 From: jarrel Date: Wed, 10 Apr 2024 16:51:38 -0700 Subject: [PATCH] Try no limiters --- server/inject.go | 46 +---- server/wire_gen.go | 82 +++----- service/multichain/alchemy/alchemy.go | 1 - service/multichain/wrapper/wrapper.go | 268 -------------------------- 4 files changed, 30 insertions(+), 367 deletions(-) diff --git a/server/inject.go b/server/inject.go index 7bf283913..6b397b552 100644 --- a/server/inject.go +++ b/server/inject.go @@ -7,13 +7,11 @@ import ( "context" "database/sql" "net/http" - "time" "github.com/google/wire" "github.com/jackc/pgx/v4/pgxpool" db "github.com/mikeydub/go-gallery/db/gen/coredb" - "github.com/mikeydub/go-gallery/service/limiters" "github.com/mikeydub/go-gallery/service/multichain" "github.com/mikeydub/go-gallery/service/multichain/indexer" "github.com/mikeydub/go-gallery/service/multichain/poap" @@ -30,7 +28,6 @@ import ( "github.com/mikeydub/go-gallery/service/task" "github.com/mikeydub/go-gallery/service/tokenmanage" "github.com/mikeydub/go-gallery/util" - "github.com/mikeydub/go-gallery/util/retry" ) // envInit is a type returned after setting up the environment @@ -46,7 +43,6 @@ func NewMultichainProvider(ctx context.Context, envFunc func()) (*multichain.Pro newTokenManageCache, postgres.NewRepositories, dbConnSet, - newReservoirLimiter, // needs to be a singleton wire.Struct(new(multichain.ChainProvider), "*"), multichainProviderInjector, ethInjector, @@ -73,18 +69,6 @@ func setEnv(f func()) envInit { return envInit{} } -type reservoirLimiter limiters.KeyRateLimiter - -// Dumb forward method to satisfy the retry.Limiter interface -func (r *reservoirLimiter) ForKey(ctx context.Context, key string) (bool, time.Duration, error) { - return (*limiters.KeyRateLimiter)(r).ForKey(ctx, key) -} - -func newReservoirLimiter(ctx context.Context, c *redis.Cache) *reservoirLimiter { - l := limiters.NewKeyRateLimiter(ctx, c, "retryer:reservoir", 120, time.Minute) - return util.ToPointer(reservoirLimiter(*l)) -} - func newPqClient(e envInit) (*sql.DB, func()) { pq := postgres.MustCreateClient() return pq, func() { pq.Close() } @@ -131,7 +115,7 @@ func customMetadataHandlersInjector(simplehashProvider *simplehash.Provider) *mu )) } -func ethInjector(envInit, context.Context, *http.Client, *reservoirLimiter) (*multichain.EthereumProvider, func()) { +func ethInjector(envInit, context.Context, *http.Client) (*multichain.EthereumProvider, func()) { panic(wire.Build( rpc.NewEthClient, wire.Value(persist.ChainETH), @@ -169,7 +153,6 @@ func ethSyncPipelineInjector( httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, - l *reservoirLimiter, ) (*wrapper.SyncPipelineWrapper, func()) { panic(wire.Build( wire.Struct(new(wrapper.SyncPipelineWrapper), "*"), @@ -178,8 +161,6 @@ func ethSyncPipelineInjector( wire.Bind(new(multichain.TokensIncrementalContractFetcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokenMetadataBatcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokensByTokenIdentifiersFetcher), util.ToPointer(simplehashProvider)), - wire.Bind(new(retry.Limiter), util.ToPointer(l)), - wrapper.NewFillInWrapper, customMetadataHandlersInjector, )) } @@ -206,7 +187,7 @@ func tezosProviderInjector(tezosProvider *tezos.Provider, tzktProvider *tzkt.Pro )) } -func optimismInjector(context.Context, *http.Client, *reservoirLimiter) (*multichain.OptimismProvider, func()) { +func optimismInjector(context.Context, *http.Client) (*multichain.OptimismProvider, func()) { panic(wire.Build( wire.Value(persist.ChainOptimism), simplehash.NewProvider, @@ -237,7 +218,6 @@ func optimismSyncPipelineInjector( httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, - l *reservoirLimiter, ) (*wrapper.SyncPipelineWrapper, func()) { panic(wire.Build( wire.Struct(new(wrapper.SyncPipelineWrapper), "*"), @@ -246,13 +226,11 @@ func optimismSyncPipelineInjector( wire.Bind(new(multichain.TokensIncrementalContractFetcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokenMetadataBatcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokensByTokenIdentifiersFetcher), util.ToPointer(simplehashProvider)), - wire.Bind(new(retry.Limiter), util.ToPointer(l)), - wrapper.NewFillInWrapper, customMetadataHandlersInjector, )) } -func arbitrumInjector(context.Context, *http.Client, *reservoirLimiter) (*multichain.ArbitrumProvider, func()) { +func arbitrumInjector(context.Context, *http.Client) (*multichain.ArbitrumProvider, func()) { panic(wire.Build( wire.Value(persist.ChainArbitrum), simplehash.NewProvider, @@ -283,7 +261,6 @@ func arbitrumSyncPipelineInjector( httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, - l *reservoirLimiter, ) (*wrapper.SyncPipelineWrapper, func()) { panic(wire.Build( wire.Struct(new(wrapper.SyncPipelineWrapper), "*"), @@ -292,8 +269,6 @@ func arbitrumSyncPipelineInjector( wire.Bind(new(multichain.TokensIncrementalContractFetcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokenMetadataBatcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokensByTokenIdentifiersFetcher), util.ToPointer(simplehashProvider)), - wire.Bind(new(retry.Limiter), util.ToPointer(l)), - wrapper.NewFillInWrapper, customMetadataHandlersInjector, )) } @@ -315,7 +290,7 @@ func poapProviderInjector(poapProvider *poap.Provider) *multichain.PoapProvider )) } -func zoraInjector(envInit, context.Context, *http.Client, *reservoirLimiter) (*multichain.ZoraProvider, func()) { +func zoraInjector(envInit, context.Context, *http.Client) (*multichain.ZoraProvider, func()) { panic(wire.Build( wire.Value(persist.ChainZora), simplehash.NewProvider, @@ -347,7 +322,6 @@ func zoraSyncPipelineInjector( httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, - l *reservoirLimiter, ) (*wrapper.SyncPipelineWrapper, func()) { panic(wire.Build( wire.Struct(new(wrapper.SyncPipelineWrapper), "*"), @@ -356,13 +330,11 @@ func zoraSyncPipelineInjector( wire.Bind(new(multichain.TokensIncrementalContractFetcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokenMetadataBatcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokensByTokenIdentifiersFetcher), util.ToPointer(simplehashProvider)), - wire.Bind(new(retry.Limiter), util.ToPointer(l)), - wrapper.NewFillInWrapper, customMetadataHandlersInjector, )) } -func baseInjector(context.Context, *http.Client, *reservoirLimiter) (*multichain.BaseProvider, func()) { +func baseInjector(context.Context, *http.Client) (*multichain.BaseProvider, func()) { panic(wire.Build( wire.Value(persist.ChainBase), simplehash.NewProvider, @@ -393,7 +365,6 @@ func baseSyncPipelineInjector( httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, - l *reservoirLimiter, ) (*wrapper.SyncPipelineWrapper, func()) { panic(wire.Build( wire.Struct(new(wrapper.SyncPipelineWrapper), "*"), @@ -402,13 +373,11 @@ func baseSyncPipelineInjector( wire.Bind(new(multichain.TokensIncrementalContractFetcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokenMetadataBatcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokensByTokenIdentifiersFetcher), util.ToPointer(simplehashProvider)), - wire.Bind(new(retry.Limiter), util.ToPointer(l)), - wrapper.NewFillInWrapper, customMetadataHandlersInjector, )) } -func polygonInjector(context.Context, *http.Client, *reservoirLimiter) (*multichain.PolygonProvider, func()) { +func polygonInjector(context.Context, *http.Client) (*multichain.PolygonProvider, func()) { panic(wire.Build( wire.Value(persist.ChainPolygon), simplehash.NewProvider, @@ -439,7 +408,6 @@ func polygonSyncPipelineInjector( httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, - l *reservoirLimiter, ) (*wrapper.SyncPipelineWrapper, func()) { panic(wire.Build( wire.Struct(new(wrapper.SyncPipelineWrapper), "*"), @@ -448,8 +416,6 @@ func polygonSyncPipelineInjector( wire.Bind(new(multichain.TokensIncrementalContractFetcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokenMetadataBatcher), util.ToPointer(simplehashProvider)), wire.Bind(new(multichain.TokensByTokenIdentifiersFetcher), util.ToPointer(simplehashProvider)), - wire.Bind(new(retry.Limiter), util.ToPointer(l)), - wrapper.NewFillInWrapper, customMetadataHandlersInjector, )) } diff --git a/server/wire_gen.go b/server/wire_gen.go index 838837603..633748b3f 100644 --- a/server/wire_gen.go +++ b/server/wire_gen.go @@ -12,7 +12,6 @@ import ( "github.com/google/wire" "github.com/jackc/pgx/v4/pgxpool" "github.com/mikeydub/go-gallery/db/gen/coredb" - "github.com/mikeydub/go-gallery/service/limiters" "github.com/mikeydub/go-gallery/service/multichain" "github.com/mikeydub/go-gallery/service/multichain/indexer" "github.com/mikeydub/go-gallery/service/multichain/poap" @@ -28,9 +27,7 @@ import ( "github.com/mikeydub/go-gallery/service/rpc/ipfs" "github.com/mikeydub/go-gallery/service/task" "github.com/mikeydub/go-gallery/service/tokenmanage" - "github.com/mikeydub/go-gallery/util" "net/http" - "time" ) // Injectors from inject.go: @@ -44,15 +41,14 @@ func NewMultichainProvider(ctx context.Context, envFunc func()) (*multichain.Pro queries := newQueries(pool) cache := newTokenManageCache() client := _wireClientValue - serverReservoirLimiter := newReservoirLimiter(ctx, cache) - ethereumProvider, cleanup3 := ethInjector(serverEnvInit, ctx, client, serverReservoirLimiter) + ethereumProvider, cleanup3 := ethInjector(serverEnvInit, ctx, client) tezosProvider := tezosInjector(serverEnvInit, client) - optimismProvider, cleanup4 := optimismInjector(ctx, client, serverReservoirLimiter) - arbitrumProvider, cleanup5 := arbitrumInjector(ctx, client, serverReservoirLimiter) + optimismProvider, cleanup4 := optimismInjector(ctx, client) + arbitrumProvider, cleanup5 := arbitrumInjector(ctx, client) poapProvider := poapInjector(serverEnvInit, client) - zoraProvider, cleanup6 := zoraInjector(serverEnvInit, ctx, client, serverReservoirLimiter) - baseProvider, cleanup7 := baseInjector(ctx, client, serverReservoirLimiter) - polygonProvider, cleanup8 := polygonInjector(ctx, client, serverReservoirLimiter) + zoraProvider, cleanup6 := zoraInjector(serverEnvInit, ctx, client) + baseProvider, cleanup7 := baseInjector(ctx, client) + polygonProvider, cleanup8 := polygonInjector(ctx, client) chainProvider := &multichain.ChainProvider{ Ethereum: ethereumProvider, Tezos: tezosProvider, @@ -100,10 +96,10 @@ func customMetadataHandlersInjector(simplehashProvider *simplehash.Provider) *mu return customMetadataHandlers } -func ethInjector(serverEnvInit envInit, contextContext context.Context, client *http.Client, serverReservoirLimiter *reservoirLimiter) (*multichain.EthereumProvider, func()) { +func ethInjector(serverEnvInit envInit, contextContext context.Context, client *http.Client) (*multichain.EthereumProvider, func()) { chain := _wireChainValue provider := simplehash.NewProvider(chain, client) - syncPipelineWrapper, cleanup := ethSyncPipelineInjector(contextContext, client, chain, provider, serverReservoirLimiter) + syncPipelineWrapper, cleanup := ethSyncPipelineInjector(contextContext, client, chain, provider) ethclientClient := rpc.NewEthClient() indexerProvider := indexer.NewProvider(client, ethclientClient) ethereumProvider := ethProviderInjector(contextContext, syncPipelineWrapper, indexerProvider, provider) @@ -133,9 +129,8 @@ func ethProviderInjector(ctx context.Context, syncPipeline *wrapper.SyncPipeline return ethereumProvider } -func ethSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, l *reservoirLimiter) (*wrapper.SyncPipelineWrapper, func()) { +func ethSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider) (*wrapper.SyncPipelineWrapper, func()) { customMetadataHandlers := customMetadataHandlersInjector(simplehashProvider) - fillInWrapper, cleanup := wrapper.NewFillInWrapper(ctx, httpClient, chain, l) syncPipelineWrapper := &wrapper.SyncPipelineWrapper{ Chain: chain, TokenIdentifierOwnerFetcher: simplehashProvider, @@ -144,10 +139,8 @@ func ethSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain TokenMetadataBatcher: simplehashProvider, TokensByTokenIdentifiersFetcher: simplehashProvider, CustomMetadataWrapper: customMetadataHandlers, - FillInWrapper: fillInWrapper, } return syncPipelineWrapper, func() { - cleanup() } } @@ -171,10 +164,10 @@ func tezosProviderInjector(tezosProvider *tezos.Provider, tzktProvider *tzkt.Pro return multichainTezosProvider } -func optimismInjector(contextContext context.Context, client *http.Client, serverReservoirLimiter *reservoirLimiter) (*multichain.OptimismProvider, func()) { +func optimismInjector(contextContext context.Context, client *http.Client) (*multichain.OptimismProvider, func()) { chain := _wirePersistChainValue provider := simplehash.NewProvider(chain, client) - syncPipelineWrapper, cleanup := optimismSyncPipelineInjector(contextContext, client, chain, provider, serverReservoirLimiter) + syncPipelineWrapper, cleanup := optimismSyncPipelineInjector(contextContext, client, chain, provider) optimismProvider := optimismProviderInjector(syncPipelineWrapper, provider) return optimismProvider, func() { cleanup() @@ -199,9 +192,8 @@ func optimismProviderInjector(syncPipeline *wrapper.SyncPipelineWrapper, simpleh return optimismProvider } -func optimismSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, l *reservoirLimiter) (*wrapper.SyncPipelineWrapper, func()) { +func optimismSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider) (*wrapper.SyncPipelineWrapper, func()) { customMetadataHandlers := customMetadataHandlersInjector(simplehashProvider) - fillInWrapper, cleanup := wrapper.NewFillInWrapper(ctx, httpClient, chain, l) syncPipelineWrapper := &wrapper.SyncPipelineWrapper{ Chain: chain, TokenIdentifierOwnerFetcher: simplehashProvider, @@ -210,17 +202,15 @@ func optimismSyncPipelineInjector(ctx context.Context, httpClient *http.Client, TokenMetadataBatcher: simplehashProvider, TokensByTokenIdentifiersFetcher: simplehashProvider, CustomMetadataWrapper: customMetadataHandlers, - FillInWrapper: fillInWrapper, } return syncPipelineWrapper, func() { - cleanup() } } -func arbitrumInjector(contextContext context.Context, client *http.Client, serverReservoirLimiter *reservoirLimiter) (*multichain.ArbitrumProvider, func()) { +func arbitrumInjector(contextContext context.Context, client *http.Client) (*multichain.ArbitrumProvider, func()) { chain := _wireChainValue2 provider := simplehash.NewProvider(chain, client) - syncPipelineWrapper, cleanup := arbitrumSyncPipelineInjector(contextContext, client, chain, provider, serverReservoirLimiter) + syncPipelineWrapper, cleanup := arbitrumSyncPipelineInjector(contextContext, client, chain, provider) arbitrumProvider := arbitrumProviderInjector(syncPipelineWrapper, provider) return arbitrumProvider, func() { cleanup() @@ -245,9 +235,8 @@ func arbitrumProviderInjector(syncPipeline *wrapper.SyncPipelineWrapper, simpleh return arbitrumProvider } -func arbitrumSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, l *reservoirLimiter) (*wrapper.SyncPipelineWrapper, func()) { +func arbitrumSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider) (*wrapper.SyncPipelineWrapper, func()) { customMetadataHandlers := customMetadataHandlersInjector(simplehashProvider) - fillInWrapper, cleanup := wrapper.NewFillInWrapper(ctx, httpClient, chain, l) syncPipelineWrapper := &wrapper.SyncPipelineWrapper{ Chain: chain, TokenIdentifierOwnerFetcher: simplehashProvider, @@ -256,10 +245,8 @@ func arbitrumSyncPipelineInjector(ctx context.Context, httpClient *http.Client, TokenMetadataBatcher: simplehashProvider, TokensByTokenIdentifiersFetcher: simplehashProvider, CustomMetadataWrapper: customMetadataHandlers, - FillInWrapper: fillInWrapper, } return syncPipelineWrapper, func() { - cleanup() } } @@ -279,10 +266,10 @@ func poapProviderInjector(poapProvider *poap.Provider) *multichain.PoapProvider return multichainPoapProvider } -func zoraInjector(serverEnvInit envInit, contextContext context.Context, client *http.Client, serverReservoirLimiter *reservoirLimiter) (*multichain.ZoraProvider, func()) { +func zoraInjector(serverEnvInit envInit, contextContext context.Context, client *http.Client) (*multichain.ZoraProvider, func()) { chain := _wireChainValue3 provider := simplehash.NewProvider(chain, client) - syncPipelineWrapper, cleanup := zoraSyncPipelineInjector(contextContext, client, chain, provider, serverReservoirLimiter) + syncPipelineWrapper, cleanup := zoraSyncPipelineInjector(contextContext, client, chain, provider) zoraProvider := zoraProviderInjector(syncPipelineWrapper, provider) return zoraProvider, func() { cleanup() @@ -308,9 +295,8 @@ func zoraProviderInjector(syncPipeline *wrapper.SyncPipelineWrapper, simplehashP return zoraProvider } -func zoraSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, l *reservoirLimiter) (*wrapper.SyncPipelineWrapper, func()) { +func zoraSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider) (*wrapper.SyncPipelineWrapper, func()) { customMetadataHandlers := customMetadataHandlersInjector(simplehashProvider) - fillInWrapper, cleanup := wrapper.NewFillInWrapper(ctx, httpClient, chain, l) syncPipelineWrapper := &wrapper.SyncPipelineWrapper{ Chain: chain, TokenIdentifierOwnerFetcher: simplehashProvider, @@ -319,17 +305,15 @@ func zoraSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chai TokenMetadataBatcher: simplehashProvider, TokensByTokenIdentifiersFetcher: simplehashProvider, CustomMetadataWrapper: customMetadataHandlers, - FillInWrapper: fillInWrapper, } return syncPipelineWrapper, func() { - cleanup() } } -func baseInjector(contextContext context.Context, client *http.Client, serverReservoirLimiter *reservoirLimiter) (*multichain.BaseProvider, func()) { +func baseInjector(contextContext context.Context, client *http.Client) (*multichain.BaseProvider, func()) { chain := _wireChainValue4 provider := simplehash.NewProvider(chain, client) - syncPipelineWrapper, cleanup := baseSyncPipelineInjector(contextContext, client, chain, provider, serverReservoirLimiter) + syncPipelineWrapper, cleanup := baseSyncPipelineInjector(contextContext, client, chain, provider) baseProvider := baseProvidersInjector(syncPipelineWrapper, provider) return baseProvider, func() { cleanup() @@ -354,9 +338,8 @@ func baseProvidersInjector(syncPipeline *wrapper.SyncPipelineWrapper, simplehash return baseProvider } -func baseSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, l *reservoirLimiter) (*wrapper.SyncPipelineWrapper, func()) { +func baseSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider) (*wrapper.SyncPipelineWrapper, func()) { customMetadataHandlers := customMetadataHandlersInjector(simplehashProvider) - fillInWrapper, cleanup := wrapper.NewFillInWrapper(ctx, httpClient, chain, l) syncPipelineWrapper := &wrapper.SyncPipelineWrapper{ Chain: chain, TokenIdentifierOwnerFetcher: simplehashProvider, @@ -365,17 +348,15 @@ func baseSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chai TokenMetadataBatcher: simplehashProvider, TokensByTokenIdentifiersFetcher: simplehashProvider, CustomMetadataWrapper: customMetadataHandlers, - FillInWrapper: fillInWrapper, } return syncPipelineWrapper, func() { - cleanup() } } -func polygonInjector(contextContext context.Context, client *http.Client, serverReservoirLimiter *reservoirLimiter) (*multichain.PolygonProvider, func()) { +func polygonInjector(contextContext context.Context, client *http.Client) (*multichain.PolygonProvider, func()) { chain := _wireChainValue5 provider := simplehash.NewProvider(chain, client) - syncPipelineWrapper, cleanup := polygonSyncPipelineInjector(contextContext, client, chain, provider, serverReservoirLimiter) + syncPipelineWrapper, cleanup := polygonSyncPipelineInjector(contextContext, client, chain, provider) polygonProvider := polygonProvidersInjector(syncPipelineWrapper, provider) return polygonProvider, func() { cleanup() @@ -400,9 +381,8 @@ func polygonProvidersInjector(syncPipeline *wrapper.SyncPipelineWrapper, simpleh return polygonProvider } -func polygonSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider, l *reservoirLimiter) (*wrapper.SyncPipelineWrapper, func()) { +func polygonSyncPipelineInjector(ctx context.Context, httpClient *http.Client, chain persist.Chain, simplehashProvider *simplehash.Provider) (*wrapper.SyncPipelineWrapper, func()) { customMetadataHandlers := customMetadataHandlersInjector(simplehashProvider) - fillInWrapper, cleanup := wrapper.NewFillInWrapper(ctx, httpClient, chain, l) syncPipelineWrapper := &wrapper.SyncPipelineWrapper{ Chain: chain, TokenIdentifierOwnerFetcher: simplehashProvider, @@ -411,10 +391,8 @@ func polygonSyncPipelineInjector(ctx context.Context, httpClient *http.Client, c TokenMetadataBatcher: simplehashProvider, TokensByTokenIdentifiersFetcher: simplehashProvider, CustomMetadataWrapper: customMetadataHandlers, - FillInWrapper: fillInWrapper, } return syncPipelineWrapper, func() { - cleanup() } } @@ -445,18 +423,6 @@ func setEnv(f func()) envInit { return envInit{} } -type reservoirLimiter limiters.KeyRateLimiter - -// Dumb forward method to satisfy the retry.Limiter interface -func (r *reservoirLimiter) ForKey(ctx context.Context, key string) (bool, time.Duration, error) { - return (*limiters.KeyRateLimiter)(r).ForKey(ctx, key) -} - -func newReservoirLimiter(ctx context.Context, c *redis.Cache) *reservoirLimiter { - l := limiters.NewKeyRateLimiter(ctx, c, "retryer:reservoir", 120, time.Minute) - return util.ToPointer(reservoirLimiter(*l)) -} - func newPqClient(e envInit) (*sql.DB, func()) { pq := postgres.MustCreateClient() return pq, func() { pq.Close() } diff --git a/service/multichain/alchemy/alchemy.go b/service/multichain/alchemy/alchemy.go index 96fda7ff5..9b9a6b3bc 100644 --- a/service/multichain/alchemy/alchemy.go +++ b/service/multichain/alchemy/alchemy.go @@ -930,7 +930,6 @@ func setExcludeSpam(url *url.URL, chain persist.Chain) { query := url.Query() query.Set("excludeFilters[]", "SPAM") url.RawQuery = query.Encode() - } func setContractAddress(url *url.URL, address persist.Address) { diff --git a/service/multichain/wrapper/wrapper.go b/service/multichain/wrapper/wrapper.go index ee0550715..2d34332aa 100644 --- a/service/multichain/wrapper/wrapper.go +++ b/service/multichain/wrapper/wrapper.go @@ -3,17 +3,11 @@ package wrapper import ( "context" "fmt" - "net/http" - "sync" - "time" "github.com/mikeydub/go-gallery/service/logger" - "github.com/mikeydub/go-gallery/service/media" "github.com/mikeydub/go-gallery/service/multichain" - "github.com/mikeydub/go-gallery/service/multichain/reservoir" "github.com/mikeydub/go-gallery/service/persist" sentryutil "github.com/mikeydub/go-gallery/service/sentry" - "github.com/mikeydub/go-gallery/util/retry" ) // SyncPipelineWrapper makes a best effort to fetch tokens requested by a sync. @@ -27,7 +21,6 @@ type SyncPipelineWrapper struct { TokenMetadataBatcher multichain.TokenMetadataBatcher TokensByTokenIdentifiersFetcher multichain.TokensByTokenIdentifiersFetcher CustomMetadataWrapper *multichain.CustomMetadataHandlers - FillInWrapper *FillInWrapper } func NewSyncPipelineWrapper( @@ -37,7 +30,6 @@ func NewSyncPipelineWrapper( tokensIncrementalOwnerFetcher multichain.TokensIncrementalOwnerFetcher, tokensIncrementalContractFetcher multichain.TokensIncrementalContractFetcher, tokenMetadataBatcher multichain.TokenMetadataBatcher, - fillInWrapper *FillInWrapper, customMetadataHandlers *multichain.CustomMetadataHandlers, ) *SyncPipelineWrapper { return &SyncPipelineWrapper{ @@ -46,7 +38,6 @@ func NewSyncPipelineWrapper( TokenIdentifierOwnerFetcher: tokenIdentifierOwnerFetcher, TokensIncrementalContractFetcher: tokensIncrementalContractFetcher, TokenMetadataBatcher: tokenMetadataBatcher, - FillInWrapper: fillInWrapper, CustomMetadataWrapper: customMetadataHandlers, } } @@ -54,28 +45,24 @@ func NewSyncPipelineWrapper( func (w SyncPipelineWrapper) GetTokenByTokenIdentifiersAndOwner(ctx context.Context, ti multichain.ChainAgnosticIdentifiers, address persist.Address) (t multichain.ChainAgnosticToken, c multichain.ChainAgnosticContract, err error) { t, c, err = w.TokenIdentifierOwnerFetcher.GetTokenByTokenIdentifiersAndOwner(ctx, ti, address) t = w.CustomMetadataWrapper.AddToToken(ctx, w.Chain, t) - t = w.FillInWrapper.AddToToken(ctx, t) return t, c, err } func (w SyncPipelineWrapper) GetTokensIncrementallyByWalletAddress(ctx context.Context, address persist.Address) (<-chan multichain.ChainAgnosticTokensAndContracts, <-chan error) { recCh, errCh := w.TokensIncrementalOwnerFetcher.GetTokensIncrementallyByWalletAddress(ctx, address) recCh, errCh = w.CustomMetadataWrapper.AddToPage(ctx, w.Chain, recCh, errCh) - recCh, errCh = w.FillInWrapper.AddToPage(ctx, recCh, errCh) return recCh, errCh } func (w SyncPipelineWrapper) GetTokensIncrementallyByContractAddress(ctx context.Context, address persist.Address, maxLimit int) (<-chan multichain.ChainAgnosticTokensAndContracts, <-chan error) { recCh, errCh := w.TokensIncrementalContractFetcher.GetTokensIncrementallyByContractAddress(ctx, address, maxLimit) recCh, errCh = w.CustomMetadataWrapper.AddToPage(ctx, w.Chain, recCh, errCh) - recCh, errCh = w.FillInWrapper.AddToPage(ctx, recCh, errCh) return recCh, errCh } func (w SyncPipelineWrapper) GetTokensByTokenIdentifiers(ctx context.Context, ti multichain.ChainAgnosticIdentifiers) ([]multichain.ChainAgnosticToken, multichain.ChainAgnosticContract, error) { t, c, err := w.TokensByTokenIdentifiersFetcher.GetTokensByTokenIdentifiers(ctx, ti) t = w.CustomMetadataWrapper.LoadAll(ctx, w.Chain, t) - t = w.FillInWrapper.LoadAll(t) return t, c, err } @@ -113,260 +100,5 @@ func (w SyncPipelineWrapper) GetTokenMetadataByTokenIdentifiersBatch(ctx context } } - // Convert metadata to tokens to fill in missing data - asTokens := make([]multichain.ChainAgnosticToken, len(tIDs)) - for i, tID := range tIDs { - asTokens[i] = multichain.ChainAgnosticToken{ - ContractAddress: tID.ContractAddress, - TokenID: tID.TokenID, - TokenMetadata: ret[i], - } - } - - ret = w.FillInWrapper.LoadMetadataAll(asTokens) return ret, nil } - -// FillInWrapper is a service for adding missing data to tokens. -// Batching pattern adapted from dataloaden (https://github.com/vektah/dataloaden) -type FillInWrapper struct { - chain persist.Chain - reservoirProvider *reservoir.Provider - ctx context.Context - mu sync.Mutex - batch *batch - wait time.Duration - maxBatch int - resultCache sync.Map -} - -func NewFillInWrapper(ctx context.Context, httpClient *http.Client, chain persist.Chain, l retry.Limiter) (*FillInWrapper, func()) { - r, cleanup := reservoir.NewProvider(ctx, httpClient, chain, l) - return &FillInWrapper{ - chain: chain, - reservoirProvider: r, - ctx: ctx, - wait: 250 * time.Millisecond, - maxBatch: 10, - }, cleanup -} - -// AddToToken adds missing data to a token. -func (w *FillInWrapper) AddToToken(ctx context.Context, t multichain.ChainAgnosticToken) multichain.ChainAgnosticToken { - return w.addToken(t)() -} - -// AddToPage adds missing data to each token of a provider page. -func (w *FillInWrapper) AddToPage(ctx context.Context, recCh <-chan multichain.ChainAgnosticTokensAndContracts, errIn <-chan error) (<-chan multichain.ChainAgnosticTokensAndContracts, <-chan error) { - outCh := make(chan multichain.ChainAgnosticTokensAndContracts, 2*10) - errOut := make(chan error) - w.resultCache = sync.Map{} - go func() { - defer close(outCh) - defer close(errOut) - for { - select { - case page, ok := <-recCh: - if !ok { - return - } - outCh <- w.addPage(page)() - case err, ok := <-errIn: - if ok { - errOut <- err - } - case <-ctx.Done(): - errOut <- ctx.Err() - return - } - } - }() - logger.For(ctx).Info("finished filling in page") - return outCh, errOut -} - -// LoaddAll fills in missing data for a slice of tokens. -func (w *FillInWrapper) LoadAll(tokens []multichain.ChainAgnosticToken) []multichain.ChainAgnosticToken { - thunks := make([]func() multichain.ChainAgnosticToken, len(tokens)) - for i, t := range tokens { - t := t - thunks[i] = w.addTokenToBatch(t) - } - result := make([]multichain.ChainAgnosticToken, len(tokens)) - for i, thunk := range thunks { - result[i] = thunk() - } - return result -} - -// LoadMetadataAll returns missing metadata for a slice of tokens. -func (w *FillInWrapper) LoadMetadataAll(tokens []multichain.ChainAgnosticToken) []persist.TokenMetadata { - thunks := make([]func() multichain.ChainAgnosticToken, len(tokens)) - for i, t := range tokens { - t := t - if hasMediaURLs(t.TokenMetadata, w.chain) { - thunks[i] = func() multichain.ChainAgnosticToken { - w.cacheTokenResult(t) - return t - } - } else { - thunks[i] = w.addTokenToBatch(t) - } - } - result := make([]persist.TokenMetadata, len(tokens)) - for i, thunk := range thunks { - r := thunk() - result[i] = r.TokenMetadata - } - return result -} - -// LoadFallbackAll returns missing fallback media for a slice of tokens. -func (w *FillInWrapper) LoadFallbackAll(tokens []multichain.ChainAgnosticToken) []persist.FallbackMedia { - thunks := make([]func() multichain.ChainAgnosticToken, len(tokens)) - for i, t := range tokens { - t := t - if t.FallbackMedia.IsServable() { - thunks[i] = func() multichain.ChainAgnosticToken { - w.cacheTokenResult(t) - return t - } - } else { - thunks[i] = w.addTokenToBatch(t) - } - } - result := make([]persist.FallbackMedia, len(tokens)) - for i, thunk := range thunks { - r := thunk() - result[i] = r.FallbackMedia - } - return result -} - -func (w *FillInWrapper) addPage(p multichain.ChainAgnosticTokensAndContracts) func() multichain.ChainAgnosticTokensAndContracts { - thunks := make([]func() multichain.ChainAgnosticToken, len(p.Tokens)) - for i, t := range p.Tokens { - thunks[i] = w.addToken(t) - } - return func() multichain.ChainAgnosticTokensAndContracts { - for i, thunk := range thunks { - p.Tokens[i] = thunk() - } - return p - } -} - -func (w *FillInWrapper) addToken(t multichain.ChainAgnosticToken) func() multichain.ChainAgnosticToken { - if hasMediaURLs(t.TokenMetadata, w.chain) && t.FallbackMedia.IsServable() { - return func() multichain.ChainAgnosticToken { - w.cacheTokenResult(t) - return t - } - } - return w.addTokenToBatch(t) -} - -func (w *FillInWrapper) cacheTokenResult(t multichain.ChainAgnosticToken) { - tID := persist.NewTokenIdentifiers(t.ContractAddress, t.TokenID, w.chain) - w.resultCache.Store(tID, t) -} - -func (w *FillInWrapper) addTokenToBatch(t multichain.ChainAgnosticToken) func() multichain.ChainAgnosticToken { - ti := multichain.ChainAgnosticIdentifiers{ContractAddress: t.ContractAddress, TokenID: t.TokenID} - - if v, ok := w.resultCache.Load(ti); ok { - return func() multichain.ChainAgnosticToken { - f := v.(multichain.ChainAgnosticToken) - if !t.FallbackMedia.IsServable() { - t.FallbackMedia = f.FallbackMedia - } - if !hasMediaURLs(t.TokenMetadata, w.chain) { - t.TokenMetadata = f.TokenMetadata - } - return t - } - } - - w.mu.Lock() - - if w.batch == nil { - w.batch = &batch{done: make(chan struct{})} - } - b := w.batch - pos := b.addToBatch(w, ti) - - w.mu.Unlock() - - return func() multichain.ChainAgnosticToken { - <-b.done - if b.errors[pos] != nil { - return t - } - if !t.FallbackMedia.IsServable() { - t.FallbackMedia = b.results[pos].FallbackMedia - } - if !hasMediaURLs(t.TokenMetadata, w.chain) { - t.TokenMetadata = b.results[pos].TokenMetadata - } - return t - } -} - -func hasMediaURLs(metadata persist.TokenMetadata, chain persist.Chain) bool { - _, _, err := media.FindMediaURLsChain(metadata, chain) - return err == nil -} - -type batch struct { - tokens []multichain.ChainAgnosticIdentifiers - errors []error - results []multichain.ChainAgnosticToken - closing bool - done chan struct{} -} - -func (b *batch) addToBatch(w *FillInWrapper, t multichain.ChainAgnosticIdentifiers) int { - pos := len(b.tokens) - b.tokens = append(b.tokens, t) - if pos == 0 { - go b.startTimer(w) - } - - if w.maxBatch != 0 && pos >= w.maxBatch-1 { - if !b.closing { - b.closing = true - w.batch = nil - go b.end(w) - } - } - - return pos -} - -func (b *batch) startTimer(w *FillInWrapper) { - time.Sleep(w.wait) - w.mu.Lock() - - // we must have hit a batch limit and are already finalizing this batch - if b.closing { - w.mu.Unlock() - return - } - - w.batch = nil - w.mu.Unlock() - - b.end(w) -} - -func (b *batch) end(w *FillInWrapper) { - ctx, cancel := context.WithTimeout(w.ctx, 10*time.Second) - defer cancel() - b.results, b.errors = w.reservoirProvider.GetTokensByTokenIdentifiersBatch(ctx, b.tokens) - for i := range b.results { - if b.errors[i] == nil { - w.resultCache.Store(b.tokens[i], b.results[i]) - } - } - close(b.done) -}