diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index a837d8d..0e7472a 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -4,7 +4,6 @@ import ( "fmt" "math/big" "sort" - "sync" "time" "github.com/rs/zerolog/log" @@ -151,7 +150,7 @@ func (c *Committer) commit(blockData []common.BlockData) error { log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) // TODO if next parts (saving or deleting) fail, we'll have to do a rollback - if err := c.saveDataToMainStorage(blockData); err != nil { + if err := c.storage.MainStorage.InsertDataForBlocks(blockData); err != nil { log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) return fmt.Errorf("error saving data to main storage: %v", err) } @@ -167,70 +166,6 @@ func (c *Committer) commit(blockData []common.BlockData) error { return nil } -func (c *Committer) saveDataToMainStorage(blockData []common.BlockData) error { - var commitWg sync.WaitGroup - commitWg.Add(4) - - var commitErr error - var commitErrMutex sync.Mutex - - blocks := make([]common.Block, 0, len(blockData)) - logs := make([]common.Log, 0) - transactions := make([]common.Transaction, 0) - traces := make([]common.Trace, 0) - - for _, block := range blockData { - blocks = append(blocks, block.Block) - logs = append(logs, block.Logs...) - transactions = append(transactions, block.Transactions...) - traces = append(traces, block.Traces...) - } - - go func() { - defer commitWg.Done() - if err := c.storage.MainStorage.InsertBlocks(blocks); err != nil { - commitErrMutex.Lock() - commitErr = fmt.Errorf("error inserting blocks: %v", err) - commitErrMutex.Unlock() - } - }() - - go func() { - defer commitWg.Done() - if err := c.storage.MainStorage.InsertLogs(logs); err != nil { - commitErrMutex.Lock() - commitErr = fmt.Errorf("error inserting logs: %v", err) - commitErrMutex.Unlock() - } - }() - - go func() { - defer commitWg.Done() - if err := c.storage.MainStorage.InsertTransactions(transactions); err != nil { - commitErrMutex.Lock() - commitErr = fmt.Errorf("error inserting transactions: %v", err) - commitErrMutex.Unlock() - } - }() - - go func() { - defer commitWg.Done() - if err := c.storage.MainStorage.InsertTraces(traces); err != nil { - commitErrMutex.Lock() - commitErr = fmt.Errorf("error inserting traces: %v", err) - commitErrMutex.Unlock() - } - }() - - commitWg.Wait() - - if commitErr != nil { - return commitErr - } - - return nil -} - func (c *Committer) handleGap(expectedStartBlockNumber *big.Int, actualFirstBlock common.Block) error { // increment the a gap counter in prometheus metrics.GapCounter.Inc() diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index dcd4999..b6f6566 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "strings" + "sync" "time" "github.com/ClickHouse/clickhouse-go/v2" @@ -55,7 +56,7 @@ func connectDB(cfg *config.ClickhouseConfig) (clickhouse.Conn, error) { return conn, nil } -func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error { +func (c *ClickHouseConnector) insertBlocks(blocks []common.Block) error { query := ` INSERT INTO ` + c.cfg.Database + `.blocks ( chain_id, number, timestamp, hash, parent_hash, sha3_uncles, nonce, @@ -100,7 +101,7 @@ func (c *ClickHouseConnector) InsertBlocks(blocks []common.Block) error { return batch.Send() } -func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error { +func (c *ClickHouseConnector) insertTransactions(txs []common.Transaction) error { query := ` INSERT INTO ` + c.cfg.Database + `.transactions ( chain_id, hash, nonce, block_hash, block_number, block_timestamp, transaction_index, @@ -142,7 +143,7 @@ func (c *ClickHouseConnector) InsertTransactions(txs []common.Transaction) error return batch.Send() } -func (c *ClickHouseConnector) InsertLogs(logs []common.Log) error { +func (c *ClickHouseConnector) insertLogs(logs []common.Log) error { query := ` INSERT INTO ` + c.cfg.Database + `.logs ( chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, @@ -663,7 +664,7 @@ func (c *ClickHouseConnector) DeleteBlockData(data []common.BlockData) error { return batch.Send() } -func (c *ClickHouseConnector) InsertTraces(traces []common.Trace) error { +func (c *ClickHouseConnector) insertTraces(traces []common.Trace) error { query := ` INSERT INTO ` + c.cfg.Database + `.traces ( chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, @@ -756,3 +757,78 @@ func (c *ClickHouseConnector) GetTraces(qf QueryFilter) (traces []common.Trace, } return traces, nil } + + +// TODO make this atomic +func (c *ClickHouseConnector) InsertDataForBlocks(data []common.BlockData) error { + blocks := make([]common.Block, 0, len(data)) + logs := make([]common.Log, 0) + transactions := make([]common.Transaction, 0) + traces := make([]common.Trace, 0) + + for _, blockData := range data { + blocks = append(blocks, blockData.Block) + logs = append(logs, blockData.Logs...) + transactions = append(transactions, blockData.Transactions...) + traces = append(traces, blockData.Traces...) + } + + var saveErr error + var saveErrMutex sync.Mutex + var wg sync.WaitGroup + + if len(blocks) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + if err := c.insertBlocks(blocks); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting blocks: %v", err) + saveErrMutex.Unlock() + } + }() + } + + if len(logs) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + if err := c.insertLogs(logs); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting logs: %v", err) + saveErrMutex.Unlock() + } + }() + } + + if len(transactions) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + if err := c.insertTransactions(transactions); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting transactions: %v", err) + saveErrMutex.Unlock() + } + }() + } + + if len(traces) > 0 { + wg.Add(1) + go func() { + defer wg.Done() + if err := c.insertTraces(traces); err != nil { + saveErrMutex.Lock() + saveErr = fmt.Errorf("error deleting traces: %v", err) + saveErrMutex.Unlock() + } + }() + } + + wg.Wait() + + if saveErr != nil { + return saveErr + } + return nil +} diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 6b7af95..27039af 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -48,12 +48,9 @@ type IStagingStorage interface { } type IMainStorage interface { - InsertBlocks(blocks []common.Block) error - InsertTransactions(txs []common.Transaction) error - InsertLogs(logs []common.Log) error - InsertTraces(traces []common.Trace) error + InsertDataForBlocks(data []common.BlockData) error - GetBlocks(qf QueryFilter) (logs []common.Block, err error) + GetBlocks(qf QueryFilter) (blocks []common.Block, err error) GetTransactions(qf QueryFilter) (transactions QueryResult[common.Transaction], err error) GetLogs(qf QueryFilter) (logs QueryResult[common.Log], err error) GetTraces(qf QueryFilter) (traces []common.Trace, err error) diff --git a/internal/storage/memory.go b/internal/storage/memory.go index e55b76d..aed84df 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -73,7 +73,7 @@ func (m *MemoryConnector) DeleteBlockFailures(failures []common.BlockFailure) er return nil } -func (m *MemoryConnector) InsertBlocks(blocks []common.Block) error { +func (m *MemoryConnector) insertBlocks(blocks []common.Block) error { for _, block := range blocks { blockJson, err := json.Marshal(block) if err != nil { @@ -109,7 +109,7 @@ func (m *MemoryConnector) GetBlocks(qf QueryFilter) ([]common.Block, error) { return blocks, nil } -func (m *MemoryConnector) InsertTransactions(txs []common.Transaction) error { +func (m *MemoryConnector) insertTransactions(txs []common.Transaction) error { for _, tx := range txs { txJson, err := json.Marshal(tx) if err != nil { @@ -143,7 +143,7 @@ func (m *MemoryConnector) GetTransactions(qf QueryFilter) ([]common.Transaction, return txs, nil } -func (m *MemoryConnector) InsertLogs(logs []common.Log) error { +func (m *MemoryConnector) insertLogs(logs []common.Log) error { for _, log := range logs { logJson, err := json.Marshal(log) if err != nil { @@ -294,7 +294,7 @@ func (m *MemoryConnector) DeleteBlockData(data []common.BlockData) error { return nil } -func (m *MemoryConnector) InsertTraces(traces []common.Trace) error { +func (m *MemoryConnector) insertTraces(traces []common.Trace) error { for _, trace := range traces { traceJson, err := json.Marshal(trace) if err != nil { @@ -331,3 +331,32 @@ func (m *MemoryConnector) GetTraces(qf QueryFilter) ([]common.Trace, error) { func traceAddressToString(traceAddress []uint64) string { return strings.Trim(strings.Replace(fmt.Sprint(traceAddress), " ", ",", -1), "[]") } + +func (m *MemoryConnector) InsertDataForBlocks(data []common.BlockData) error { + blocks := make([]common.Block, 0, len(data)) + logs := make([]common.Log, 0) + transactions := make([]common.Transaction, 0) + traces := make([]common.Trace, 0) + + for _, blockData := range data { + blocks = append(blocks, blockData.Block) + logs = append(logs, blockData.Logs...) + transactions = append(transactions, blockData.Transactions...) + traces = append(traces, blockData.Traces...) + } + + if err := m.insertBlocks(blocks); err != nil { + return err + } + if err := m.insertLogs(logs); err != nil { + return err + } + if err := m.insertTransactions(transactions); err != nil { + return err + } + if err := m.insertTraces(traces); err != nil { + return err + } + return nil +} +