diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go index 4a353bfe22..68e03ffcd3 100644 --- a/ingest/ledgerbackend/buffered_storage_backend.go +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -18,12 +18,10 @@ import ( var _ LedgerBackend = (*BufferedStorageBackend)(nil) type BufferedStorageBackendConfig struct { - LedgerBatchConfig datastore.DataStoreSchema - DataStore datastore.DataStore - BufferSize uint32 - NumWorkers uint32 - RetryLimit uint32 - RetryWait time.Duration + BufferSize uint32 `toml:"buffer_size"` + NumWorkers uint32 `toml:"num_workers"` + RetryLimit uint32 `toml:"retry_limit"` + RetryWait time.Duration `toml:"retry_wait"` } // BufferedStorageBackend is a ledger backend that reads from a storage service. @@ -45,7 +43,7 @@ type BufferedStorageBackend struct { } // NewBufferedStorageBackend returns a new BufferedStorageBackend instance. -func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig) (*BufferedStorageBackend, error) { +func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBackendConfig, dataStore datastore.DataStore) (*BufferedStorageBackend, error) { if config.BufferSize == 0 { return nil, errors.New("buffer size must be > 0") } @@ -54,17 +52,17 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken return nil, errors.New("number of workers must be <= BufferSize") } - if config.DataStore == nil { + if dataStore == nil { return nil, errors.New("no DataStore provided") } - if config.LedgerBatchConfig.LedgersPerFile <= 0 { + if dataStore.GetSchema(ctx).LedgersPerFile <= 0 { return nil, errors.New("ledgersPerFile must be > 0") } bsBackend := &BufferedStorageBackend{ config: config, - dataStore: config.DataStore, + dataStore: dataStore, } return bsBackend, nil diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index f18329fffa..510cffeabb 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -44,29 +44,21 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig param := make(map[string]string) param["destination_bucket_path"] = "testURL" - ledgerBatchConfig := datastore.DataStoreSchema{ - LedgersPerFile: 1, - FilesPerPartition: 64000, - } - - dataStore := new(datastore.MockDataStore) - return BufferedStorageBackendConfig{ - LedgerBatchConfig: ledgerBatchConfig, - DataStore: dataStore, - BufferSize: 100, - NumWorkers: 5, - RetryLimit: 3, - RetryWait: time.Microsecond, + BufferSize: 100, + NumWorkers: 5, + RetryLimit: 3, + RetryWait: time.Microsecond, } } func createBufferedStorageBackendForTesting() BufferedStorageBackend { config := createBufferedStorageBackendConfigForTesting() + dataStore := new(datastore.MockDataStore) return BufferedStorageBackend{ config: config, - dataStore: config.DataStore, + dataStore: dataStore, } } @@ -86,6 +78,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) } mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil) } + mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{ + LedgersPerFile: count, + FilesPerPartition: partitionSize, + }) t.Cleanup(func() { mockDataStore.AssertExpectations(t) @@ -128,13 +124,17 @@ func createLCMBatchReader(start, end, count uint32) io.ReadCloser { func TestNewBufferedStorageBackend(t *testing.T) { ctx := context.Background() config := createBufferedStorageBackendConfigForTesting() - - bsb, err := NewBufferedStorageBackend(ctx, config) + mockDataStore := new(datastore.MockDataStore) + mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{ + LedgersPerFile: uint32(1), + FilesPerPartition: partitionSize, + }) + bsb, err := NewBufferedStorageBackend(ctx, config, mockDataStore) assert.NoError(t, err) - assert.Equal(t, bsb.dataStore, config.DataStore) - assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile) - assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition) + assert.Equal(t, bsb.dataStore, mockDataStore) + assert.Equal(t, uint32(1), bsb.dataStore.GetSchema(ctx).LedgersPerFile) + assert.Equal(t, uint32(64000), bsb.dataStore.GetSchema(ctx).FilesPerPartition) assert.Equal(t, uint32(100), bsb.config.BufferSize) assert.Equal(t, uint32(5), bsb.config.NumWorkers) assert.Equal(t, uint32(3), bsb.config.RetryLimit) @@ -210,12 +210,14 @@ func TestCloudStorageGetLedger_MultipleLedgerPerFile(t *testing.T) { lcmArray := createLCMForTesting(startLedger, endLedger) bsb := createBufferedStorageBackendForTesting() ctx := context.Background() - bsb.config.LedgerBatchConfig.LedgersPerFile = uint32(2) ledgerRange := BoundedRange(startLedger, endLedger) mockDataStore := createMockdataStore(t, startLedger, endLedger, partitionSize, 2) bsb.dataStore = mockDataStore - + mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{ + LedgersPerFile: uint32(2), + FilesPerPartition: partitionSize, + }) assert.NoError(t, bsb.PrepareRange(ctx, ledgerRange)) assert.Eventually(t, func() bool { return len(bsb.ledgerBuffer.ledgerQueue) == 2 }, time.Second*5, time.Millisecond*50) @@ -451,6 +453,10 @@ func TestLedgerBufferClose(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 + mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{ + LedgersPerFile: ledgerPerFileCount, + FilesPerPartition: partitionSize, + }) objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) afterPrepareRange := make(chan struct{}) @@ -483,7 +489,10 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - + mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{ + LedgersPerFile: ledgerPerFileCount, + FilesPerPartition: partitionSize, + }) objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once() t.Cleanup(func() { @@ -509,7 +518,10 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - + mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{ + LedgersPerFile: ledgerPerFileCount, + FilesPerPartition: partitionSize, + }) objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) iteration := &atomic.Int32{} cancelAfter := int32(bsb.config.RetryLimit) + 2 @@ -551,7 +563,10 @@ func TestLedgerBufferRetryLimit(t *testing.T) { }) bsb.dataStore = mockDataStore - + mockDataStore.On("GetSchema", mock.Anything).Return(datastore.DataStoreSchema{ + LedgersPerFile: ledgerPerFileCount, + FilesPerPartition: partitionSize, + }) assert.NoError(t, bsb.PrepareRange(context.Background(), ledgerRange)) bsb.ledgerBuffer.wg.Wait() diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index 5b2ec57ffc..3fea296b20 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -83,19 +83,19 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu // but for easier conceptualization, len(taskQueue) can be interpreted as both pending and in-flight tasks // where we assume the workers are empty and not processing any tasks. for i := 0; i <= int(bsb.config.BufferSize); i++ { - ledgerBuffer.pushTaskQueue() + ledgerBuffer.pushTaskQueue(ctx) } return ledgerBuffer, nil } -func (lb *ledgerBuffer) pushTaskQueue() { +func (lb *ledgerBuffer) pushTaskQueue(ctx context.Context) { // In bounded mode, don't queue past the end ledger if lb.nextTaskLedger > lb.ledgerRange.to && lb.ledgerRange.bounded { return } lb.taskQueue <- lb.nextTaskLedger - lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile + lb.nextTaskLedger += lb.dataStore.GetSchema(ctx).LedgersPerFile } // sleepWithContext returns true upon sleeping without interruption from the context @@ -155,7 +155,7 @@ func (lb *ledgerBuffer) worker(ctx context.Context) { // Thus, the number of tasks decreases by 1 and the priority queue length increases by 1. // This keeps the overall total the same (<= BufferSize). As long as the the ledger buffer invariant // was maintained in the previous state, it is still maintained during this state transition. - lb.storeObject(ledgerObject, sequence) + lb.storeObject(ctx, ledgerObject, sequence) break } } @@ -163,7 +163,7 @@ func (lb *ledgerBuffer) worker(ctx context.Context) { } func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint32) ([]byte, error) { - objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence) + objectKey := lb.dataStore.GetSchema(ctx).GetObjectKeyFromSequenceNumber(sequence) reader, err := lb.dataStore.GetFile(ctx, objectKey) if err != nil { @@ -180,7 +180,7 @@ func (lb *ledgerBuffer) downloadLedgerObject(ctx context.Context, sequence uint3 return objectBytes, nil } -func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) { +func (lb *ledgerBuffer) storeObject(ctx context.Context, ledgerObject []byte, sequence uint32) { lb.priorityQueueLock.Lock() defer lb.priorityQueueLock.Unlock() @@ -198,7 +198,7 @@ func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) { for lb.ledgerPriorityQueue.Len() > 0 && lb.currentLedger == uint32(lb.ledgerPriorityQueue.Peek().startLedger) { item := lb.ledgerPriorityQueue.Pop() lb.ledgerQueue <- item.payload - lb.currentLedger += lb.config.LedgerBatchConfig.LedgersPerFile + lb.currentLedger += lb.dataStore.GetSchema(ctx).LedgersPerFile } } @@ -215,7 +215,7 @@ func (lb *ledgerBuffer) getFromLedgerQueue(ctx context.Context) (xdr.LedgerClose // Thus len(ledgerQueue) decreases by 1 and the number of tasks increases by 1. // The overall sum below remains the same: // len(taskQueue) + len(ledgerQueue) + ledgerPriorityQueue.Len() <= bsb.config.BufferSize - lb.pushTaskQueue() + lb.pushTaskQueue(ctx) lcmBatch := xdr.LedgerCloseMetaBatch{} decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &lcmBatch) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index e2589a7385..97144267ce 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -11,14 +11,17 @@ import ( "strconv" "strings" + "github.com/pelletier/go-toml" "github.com/spf13/cobra" "github.com/spf13/viper" + "github.com/stellar/go/ingest/ledgerbackend" horizon "github.com/stellar/go/services/horizon/internal" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/db2/schema" "github.com/stellar/go/services/horizon/internal/ingest" support "github.com/stellar/go/support/config" + "github.com/stellar/go/support/datastore" "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" hlog "github.com/stellar/go/support/log" @@ -257,13 +260,21 @@ var dbReingestCmd = &cobra.Command{ } var ( - reingestForce bool - parallelWorkers uint - parallelJobSize uint32 - retries uint - retryBackoffSeconds uint + reingestForce bool + parallelWorkers uint + parallelJobSize uint32 + retries uint + retryBackoffSeconds uint + ledgerBackendStr string + storageBackendConfigPath string + ledgerBackendType ingest.LedgerBackendType ) +type StorageBackendConfig struct { + DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` + BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig `toml:"buffered_storage_backend_config"` +} + func ingestRangeCmdOpts() support.ConfigOptions { return support.ConfigOptions{ { @@ -307,6 +318,42 @@ func ingestRangeCmdOpts() support.ConfigOptions { FlagDefault: uint(5), Usage: "[optional] backoff seconds between reingest retries", }, + { + Name: "ledgerbackend", + ConfigKey: &ledgerBackendStr, + OptType: types.String, + Required: false, + FlagDefault: ingest.CaptiveCoreBackend.String(), + Usage: "[optional] Specify the ledger backend type: 'captive-core' (default) or 'datastore'", + CustomSetValue: func(co *support.ConfigOption) error { + val := viper.GetString(co.Name) + switch val { + case ingest.CaptiveCoreBackend.String(): + ledgerBackendType = ingest.CaptiveCoreBackend + case ingest.BufferedStorageBackend.String(): + ledgerBackendType = ingest.BufferedStorageBackend + default: + return fmt.Errorf("invalid ledger backend: %s, must be 'captive-core' or 'datastore'", val) + } + *co.ConfigKey.(*string) = val + return nil + }, + }, + { + Name: "datastore-config", + ConfigKey: &storageBackendConfigPath, + OptType: types.String, + Required: false, + Usage: "[optional] Specify the path to the datastore config file (required for datastore backend)", + CustomSetValue: func(co *support.ConfigOption) error { + val := viper.GetString(co.Name) + if ledgerBackendType == ingest.BufferedStorageBackend && val == "" { + return errors.New("datastore config file is required for datastore backend type") + } + *co.ConfigKey.(*string) = val + return nil + }, + }, } } @@ -337,7 +384,18 @@ var dbReingestRangeCmd = &cobra.Command{ } } - err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}) + var storageBackendConfig StorageBackendConfig + if ledgerBackendType == ingest.BufferedStorageBackend { + cfg, err := toml.LoadFile(storageBackendConfigPath) + if err != nil { + return fmt.Errorf("failed to load config file %v: %w", storageBackendConfigPath, err) + } + if err = cfg.Unmarshal(&storageBackendConfig); err != nil { + return fmt.Errorf("error unmarshalling TOML config: %w", err) + } + } + + err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: false}) if err != nil { return err } @@ -346,6 +404,7 @@ var dbReingestRangeCmd = &cobra.Command{ reingestForce, parallelWorkers, *globalConfig, + storageBackendConfig, ) }, } @@ -385,7 +444,18 @@ var dbFillGapsCmd = &cobra.Command{ withRange = true } - err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: true}) + var storageBackendConfig StorageBackendConfig + if ledgerBackendType == ingest.BufferedStorageBackend { + cfg, err := toml.LoadFile(storageBackendConfigPath) + if err != nil { + return fmt.Errorf("failed to load config file %v: %w", storageBackendConfigPath, err) + } + if err = cfg.Unmarshal(&storageBackendConfig); err != nil { + return fmt.Errorf("error unmarshalling TOML config: %w", err) + } + } + + err := horizon.ApplyFlags(globalConfig, globalFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false, AlwaysIngest: false}) if err != nil { return err } @@ -404,11 +474,11 @@ var dbFillGapsCmd = &cobra.Command{ hlog.Infof("found gaps %v", gaps) } - return runDBReingestRange(gaps, reingestForce, parallelWorkers, *globalConfig) + return runDBReingestRange(gaps, reingestForce, parallelWorkers, *globalConfig, storageBackendConfig) }, } -func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config) error { +func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, parallelWorkers uint, config horizon.Config, storageBackendConfig StorageBackendConfig) error { var err error if reingestForce && parallelWorkers > 1 { @@ -435,6 +505,9 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, RoundingSlippageFilter: config.RoundingSlippageFilter, MaxLedgerPerFlush: maxLedgersPerFlush, SkipTxmeta: config.SkipTxmeta, + LedgerBackendType: ledgerBackendType, + DataStoreConfig: storageBackendConfig.DataStoreConfig, + BufferedBackendConfig: storageBackendConfig.BufferedStorageBackendConfig, } if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { diff --git a/services/horizon/config.storagebackend.toml b/services/horizon/config.storagebackend.toml new file mode 100644 index 0000000000..538b793b54 --- /dev/null +++ b/services/horizon/config.storagebackend.toml @@ -0,0 +1,19 @@ +[buffered_storage_backend_config] +buffer_size = 5 # The size of the buffer +num_workers = 5 # Number of workers +retry_limit = 3 # Number of retries allowed +retry_wait = "30s" # Duration to wait before retrying in seconds + +# Datastore Configuration +[datastore_config] +# Specifies the type of datastore. Currently, only Google Cloud Storage (GCS) is supported. +type = "GCS" + +[datastore_config.params] +# The Google Cloud Storage bucket path for storing data, with optional subpaths for organization. +destination_bucket_path = "path/to/my/bucket" + +[datastore_config.schema] +# Configuration for data organization +ledgers_per_file = 1 # Number of ledgers stored in each file. +files_per_partition = 64000 # Number of files per partition/directory. diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index e5f5cc1e95..a70a31fdf9 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -83,38 +83,24 @@ const ( var log = logpkg.DefaultLogger.WithField("service", "ingest") -type LedgerMetaBackendType int64 +type LedgerBackendType uint const ( - LedgerBackendCaptiveCore LedgerMetaBackendType = iota - LedgerBackendPrecomputed + CaptiveCoreBackend LedgerBackendType = iota + BufferedStorageBackend ) -func (s LedgerMetaBackendType) String() string { +func (s LedgerBackendType) String() string { switch s { - case LedgerBackendCaptiveCore: - return "captive core" - case LedgerBackendPrecomputed: - return "precomputed" - default: - return "" + case CaptiveCoreBackend: + return "captive-core" + case BufferedStorageBackend: + return "datastore" } -} - -type BufferedBackendConfig struct { - BufferSize uint32 `toml:"size"` - NumWorkers uint32 `toml:"num_workers"` - RetryLimit uint32 `toml:"retry_limit"` - RetryWait time.Duration `toml:"retry_wait"` -} - -type PrecomputedLedgerMetaConfig struct { - DataStoreConfig datastore.DataStoreConfig `toml:"datastore_config"` - BufferedBackendConfig BufferedBackendConfig `toml:"buffered_backend_config"` + return "" } type Config struct { - LedgerMetaBackendType LedgerMetaBackendType StellarCoreURL string CaptiveCoreBinaryPath string CaptiveCoreStoragePath string @@ -148,7 +134,9 @@ type Config struct { ReapConfig ReapConfig - PrecomputedMetaConfig *PrecomputedLedgerMetaConfig + LedgerBackendType LedgerBackendType + DataStoreConfig datastore.DataStoreConfig + BufferedBackendConfig ledgerbackend.BufferedStorageBackendConfig } const ( @@ -296,11 +284,24 @@ func NewSystem(config Config) (System, error) { cancel() return nil, errors.Wrap(err, "error creating history archive") } - var ledgerBackend ledgerbackend.LedgerBackend - switch config.LedgerMetaBackendType { - case LedgerBackendCaptiveCore: + if config.LedgerBackendType == BufferedStorageBackend { + // Ingest from datastore + var dataStore datastore.DataStore + dataStore, err = datastore.NewDataStore(context.Background(), config.DataStoreConfig) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create datastore: %w", err) + } + ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(ctx, config.BufferedBackendConfig, dataStore) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create buffered storage backend: %w", err) + } + } else { + // Ingest from local captive core + logger := log.WithField("subservice", "stellar-core") ledgerBackend, err = ledgerbackend.NewCaptive( ledgerbackend.CaptiveCoreConfig{ @@ -323,37 +324,6 @@ func NewSystem(config Config) (System, error) { cancel() return nil, errors.Wrap(err, "error creating captive core backend") } - log.Infof("successfully created ledger backend of type captive core") - case LedgerBackendPrecomputed: - if config.PrecomputedMetaConfig == nil { - cancel() - return nil, errors.New("error creating precomputed buffered backend, precomputed backend config is not present") - } - precompConfig := config.PrecomputedMetaConfig - - dataStore, err := datastore.NewDataStore(ctx, precompConfig.DataStoreConfig) - if err != nil { - cancel() - return nil, errors.Wrapf(err, "error creating datastore from config, %v", precompConfig.DataStoreConfig) - } - - bufferedConfig := ledgerbackend.BufferedStorageBackendConfig{ - LedgerBatchConfig: precompConfig.DataStoreConfig.Schema, - DataStore: dataStore, - BufferSize: precompConfig.BufferedBackendConfig.BufferSize, - NumWorkers: precompConfig.BufferedBackendConfig.NumWorkers, - RetryLimit: precompConfig.BufferedBackendConfig.RetryLimit, - RetryWait: precompConfig.BufferedBackendConfig.RetryWait, - } - - if ledgerBackend, err = ledgerbackend.NewBufferedStorageBackend(ctx, bufferedConfig); err != nil { - cancel() - return nil, errors.Wrapf(err, "error creating buffered storage backend, %v", bufferedConfig) - } - log.Infof("successfully created ledger backend of type buffered storage") - default: - cancel() - return nil, errors.Errorf("unsupported ledger backend type %v", config.LedgerMetaBackendType.String()) } historyQ := &history.Q{config.HistorySession.Clone()} diff --git a/support/datastore/datastore.go b/support/datastore/datastore.go index e7e999345d..e4f41920e8 100644 --- a/support/datastore/datastore.go +++ b/support/datastore/datastore.go @@ -21,6 +21,7 @@ type DataStore interface { PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) (bool, error) Exists(ctx context.Context, path string) (bool, error) Size(ctx context.Context, path string) (int64, error) + GetSchema(ctx context.Context) DataStoreSchema Close() error } @@ -32,7 +33,7 @@ func NewDataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataSto if !ok { return nil, errors.Errorf("Invalid GCS config, no destination_bucket_path") } - return NewGCSDataStore(ctx, destinationBucketPath) + return NewGCSDataStore(ctx, destinationBucketPath, datastoreConfig.Schema) default: return nil, errors.Errorf("Invalid datastore type %v, not supported", datastoreConfig.Type) } diff --git a/support/datastore/gcs_datastore.go b/support/datastore/gcs_datastore.go index cdedea086d..3cf48b3fcb 100644 --- a/support/datastore/gcs_datastore.go +++ b/support/datastore/gcs_datastore.go @@ -24,18 +24,19 @@ type GCSDataStore struct { client *storage.Client bucket *storage.BucketHandle prefix string + schema DataStoreSchema } -func NewGCSDataStore(ctx context.Context, bucketPath string) (DataStore, error) { +func NewGCSDataStore(ctx context.Context, bucketPath string, schema DataStoreSchema) (DataStore, error) { client, err := storage.NewClient(ctx) if err != nil { return nil, err } - return FromGCSClient(ctx, client, bucketPath) + return FromGCSClient(ctx, client, bucketPath, schema) } -func FromGCSClient(ctx context.Context, client *storage.Client, bucketPath string) (DataStore, error) { +func FromGCSClient(ctx context.Context, client *storage.Client, bucketPath string, schema DataStoreSchema) (DataStore, error) { // append the gcs:// scheme to enable usage of the url package reliably to // get parse bucket name which is first path segment as URL.Host gcsBucketURL := fmt.Sprintf("gcs://%s", bucketPath) @@ -55,7 +56,8 @@ func FromGCSClient(ctx context.Context, client *storage.Client, bucketPath strin return nil, fmt.Errorf("failed to retrieve bucket attributes: %w", err) } - return &GCSDataStore{client: client, bucket: bucket, prefix: prefix}, nil + // TODO: Datastore schema to be fetched from the datastore https://stellarorg.atlassian.net/browse/HUBBLE-397 + return &GCSDataStore{client: client, bucket: bucket, prefix: prefix, schema: schema}, nil } // GetFileMetadata retrieves the metadata for the specified file in the GCS bucket. @@ -177,3 +179,9 @@ func (b GCSDataStore) putFile(ctx context.Context, filePath string, in io.Writer } return w.Close() } + +// GetSchema returns the schema information which defines the structure +// and organization of data in the datastore. +func (b GCSDataStore) GetSchema(ctx context.Context) DataStoreSchema { + return b.schema +} diff --git a/support/datastore/gcs_test.go b/support/datastore/gcs_test.go index 8838e8dadb..618b5d602a 100644 --- a/support/datastore/gcs_test.go +++ b/support/datastore/gcs_test.go @@ -24,7 +24,7 @@ func TestGCSExists(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -52,7 +52,7 @@ func TestGCSSize(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -86,7 +86,7 @@ func TestGCSPutFile(t *testing.T) { DefaultEventBasedHold: false, }) - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -138,7 +138,7 @@ func TestGCSPutFileIfNotExists(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -187,7 +187,7 @@ func TestGCSPutFileWithMetadata(t *testing.T) { DefaultEventBasedHold: false, }) - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -255,7 +255,7 @@ func TestGCSPutFileIfNotExistsWithMetadata(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -323,7 +323,7 @@ func TestGCSGetNonExistentFile(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) @@ -365,7 +365,7 @@ func TestGCSGetFileValidatesCRC32C(t *testing.T) { }) defer server.Stop() - store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet") + store, err := FromGCSClient(context.Background(), server.Client(), "test-bucket/objects/testnet", DataStoreSchema{}) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, store.Close()) diff --git a/support/datastore/mocks.go b/support/datastore/mocks.go index 96c15c1371..6b505e6478 100644 --- a/support/datastore/mocks.go +++ b/support/datastore/mocks.go @@ -47,6 +47,11 @@ func (m *MockDataStore) Close() error { return args.Error(0) } +func (m *MockDataStore) GetSchema(ctx context.Context) DataStoreSchema { + args := m.Called(ctx) + return args.Get(0).(DataStoreSchema) +} + type MockResumableManager struct { mock.Mock }