diff --git a/storage/aws_bucket.go b/storage/aws_bucket.go index 244f7ef..fdf5261 100644 --- a/storage/aws_bucket.go +++ b/storage/aws_bucket.go @@ -101,13 +101,3 @@ func (s *S3) ReadBatch(readItems []ReadItem) (map[string][]string, error) { // Implement the ReadBatch method return nil, nil } - -func (s *S3) ReadFiles(keys []string) ([]bytes.Buffer, error) { - // Implement the ReadFiles method - return []bytes.Buffer{}, nil -} - -func (s *S3) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { - // Implement the ReadFilesAsync method - return []bytes.Buffer{}, nil -} diff --git a/storage/filesystem.go b/storage/filesystem.go index 3187d1b..463cd95 100644 --- a/storage/filesystem.go +++ b/storage/filesystem.go @@ -9,7 +9,6 @@ import ( "log" "os" "path/filepath" - "sync" ) type FileStorage struct { @@ -145,58 +144,3 @@ func (fs *FileStorage) Delete(key string) error { // Implement the Delete method return nil } - -func (fs *FileStorage) ReadFiles(keys []string) ([]bytes.Buffer, error) { - - var data []bytes.Buffer - - for _, key := range keys { - dataBlock, err := fs.Read(key) - - if err != nil { - return nil, err - } - data = append(data, dataBlock) - - } - return data, nil -} - -func (fs *FileStorage) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { - var data []bytes.Buffer - var mu sync.Mutex - var wg sync.WaitGroup - errChan := make(chan error, len(keys)) - - // Semaphore to limit the number of concurrent reads - sem := make(chan struct{}, threads) - - for _, key := range keys { - wg.Add(1) - sem <- struct{}{} - go func(k string) { - defer func() { - <-sem - wg.Done() - }() - bf, err := fs.Read(k) - if err != nil { - errChan <- fmt.Errorf("failed to read file %s: %v", k, err) - return - } - - mu.Lock() - data = append(data, bf) - mu.Unlock() - }(key) - } - - wg.Wait() - close(errChan) - - if len(errChan) > 0 { - return nil, fmt.Errorf("failed to read files: %v", <-errChan) - } - - return data, nil -} diff --git a/storage/gcp_storage.go b/storage/gcp_storage.go index 352bfc8..1488ce1 100644 --- a/storage/gcp_storage.go +++ b/storage/gcp_storage.go @@ -9,7 +9,6 @@ import ( "log" "path/filepath" "strings" - "sync" "time" "cloud.google.com/go/storage" @@ -217,65 +216,3 @@ func (g *GCS) ReadBatch(readItems []ReadItem) (map[string][]string, error) { return result, nil } - -func (g *GCS) ReadFiles(keys []string) ([]bytes.Buffer, error) { - var result []bytes.Buffer - - for _, key := range keys { - buf, err := g.Read(key) - if err != nil { - return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err) - } - - result = append(result, buf) - } - - return result, nil -} - -func (g *GCS) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) { - var result []bytes.Buffer - var mu sync.Mutex - var wg sync.WaitGroup - errChan := make(chan error, len(keys)) - - // Semaphore to limit the number of concurrent reads - sem := make(chan struct{}, threads) - - for _, key := range keys { - wg.Add(1) - sem <- struct{}{} - go func(k string) { - defer func() { - <-sem - wg.Done() - }() - - buf, err := g.Read(k) - if err != nil { - errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err) - return - } - - mu.Lock() - result = append(result, buf) - mu.Unlock() - }(key) - } - - // Wait for all goroutines to finish - wg.Wait() - close(errChan) - - // Check if any errors occurred - if len(errChan) > 0 { - var errMsgs []string - for err := range errChan { - errMsgs = append(errMsgs, err.Error()) - } - return result, fmt.Errorf("errors occurred during file reads:\n%s", - strings.Join(errMsgs, "\n")) - } - - return result, nil -} diff --git a/storage/storage.go b/storage/storage.go index a45795c..4933a90 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -3,6 +3,9 @@ package storage import ( "bytes" "context" + "fmt" + "strings" + "sync" ) type ListReturnFunc func(any) string @@ -11,8 +14,6 @@ type Storer interface { Save(batchDir, filename string, bf bytes.Buffer) error Read(key string) (bytes.Buffer, error) ReadBatch(readItems []ReadItem) (map[string][]string, error) - ReadFiles(keys []string) ([]bytes.Buffer, error) - ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) Delete(key string) error List(ctx context.Context, delim, blockBatch string, timeout int, returnFunc ListReturnFunc) ([]string, error) } @@ -21,3 +22,65 @@ type ReadItem struct { Key string RowIds []uint64 } + +func ReadFiles(keys []string, storageInstance Storer) ([]bytes.Buffer, error) { + var result []bytes.Buffer + + for _, key := range keys { + buf, err := storageInstance.Read(key) + if err != nil { + return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err) + } + + result = append(result, buf) + } + + return result, nil +} + +func ReadFilesAsync(keys []string, threads int, storageInstance Storer) ([]bytes.Buffer, error) { + var result []bytes.Buffer + var mu sync.Mutex + var wg sync.WaitGroup + errChan := make(chan error, len(keys)) + + // Semaphore to limit the number of concurrent reads + sem := make(chan struct{}, threads) + + for _, key := range keys { + wg.Add(1) + sem <- struct{}{} + go func(k string) { + defer func() { + <-sem + wg.Done() + }() + + buf, err := storageInstance.Read(k) + if err != nil { + errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err) + return + } + + mu.Lock() + result = append(result, buf) + mu.Unlock() + }(key) + } + + // Wait for all goroutines to finish + wg.Wait() + close(errChan) + + // Check if any errors occurred + if len(errChan) > 0 { + var errMsgs []string + for err := range errChan { + errMsgs = append(errMsgs, err.Error()) + } + return result, fmt.Errorf("errors occurred during file reads:\n%s", + strings.Join(errMsgs, "\n")) + } + + return result, nil +} diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index b8d67fb..17d0ea8 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -470,7 +470,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) { log.Println("Last block of current chank: ", lastBlockOfChank) // Read the raw data from the storage for current path - rawData, readErr := d.StorageInstance.ReadFilesAsync(paths, d.threads) + rawData, readErr := storage.ReadFilesAsync(paths, d.threads, d.StorageInstance) if readErr != nil { return isEnd, fmt.Errorf("error reading raw data: %w", readErr) } @@ -671,7 +671,7 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s // Read raw data from storage or via RPC var rawData []bytes.Buffer - rawData, err = d.StorageInstance.ReadFilesAsync(paths, d.threads) + rawData, err = storage.ReadFilesAsync(paths, d.threads, d.StorageInstance) if err != nil { return fmt.Errorf("error reading events from storage: %w", err) }