Skip to content

Commit

Permalink
Process at once events and tx for synchronizer
Browse files Browse the repository at this point in the history
  • Loading branch information
kompotkot committed Jul 2, 2024
1 parent d1dbc98 commit 943d15f
Show file tree
Hide file tree
Showing 14 changed files with 1,350 additions and 86 deletions.
144 changes: 133 additions & 11 deletions blockchain/arbitrum_one/arbitrum_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,8 @@ func ToEntireBlockFromLogProto(obj *ArbitrumOneBlock) *seer_common.BlockJson {
BlockTimestamp: fmt.Sprintf("%d", tx.BlockTimestamp),
AccessList: accessList,
YParity: tx.YParity,

Events: events,
Events: events,
})
}

Expand All @@ -558,10 +558,10 @@ func ToEntireBlockFromLogProto(obj *ArbitrumOneBlock) *seer_common.BlockJson {
BaseFeePerGas: obj.BaseFeePerGas,
IndexedAt: fmt.Sprintf("%d", obj.IndexedAt),

MixHash: obj.MixHash,
SendCount: obj.SendCount,
SendRoot: obj.SendRoot,
L1BlockNumber: fmt.Sprintf("%d", obj.L1BlockNumber),
MixHash: obj.MixHash,
SendCount: obj.SendCount,
SendRoot: obj.SendRoot,
L1BlockNumber: fmt.Sprintf("%d", obj.L1BlockNumber),

Transactions: txs,
}
Expand Down Expand Up @@ -589,10 +589,10 @@ func ToProtoSingleBlock(obj *seer_common.BlockJson) *ArbitrumOneBlock {
TransactionsRoot: obj.TransactionsRoot,
IndexedAt: fromHex(obj.IndexedAt).Uint64(),

MixHash: obj.MixHash,
SendCount: obj.SendCount,
SendRoot: obj.SendRoot,
L1BlockNumber: fromHex(obj.L1BlockNumber).Uint64(),
MixHash: obj.MixHash,
SendCount: obj.SendCount,
SendRoot: obj.SendRoot,
L1BlockNumber: fromHex(obj.L1BlockNumber).Uint64(),
}
}

Expand Down Expand Up @@ -707,7 +707,7 @@ func (c *Client) DecodeProtoBlocks(data []string) ([]*ArbitrumOneBlock, error) {
return blocks, nil
}

