Skip to content

Commit

Permalink
Forward token messages instead
Browse files Browse the repository at this point in the history
  • Loading branch information
jarrel-b committed Jun 27, 2024
1 parent b3cefe9 commit bd94751
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
30 changes: 20 additions & 10 deletions kafka-streamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

// Enable for debugging; will not commit offsets or write to the database
const readOnlyMode = false
const moshicamMinterAddress = "0xaceb0de9f3efab3c50bf4dc6b14706f119e39dd8"

type streamerConfig struct {
Topic string
Expand Down Expand Up @@ -95,7 +96,7 @@ func newEthereumTokenConfig(deserializer *avro.GenericDeserializer, queries *mir
}
}

func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, taskClient *task.Client) *streamerConfig {
func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseOwnerEntryParams, error) {
ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message)
if err != nil {
Expand All @@ -110,17 +111,13 @@ func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord
return submitOwnerBatch(ctx, queries.ProcessBaseOwnerEntry, entries)
}

postCommitF := func(ctx context.Context, entries []mirrordb.ProcessBaseOwnerEntryParams) error {
return taskClient.CreateTaskForMoshicamOwnerProcessing(ctx, task.MoshicamOwnerProcessingMessage{Entries: entries})
}

return &streamerConfig{
Topic: "base.owner.v4",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF, withPostCommitF(postCommitF)),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig {
func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller, opts ...option[mirrordb.ProcessBaseTokenEntryParams]) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseTokenEntryParams, error) {
ethereumEntry, err := parseTokenMessage(ctx, deserializer, message)
if err != nil {
Expand All @@ -137,10 +134,23 @@ func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord

return &streamerConfig{
Topic: "base.nft.v4",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF, opts...),
}
}

func newMoshicamPostCommitF(taskClient *task.Client) func(context.Context, []mirrordb.ProcessBaseTokenEntryParams) error {
return func(ctx context.Context, entries []mirrordb.ProcessBaseTokenEntryParams) error {
moshiTokens := util.Filter(entries, isMoshicamEntry, false)
return taskClient.CreateTaskForMoshicamOwnerProcessing(ctx, task.MoshicamOwnerProcessingMessage{Entries: moshiTokens})
}
}

func isMoshicamEntry(e mirrordb.ProcessBaseTokenEntryParams) bool {
var c map[string]string
e.Contract.AssignTo(&c)
return strings.ToLower(c["deployed_via_contract"]) == moshicamMinterAddress
}

func newBaseSepoliaOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseSepoliaOwnerEntryParams, error) {
ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message)
Expand Down Expand Up @@ -245,8 +255,8 @@ func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollection
configs := []*streamerConfig{
newEthereumOwnerConfig(deserializer, queries),
newEthereumTokenConfig(deserializer, queries, ccf),
newBaseOwnerConfig(deserializer, queries, tc),
newBaseTokenConfig(deserializer, queries, ccf),
newBaseOwnerConfig(deserializer, queries),
newBaseTokenConfig(deserializer, queries, ccf, withPostCommitF(newMoshicamPostCommitF(tc))),
newBaseSepoliaOwnerConfig(deserializer, queries),
newBaseSepoliaTokenConfig(deserializer, queries, ccf),
//newZoraOwnerConfig(deserializer, queries),
Expand Down
4 changes: 2 additions & 2 deletions service/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type AutosocialPollFarcasterMessage struct {
}

type MoshicamOwnerProcessingMessage struct {
Entries []mirrordb.ProcessBaseOwnerEntryParams `json:"entries" binding:"required"`
Entries []mirrordb.ProcessBaseTokenEntryParams `json:"entries" binding:"required"`
}

type TokenIdentifiersQuantities map[persist.TokenUniqueIdentifiers]persist.HexString
Expand Down Expand Up @@ -267,7 +267,7 @@ func (c *Client) CreateTaskForMoshicamOwnerProcessing(ctx context.Context, messa
span, ctx := tracing.StartSpan(ctx, "cloudtask.create", "createTaskForMoshicamOwnerProcessing")
defer tracing.FinishSpan(span)
queue := env.GetString("MOSHICAM_OWNER_PROCESSING_QUEUE")
url := fmt.Sprintf("%s/tasks/owner-processing", env.GetString("MOSHICAM_URL"))
url := fmt.Sprintf("%s/task/notify/owner-processing", env.GetString("MOSHICAM_URL"))
secret := env.GetString("MOSHICAM_TASK_SECRET")
return c.submitTask(ctx, queue, url, withJSON(message), withBasicAuth(secret))
}
Expand Down

0 comments on commit bd94751

Please sign in to comment.