diff --git a/kafka-streamer/main.go b/kafka-streamer/main.go index d8d747cba..128ad20d4 100644 --- a/kafka-streamer/main.go +++ b/kafka-streamer/main.go @@ -65,45 +65,9 @@ func main() { } } -func newEthereumOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig { - parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessEthereumOwnerEntryParams, error) { - return parseOwnerMessage(ctx, deserializer, message) - } - - submitF := func(ctx context.Context, entries []mirrordb.ProcessEthereumOwnerEntryParams) error { - return submitOwnerBatch(ctx, queries.ProcessEthereumOwnerEntry, entries) - } - - return &streamerConfig{ - Topic: "ethereum.owner.v4", - Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), - } -} - -func newEthereumTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig { - parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessEthereumTokenEntryParams, error) { - return parseTokenMessage(ctx, deserializer, message) - } - - submitF := func(ctx context.Context, entries []mirrordb.ProcessEthereumTokenEntryParams) error { - return submitTokenBatch(ctx, queries.ProcessEthereumTokenEntry, entries, ccf) - } - - return &streamerConfig{ - Topic: "ethereum.nft.v4", - Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), - } -} - func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig { parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseOwnerEntryParams, error) { - ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message) - if err != nil { - return mirrordb.ProcessBaseOwnerEntryParams{}, err - } - - // All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters - return mirrordb.ProcessBaseOwnerEntryParams(ethereumEntry), nil + return parseOwnerMessage(ctx, deserializer, message) } submitF := func(ctx context.Context, entries []mirrordb.ProcessBaseOwnerEntryParams) error { @@ -118,13 +82,7 @@ func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig { parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseTokenEntryParams, error) { - ethereumEntry, err := parseTokenMessage(ctx, deserializer, message) - if err != nil { - return mirrordb.ProcessBaseTokenEntryParams{}, err - } - - // All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters - return mirrordb.ProcessBaseTokenEntryParams(ethereumEntry), nil + return parseTokenMessage(ctx, deserializer, message) } submitF := func(ctx context.Context, entries []mirrordb.ProcessBaseTokenEntryParams) error { @@ -137,92 +95,6 @@ func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord } } -func newBaseSepoliaOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig { - parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseSepoliaOwnerEntryParams, error) { - ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message) - if err != nil { - return mirrordb.ProcessBaseSepoliaOwnerEntryParams{}, err - } - - // All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters - return mirrordb.ProcessBaseSepoliaOwnerEntryParams(ethereumEntry), nil - } - - submitF := func(ctx context.Context, entries []mirrordb.ProcessBaseSepoliaOwnerEntryParams) error { - return submitOwnerBatch(ctx, queries.ProcessBaseSepoliaOwnerEntry, entries) - } - - return &streamerConfig{ - Topic: "base-sepolia.owner.v4", - DefaultOffset: "latest", - Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), - } -} - -func newBaseSepoliaTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig { - parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseSepoliaTokenEntryParams, error) { - ethereumEntry, err := parseTokenMessage(ctx, deserializer, message) - if err != nil { - return mirrordb.ProcessBaseSepoliaTokenEntryParams{}, err - } - - // All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters - return mirrordb.ProcessBaseSepoliaTokenEntryParams(ethereumEntry), nil - } - - submitF := func(ctx context.Context, entries []mirrordb.ProcessBaseSepoliaTokenEntryParams) error { - return submitTokenBatch(ctx, queries.ProcessBaseSepoliaTokenEntry, entries, ccf) - } - - return &streamerConfig{ - Topic: "base-sepolia.nft.v4", - DefaultOffset: "latest", - Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), - } -} - -func newZoraOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig { - parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessZoraOwnerEntryParams, error) { - ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message) - if err != nil { - return mirrordb.ProcessZoraOwnerEntryParams{}, err - } - - // All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters - return mirrordb.ProcessZoraOwnerEntryParams(ethereumEntry), nil - } - - submitF := func(ctx context.Context, entries []mirrordb.ProcessZoraOwnerEntryParams) error { - return submitOwnerBatch(ctx, queries.ProcessZoraOwnerEntry, entries) - } - - return &streamerConfig{ - Topic: "zora.owner.v4", - Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), - } -} - -func newZoraTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig { - parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessZoraTokenEntryParams, error) { - ethereumEntry, err := parseTokenMessage(ctx, deserializer, message) - if err != nil { - return mirrordb.ProcessZoraTokenEntryParams{}, err - } - - // All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters - return mirrordb.ProcessZoraTokenEntryParams(ethereumEntry), nil - } - - submitF := func(ctx context.Context, entries []mirrordb.ProcessZoraTokenEntryParams) error { - return submitTokenBatch(ctx, queries.ProcessZoraTokenEntry, entries, ccf) - } - - return &streamerConfig{ - Topic: "zora.nft.v4", - Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF), - } -} - func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollectionFiller) { deserializer, err := newDeserializerFromRegistry() if err != nil { @@ -239,14 +111,8 @@ func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollection // Creating multiple configs for each topic allows them to process separate partitions in parallel configs := []*streamerConfig{ - //newEthereumOwnerConfig(deserializer, queries), - //newEthereumTokenConfig(deserializer, queries, ccf), newBaseOwnerConfig(deserializer, queries), newBaseTokenConfig(deserializer, queries, ccf), - //newBaseSepoliaOwnerConfig(deserializer, queries), - //newBaseSepoliaTokenConfig(deserializer, queries, ccf), - //newZoraOwnerConfig(deserializer, queries), - //newZoraTokenConfig(deserializer, queries, ccf), } // If any topic errors more than 10 times in 10 minutes, panic and restart the whole service @@ -530,24 +396,24 @@ func parseTimestamp(s *string) (*time.Time, error) { return &ts, nil } -func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializer, msg *kafka.Message) (mirrordb.ProcessEthereumOwnerEntryParams, error) { +func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializer, msg *kafka.Message) (mirrordb.ProcessBaseOwnerEntryParams, error) { key := string(msg.Key) owner := ethereum.Owner{} err := deserializer.DeserializeInto(*msg.TopicPartition.Topic, msg.Value, &owner) if err != nil { - return mirrordb.ProcessEthereumOwnerEntryParams{}, fmt.Errorf("failed to deserialize owner message with key %s: %w", key, err) + return mirrordb.ProcessBaseOwnerEntryParams{}, fmt.Errorf("failed to deserialize owner message with key %s: %w", key, err) } actionType, err := getActionType(msg) if err != nil { err = fmt.Errorf("failed to get action type for msg: %v", msg) - return mirrordb.ProcessEthereumOwnerEntryParams{}, err + return mirrordb.ProcessBaseOwnerEntryParams{}, err } contractAddress, tokenID, err := parseNftID(owner.Nft_id) if err != nil { - return mirrordb.ProcessEthereumOwnerEntryParams{}, fmt.Errorf("error parsing NftID: %w", err) + return mirrordb.ProcessBaseOwnerEntryParams{}, fmt.Errorf("error parsing NftID: %w", err) } // TODO: Same as above, we'll want to normalize the address based on the chain at some point @@ -555,25 +421,25 @@ func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializ quantity, err := parseNumeric(owner.Quantity) if err != nil { - return mirrordb.ProcessEthereumOwnerEntryParams{}, err + return mirrordb.ProcessBaseOwnerEntryParams{}, err } firstAcquiredDate, err := parseTimestamp(owner.First_acquired_date) if err != nil { err = fmt.Errorf("failed to parse First_acquired_date: %w", err) - return mirrordb.ProcessEthereumOwnerEntryParams{}, err + return mirrordb.ProcessBaseOwnerEntryParams{}, err } lastAcquiredDate, err := parseTimestamp(owner.Last_acquired_date) if err != nil { err = fmt.Errorf("failed to parse Last_acquired_date: %w", err) - return mirrordb.ProcessEthereumOwnerEntryParams{}, err + return mirrordb.ProcessBaseOwnerEntryParams{}, err } - var params mirrordb.ProcessEthereumOwnerEntryParams + var params mirrordb.ProcessBaseOwnerEntryParams if actionType == "delete" { - params = mirrordb.ProcessEthereumOwnerEntryParams{ + params = mirrordb.ProcessBaseOwnerEntryParams{ ShouldDelete: true, SimplehashKafkaKey: key, @@ -584,7 +450,7 @@ func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializ } } else { if actionType == "insert" || actionType == "update" { - params = mirrordb.ProcessEthereumOwnerEntryParams{ + params = mirrordb.ProcessBaseOwnerEntryParams{ ShouldUpsert: true, SimplehashKafkaKey: key, SimplehashNftID: &owner.Nft_id, @@ -612,19 +478,19 @@ func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializ return params, nil } -func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializer, msg *kafka.Message) (mirrordb.ProcessEthereumTokenEntryParams, error) { +func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializer, msg *kafka.Message) (mirrordb.ProcessBaseTokenEntryParams, error) { key := string(msg.Key) nft := ethereum.Nft{} err := deserializer.DeserializeInto(*msg.TopicPartition.Topic, msg.Value, &nft) if err != nil { - return mirrordb.ProcessEthereumTokenEntryParams{}, fmt.Errorf("failed to deserialize token message with key %s: %w", key, err) + return mirrordb.ProcessBaseTokenEntryParams{}, fmt.Errorf("failed to deserialize token message with key %s: %w", key, err) } actionType, err := getActionType(msg) if err != nil { err = fmt.Errorf("failed to get action type for msg: %v", msg) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } // If the NftID is not set, we can try to construct it from the chain, contract_address, and token_id @@ -634,37 +500,37 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ contractAddress, tokenID, err := parseNftID(nft.Nft_id) if err != nil { - return mirrordb.ProcessEthereumTokenEntryParams{}, fmt.Errorf("error parsing NftID: %w", err) + return mirrordb.ProcessBaseTokenEntryParams{}, fmt.Errorf("error parsing NftID: %w", err) } previews, err := toJSONB(nft.Previews) if err != nil { err = fmt.Errorf("failed to convert Previews to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } contract, err := toJSONB(nft.Contract) if err != nil { err = fmt.Errorf("failed to convert Contract to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } lastSale, err := toJSONB(nft.Last_sale) if err != nil { err = fmt.Errorf("failed to convert Last_sale to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } firstCreated, err := toJSONB(nft.First_created) if err != nil { err = fmt.Errorf("failed to convert First_created to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } rarity, err := toJSONB(nft.Rarity) if err != nil { err = fmt.Errorf("failed to convert Rarity to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } extraMetadataJsonb, err := cleanJSONB(nft.Extra_metadata, true) @@ -675,50 +541,50 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ extraMetadataJsonb = pgtype.JSONB{Status: pgtype.Null} } else { err = fmt.Errorf("failed to convert Extra_metadata to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } } imageProperties, err := toJSONB(nft.Image_properties) if err != nil { err = fmt.Errorf("failed to convert Image_properties to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } videoProperties, err := toJSONB(nft.Video_properties) if err != nil { err = fmt.Errorf("failed to convert Video_properties to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } audioProperties, err := toJSONB(nft.Audio_properties) if err != nil { err = fmt.Errorf("failed to convert Audio_properties to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } modelProperties, err := toJSONB(nft.Model_properties) if err != nil { err = fmt.Errorf("failed to convert Model_properties to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } otherProperties, err := toJSONB(nft.Other_properties) if err != nil { err = fmt.Errorf("failed to convert Other_properties to JSONB: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } tokenCount, err := parseNumeric(nft.Token_count) if err != nil { err = fmt.Errorf("failed to parse Token_count: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } ownerCount, err := parseNumeric(nft.Owner_count) if err != nil { err = fmt.Errorf("failed to parse Owner_count: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } var collectionID *string @@ -729,13 +595,13 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ onChainCreatedDate, err := parseTimestamp(nft.Created_date) if err != nil { err = fmt.Errorf("failed to parse Created_date: %w", err) - return mirrordb.ProcessEthereumTokenEntryParams{}, err + return mirrordb.ProcessBaseTokenEntryParams{}, err } - var params mirrordb.ProcessEthereumTokenEntryParams + var params mirrordb.ProcessBaseTokenEntryParams if actionType == "delete" { - params = mirrordb.ProcessEthereumTokenEntryParams{ + params = mirrordb.ProcessBaseTokenEntryParams{ ShouldDelete: true, SimplehashKafkaKey: key, @@ -757,7 +623,7 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ } } else { if actionType == "insert" || actionType == "update" { - params = mirrordb.ProcessEthereumTokenEntryParams{ + params = mirrordb.ProcessBaseTokenEntryParams{ ShouldUpsert: true, SimplehashKafkaKey: key, SimplehashNftID: nft.Nft_id,