func (c *Client) DecodeProtoEntireBlock(rawData *bytes.Buffer) ([]*seer_common.BlockJson, error) {
func (c *Client) DecodeProtoEntireBlockToJson(rawData *bytes.Buffer) ([]*seer_common.BlockJson, error) {
var blocks []*seer_common.BlockJson

for {
Expand All @@ -726,6 +726,128 @@ func (c *Client) DecodeProtoEntireBlock(rawData *bytes.Buffer) ([]*seer_common.B
return blocks, nil
}

func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, blocksCache map[uint64]uint64, abiMap map[string]map[string]map[string]string) ([]indexer.EventLabel, []indexer.TransactionLabel, error) {
var blocks []*ArbitrumOneBlock

for {
block := &ArbitrumOneBlock{}
if unmErr := protodelim.UnmarshalFrom(rawData, block); unmErr != nil {
if unmErr == io.EOF {
break // End of the buffer
}
return nil, nil, unmErr
}

blocks = append(blocks, block)
}

var labels []indexer.EventLabel
var txLabels []indexer.TransactionLabel

for _, b := range blocks {
for _, tx := range b.Transactions {
// Process transaction labels
selector := tx.Input[:10]

if abiMap[tx.ToAddress] != nil {
txContractAbi, err := abi.JSON(strings.NewReader(abiMap[tx.ToAddress][selector]["abi"]))
if err != nil {
return nil, nil, err
}

inputData, err := hex.DecodeString(tx.Input[2:])
if err != nil {
fmt.Println("Error decoding input data: ", err)
return nil, nil, err
}

decodedArgs, err := seer_common.DecodeTransactionInputDataToInterface(&txContractAbi, inputData)
if err != nil {
fmt.Println("Error decoding transaction input data: ", err)
return nil, nil, err
}

txLabelDataBytes, err := json.Marshal(decodedArgs)
if err != nil {
return nil, nil, err
}

// Convert transaction to label
transactionLabel := indexer.TransactionLabel{
Address: tx.ToAddress,
BlockNumber: tx.BlockNumber,
BlockHash: tx.BlockHash,
CallerAddress: tx.FromAddress,
LabelName: abiMap[tx.ToAddress][selector]["abi_name"],
LabelType: "tx_call",
OriginAddress: tx.FromAddress,
Label: indexer.SeerCrawlerLabel,
TransactionHash: tx.Hash,
LabelData: string(txLabelDataBytes), // Convert JSON byte slice to string
BlockTimestamp: blocksCache[tx.BlockNumber],
}

txLabels = append(txLabels, transactionLabel)
}

// Process events
for _, e := range tx.Logs {
var topicSelector string

if len(e.Topics) > 0 {
topicSelector = e.Topics[0]
} else {
// 0x0 is the default topic selector
topicSelector = "0x0"
}

if abiMap[e.Address] == nil {
continue
}

// Get the ABI string
contractAbi, err := abi.JSON(strings.NewReader(abiMap[e.Address][topicSelector]["abi"]))
if err != nil {
fmt.Println("Error initializing contract ABI: ", err)
return nil, nil, err
}

// Decode the event data
decodedArgs, err := seer_common.DecodeLogArgsToLabelData(&contractAbi, e.Topics, e.Data)

if err != nil {
fmt.Println("Error decoding event data: ", err)
return nil, nil, err
}

// Convert decodedArgs map to JSON
labelDataBytes, err := json.Marshal(decodedArgs)
if err != nil {
return nil, nil, err
}

// Convert event to label
eventLabel := indexer.EventLabel{
Label: indexer.SeerCrawlerLabel,
LabelName: abiMap[e.Address][topicSelector]["abi_name"],
LabelType: "event",
BlockNumber: e.BlockNumber,
BlockHash: e.BlockHash,
Address: e.Address,
TransactionHash: e.TransactionHash,
LabelData: string(labelDataBytes), // Convert JSON byte slice to string
BlockTimestamp: blocksCache[e.BlockNumber],
LogIndex: e.LogIndex,
}

labels = append(labels, eventLabel)
}
}
}

return labels, txLabels, nil
}

func (c *Client) DecodeProtoEventsToLabels(rawData *bytes.Buffer, blocksCache map[uint64]uint64, abiMap map[string]map[string]map[string]string) ([]indexer.EventLabel, error) {
var protoEvents []*ArbitrumOneEventLog
for {
Expand Down
144 changes: 133 additions & 11 deletions blockchain/arbitrum_sepolia/arbitrum_sepolia.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,8 @@ func ToEntireBlockFromLogProto(obj *ArbitrumSepoliaBlock) *seer_common.BlockJson
BlockTimestamp: fmt.Sprintf("%d", tx.BlockTimestamp),
AccessList: accessList,
YParity: tx.YParity,

Events: events,
Events: events,
})
}

Expand All @@ -558,10 +558,10 @@ func ToEntireBlockFromLogProto(obj *ArbitrumSepoliaBlock) *seer_common.BlockJson
BaseFeePerGas: obj.BaseFeePerGas,
IndexedAt: fmt.Sprintf("%d", obj.IndexedAt),

MixHash: obj.MixHash,
SendCount: obj.SendCount,
SendRoot: obj.SendRoot,
L1BlockNumber: fmt.Sprintf("%d", obj.L1BlockNumber),
MixHash: obj.MixHash,
SendCount: obj.SendCount,
SendRoot: obj.SendRoot,
L1BlockNumber: fmt.Sprintf("%d", obj.L1BlockNumber),

Transactions: txs,
}
Expand Down Expand Up @@ -589,10 +589,10 @@ func ToProtoSingleBlock(obj *seer_common.BlockJson) *ArbitrumSepoliaBlock {
TransactionsRoot: obj.TransactionsRoot,
IndexedAt: fromHex(obj.IndexedAt).Uint64(),

MixHash: obj.MixHash,
SendCount: obj.SendCount,
SendRoot: obj.SendRoot,
L1BlockNumber: fromHex(obj.L1BlockNumber).Uint64(),
MixHash: obj.MixHash,
SendCount: obj.SendCount,
SendRoot: obj.SendRoot,
L1BlockNumber: fromHex(obj.L1BlockNumber).Uint64(),
}
}

Expand Down Expand Up @@ -707,7 +707,7 @@ func (c *Client) DecodeProtoBlocks(data []string) ([]*ArbitrumSepoliaBlock, erro
return blocks, nil
}

