diff --git a/kafka-streamer/batcher.go b/kafka-streamer/batcher.go index 24bb7633a..005b49f4e 100644 --- a/kafka-streamer/batcher.go +++ b/kafka-streamer/batcher.go @@ -33,9 +33,18 @@ type messageBatcher[T any] struct { closed bool parseF func(context.Context, *kafka.Message) (T, error) submitF func(context.Context, []T) error + postCommitF func(context.Context, []T) error } -func newMessageBatcher[T any](maxSize int, timeout time.Duration, workerCount int, parseF func(context.Context, *kafka.Message) (T, error), submitF func(context.Context, []T) error) *messageBatcher[T] { +type option[T any] func(*messageBatcher[T]) + +func withPostCommitF[T any](f func(context.Context, []T) error) option[T] { + return func(mb *messageBatcher[T]) { + mb.postCommitF = f + } +} + +func newMessageBatcher[T any](maxSize int, timeout time.Duration, workerCount int, parseF func(context.Context, *kafka.Message) (T, error), submitF func(context.Context, []T) error, opts ...option[T]) *messageBatcher[T] { mb := &messageBatcher[T]{ maxSize: maxSize, timeoutDuration: timeout, @@ -46,6 +55,10 @@ func newMessageBatcher[T any](maxSize int, timeout time.Duration, workerCount in submitF: submitF, } + for _, opt := range opts { + opt(mb) + } + for i := 0; i < workerCount; i++ { go mb.worker() } @@ -136,6 +149,13 @@ func (mb *messageBatcher[T]) Submit(ctx context.Context, c *kafka.Consumer) erro return fmt.Errorf("failed to commit offsets: %w", err) } + if mb.postCommitF != nil { + err := mb.postCommitF(ctx, mb.entries) + if err != nil { + logger.For(ctx).Warnf("error calling post commit function: %v", err) + } + } + mb.Reset() return nil } diff --git a/kafka-streamer/main.go b/kafka-streamer/main.go index f39cde5af..9397f34c7 100644 --- a/kafka-streamer/main.go +++ b/kafka-streamer/main.go @@ -20,6 +20,7 @@ import ( "github.com/mikeydub/go-gallery/service/logger" "github.com/mikeydub/go-gallery/service/persist" "github.com/mikeydub/go-gallery/service/persist/postgres" + "github.com/mikeydub/go-gallery/service/task" "github.com/mikeydub/go-gallery/util" "github.com/mikeydub/go-gallery/util/batch" "github.com/sirupsen/logrus" @@ -32,6 +33,7 @@ import ( // Enable for debugging; will not commit offsets or write to the database const readOnlyMode = false +const moshicamMinterAddress = "0xaceb0de9f3efab3c50bf4dc6b14706f119e39dd8" type streamerConfig struct { Topic string @@ -53,8 +55,9 @@ func main() { defer pgx.Close() ccf := newContractCollectionFiller(ctx, pgx) + tc := task.NewClient(ctx) - go runStreamer(ctx, pgx, ccf) + go runStreamer(ctx, pgx, ccf, tc) err := router.Run(":3000") if err != nil { @@ -114,7 +117,7 @@ func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord } } -func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig { +func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller, opts ...option[mirrordb.ProcessBaseTokenEntryParams]) *streamerConfig { parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseTokenEntryParams, error) { ethereumEntry, err := parseTokenMessage(ctx, deserializer, message) if err != nil { @@ -131,10 +134,23 @@ func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord return &streamerConfig{ Topic: "base.nft.v4", - Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), + Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF, opts...), + } +} + +func newMoshicamPostCommitF(taskClient *task.Client) func(context.Context, []mirrordb.ProcessBaseTokenEntryParams) error { + return func(ctx context.Context, entries []mirrordb.ProcessBaseTokenEntryParams) error { + moshiTokens := util.Filter(entries, isMoshicamEntry, false) + return taskClient.CreateTaskForMoshicamOwnerProcessing(ctx, task.MoshicamOwnerProcessingMessage{Entries: moshiTokens}) } } +func isMoshicamEntry(e mirrordb.ProcessBaseTokenEntryParams) bool { + var c map[string]string + e.Contract.AssignTo(&c) + return strings.ToLower(c["deployed_via_contract"]) == moshicamMinterAddress +} + func newBaseSepoliaOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig { parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseSepoliaOwnerEntryParams, error) { ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message) @@ -221,7 +237,7 @@ func newZoraTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord } } -func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollectionFiller) { +func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollectionFiller, tc *task.Client) { deserializer, err := newDeserializerFromRegistry() if err != nil { panic(fmt.Errorf("failed to create Avro deserializer: %w", err)) @@ -240,7 +256,7 @@ func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollection newEthereumOwnerConfig(deserializer, queries), newEthereumTokenConfig(deserializer, queries, ccf), newBaseOwnerConfig(deserializer, queries), - newBaseTokenConfig(deserializer, queries, ccf), + newBaseTokenConfig(deserializer, queries, ccf, withPostCommitF(newMoshicamPostCommitF(tc))), newBaseSepoliaOwnerConfig(deserializer, queries), newBaseSepoliaTokenConfig(deserializer, queries, ccf), //newZoraOwnerConfig(deserializer, queries), diff --git a/secrets/local/local/app-local-kafka-streamer.yaml b/secrets/local/local/app-local-kafka-streamer.yaml index 6ad820fbc..c034e085b 100644 --- a/secrets/local/local/app-local-kafka-streamer.yaml +++ b/secrets/local/local/app-local-kafka-streamer.yaml @@ -7,6 +7,9 @@ SIMPLEHASH_API_KEY: ENC[AES256_GCM,data:xs21dnHG2SDndLykF55xBg==,iv:kbP6U9agwhmo SIMPLEHASH_API_SECRET: ENC[AES256_GCM,data:AZRMjIpXY8Y0a4JNvqosvrrqJJ3MG7ndaP74bh0X38I9GsDKs+WjzgD2PG5sLPtrkFn9pMzKcEJSj2AuYZsUDA==,iv:Ej+wlqbGM9g7cSoilBSJg4NpuukG36KQDRA8nOhahtc=,tag:xC+sykMjBh70J6YgjiYuhw==,type:str] SIMPLEHASH_GROUP_ID: ENC[AES256_GCM,data:C27xn+/cfWz1,iv:vaemCg0liT3x6aaBcUKU0+EkDW1ES6m7I/gh6JLxthQ=,tag:OelT4SMDMO5n7wPWwmQ67g==,type:str] SIMPLEHASH_BOOTSTRAP_SERVERS: ENC[AES256_GCM,data:4te1EEuo9e214YQHswryzgx8pHYLM6iH9eqZknS8A4hJPfyroeWVwjiwUqYUqw==,iv:aiejiGnBaefVHRKU3Pol54hgknA4twkxwJ46hh3A9LE=,tag:QL6Y/2XWWLkjMHOBVo5dmg==,type:str] +MOSHICAM_OWNER_PROCESSING_QUEUE: ENC[AES256_GCM,data:ujvHm6jPGWitIPH4xoKUjVwGjgtcHqH86EqYc64vxJ5MuFgkpxax6VGWXi52M/F08wLMeHsH1BBmrw/QKYulLTMnWh+yAQ==,iv:NvuIgf8I8SvQkFz5t2jgteOtSJ9moE9+ocVLFY+BwWo=,tag:z0gRJOg0kz8ty3hAZSi/Ig==,type:str] +MOSHICAM_URL: ENC[AES256_GCM,data:buQsav0nKgsIpXKyFkqGZNmxeeIcKa296UG3t5BJJSi7Ce0tIg==,iv:XnKR97pTtsYD+ZwZxM8fSrVlLic/6bt6v4zuOYYyicY=,tag:DVcoDAr4d0hAd3EXmaBIdQ==,type:str] +MOSHCAM_TASK_SECRET: ENC[AES256_GCM,data:jEB8SUIgeg6+q/hlyg37RA==,iv:P5X5Yi0ed8VVsukob+vH8h9GWZgodywXkFffSw64LQo=,tag:At56f3AukZo932gZ6QUEgQ==,type:str] sops: kms: [] gcp_kms: @@ -16,8 +19,8 @@ sops: azure_kv: [] hc_vault: [] age: [] - lastmodified: "2024-04-19T19:59:21Z" - mac: ENC[AES256_GCM,data:CoM5SNAhJmQ1461Y6GZnOvoHwH3fnmavooexvx3NxLSHLQUx8PmperxRWAfsjlM84CwyYn7h8UiblzCri9eoXp74/gOp+erOH9sHjamtU12riz89nylk4KDKJ1/G6pfFYCkBQtGUSkenH0Hp4EZczXiC0AyCW9JARB83c1FiYYg=,iv:2cLYRhhhYbL5j3Pk/wRDho249es65nqvQPeWtAuW0wY=,tag:eVKf8HhUedgVJk/CGMhXsQ==,type:str] + lastmodified: "2024-06-26T20:43:46Z" + mac: ENC[AES256_GCM,data:vA5G4YCnVAD83YesrcIlZ36C5PI6KK2h+GTKqdDE6G5FexorRSuiFEaX/DLIiXW9TSDMPIqtKrOY4HD/Dd4XCWhnh7sLBeZ9+rQkCj8hzqOK2LnNdRjxf9ghRHa2zMExO2vuqeHScXugdju/bgT7e5ARN1DlKH4mo2riYZebvvE=,iv:fqWN69B0WLaDLDEyCwymwDT44BYojSlFCLgJ3cXar+M=,tag:THtXR4kySzI8/pHwqKUERQ==,type:str] pgp: [] unencrypted_suffix: _unencrypted version: 3.7.3 diff --git a/secrets/prod/kafka-streamer-env.yaml b/secrets/prod/kafka-streamer-env.yaml index 0268cdc0a..40afbd5c7 100644 --- a/secrets/prod/kafka-streamer-env.yaml +++ b/secrets/prod/kafka-streamer-env.yaml @@ -13,6 +13,9 @@ SIMPLEHASH_KAFKA_API_SECRET: ENC[AES256_GCM,data:dNCgNhakFfuI8Ky434TKM2wNtWOiun/ SIMPLEHASH_GROUP_ID: ENC[AES256_GCM,data:C27xn+/cfWz1,iv:vaemCg0liT3x6aaBcUKU0+EkDW1ES6m7I/gh6JLxthQ=,tag:OelT4SMDMO5n7wPWwmQ67g==,type:str] SIMPLEHASH_BOOTSTRAP_SERVERS: ENC[AES256_GCM,data:4te1EEuo9e214YQHswryzgx8pHYLM6iH9eqZknS8A4hJPfyroeWVwjiwUqYUqw==,iv:aiejiGnBaefVHRKU3Pol54hgknA4twkxwJ46hh3A9LE=,tag:QL6Y/2XWWLkjMHOBVo5dmg==,type:str] SIMPLEHASH_REST_API_KEY: ENC[AES256_GCM,data:h4NuJQSE9N2rbJjxKzIxJiZUNcYSGSC6d/JMgUeI488g,iv:sTVD8pwCPqXCR7pwu8699yWIr2kl7spP68XYZwVCwH8=,tag:F2BFSf7x3J//Cfjr5zXOMg==,type:str] +MOSHICAM_OWNER_PROCESSING_QUEUE: ENC[AES256_GCM,data:zYXkqN2s8ne9vGHtYze71Y9j57WhRO2eo2oCYruBLIojjhU9L1tfcvqbArqxYVl35uLKHlm7fBa52W48b+rUAAU1qWQHtg==,iv:7bXBapocFL51PVzXSYqEfx3pB9h1Pld6MPYnFnHj5yo=,tag:xtJd/CLxRh8NfYkxbruQEA==,type:str] +MOSHICAM_URL: ENC[AES256_GCM,data:ZyoCHlTLUKkPdZsgubXAFPrfHuU0ZcENRD/IDurkeexCp1Gi+Q==,iv:u75MDxg0zSB764SKy6uLH7t9bAD/cXAvQ2LRJX8llbk=,tag:3Brr73NVB9I/kHpy3yNnnw==,type:str] +MOSHCAM_TASK_SECRET: ENC[AES256_GCM,data:wY13AbFCCq3gsVePCnm8kw==,iv:BSBG/1LXuftfFT7vMm5UEGcVgbH8RG6RPwXTBxS16u0=,tag:m3Tdcj6iqYGotLL0Z62eDA==,type:str] sops: kms: [] gcp_kms: @@ -22,8 +25,8 @@ sops: azure_kv: [] hc_vault: [] age: [] - lastmodified: "2024-05-06T15:09:36Z" - mac: ENC[AES256_GCM,data:eNNTdDQ+ruc6uDnFOXR4RA/F0+ytcpTcc+LlPyHTSZbMCAidTX/hjps8KiOFx0Mdc9R/s+Vx9QkpIuOzM78ZnDwy2PyTt2w/+33dZUx2C++AcBIFJWoCNkqc87otZJoWtlqq0uWvpG8CfQH6cOgrffQL0hR8DmCIaO5oWt48MJI=,iv:O7VplWjgmCw6p7YkLyrp0wKvGfhkzwslgVtnqsro4aQ=,tag:QwPY6xIqncuEe5g7f3YAWQ==,type:str] + lastmodified: "2024-06-26T20:42:57Z" + mac: ENC[AES256_GCM,data:Lyd7StXvHaP4k4GU7X2rkGyVXu+Kteo1wT7pGumjhrjPbNrA6CMjTI9TBVANzhmpr87iY3A4O0gRtJ2W4bs0MibDT5P5jA0fTtZIBdhpuUQD3oEKWYX3TPUxmvQSqfxhSpUY9jCIakQVJ1GG2tZA4zN2mGsnwjj0JeLps0koAZ8=,iv:en9J9ECCylTxZ0TOzQ28Gjy4AppqmkPf6sFjROu2plg=,tag:ibClIkCiKWCNTg9IIDMXMA==,type:str] pgp: [] unencrypted_suffix: _unencrypted version: 3.7.3 diff --git a/secrets/prod/local/app-prod-kafka-streamer.yaml b/secrets/prod/local/app-prod-kafka-streamer.yaml index 12588c5d8..33240e9ab 100644 --- a/secrets/prod/local/app-prod-kafka-streamer.yaml +++ b/secrets/prod/local/app-prod-kafka-streamer.yaml @@ -13,6 +13,9 @@ SIMPLEHASH_KAFKA_API_SECRET: ENC[AES256_GCM,data:/YxmsxIMuDQfRTFZiZCcR9NaG0fkyIH SIMPLEHASH_GROUP_ID: ENC[AES256_GCM,data:C27xn+/cfWz1,iv:vaemCg0liT3x6aaBcUKU0+EkDW1ES6m7I/gh6JLxthQ=,tag:OelT4SMDMO5n7wPWwmQ67g==,type:str] SIMPLEHASH_BOOTSTRAP_SERVERS: ENC[AES256_GCM,data:4te1EEuo9e214YQHswryzgx8pHYLM6iH9eqZknS8A4hJPfyroeWVwjiwUqYUqw==,iv:aiejiGnBaefVHRKU3Pol54hgknA4twkxwJ46hh3A9LE=,tag:QL6Y/2XWWLkjMHOBVo5dmg==,type:str] SIMPLEHASH_REST_API_KEY: ENC[AES256_GCM,data:Y2aS46vXVaIxWXZ6JnZZ354Z+Vib8U2EGOmZdLWUt7sU,iv:nY0IU5zgqnLt5D3n84k3dNVYqqDfLOXWgMYDVKYHcGo=,tag:QZZqi4OyI5uR0ITWsiJJBQ==,type:str] +MOSHICAM_OWNER_PROCESSING_QUEUE: ENC[AES256_GCM,data:vBabHylUv5cI5mb9PueQ3G9a4w8yTJanAbmnBmM6Ynvg+/MFtDTf4pRfSzxsnhQ2kr3AvGLug028Ku0NRy2ARNDHpi69RQ==,iv:mf6325v6XC+GGfkDFeyA0Azm3HHZdrF4WGc5xscuicY=,tag:d47LKSu5pQQTZWNxD0S+OQ==,type:str] +MOSHICAM_URL: ENC[AES256_GCM,data:bcQDmsDcF1kuwT3gRlBfAwEtSMb585ikwS0TUc/yEMs22Wxebg==,iv:ZuV7I0yF4L52PWFh11/XetbLIEbNeEmilItcwFH5sCE=,tag:hIRWAALS/dIxfJri1+FHBw==,type:str] +MOSHCAM_TASK_SECRET: ENC[AES256_GCM,data:jc7Y6C9a7t5WssaUZhUx0w==,iv:9qOXZyBFyWR5jEotmHVRwUES0YTBXPUaX+PQASuA85E=,tag:HOHmvChkHPvXcrwcoVEDHA==,type:str] sops: kms: [] gcp_kms: @@ -22,8 +25,8 @@ sops: azure_kv: [] hc_vault: [] age: [] - lastmodified: "2024-05-06T15:09:52Z" - mac: ENC[AES256_GCM,data:2TyE3zqeGLvAwYTq3kqdmynIFv/HrqrVDfZZJBS/RCn7Y3ylWHXlLhLKSd8J9HKyeo4uM5zDCT75JcsAmV10TcBp2FzFBFLIL0gqrjgUN324wrqpbBix16yM4Q8EQNI8E9e/nclBu6Fs8aZ7P04EfPNt37nJPcI3IOwzyuW/9oA=,iv:XmJ6JI7+TzaBcFuXHDk9iEGcrBEWIeMVF2UegcI3zyY=,tag:EG8iuQcv1IacjZURMWWeXw==,type:str] + lastmodified: "2024-06-26T20:43:27Z" + mac: ENC[AES256_GCM,data:74Cb9Cv+07HKC08HSo8wf5XBnZnbJxsx13ZSZr5sC4gRqTpi/hGjOVvuVmlEeYPLz/o+IwaMjPCfcshM8LU+Xhzg6X/M2LJ1sn4wcGBP5MYm1LWIYFCaKdAPIMpImnVUjJocGv0pSYtq6OeO8Fhle5HAXJp3TtYI8fywpUYe3jE=,iv:iqu+cbz9fyCOP/uqbOJHcB7adoOcstcsJyjxhsp/YQE=,tag:56sAMKpJCJeL7wmsdkwbGA==,type:str] pgp: [] unencrypted_suffix: _unencrypted version: 3.7.3 diff --git a/service/task/task.go b/service/task/task.go index 8c477fcae..d5833fa4a 100644 --- a/service/task/task.go +++ b/service/task/task.go @@ -11,6 +11,7 @@ import ( gcptasks "cloud.google.com/go/cloudtasks/apiv2" taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb" "github.com/getsentry/sentry-go" + "github.com/mikeydub/go-gallery/db/gen/mirrordb" "github.com/mikeydub/go-gallery/service/auth/basicauth" "google.golang.org/api/option" "google.golang.org/grpc" @@ -84,6 +85,10 @@ type AutosocialPollFarcasterMessage struct { UserID persist.DBID `form:"user_id" binding:"required"` } +type MoshicamOwnerProcessingMessage struct { + Entries []mirrordb.ProcessBaseTokenEntryParams `json:"entries" binding:"required"` +} + type TokenIdentifiersQuantities map[persist.TokenUniqueIdentifiers]persist.HexString func (t TokenIdentifiersQuantities) MarshalJSON() ([]byte, error) { @@ -258,6 +263,15 @@ func (c *Client) CreateTaskForSlackPostFeedBot(ctx context.Context, message Feed return c.submitTask(ctx, queue, url, withJSON(message), withTrace(span), withBasicAuth(secret), WithDelay(2*time.Minute)) } +func (c *Client) CreateTaskForMoshicamOwnerProcessing(ctx context.Context, message MoshicamOwnerProcessingMessage) error { + span, ctx := tracing.StartSpan(ctx, "cloudtask.create", "createTaskForMoshicamOwnerProcessing") + defer tracing.FinishSpan(span) + queue := env.GetString("MOSHICAM_OWNER_PROCESSING_QUEUE") + url := fmt.Sprintf("%s/task/notify/owner-processing", env.GetString("MOSHICAM_URL")) + secret := env.GetString("MOSHICAM_TASK_SECRET") + return c.submitTask(ctx, queue, url, withJSON(message), withBasicAuth(secret)) +} + // NewClient returns a new task client with tracing enabled. func NewClient(ctx context.Context) *Client { skipQueues := make(map[string]bool)