diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 27ba143..c0fa7db 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -49,7 +49,7 @@ func NewStorageConnector( switch cfg.Driver { case "memory": connector, err := NewMemoryConnector(cfg.Memory) - return IStorage{OrchestratorStorage: connector}, err + return IStorage{OrchestratorStorage: connector, DBStorage: connector}, err case "clickhouse": connector, err := NewClickHouseConnector(cfg.Clickhouse) return IStorage{DBStorage: connector, OrchestratorStorage: connector}, err diff --git a/internal/storage/memory.go b/internal/storage/memory.go index a6e7ccd..b60723b 100644 --- a/internal/storage/memory.go +++ b/internal/storage/memory.go @@ -1,6 +1,7 @@ package storage import ( + "encoding/json" "fmt" "strconv" "strings" @@ -87,3 +88,115 @@ func (m *MemoryConnector) DeleteBlockFailures(failures []common.BlockFailure) er } return nil } + +func (m *MemoryConnector) InsertBlocks(blocks []common.Block) error { + for _, block := range blocks { + blockJson, err := json.Marshal(block) + if err != nil { + return err + } + m.cache.Add(fmt.Sprintf("block:%d", block.Number), string(blockJson)) + } + return nil +} + +func (m *MemoryConnector) GetBlocks(limit int) ([]common.Block, error) { + blocks := []common.Block{} + for _, key := range m.cache.Keys() { + if len(blocks) >= limit { + break + } + if strings.HasPrefix(key, "block:") { + value, ok := m.cache.Get(key) + if ok { + block := common.Block{} + err := json.Unmarshal([]byte(value), &block) + if err != nil { + return nil, err + } + blocks = append(blocks, block) + } + } + } + return blocks, nil +} + +func (m *MemoryConnector) InsertTransactions(txs []common.Transaction) error { + for _, tx := range txs { + txJson, err := json.Marshal(tx) + if err != nil { + return err + } + m.cache.Add(fmt.Sprintf("transaction:%s", tx.Hash), string(txJson)) + } + return nil +} + +func (m *MemoryConnector) GetTransactions(blockNumber uint64, limit int) ([]common.Transaction, error) { + txs := []common.Transaction{} + for _, key := range m.cache.Keys() { + if len(txs) >= limit { + break + } + if strings.HasPrefix(key, fmt.Sprintf("transaction:%d", blockNumber)) { + value, ok := m.cache.Get(key) + if ok { + tx := common.Transaction{} + err := json.Unmarshal([]byte(value), &tx) + if err != nil { + return nil, err + } + txs = append(txs, tx) + } + } + } + return txs, nil +} + +func (m *MemoryConnector) InsertEvents(events []common.Log) error { + for _, event := range events { + eventJson, err := json.Marshal(event) + if err != nil { + return err + } + m.cache.Add(fmt.Sprintf("event:%s-%d", event.TransactionHash, event.Index), string(eventJson)) + } + return nil +} + +func (m *MemoryConnector) GetEvents(blockNumber uint64, limit int) ([]common.Log, error) { + events := []common.Log{} + for _, key := range m.cache.Keys() { + if len(events) >= limit { + break + } + if strings.HasPrefix(key, fmt.Sprintf("event:%d", blockNumber)) { + value, ok := m.cache.Get(key) + if ok { + event := common.Log{} + err := json.Unmarshal([]byte(value), &event) + if err != nil { + return nil, err + } + events = append(events, event) + } + } + } + return events, nil +} + +func (m *MemoryConnector) GetMaxBlockNumber() (uint64, error) { + maxBlockNumber := uint64(0) + for _, key := range m.cache.Keys() { + if strings.HasPrefix(key, "block:") { + blockNumber, err := strconv.ParseUint(strings.Split(key, ":")[1], 10, 64) + if err != nil { + return 0, err + } + if blockNumber > maxBlockNumber { + maxBlockNumber = blockNumber + } + } + } + return maxBlockNumber, nil +}