Skip to content

Commit

Permalink
Use Base types directly instead of casting from Ethereum types (#1535)
Browse files Browse the repository at this point in the history
  • Loading branch information
radazen authored Aug 1, 2024
1 parent 5b26eac commit cc07547
Showing 1 changed file with 33 additions and 167 deletions.
200 changes: 33 additions & 167 deletions kafka-streamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,45 +65,9 @@ func main() {
}
}

func newEthereumOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessEthereumOwnerEntryParams, error) {
return parseOwnerMessage(ctx, deserializer, message)
}

submitF := func(ctx context.Context, entries []mirrordb.ProcessEthereumOwnerEntryParams) error {
return submitOwnerBatch(ctx, queries.ProcessEthereumOwnerEntry, entries)
}

return &streamerConfig{
Topic: "ethereum.owner.v4",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

func newEthereumTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessEthereumTokenEntryParams, error) {
return parseTokenMessage(ctx, deserializer, message)
}

submitF := func(ctx context.Context, entries []mirrordb.ProcessEthereumTokenEntryParams) error {
return submitTokenBatch(ctx, queries.ProcessEthereumTokenEntry, entries, ccf)
}

return &streamerConfig{
Topic: "ethereum.nft.v4",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseOwnerEntryParams, error) {
ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message)
if err != nil {
return mirrordb.ProcessBaseOwnerEntryParams{}, err
}

// All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters
return mirrordb.ProcessBaseOwnerEntryParams(ethereumEntry), nil
return parseOwnerMessage(ctx, deserializer, message)
}

submitF := func(ctx context.Context, entries []mirrordb.ProcessBaseOwnerEntryParams) error {
Expand All @@ -118,13 +82,7 @@ func newBaseOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrord

func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseTokenEntryParams, error) {
ethereumEntry, err := parseTokenMessage(ctx, deserializer, message)
if err != nil {
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

// All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters
return mirrordb.ProcessBaseTokenEntryParams(ethereumEntry), nil
return parseTokenMessage(ctx, deserializer, message)
}

submitF := func(ctx context.Context, entries []mirrordb.ProcessBaseTokenEntryParams) error {
Expand All @@ -137,92 +95,6 @@ func newBaseTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrord
}
}

func newBaseSepoliaOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseSepoliaOwnerEntryParams, error) {
ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message)
if err != nil {
return mirrordb.ProcessBaseSepoliaOwnerEntryParams{}, err
}

// All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters
return mirrordb.ProcessBaseSepoliaOwnerEntryParams(ethereumEntry), nil
}

submitF := func(ctx context.Context, entries []mirrordb.ProcessBaseSepoliaOwnerEntryParams) error {
return submitOwnerBatch(ctx, queries.ProcessBaseSepoliaOwnerEntry, entries)
}

return &streamerConfig{
Topic: "base-sepolia.owner.v4",
DefaultOffset: "latest",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

func newBaseSepoliaTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessBaseSepoliaTokenEntryParams, error) {
ethereumEntry, err := parseTokenMessage(ctx, deserializer, message)
if err != nil {
return mirrordb.ProcessBaseSepoliaTokenEntryParams{}, err
}

// All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters
return mirrordb.ProcessBaseSepoliaTokenEntryParams(ethereumEntry), nil
}

submitF := func(ctx context.Context, entries []mirrordb.ProcessBaseSepoliaTokenEntryParams) error {
return submitTokenBatch(ctx, queries.ProcessBaseSepoliaTokenEntry, entries, ccf)
}

return &streamerConfig{
Topic: "base-sepolia.nft.v4",
DefaultOffset: "latest",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

func newZoraOwnerConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessZoraOwnerEntryParams, error) {
ethereumEntry, err := parseOwnerMessage(ctx, deserializer, message)
if err != nil {
return mirrordb.ProcessZoraOwnerEntryParams{}, err
}

// All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters
return mirrordb.ProcessZoraOwnerEntryParams(ethereumEntry), nil
}

submitF := func(ctx context.Context, entries []mirrordb.ProcessZoraOwnerEntryParams) error {
return submitOwnerBatch(ctx, queries.ProcessZoraOwnerEntry, entries)
}

