Skip to content

Commit

Permalink
Reduce ode duplication.
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey committed Nov 21, 2024
1 parent 4b44b36 commit 047bc2b
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 133 deletions.
10 changes: 0 additions & 10 deletions storage/aws_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,3 @@ func (s *S3) ReadBatch(readItems []ReadItem) (map[string][]string, error) {
// Implement the ReadBatch method
return nil, nil
}

func (s *S3) ReadFiles(keys []string) ([]bytes.Buffer, error) {
// Implement the ReadFiles method
return []bytes.Buffer{}, nil
}

func (s *S3) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) {
// Implement the ReadFilesAsync method
return []bytes.Buffer{}, nil
}
56 changes: 0 additions & 56 deletions storage/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"log"
"os"
"path/filepath"
"sync"
)

type FileStorage struct {
Expand Down Expand Up @@ -145,58 +144,3 @@ func (fs *FileStorage) Delete(key string) error {
// Implement the Delete method
return nil
}

func (fs *FileStorage) ReadFiles(keys []string) ([]bytes.Buffer, error) {

var data []bytes.Buffer

for _, key := range keys {
dataBlock, err := fs.Read(key)

if err != nil {
return nil, err
}
data = append(data, dataBlock)

}
return data, nil
}

func (fs *FileStorage) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) {
var data []bytes.Buffer
var mu sync.Mutex
var wg sync.WaitGroup
errChan := make(chan error, len(keys))

// Semaphore to limit the number of concurrent reads
sem := make(chan struct{}, threads)

for _, key := range keys {
wg.Add(1)
sem <- struct{}{}
go func(k string) {
defer func() {
<-sem
wg.Done()
}()
bf, err := fs.Read(k)
if err != nil {
errChan <- fmt.Errorf("failed to read file %s: %v", k, err)
return
}

mu.Lock()
data = append(data, bf)
mu.Unlock()
}(key)
}

wg.Wait()
close(errChan)

if len(errChan) > 0 {
return nil, fmt.Errorf("failed to read files: %v", <-errChan)
}

return data, nil
}
63 changes: 0 additions & 63 deletions storage/gcp_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"log"
"path/filepath"
"strings"
"sync"
"time"

"cloud.google.com/go/storage"
Expand Down Expand Up @@ -217,65 +216,3 @@ func (g *GCS) ReadBatch(readItems []ReadItem) (map[string][]string, error) {

return result, nil
}

func (g *GCS) ReadFiles(keys []string) ([]bytes.Buffer, error) {
var result []bytes.Buffer

for _, key := range keys {
buf, err := g.Read(key)
if err != nil {
return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err)
}

result = append(result, buf)
}

return result, nil
}

func (g *GCS) ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error) {
var result []bytes.Buffer
var mu sync.Mutex
var wg sync.WaitGroup
errChan := make(chan error, len(keys))

// Semaphore to limit the number of concurrent reads
sem := make(chan struct{}, threads)

for _, key := range keys {
wg.Add(1)
sem <- struct{}{}
go func(k string) {
defer func() {
<-sem
wg.Done()
}()

buf, err := g.Read(k)
if err != nil {
errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err)
return
}

mu.Lock()
result = append(result, buf)
mu.Unlock()
}(key)
}

// Wait for all goroutines to finish
wg.Wait()
close(errChan)

// Check if any errors occurred
if len(errChan) > 0 {
var errMsgs []string
for err := range errChan {
errMsgs = append(errMsgs, err.Error())
}
return result, fmt.Errorf("errors occurred during file reads:\n%s",
strings.Join(errMsgs, "\n"))
}

return result, nil
}
67 changes: 65 additions & 2 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package storage
import (
"bytes"
"context"
"fmt"
"strings"
"sync"
)

type ListReturnFunc func(any) string
Expand All @@ -11,8 +14,6 @@ type Storer interface {
Save(batchDir, filename string, bf bytes.Buffer) error
Read(key string) (bytes.Buffer, error)
ReadBatch(readItems []ReadItem) (map[string][]string, error)
ReadFiles(keys []string) ([]bytes.Buffer, error)
ReadFilesAsync(keys []string, threads int) ([]bytes.Buffer, error)
Delete(key string) error
List(ctx context.Context, delim, blockBatch string, timeout int, returnFunc ListReturnFunc) ([]string, error)
}
Expand All @@ -21,3 +22,65 @@ type ReadItem struct {
Key string
RowIds []uint64
}

func ReadFiles(keys []string, storageInstance Storer) ([]bytes.Buffer, error) {
var result []bytes.Buffer

for _, key := range keys {
buf, err := storageInstance.Read(key)
if err != nil {
return nil, fmt.Errorf("failed to read object from bucket %s: %v", key, err)
}

result = append(result, buf)
}

return result, nil
}

func ReadFilesAsync(keys []string, threads int, storageInstance Storer) ([]bytes.Buffer, error) {
var result []bytes.Buffer
var mu sync.Mutex
var wg sync.WaitGroup
errChan := make(chan error, len(keys))

// Semaphore to limit the number of concurrent reads
sem := make(chan struct{}, threads)

for _, key := range keys {
wg.Add(1)
sem <- struct{}{}
go func(k string) {
defer func() {
<-sem
wg.Done()
}()

buf, err := storageInstance.Read(k)
if err != nil {
errChan <- fmt.Errorf("failed to read object from bucket %s: %v", k, err)
return
}

mu.Lock()
result = append(result, buf)
mu.Unlock()
}(key)
}

// Wait for all goroutines to finish
wg.Wait()
close(errChan)

// Check if any errors occurred
if len(errChan) > 0 {
var errMsgs []string
for err := range errChan {
errMsgs = append(errMsgs, err.Error())
}
return result, fmt.Errorf("errors occurred during file reads:\n%s",
strings.Join(errMsgs, "\n"))
}

return result, nil
}
4 changes: 2 additions & 2 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (d *Synchronizer) SyncCycle(customerDbUriFlag string) (bool, error) {
log.Println("Last block of current chank: ", lastBlockOfChank)

// Read the raw data from the storage for current path
rawData, readErr := d.StorageInstance.ReadFilesAsync(paths, d.threads)
rawData, readErr := storage.ReadFilesAsync(paths, d.threads, d.StorageInstance)
if readErr != nil {
return isEnd, fmt.Errorf("error reading raw data: %w", readErr)
}
Expand Down Expand Up @@ -671,7 +671,7 @@ func (d *Synchronizer) HistoricalSyncRef(customerDbUriFlag string, addresses []s

// Read raw data from storage or via RPC
var rawData []bytes.Buffer
rawData, err = d.StorageInstance.ReadFilesAsync(paths, d.threads)
rawData, err = storage.ReadFilesAsync(paths, d.threads, d.StorageInstance)
if err != nil {
return fmt.Errorf("error reading events from storage: %w", err)
}
Expand Down

0 comments on commit 047bc2b

Please sign in to comment.