func (c *Client) DecodeProtoEntireBlock(rawData *bytes.Buffer) ([]*seer_common.BlockJson, error) {
func (c *Client) DecodeProtoEntireBlockToJson(rawData *bytes.Buffer) ([]*seer_common.BlockJson, error) {
var blocks []*seer_common.BlockJson

for {
Expand All @@ -726,6 +726,128 @@ func (c *Client) DecodeProtoEntireBlock(rawData *bytes.Buffer) ([]*seer_common.B
return blocks, nil
}

func (c *Client) DecodeProtoEntireBlockToLabels(rawData *bytes.Buffer, blocksCache map[uint64]uint64, abiMap map[string]map[string]map[string]string) ([]indexer.EventLabel, []indexer.TransactionLabel, error) {
var blocks []*ArbitrumSepoliaBlock

for {
block := &ArbitrumSepoliaBlock{}
if unmErr := protodelim.UnmarshalFrom(rawData, block); unmErr != nil {
if unmErr == io.EOF {
break // End of the buffer
}
return nil, nil, unmErr
}

blocks = append(blocks, block)
}

var labels []indexer.EventLabel
var txLabels []indexer.TransactionLabel

for _, b := range blocks {
for _, tx := range b.Transactions {
// Process transaction labels
selector := tx.Input[:10]

if abiMap[tx.ToAddress] != nil {
txContractAbi, err := abi.JSON(strings.NewReader(abiMap[tx.ToAddress][selector]["abi"]))
if err != nil {
return nil, nil, err
}

inputData, err := hex.DecodeString(tx.Input[2:])
if err != nil {
fmt.Println("Error decoding input data: ", err)
return nil, nil, err
}

decodedArgs, err := seer_common.DecodeTransactionInputDataToInterface(&txContractAbi, inputData)
if err != nil {
fmt.Println("Error decoding transaction input data: ", err)
return nil, nil, err
}

txLabelDataBytes, err := json.Marshal(decodedArgs)
if err != nil {
return nil, nil, err
}

// Convert transaction to label
transactionLabel := indexer.TransactionLabel{
Address: tx.ToAddress,
BlockNumber: tx.BlockNumber,
BlockHash: tx.BlockHash,
CallerAddress: tx.FromAddress,
LabelName: abiMap[tx.ToAddress][selector]["abi_name"],
LabelType: "tx_call",
OriginAddress: tx.FromAddress,
Label: indexer.SeerCrawlerLabel,
TransactionHash: tx.Hash,
LabelData: string(txLabelDataBytes), // Convert JSON byte slice to string
BlockTimestamp: blocksCache[tx.BlockNumber],
}

txLabels = append(txLabels, transactionLabel)
}

// Process events
for _, e := range tx.Logs {
var topicSelector string

if len(e.Topics) > 0 {
topicSelector = e.Topics[0]
} else {
// 0x0 is the default topic selector
topicSelector = "0x0"
}

if abiMap[e.Address] == nil {
continue
}

// Get the ABI string
contractAbi, err := abi.JSON(strings.NewReader(abiMap[e.Address][topicSelector]["abi"]))
if err != nil {
fmt.Println("Error initializing contract ABI: ", err)
return nil, nil, err
}

// Decode the event data
decodedArgs, err := seer_common.DecodeLogArgsToLabelData(&contractAbi, e.Topics, e.Data)

if err != nil {
fmt.Println("Error decoding event data: ", err)
return nil, nil, err
}

// Convert decodedArgs map to JSON
labelDataBytes, err := json.Marshal(decodedArgs)
if err != nil {
return nil, nil, err
}

// Convert event to label
eventLabel := indexer.EventLabel{
Label: indexer.SeerCrawlerLabel,
LabelName: abiMap[e.Address][topicSelector]["abi_name"],
LabelType: "event",
BlockNumber: e.BlockNumber,
BlockHash: e.BlockHash,
Address: e.Address,
TransactionHash: e.TransactionHash,
LabelData: string(labelDataBytes), // Convert JSON byte slice to string
BlockTimestamp: blocksCache[e.BlockNumber],
LogIndex: e.LogIndex,
}

labels = append(labels, eventLabel)
}
}
}

return labels, txLabels, nil
}

func (c *Client) DecodeProtoEventsToLabels(rawData *bytes.Buffer, blocksCache map[uint64]uint64, abiMap map[string]map[string]map[string]string) ([]indexer.EventLabel, error) {
var protoEvents []*ArbitrumSepoliaEventLog
for {
Expand Down
Loading

0 comments on commit 943d15f

Please sign in to comment.