Skip to content

Commit

Permalink
Refactor storage interface and implement query filtering (#20)
Browse files Browse the repository at this point in the history
### TL;DR

Refactored storage system to support multiple storage instances and improved query filtering.

Note that intellisense won't show it if a storageConnector is not implementing the needed interface. But it will throw at runtime. This change was made to not have the switch clause duplicated. If that's not important, we can create multiple functions (one for each interface) so that editors can understand type inference

### What changed?

- Introduced `StorageConfig` with separate configurations for Main, Staging, and Orchestrator storage.
- Added `QueryFilter` struct for more flexible querying of blocks, transactions, and events.
- Updated `IStorage` interface to include separate storage instances for different purposes.
- Modified `NewStorageConnector` to create multiple storage instances based on the new config.
- Implemented `newConnector` function for type-safe connector creation.
- Updated `MemoryConnector` and `ClickHouseConnector` to use the new `QueryFilter` in their query methods.
- Added helper functions in `memory.go` for handling query filters and key matching.
- Updated `Commiter` to accept a storage instance in its constructor.


### Why make this change?

This refactoring improves the flexibility and scalability of the storage system by:

1. Allowing separate storage configurations for different purposes (Main, Staging, Orchestrator).
2. Providing more granular control over queries with the `QueryFilter` struct.
3. Enhancing type safety and reducing potential runtime errors with the new connector creation process.
4. Improving the overall structure and maintainability of the storage-related code.
  • Loading branch information
iuwqyir authored Sep 19, 2024
1 parent a09c36f commit dee9843
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 32 deletions.
7 changes: 6 additions & 1 deletion internal/orchestrator/commiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package orchestrator

import (
"fmt"
"log"
"os"
"strconv"
"time"

"github.com/thirdweb-dev/indexer/internal/storage"
)

const DEFAULT_COMMITER_TRIGGER_INTERVAL = 250
Expand All @@ -13,9 +16,10 @@ const DEFAULT_BLOCKS_PER_COMMIT = 10
type Commiter struct {
triggerIntervalMs int
blocksPerCommit int
storage storage.IStorage
}

func NewCommiter() *Commiter {
func NewCommiter(storage storage.IStorage) *Commiter {
triggerInterval, err := strconv.Atoi(os.Getenv("COMMITER_TRIGGER_INTERVAL"))
if err != nil || triggerInterval == 0 {
triggerInterval = DEFAULT_COMMITER_TRIGGER_INTERVAL
Expand All @@ -27,6 +31,7 @@ func NewCommiter() *Commiter {
return &Commiter{
triggerIntervalMs: triggerInterval,
blocksPerCommit: blocksPerCommit,
storage: storage,
}
}

Expand Down
23 changes: 20 additions & 3 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,25 @@ type Orchestrator struct {
}

func NewOrchestrator(rpc common.RPC) (*Orchestrator, error) {
storage, err := storage.NewStorageConnector(&storage.ConnectorConfig{
Driver: "memory",
storage, err := storage.NewStorageConnector(&storage.StorageConfig{
Orchestrator: storage.ConnectorConfig{
Driver: "memory",
Memory: &storage.MemoryConnectorConfig{
Prefix: "orchestrator",
},
},
Main: storage.ConnectorConfig{
Driver: "memory",
Memory: &storage.MemoryConnectorConfig{
Prefix: "main",
},
},
Staging: storage.ConnectorConfig{
Driver: "memory",
Memory: &storage.MemoryConnectorConfig{
Prefix: "staging",
},
},
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -58,7 +75,7 @@ func (o *Orchestrator) Start() {
wg.Add(1)
go func() {
defer wg.Done()
commiter := NewCommiter()
commiter := NewCommiter(o.storage)
commiter.Start()
}()
}
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ func (c *ClickHouseConnector) InsertEvents(events []common.Log) error {
return nil
}

func (c *ClickHouseConnector) GetBlocks(limit int) (events []common.Block, err error) {
func (c *ClickHouseConnector) GetBlocks(qf QueryFilter) (events []common.Block, err error) {
return nil, nil
}

func (c *ClickHouseConnector) GetTransactions(blockNumber uint64, limit int) (events []common.Transaction, err error) {
func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (events []common.Transaction, err error) {
return nil, nil
}

func (c *ClickHouseConnector) GetEvents(blockNumber uint64, limit int) (events []common.Log, err error) {
func (c *ClickHouseConnector) GetEvents(qf QueryFilter) (events []common.Log, err error) {
return nil, nil
}

Expand Down
74 changes: 56 additions & 18 deletions internal/storage/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,28 @@ import (
"github.com/thirdweb-dev/indexer/internal/common"
)

type QueryFilter struct {
BlockNumbers []uint64
Limit uint16
Offset uint64
}

type StorageConfig struct {
Main ConnectorConfig
Staging ConnectorConfig
Orchestrator ConnectorConfig
}

type ConnectorConfig struct {
Driver string
Memory *MemoryConnectorConfig
Clickhouse *ClickhouseConnectorConfig
}

type IStorage struct {
IStorageBase
OrchestratorStorage IOrchestratorStorage
DBStorage IDBStorage
}

type IStorageBase interface {
connect() error
close() error
DBMainStorage IDBStorage
DBStagingStorage IDBStorage
}

type IOrchestratorStorage interface {
Expand All @@ -37,23 +44,54 @@ type IDBStorage interface {
InsertTransactions(txs []common.Transaction) error
InsertEvents(events []common.Log) error

GetBlocks(limit int) (events []common.Block, err error)
GetTransactions(blockNumber uint64, limit int) (events []common.Transaction, err error)
GetEvents(blockNumber uint64, limit int) (events []common.Log, err error)
GetBlocks(qf QueryFilter) (events []common.Block, err error)
GetTransactions(qf QueryFilter) (events []common.Transaction, err error)
GetEvents(qf QueryFilter) (events []common.Log, err error)
GetMaxBlockNumber() (maxBlockNumber uint64, err error)
}

func NewStorageConnector(
cfg *ConnectorConfig,
) (IStorage, error) {
func NewStorageConnector(cfg *StorageConfig) (IStorage, error) {
var storage IStorage
var err error

storage.OrchestratorStorage, err = newConnector[IOrchestratorStorage](cfg.Orchestrator)
if err != nil {
return IStorage{}, fmt.Errorf("failed to create orchestrator storage: %w", err)
}

storage.DBMainStorage, err = newConnector[IDBStorage](cfg.Main)
if err != nil {
return IStorage{}, fmt.Errorf("failed to create main storage: %w", err)
}

storage.DBStagingStorage, err = newConnector[IDBStorage](cfg.Staging)
if err != nil {
return IStorage{}, fmt.Errorf("failed to create staging storage: %w", err)
}

return storage, nil
}

func newConnector[T any](cfg ConnectorConfig) (T, error) {
var conn interface{}
var err error
switch cfg.Driver {
case "memory":
connector, err := NewMemoryConnector(cfg.Memory)
return IStorage{OrchestratorStorage: connector, DBStorage: connector}, err
conn, err = NewMemoryConnector(cfg.Memory)
case "clickhouse":
connector, err := NewClickHouseConnector(cfg.Clickhouse)
return IStorage{DBStorage: connector, OrchestratorStorage: connector}, err
conn, err = NewClickHouseConnector(cfg.Clickhouse)
default:
return *new(T), fmt.Errorf("invalid connector driver: %s", cfg.Driver)
}

if err != nil {
return *new(T), err
}

typedConn, ok := conn.(T)
if !ok {
return *new(T), fmt.Errorf("connector does not implement the required interface")
}

return IStorage{}, fmt.Errorf("invalid connector driver: %s", cfg.Driver)
return typedConn, nil
}
52 changes: 45 additions & 7 deletions internal/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

type MemoryConnectorConfig struct {
MaxItems int
Prefix string
}

type MemoryConnector struct {
Expand Down Expand Up @@ -101,13 +102,15 @@ func (m *MemoryConnector) InsertBlocks(blocks []common.Block) error {
return nil
}

func (m *MemoryConnector) GetBlocks(limit int) ([]common.Block, error) {
func (m *MemoryConnector) GetBlocks(qf QueryFilter) ([]common.Block, error) {
blocks := []common.Block{}
limit := getLimit(qf)
blockNumbersToCheck := getBlockNumbersToCheck(qf)
for _, key := range m.cache.Keys() {
if len(blocks) >= limit {
if len(blocks) >= int(limit) {
break
}
if strings.HasPrefix(key, "block:") {
if isKeyForBlock(key, "block:", blockNumbersToCheck) {
value, ok := m.cache.Get(key)
if ok {
block := common.Block{}
Expand All @@ -133,13 +136,15 @@ func (m *MemoryConnector) InsertTransactions(txs []common.Transaction) error {
return nil
}

func (m *MemoryConnector) GetTransactions(blockNumber uint64, limit int) ([]common.Transaction, error) {
func (m *MemoryConnector) GetTransactions(qf QueryFilter) ([]common.Transaction, error) {
txs := []common.Transaction{}
limit := getLimit(qf)
blockNumbersToCheck := getBlockNumbersToCheck(qf)
for _, key := range m.cache.Keys() {
if len(txs) >= limit {
break
}
if strings.HasPrefix(key, fmt.Sprintf("transaction:%d", blockNumber)) {
if isKeyForBlock(key, "transaction:", blockNumbersToCheck) {
value, ok := m.cache.Get(key)
if ok {
tx := common.Transaction{}
Expand All @@ -165,13 +170,15 @@ func (m *MemoryConnector) InsertEvents(events []common.Log) error {
return nil
}

func (m *MemoryConnector) GetEvents(blockNumber uint64, limit int) ([]common.Log, error) {
func (m *MemoryConnector) GetEvents(qf QueryFilter) ([]common.Log, error) {
events := []common.Log{}
limit := getLimit(qf)
blockNumbersToCheck := getBlockNumbersToCheck(qf)
for _, key := range m.cache.Keys() {
if len(events) >= limit {
break
}
if strings.HasPrefix(key, fmt.Sprintf("event:%d", blockNumber)) {
if isKeyForBlock(key, "event:", blockNumbersToCheck) {
value, ok := m.cache.Get(key)
if ok {
event := common.Log{}
Expand Down Expand Up @@ -201,3 +208,34 @@ func (m *MemoryConnector) GetMaxBlockNumber() (uint64, error) {
}
return maxBlockNumber, nil
}

func isKeyForBlock(key string, prefix string, blocksFilter map[uint64]uint8) bool {
if !strings.HasPrefix(key, prefix) {
return false
}
blockNumber, err := strconv.ParseUint(strings.TrimPrefix(key, prefix), 10, 64)
if err != nil {
return false
}
if len(blocksFilter) == 0 {
return true
}
_, ok := blocksFilter[blockNumber]
return ok
}

func getLimit(qf QueryFilter) int {
limit := qf.Limit
if limit == 0 {
limit = math.MaxUint16
}
return int(limit)
}

func getBlockNumbersToCheck(qf QueryFilter) map[uint64]uint8 {
blockNumbersToCheck := make(map[uint64]uint8)
for _, num := range qf.BlockNumbers {
blockNumbersToCheck[num] = 1
}
return blockNumbersToCheck
}

0 comments on commit dee9843

Please sign in to comment.