Skip to content

Commit

Permalink
services/horizon: Reingest from precomputed TxMeta (#5375)
Browse files Browse the repository at this point in the history
* services/horizon: Reingest from precomputed TxMeta

* udpate config

* Fix govet error
  • Loading branch information
urvisavla authored Jul 5, 2024
1 parent 5a020f3 commit 5d8d64b
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 122 deletions.
18 changes: 8 additions & 10 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ import (
var _ LedgerBackend = (*BufferedStorageBackend)(nil)

type BufferedStorageBackendConfig struct {
LedgerBatchConfig datastore.DataStoreSchema
DataStore datastore.DataStore
BufferSize uint32
NumWorkers uint32
RetryLimit uint32
RetryWait time.Duration
BufferSize uint32 `toml:"buffer_size"`
NumWorkers uint32 `toml:"num_workers"`
RetryLimit uint32 `toml:"retry_limit"`
RetryWait time.Duration `toml:"retry_wait"`
}

// BufferedStorageBackend is a ledger backend that reads from a storage service.
Expand All @@ -45,7 +43,7 @@ type BufferedStorageBackend struct {
}

// NewBufferedStorageBackend returns a new BufferedStorageBackend instance.
func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig) (*BufferedStorageBackend, error) {
func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig, dataStore datastore.DataStore) (*BufferedStorageBackend, error) {
if config.BufferSize == 0 {
return nil, errors.New("buffer size must be > 0")
}
Expand All @@ -54,17 +52,17 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken
return nil, errors.New("number of workers must be <= BufferSize")
}

if config.DataStore == nil {
if dataStore == nil {
return nil, errors.New("no DataStore provided")
}

if config.LedgerBatchConfig.LedgersPerFile <= 0 {
if dataStore.GetSchema(ctx).LedgersPerFile <= 0 {
return nil, errors.New("ledgersPerFile must be > 0")
}

bsBackend := &BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
dataStore: dataStore,
}

return bsBackend, nil
Expand Down
63 changes: 39 additions & 24 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,29 +44,21 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig
param := make(map[string]string)
param["destination_bucket_path"] = "testURL"

ledgerBatchConfig := datastore.DataStoreSchema{
LedgersPerFile: 1,
FilesPerPartition: 64000,
}

dataStore := new(datastore.MockDataStore)

return BufferedStorageBackendConfig{
LedgerBatchConfig: ledgerBatchConfig,
DataStore: dataStore,
BufferSize: 100,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: time.Microsecond,
BufferSize: 100,
NumWorkers: 5,
RetryLimit: 3,
RetryWait: time.Microsecond,
}
}

func createBufferedStorageBackendForTesting() BufferedStorageBackend {
config := createBufferedStorageBackendConfigForTesting()

dataStore := new(datastore.MockDataStore)
return BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
dataStore: dataStore,
}
}

Expand All @@ -86,6 +78,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil)
}
mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{
LedgersPerFile: count,
FilesPerPartition: partitionSize,
})

t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand Down Expand Up @@ -128,13 +124,17 @@ func createLCMBatchReader(start, end, count uint32) io.ReadCloser {
func TestNewBufferedStorageBackend(t *testing.T) {
ctx := context.Background()
config := createBufferedStorageBackendConfigForTesting()

bsb, err := NewBufferedStorageBackend(ctx, config)
mockDataStore := new(datastore.MockDataStore)
mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(1),
FilesPerPartition: partitionSize,
})
bsb, err := NewBufferedStorageBackend(ctx, config, mockDataStore)
assert.NoError(t, err)

assert.Equal(t, bsb.dataStore, config.DataStore)
assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition)
assert.Equal(t, bsb.dataStore, mockDataStore)
assert.Equal(t, uint32(1), bsb.dataStore.GetSchema(ctx).LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.dataStore.GetSchema(ctx).FilesPerPartition)
assert.Equal(t, uint32(100), bsb.config.BufferSize)
assert.Equal(t, uint32(5), bsb.config.NumWorkers)
assert.Equal(t, uint32(3), bsb.config.RetryLimit)
Expand Down Expand Up @@ -210,12 +210,14 @@ func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) {
lcmArray := createLCMForTesting(startLedger, endLedger)
bsb := createBufferedStorageBackendForTesting()
ctx := context.Background()
bsb.config.LedgerBatchConfig.LedgersPerFile = uint32(2)
ledgerRange := BoundedRange(startLedger, endLedger)

mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2)
bsb.dataStore = mockDataStore

mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{
LedgersPerFile: uint32(2),
FilesPerPartition: partitionSize,
})
assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange))
assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50)

Expand Down Expand Up @@ -451,6 +453,10 @@ func TestLedgerBufferClose(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1
mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})

objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
afterPrepareRange := make(chan struct{})
Expand Down Expand Up @@ -483,7 +489,10 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once()
t.Cleanup(func() {
Expand All @@ -509,7 +518,10 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) {

mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
iteration := &atomic.Int32{}
cancelAfter := int32(bsb.config.RetryLimit) + 2
Expand Down Expand Up @@ -551,7 +563,10 @@ func TestLedgerBufferRetryLimit(t *testing.T) {
})

bsb.dataStore = mockDataStore

mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{
LedgersPerFile: ledgerPerFileCount,
FilesPerPartition: partitionSize,
})
assert.NoError(t, bsb.PrepareRange(context.Background(), ledgerRange))

bsb.ledgerBuffer.wg.Wait()
Expand Down
16 changes: 8 additions & 8 deletions ingest/ledgerbackend/ledger_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu
// but for easier conceptualization, len(taskQueue) can be interpreted as both pending and in-flight tasks
// where we assume the workers are empty and not processing any tasks.
for i := 0; i <= int(bsb.config.BufferSize); i++ {
ledgerBuffer.pushTaskQueue()
ledgerBuffer.pushTaskQueue(ctx)
}

return ledgerBuffer, nil
}

func (lb *ledgerBuffer) pushTaskQueue() {
func (lb *ledgerBuffer) pushTaskQueue(ctx context.Context) {
// In bounded mode, don't queue past the end ledger
if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded {
return
}
lb.taskQueue <- lb.nextTaskLedger
lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.nextTaskLedger += lb.dataStore.GetSchema(ctx).LedgersPerFile
}

// sleepWithContext returns true upon sleeping without interruption from the context
Expand Down Expand Up @@ -155,15 +155,15 @@ func (lb *ledgerBuffer) worker(ctx context.Context) {
// Thus, the number of tasks decreases by 1 and the priority queue length increases by 1.
// This keeps the overall total the same (<= BufferSize). As long as the the ledger buffer invariant
// was maintained in the previous state, it is still maintained during this state transition.
lb.storeObject(ledgerObject, sequence)
lb.storeObject(ctx, ledgerObject, sequence)
break
}
}
}
}

func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint32) ([]byte, error) {
objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence)
objectKey := lb.dataStore.GetSchema(ctx).GetObjectKeyFromSequenceNumber(sequence)

reader, err := lb.dataStore.GetFile(ctx, objectKey)
if err != nil {
Expand All @@ -180,7 +180,7 @@ func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint3
return objectBytes, nil
}

func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) {
func (lb *ledgerBuffer) storeObject(ctx context.Context, ledgerObject []byte, sequence uint32) {
lb.priorityQueueLock.Lock()
defer lb.priorityQueueLock.Unlock()

Expand All @@ -198,7 +198,7 @@ func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) {
for lb.ledgerPriorityQueue.Len() > 0 && lb.currentLedger == uint32(lb.ledgerPriorityQueue.Peek().startLedger) {
item := lb.ledgerPriorityQueue.Pop()
lb.ledgerQueue <- item.payload
lb.currentLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.currentLedger += lb.dataStore.GetSchema(ctx).LedgersPerFile
}
}

Expand All @@ -215,7 +215,7 @@ func (lb *ledgerBuffer) getFromLedgerQueue(ctx context.Context) (xdr.LedgerClose
// Thus len(ledgerQueue) decreases by 1 and the number of tasks increases by 1.
// The overall sum below remains the same:
// len(taskQueue) + len(ledgerQueue) + ledgerPriorityQueue.Len() <= bsb.config.BufferSize
lb.pushTaskQueue()
lb.pushTaskQueue(ctx)

lcmBatch := xdr.LedgerCloseMetaBatch{}
decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &lcmBatch)
Expand Down
Loading

0 comments on commit 5d8d64b

Please sign in to comment.