return &streamerConfig{
Topic: "zora.owner.v4",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

func newZoraTokenConfig(deserializer *avro.GenericDeserializer, queries *mirrordb.Queries, ccf *contractCollectionFiller) *streamerConfig {
parseF := func(ctx context.Context, message *kafka.Message) (mirrordb.ProcessZoraTokenEntryParams, error) {
ethereumEntry, err := parseTokenMessage(ctx, deserializer, message)
if err != nil {
return mirrordb.ProcessZoraTokenEntryParams{}, err
}

// All EVM chains (Ethereum, Base, Zora) have the same database and query structure, so we can cast between their parameters
return mirrordb.ProcessZoraTokenEntryParams(ethereumEntry), nil
}

submitF := func(ctx context.Context, entries []mirrordb.ProcessZoraTokenEntryParams) error {
return submitTokenBatch(ctx, queries.ProcessZoraTokenEntry, entries, ccf)
}

return &streamerConfig{
Topic: "zora.nft.v4",
Batcher: newMessageBatcher(250, time.Second, 10, parseF, submitF),
}
}

func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollectionFiller) {
deserializer, err := newDeserializerFromRegistry()
if err != nil {
Expand All @@ -239,14 +111,8 @@ func runStreamer(ctx context.Context, pgx *pgxpool.Pool, ccf *contractCollection

// Creating multiple configs for each topic allows them to process separate partitions in parallel
configs := []*streamerConfig{
//newEthereumOwnerConfig(deserializer, queries),
//newEthereumTokenConfig(deserializer, queries, ccf),
newBaseOwnerConfig(deserializer, queries),
newBaseTokenConfig(deserializer, queries, ccf),
//newBaseSepoliaOwnerConfig(deserializer, queries),
//newBaseSepoliaTokenConfig(deserializer, queries, ccf),
//newZoraOwnerConfig(deserializer, queries),
//newZoraTokenConfig(deserializer, queries, ccf),
}

// If any topic errors more than 10 times in 10 minutes, panic and restart the whole service
Expand Down Expand Up @@ -530,50 +396,50 @@ func parseTimestamp(s *string) (*time.Time, error) {
return &ts, nil
}

func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializer, msg *kafka.Message) (mirrordb.ProcessEthereumOwnerEntryParams, error) {
func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializer, msg *kafka.Message) (mirrordb.ProcessBaseOwnerEntryParams, error) {
key := string(msg.Key)

owner := ethereum.Owner{}
err := deserializer.DeserializeInto(*msg.TopicPartition.Topic, msg.Value, &owner)
if err != nil {
return mirrordb.ProcessEthereumOwnerEntryParams{}, fmt.Errorf("failed to deserialize owner message with key %s: %w", key, err)
return mirrordb.ProcessBaseOwnerEntryParams{}, fmt.Errorf("failed to deserialize owner message with key %s: %w", key, err)
}

actionType, err := getActionType(msg)
if err != nil {
err = fmt.Errorf("failed to get action type for msg: %v", msg)
return mirrordb.ProcessEthereumOwnerEntryParams{}, err
return mirrordb.ProcessBaseOwnerEntryParams{}, err
}

contractAddress, tokenID, err := parseNftID(owner.Nft_id)
if err != nil {
return mirrordb.ProcessEthereumOwnerEntryParams{}, fmt.Errorf("error parsing NftID: %w", err)
return mirrordb.ProcessBaseOwnerEntryParams{}, fmt.Errorf("error parsing NftID: %w", err)
}

// TODO: Same as above, we'll want to normalize the address based on the chain at some point
walletAddress := strings.ToLower(owner.Owner_address)

quantity, err := parseNumeric(owner.Quantity)
if err != nil {
return mirrordb.ProcessEthereumOwnerEntryParams{}, err
return mirrordb.ProcessBaseOwnerEntryParams{}, err
}

firstAcquiredDate, err := parseTimestamp(owner.First_acquired_date)
if err != nil {
err = fmt.Errorf("failed to parse First_acquired_date: %w", err)
return mirrordb.ProcessEthereumOwnerEntryParams{}, err
return mirrordb.ProcessBaseOwnerEntryParams{}, err
}

lastAcquiredDate, err := parseTimestamp(owner.Last_acquired_date)
if err != nil {
err = fmt.Errorf("failed to parse Last_acquired_date: %w", err)
return mirrordb.ProcessEthereumOwnerEntryParams{}, err
return mirrordb.ProcessBaseOwnerEntryParams{}, err
}

var params mirrordb.ProcessEthereumOwnerEntryParams
var params mirrordb.ProcessBaseOwnerEntryParams

if actionType == "delete" {
params = mirrordb.ProcessEthereumOwnerEntryParams{
params = mirrordb.ProcessBaseOwnerEntryParams{
ShouldDelete: true,
SimplehashKafkaKey: key,

Expand All @@ -584,7 +450,7 @@ func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializ
}
} else {
if actionType == "insert" || actionType == "update" {
params = mirrordb.ProcessEthereumOwnerEntryParams{
params = mirrordb.ProcessBaseOwnerEntryParams{
ShouldUpsert: true,
SimplehashKafkaKey: key,
SimplehashNftID: &owner.Nft_id,
Expand Down Expand Up @@ -612,19 +478,19 @@ func parseOwnerMessage(ctx context.Context, deserializer *avro.GenericDeserializ
return params, nil
}

func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializer, msg *kafka.Message) (mirrordb.ProcessEthereumTokenEntryParams, error) {
func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializer, msg *kafka.Message) (mirrordb.ProcessBaseTokenEntryParams, error) {
key := string(msg.Key)

nft := ethereum.Nft{}
err := deserializer.DeserializeInto(*msg.TopicPartition.Topic, msg.Value, &nft)
if err != nil {
return mirrordb.ProcessEthereumTokenEntryParams{}, fmt.Errorf("failed to deserialize token message with key %s: %w", key, err)
return mirrordb.ProcessBaseTokenEntryParams{}, fmt.Errorf("failed to deserialize token message with key %s: %w", key, err)
}

actionType, err := getActionType(msg)
if err != nil {
err = fmt.Errorf("failed to get action type for msg: %v", msg)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

// If the NftID is not set, we can try to construct it from the chain, contract_address, and token_id
Expand All @@ -634,37 +500,37 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ

contractAddress, tokenID, err := parseNftID(nft.Nft_id)
if err != nil {
return mirrordb.ProcessEthereumTokenEntryParams{}, fmt.Errorf("error parsing NftID: %w", err)
return mirrordb.ProcessBaseTokenEntryParams{}, fmt.Errorf("error parsing NftID: %w", err)
}

previews, err := toJSONB(nft.Previews)
if err != nil {
err = fmt.Errorf("failed to convert Previews to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

contract, err := toJSONB(nft.Contract)
if err != nil {
err = fmt.Errorf("failed to convert Contract to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

lastSale, err := toJSONB(nft.Last_sale)
if err != nil {
err = fmt.Errorf("failed to convert Last_sale to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

firstCreated, err := toJSONB(nft.First_created)
if err != nil {
err = fmt.Errorf("failed to convert First_created to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

rarity, err := toJSONB(nft.Rarity)
if err != nil {
err = fmt.Errorf("failed to convert Rarity to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

extraMetadataJsonb, err := cleanJSONB(nft.Extra_metadata, true)
Expand All @@ -675,50 +541,50 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ
extraMetadataJsonb = pgtype.JSONB{Status: pgtype.Null}
} else {
err = fmt.Errorf("failed to convert Extra_metadata to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}
}

imageProperties, err := toJSONB(nft.Image_properties)
if err != nil {
err = fmt.Errorf("failed to convert Image_properties to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

videoProperties, err := toJSONB(nft.Video_properties)
if err != nil {
err = fmt.Errorf("failed to convert Video_properties to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

audioProperties, err := toJSONB(nft.Audio_properties)
if err != nil {
err = fmt.Errorf("failed to convert Audio_properties to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

modelProperties, err := toJSONB(nft.Model_properties)
if err != nil {
err = fmt.Errorf("failed to convert Model_properties to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

otherProperties, err := toJSONB(nft.Other_properties)
if err != nil {
err = fmt.Errorf("failed to convert Other_properties to JSONB: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

tokenCount, err := parseNumeric(nft.Token_count)
if err != nil {
err = fmt.Errorf("failed to parse Token_count: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

ownerCount, err := parseNumeric(nft.Owner_count)
if err != nil {
err = fmt.Errorf("failed to parse Owner_count: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

var collectionID *string
Expand All @@ -729,13 +595,13 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ
onChainCreatedDate, err := parseTimestamp(nft.Created_date)
if err != nil {
err = fmt.Errorf("failed to parse Created_date: %w", err)
return mirrordb.ProcessEthereumTokenEntryParams{}, err
return mirrordb.ProcessBaseTokenEntryParams{}, err
}

var params mirrordb.ProcessEthereumTokenEntryParams
var params mirrordb.ProcessBaseTokenEntryParams

if actionType == "delete" {
params = mirrordb.ProcessEthereumTokenEntryParams{
params = mirrordb.ProcessBaseTokenEntryParams{
ShouldDelete: true,
SimplehashKafkaKey: key,

Expand All @@ -757,7 +623,7 @@ func parseTokenMessage(ctx context.Context, deserializer *avro.GenericDeserializ
}
} else {
if actionType == "insert" || actionType == "update" {
params = mirrordb.ProcessEthereumTokenEntryParams{
params = mirrordb.ProcessBaseTokenEntryParams{
ShouldUpsert: true,
SimplehashKafkaKey: key,
SimplehashNftID: nft.Nft_id,
Expand Down

0 comments on commit cc07547

Please sign in to comment.