Skip to content

Commit

Permalink
Add base post commit for moshi
Browse files Browse the repository at this point in the history
  • Loading branch information
jarrel-b committed Jun 26, 2024
1 parent fe695c0 commit b3cefe9
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 12 deletions.
22 changes: 21 additions & 1 deletion kafka-streamer/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@ type messageBatcher[T any] struct {
closed bool
parseF func(context.Context, *kafka.Message) (T, error)
submitF func(context.Context, []T) error
postCommitF func(context.Context, []T) error
}

func newMessageBatcher[T any](maxSize int, timeout time.Duration, workerCount int, parseF func(context.Context, *kafka.Message) (T, error), submitF func(context.Context, []T) error) *messageBatcher[T] {
type option[T any] func(*messageBatcher[T])

func withPostCommitF[T any](f func(context.Context, []T) error) option[T] {
return func(mb *messageBatcher[T]) {
mb.postCommitF = f
}
}

func newMessageBatcher[T any](maxSize int, timeout time.Duration, workerCount int, parseF func(context.Context, *kafka.Message) (T, error), submitF func(context.Context, []T) error, opts ...option[T]) *messageBatcher[T] {
mb := &messageBatcher[T]{
maxSize: maxSize,
timeoutDuration: timeout,
Expand All @@ -46,6 +55,10 @@ func newMessageBatcher[T any](maxSize int, timeout time.Duration, workerCount in
submitF: submitF,
}

for _, opt := range opts {
opt(mb)
}

for i := 0; i < workerCount; i++ {
go mb.worker()
}
Expand Down Expand Up @@ -136,6 +149,13 @@ func (mb *messageBatcher[T]) Submit(ctx context.Context, c *kafka.Consumer) erro
return fmt.Errorf("failed to commit offsets: %w", err)
}

if mb.postCommitF != nil {
err := mb.postCommitF(ctx, mb.entries)
if err != nil {
logger.For(ctx).Warnf("error calling post commit function: %v", err)
}
}

mb.Reset()
return nil
}
Expand Down
16 changes: 11 additions & 5 deletions kafka-streamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/mikeydub/go-gallery/service/logger"
"github.com/mikeydub/go-gallery/service/persist"
"github.com/mikeydub/go-gallery/service/persist/postgres"
"github.com/mikeydub/go-gallery/service/task"
"github.com/mikeydub/go-gallery/util"
"github.com/mikeydub/go-gallery/util/batch"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -53,8 +54,9 @@ func main() {
defer pgx.Close()

ccf := newContractCollectionFiller(ctx, pgx)
tc := task.NewClient(ctx)

go runStreamer(ctx, pgx, ccf)
go runStreamer(ctx, pgx, ccf, tc)

err := router.Run(":3000")
if err != nil {
Expand Down Expand Up @@ -93,7 +95,7 @@ func newEthereumTokenConfig(deserializer *avro.GenericDeserializer, queries *mir
}
}

func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig {
func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, taskClient *task.Client) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseOwnerEntryParams, error) {
ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message)
if err != nil {
Expand All @@ -108,9 +110,13 @@ func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord
return submitOwnerBatch(ctx, queries.ProcessBaseOwnerEntry, entries)
}

postCommitF := func(ctx context.Context, entries []mirrordb.ProcessBaseOwnerEntryParams) error {
return taskClient.CreateTaskForMoshicamOwnerProcessing(ctx, task.MoshicamOwnerProcessingMessage{Entries: entries})
}

return &streamerConfig{
Topic: "base.owner.v4",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF, withPostCommitF(postCommitF)),
}
}

Expand Down Expand Up @@ -221,7 +227,7 @@ func newZoraTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord
}
}

