Skip to content

Commit

Permalink
rqlite object store (#4651)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgalsaleh authored May 30, 2024
1 parent 6079fe3 commit 187d483
Show file tree
Hide file tree
Showing 8 changed files with 430 additions and 6 deletions.
106 changes: 106 additions & 0 deletions cmd/kotsadm/cli/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package cli

import (
"os"

"github.com/pkg/errors"
"github.com/replicatedhq/kots/pkg/filestore"
"github.com/replicatedhq/kots/pkg/persistence"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

func MigrateCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate",
Short: "Trigger a migration",
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return nil
},
}

cmd.AddCommand(MigrateS3ToRqliteCmd())
cmd.AddCommand(MigratePVCToRqliteCmd())

return cmd
}

func MigrateS3ToRqliteCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "s3-to-rqlite",
Short: "Migrate object storage from S3 to rqlite",
Long: ``,
SilenceUsage: true,
SilenceErrors: false,
PreRun: func(cmd *cobra.Command, args []string) {
viper.BindPFlags(cmd.Flags())
},
RunE: func(cmd *cobra.Command, args []string) error {
// Check if required env vars are set
if os.Getenv("RQLITE_URI") == "" {
return errors.New("RQLITE_URI is not set")
}
if os.Getenv("S3_ENDPOINT") == "" {
return errors.New("S3_ENDPOINT is not set")
}
if os.Getenv("S3_BUCKET_NAME") == "" {
return errors.New("S3_BUCKET_NAME is not set")
}
if os.Getenv("S3_ACCESS_KEY_ID") == "" {
return errors.New("S3_ACCESS_KEY_ID is not set")
}
if os.Getenv("S3_SECRET_ACCESS_KEY") == "" {
return errors.New("S3_SECRET_ACCESS_KEY is not set")
}

// Initialize the rqlite DB
persistence.InitDB(os.Getenv("RQLITE_URI"))

// Migrate from S3 to rqlite
if err := filestore.MigrateFromS3ToRqlite(cmd.Context()); err != nil {
return err
}

return nil
},
}

return cmd
}

func MigratePVCToRqliteCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "pvc-to-rqlite",
Short: "Migrate object storage from PVC to rqlite",
Long: ``,
SilenceUsage: true,
SilenceErrors: false,
PreRun: func(cmd *cobra.Command, args []string) {
viper.BindPFlags(cmd.Flags())
},
RunE: func(cmd *cobra.Command, args []string) error {
// Check if required env vars are set
if os.Getenv("RQLITE_URI") == "" {
return errors.New("RQLITE_URI is not set")
}

// Check if PVC mount and the archives dir exist
if _, err := os.Stat(filestore.ArchivesDir); err != nil {
return errors.Wrap(err, "failed to stat archives dir")
}

// Initialize the rqlite DB
persistence.InitDB(os.Getenv("RQLITE_URI"))

// Migrate from PVC to rqlite
if err := filestore.MigrateFromPVCToRqlite(cmd.Context()); err != nil {
return err
}

return nil
},
}