func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollectionFiller) {
func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollectionFiller, tc *task.Client) {
deserializer, err := newDeserializerFromRegistry()
if err != nil {
panic(fmt.Errorf("failed to create Avro deserializer: %w", err))
Expand All @@ -239,7 +245,7 @@ func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollection
configs := []*streamerConfig{
newEthereumOwnerConfig(deserializer, queries),
newEthereumTokenConfig(deserializer, queries, ccf),
newBaseOwnerConfig(deserializer, queries),
newBaseOwnerConfig(deserializer, queries, tc),
newBaseTokenConfig(deserializer, queries, ccf),
newBaseSepoliaOwnerConfig(deserializer, queries),
newBaseSepoliaTokenConfig(deserializer, queries, ccf),
Expand Down
7 changes: 5 additions & 2 deletions secrets/local/local/app-local-kafka-streamer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ SIMPLEHASH_API_KEY: ENC[AES256_GCM,data:xs21dnHG2SDndLykF55xBg==,iv:kbP6U9agwhmo
SIMPLEHASH_API_SECRET: ENC[AES256_GCM,data:AZRMjIpXY8Y0a4JNvqosvrrqJJ3MG7ndaP74bh0X38I9GsDKs+WjzgD2PG5sLPtrkFn9pMzKcEJSj2AuYZsUDA==,iv:Ej+wlqbGM9g7cSoilBSJg4NpuukG36KQDRA8nOhahtc=,tag:xC+sykMjBh70J6YgjiYuhw==,type:str]
SIMPLEHASH_GROUP_ID: ENC[AES256_GCM,data:C27xn+/cfWz1,iv:vaemCg0liT3x6aaBcUKU0+EkDW1ES6m7I/gh6JLxthQ=,tag:OelT4SMDMO5n7wPWwmQ67g==,type:str]
SIMPLEHASH_BOOTSTRAP_SERVERS: ENC[AES256_GCM,data:4te1EEuo9e214YQHswryzgx8pHYLM6iH9eqZknS8A4hJPfyroeWVwjiwUqYUqw==,iv:aiejiGnBaefVHRKU3Pol54hgknA4twkxwJ46hh3A9LE=,tag:QL6Y/2XWWLkjMHOBVo5dmg==,type:str]
MOSHICAM_OWNER_PROCESSING_QUEUE: ENC[AES256_GCM,data:ujvHm6jPGWitIPH4xoKUjVwGjgtcHqH86EqYc64vxJ5MuFgkpxax6VGWXi52M/F08wLMeHsH1BBmrw/QKYulLTMnWh+yAQ==,iv:NvuIgf8I8SvQkFz5t2jgteOtSJ9moE9+ocVLFY+BwWo=,tag:z0gRJOg0kz8ty3hAZSi/Ig==,type:str]
MOSHICAM_URL: ENC[AES256_GCM,data:buQsav0nKgsIpXKyFkqGZNmxeeIcKa296UG3t5BJJSi7Ce0tIg==,iv:XnKR97pTtsYD+ZwZxM8fSrVlLic/6bt6v4zuOYYyicY=,tag:DVcoDAr4d0hAd3EXmaBIdQ==,type:str]
MOSHCAM_TASK_SECRET: ENC[AES256_GCM,data:jEB8SUIgeg6+q/hlyg37RA==,iv:P5X5Yi0ed8VVsukob+vH8h9GWZgodywXkFffSw64LQo=,tag:At56f3AukZo932gZ6QUEgQ==,type:str]
sops:
kms: []
gcp_kms:
Expand All @@ -16,8 +19,8 @@ sops:
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-04-19T19:59:21Z"
mac: ENC[AES256_GCM,data:CoM5SNAhJmQ1461Y6GZnOvoHwH3fnmavooexvx3NxLSHLQUx8PmperxRWAfsjlM84CwyYn7h8UiblzCri9eoXp74/gOp+erOH9sHjamtU12riz89nylk4KDKJ1/G6pfFYCkBQtGUSkenH0Hp4EZczXiC0AyCW9JARB83c1FiYYg=,iv:2cLYRhhhYbL5j3Pk/wRDho249es65nqvQPeWtAuW0wY=,tag:eVKf8HhUedgVJk/CGMhXsQ==,type:str]
lastmodified: "2024-06-26T20:43:46Z"
mac: ENC[AES256_GCM,data:vA5G4YCnVAD83YesrcIlZ36C5PI6KK2h+GTKqdDE6G5FexorRSuiFEaX/DLIiXW9TSDMPIqtKrOY4HD/Dd4XCWhnh7sLBeZ9+rQkCj8hzqOK2LnNdRjxf9ghRHa2zMExO2vuqeHScXugdju/bgT7e5ARN1DlKH4mo2riYZebvvE=,iv:fqWN69B0WLaDLDEyCwymwDT44BYojSlFCLgJ3cXar+M=,tag:THtXR4kySzI8/pHwqKUERQ==,type:str]
pgp: []
unencrypted_suffix: _unencrypted
version: 3.7.3
7 changes: 5 additions & 2 deletions secrets/prod/kafka-streamer-env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ SIMPLEHASH_KAFKA_API_SECRET: ENC[AES256_GCM,data:dNCgNhakFfuI8Ky434TKM2wNtWOiun/
SIMPLEHASH_GROUP_ID: ENC[AES256_GCM,data:C27xn+/cfWz1,iv:vaemCg0liT3x6aaBcUKU0+EkDW1ES6m7I/gh6JLxthQ=,tag:OelT4SMDMO5n7wPWwmQ67g==,type:str]
SIMPLEHASH_BOOTSTRAP_SERVERS: ENC[AES256_GCM,data:4te1EEuo9e214YQHswryzgx8pHYLM6iH9eqZknS8A4hJPfyroeWVwjiwUqYUqw==,iv:aiejiGnBaefVHRKU3Pol54hgknA4twkxwJ46hh3A9LE=,tag:QL6Y/2XWWLkjMHOBVo5dmg==,type:str]
SIMPLEHASH_REST_API_KEY: ENC[AES256_GCM,data:h4NuJQSE9N2rbJjxKzIxJiZUNcYSGSC6d/JMgUeI488g,iv:sTVD8pwCPqXCR7pwu8699yWIr2kl7spP68XYZwVCwH8=,tag:F2BFSf7x3J//Cfjr5zXOMg==,type:str]
MOSHICAM_OWNER_PROCESSING_QUEUE: ENC[AES256_GCM,data:zYXkqN2s8ne9vGHtYze71Y9j57WhRO2eo2oCYruBLIojjhU9L1tfcvqbArqxYVl35uLKHlm7fBa52W48b+rUAAU1qWQHtg==,iv:7bXBapocFL51PVzXSYqEfx3pB9h1Pld6MPYnFnHj5yo=,tag:xtJd/CLxRh8NfYkxbruQEA==,type:str]
MOSHICAM_URL: ENC[AES256_GCM,data:ZyoCHlTLUKkPdZsgubXAFPrfHuU0ZcENRD/IDurkeexCp1Gi+Q==,iv:u75MDxg0zSB764SKy6uLH7t9bAD/cXAvQ2LRJX8llbk=,tag:3Brr73NVB9I/kHpy3yNnnw==,type:str]
MOSHCAM_TASK_SECRET: ENC[AES256_GCM,data:wY13AbFCCq3gsVePCnm8kw==,iv:BSBG/1LXuftfFT7vMm5UEGcVgbH8RG6RPwXTBxS16u0=,tag:m3Tdcj6iqYGotLL0Z62eDA==,type:str]
sops:
kms: []
gcp_kms:
Expand All @@ -22,8 +25,8 @@ sops:
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-05-06T15:09:36Z"
mac: ENC[AES256_GCM,data:eNNTdDQ+ruc6uDnFOXR4RA/F0+ytcpTcc+LlPyHTSZbMCAidTX/hjps8KiOFx0Mdc9R/s+Vx9QkpIuOzM78ZnDwy2PyTt2w/+33dZUx2C++AcBIFJWoCNkqc87otZJoWtlqq0uWvpG8CfQH6cOgrffQL0hR8DmCIaO5oWt48MJI=,iv:O7VplWjgmCw6p7YkLyrp0wKvGfhkzwslgVtnqsro4aQ=,tag:QwPY6xIqncuEe5g7f3YAWQ==,type:str]
lastmodified: "2024-06-26T20:42:57Z"
mac: ENC[AES256_GCM,data:Lyd7StXvHaP4k4GU7X2rkGyVXu+Kteo1wT7pGumjhrjPbNrA6CMjTI9TBVANzhmpr87iY3A4O0gRtJ2W4bs0MibDT5P5jA0fTtZIBdhpuUQD3oEKWYX3TPUxmvQSqfxhSpUY9jCIakQVJ1GG2tZA4zN2mGsnwjj0JeLps0koAZ8=,iv:en9J9ECCylTxZ0TOzQ28Gjy4AppqmkPf6sFjROu2plg=,tag:ibClIkCiKWCNTg9IIDMXMA==,type:str]
pgp: []
unencrypted_suffix: _unencrypted
version: 3.7.3
7 changes: 5 additions & 2 deletions secrets/prod/local/app-prod-kafka-streamer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ SIMPLEHASH_KAFKA_API_SECRET: ENC[AES256_GCM,data:/YxmsxIMuDQfRTFZiZCcR9NaG0fkyIH
SIMPLEHASH_GROUP_ID: ENC[AES256_GCM,data:C27xn+/cfWz1,iv:vaemCg0liT3x6aaBcUKU0+EkDW1ES6m7I/gh6JLxthQ=,tag:OelT4SMDMO5n7wPWwmQ67g==,type:str]
SIMPLEHASH_BOOTSTRAP_SERVERS: ENC[AES256_GCM,data:4te1EEuo9e214YQHswryzgx8pHYLM6iH9eqZknS8A4hJPfyroeWVwjiwUqYUqw==,iv:aiejiGnBaefVHRKU3Pol54hgknA4twkxwJ46hh3A9LE=,tag:QL6Y/2XWWLkjMHOBVo5dmg==,type:str]
SIMPLEHASH_REST_API_KEY: ENC[AES256_GCM,data:Y2aS46vXVaIxWXZ6JnZZ354Z+Vib8U2EGOmZdLWUt7sU,iv:nY0IU5zgqnLt5D3n84k3dNVYqqDfLOXWgMYDVKYHcGo=,tag:QZZqi4OyI5uR0ITWsiJJBQ==,type:str]
MOSHICAM_OWNER_PROCESSING_QUEUE: ENC[AES256_GCM,data:vBabHylUv5cI5mb9PueQ3G9a4w8yTJanAbmnBmM6Ynvg+/MFtDTf4pRfSzxsnhQ2kr3AvGLug028Ku0NRy2ARNDHpi69RQ==,iv:mf6325v6XC+GGfkDFeyA0Azm3HHZdrF4WGc5xscuicY=,tag:d47LKSu5pQQTZWNxD0S+OQ==,type:str]
MOSHICAM_URL: ENC[AES256_GCM,data:bcQDmsDcF1kuwT3gRlBfAwEtSMb585ikwS0TUc/yEMs22Wxebg==,iv:ZuV7I0yF4L52PWFh11/XetbLIEbNeEmilItcwFH5sCE=,tag:hIRWAALS/dIxfJri1+FHBw==,type:str]
MOSHCAM_TASK_SECRET: ENC[AES256_GCM,data:jc7Y6C9a7t5WssaUZhUx0w==,iv:9qOXZyBFyWR5jEotmHVRwUES0YTBXPUaX+PQASuA85E=,tag:HOHmvChkHPvXcrwcoVEDHA==,type:str]
sops:
kms: []
gcp_kms:
Expand All @@ -22,8 +25,8 @@ sops:
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-05-06T15:09:52Z"
mac: ENC[AES256_GCM,data:2TyE3zqeGLvAwYTq3kqdmynIFv/HrqrVDfZZJBS/RCn7Y3ylWHXlLhLKSd8J9HKyeo4uM5zDCT75JcsAmV10TcBp2FzFBFLIL0gqrjgUN324wrqpbBix16yM4Q8EQNI8E9e/nclBu6Fs8aZ7P04EfPNt37nJPcI3IOwzyuW/9oA=,iv:XmJ6JI7+TzaBcFuXHDk9iEGcrBEWIeMVF2UegcI3zyY=,tag:EG8iuQcv1IacjZURMWWeXw==,type:str]
lastmodified: "2024-06-26T20:43:27Z"
mac: ENC[AES256_GCM,data:74Cb9Cv+07HKC08HSo8wf5XBnZnbJxsx13ZSZr5sC4gRqTpi/hGjOVvuVmlEeYPLz/o+IwaMjPCfcshM8LU+Xhzg6X/M2LJ1sn4wcGBP5MYm1LWIYFCaKdAPIMpImnVUjJocGv0pSYtq6OeO8Fhle5HAXJp3TtYI8fywpUYe3jE=,iv:iqu+cbz9fyCOP/uqbOJHcB7adoOcstcsJyjxhsp/YQE=,tag:56sAMKpJCJeL7wmsdkwbGA==,type:str]
pgp: []
unencrypted_suffix: _unencrypted
version: 3.7.3
14 changes: 14 additions & 0 deletions service/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
gcptasks "cloud.google.com/go/cloudtasks/apiv2"
taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
"github.com/getsentry/sentry-go"
"github.com/mikeydub/go-gallery/db/gen/mirrordb"
"github.com/mikeydub/go-gallery/service/auth/basicauth"
"google.golang.org/api/option"
"google.golang.org/grpc"
Expand Down Expand Up @@ -84,6 +85,10 @@ type AutosocialPollFarcasterMessage struct {
UserID persist.DBID `form:"user_id" binding:"required"`
}

type MoshicamOwnerProcessingMessage struct {
Entries []mirrordb.ProcessBaseOwnerEntryParams `json:"entries" binding:"required"`
}

type TokenIdentifiersQuantities map[persist.TokenUniqueIdentifiers]persist.HexString

func (t TokenIdentifiersQuantities) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -258,6 +263,15 @@ func (c *Client) CreateTaskForSlackPostFeedBot(ctx context.Context, message Feed
return c.submitTask(ctx, queue, url, withJSON(message), withTrace(span), withBasicAuth(secret), WithDelay(2*time.Minute))
}

func (c *Client) CreateTaskForMoshicamOwnerProcessing(ctx context.Context, message MoshicamOwnerProcessingMessage) error {
span, ctx := tracing.StartSpan(ctx, "cloudtask.create", "createTaskForMoshicamOwnerProcessing")
defer tracing.FinishSpan(span)
queue := env.GetString("MOSHICAM_OWNER_PROCESSING_QUEUE")
url := fmt.Sprintf("%s/tasks/owner-processing", env.GetString("MOSHICAM_URL"))
secret := env.GetString("MOSHICAM_TASK_SECRET")
return c.submitTask(ctx, queue, url, withJSON(message), withBasicAuth(secret))
}

// NewClient returns a new task client with tracing enabled.
func NewClient(ctx context.Context) *Client {
skipQueues := make(map[string]bool)
Expand Down

0 comments on commit b3cefe9

Please sign in to comment.