return cmd
}
1 change: 1 addition & 0 deletions cmd/kotsadm/cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func RootCmd() *cobra.Command {
cmd.PersistentFlags().String("log-level", "info", "set the log level")

cmd.AddCommand(APICmd())
cmd.AddCommand(MigrateCmd())
cmd.AddCommand(CompletionCmd())

viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_"))
Expand Down
2 changes: 1 addition & 1 deletion migrations/tables/object_store.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ spec:
- name: encoded_block
type: text
constraints:
notNull: true
notNull: true
3 changes: 1 addition & 2 deletions pkg/filestore/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package filestore
import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -70,7 +69,7 @@ func (s *BlobStore) readFile(path string) (string, error) {
}
defer fileReader.Close()

tmpDir, err := ioutil.TempDir("", "kotsadm")
tmpDir, err := os.MkdirTemp("", "kotsadm")
if err != nil {
return "", errors.Wrap(err, "failed to create temp dir")
}
Expand Down
201 changes: 201 additions & 0 deletions pkg/filestore/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package filestore

import (
"bytes"
"context"
"fmt"
"log"
"os"
"path/filepath"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
_ "github.com/mattn/go-sqlite3"
"github.com/pkg/errors"
"github.com/replicatedhq/kots/pkg/persistence"
kotss3 "github.com/replicatedhq/kots/pkg/s3"
"github.com/rqlite/gorqlite"
)

const (
RQLITE_S3_MIGRATION_SUCCESS_KEY = "rqlite.s3.migration.success"
RQLITE_BLOB_MIGRATION_SUCCESS_KEY = "rqlite.blob.migration.success"
RQLITE_MIGRATION_SUCCESS_VALUE = "true"
)

func MigrateFromS3ToRqlite(ctx context.Context) error {
// Check if already migrated
rqliteDB := persistence.MustGetDBSession()
alreadyMigrated, err := isAlreadyMigrated(rqliteDB, RQLITE_S3_MIGRATION_SUCCESS_KEY)
if err != nil {
return errors.Wrap(err, "failed to check if already migrated")
}
if alreadyMigrated {
log.Println("Already migrated from S3 to rqlite. Skipping migration...")
return nil
}

log.Println("Migrating from S3 to rqlite...")

// Initialize rqlite store
rqliteStore := &RqliteStore{}
if err := rqliteStore.Init(); err != nil {
return errors.Wrap(err, "failed to init rqlite store")
}
if err := rqliteStore.WaitForReady(ctx); err != nil {
return errors.Wrap(err, "failed to wait for rqlite store to become ready")
}

// Create a new S3 session
sess, err := session.NewSession(kotss3.GetConfig())
if err != nil {
return errors.Wrap(err, "failed to create new s3 session")
}

// Create an S3 client
s3Client := s3.New(sess)

// List objects in the bucket
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(os.Getenv("S3_BUCKET_NAME")),
}
listObjectsOutput, err := s3Client.ListObjectsV2(listObjectsInput)
if err != nil {
return errors.Wrap(err, "failed to list objects in bucket")
}

// Initialize the S3 downloader
downloader := s3manager.NewDownloader(sess)

// Process each object
for _, item := range listObjectsOutput.Contents {
if item == nil || item.Key == nil {
continue
}
key := *item.Key
log.Printf("Processing key: %s\n", key)

// Download the object
buff := &aws.WriteAtBuffer{}
_, err := downloader.Download(buff, &s3.GetObjectInput{
Bucket: aws.String(os.Getenv("S3_BUCKET_NAME")),
Key: aws.String(key),
})
if err != nil {
return errors.Wrap(err, "failed to download object")
}

// Write the object to rqlite
if err := rqliteStore.WriteArchive(key, bytes.NewReader(buff.Bytes())); err != nil {
return errors.Wrap(err, "failed to write archive to rqlite")
}
}

// Record the migration success
query := `REPLACE INTO kotsadm_params (key, value) VALUES (?, ?)`
wr, err := rqliteDB.WriteOneParameterized(gorqlite.ParameterizedStatement{
Query: query,
Arguments: []interface{}{RQLITE_S3_MIGRATION_SUCCESS_KEY, RQLITE_MIGRATION_SUCCESS_VALUE},
})
if err != nil {
return fmt.Errorf("failed to mark migration as successful: %v: %v", err, wr.Err)
}

log.Println("Migrated from S3 to rqlite successfully!")

return nil
}

func MigrateFromPVCToRqlite(ctx context.Context) error {
// Check if already migrated
rqliteDB := persistence.MustGetDBSession()
alreadyMigrated, err := isAlreadyMigrated(rqliteDB, RQLITE_BLOB_MIGRATION_SUCCESS_KEY)
if err != nil {
return errors.Wrap(err, "failed to check if already migrated")
}
if alreadyMigrated {
log.Println("Already migrated from PVC to rqlite. Skipping migration...")
return nil
}

log.Println("Migrating from PVC to rqlite...")

// Initialize rqlite store
rqliteStore := &RqliteStore{}
if err := rqliteStore.Init(); err != nil {
return errors.Wrap(err, "failed to init rqlite store")
}
if err := rqliteStore.WaitForReady(ctx); err != nil {
return errors.Wrap(err, "failed to wait for rqlite store to become ready")
}

// Process each object
err = filepath.Walk(ArchivesDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return errors.Wrap(err, "failed to walk path")
}

if info.IsDir() {
return nil
}

key, err := filepath.Rel(ArchivesDir, path)
if err != nil {
return errors.Wrap(err, "failed to get relative path")
}
log.Printf("Processing key: %s\n", key)

// Open the file
file, err := os.Open(path)
if err != nil {
return errors.Wrap(err, "failed to open file")
}
defer file.Close()

// Write the object to rqlite
if err := rqliteStore.WriteArchive(key, file); err != nil {
return errors.Wrap(err, "failed to write archive to rqlite")
}

return nil
})
if err != nil {
return errors.Wrap(err, "failed to walk PVC mount")
}

// Record the migration success
query := `REPLACE INTO kotsadm_params (key, value) VALUES (?, ?)`
wr, err := rqliteDB.WriteOneParameterized(gorqlite.ParameterizedStatement{
Query: query,
Arguments: []interface{}{RQLITE_BLOB_MIGRATION_SUCCESS_KEY, RQLITE_MIGRATION_SUCCESS_VALUE},
})
if err != nil {
return fmt.Errorf("failed to mark migration as successful: %v: %v", err, wr.Err)
}

log.Println("Migrated from PVC to rqlite successfully!")

return nil
}

func isAlreadyMigrated(rqliteDB *gorqlite.Connection, migrationKey string) (bool, error) {
rows, err := rqliteDB.QueryOneParameterized(gorqlite.ParameterizedStatement{
Query: `SELECT value FROM kotsadm_params WHERE key = ?`,
Arguments: []interface{}{migrationKey},
})
if err != nil {
return false, fmt.Errorf("failed to query: %v: %v", err, rows.Err)
}
if !rows.Next() {
return false, nil
}

var value string
if err := rows.Scan(&value); err != nil {
return false, errors.Wrap(err, "failed to scan")
}

return value == RQLITE_MIGRATION_SUCCESS_VALUE, nil
}
Loading

0 comments on commit 187d483

Please sign in to comment.