Skip to content

Commit

Permalink
feat: s3 switch
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Jan 2, 2025
1 parent f3d5634 commit 819997a
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pkg/common/storage/database/mgo/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,15 @@ func (o *S3Mongo) FindExpirationObject(ctx context.Context, engine string, expir
func (o *S3Mongo) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) {
return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine, "key": key})
}

func (o *S3Mongo) GetEngineCount(ctx context.Context, engine string) (int64, error) {
return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine})
}

func (o *S3Mongo) GetEngineInfo(ctx context.Context, engine string, limit int, skip int) ([]*model.Object, error) {
return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{"engine": engine}, options.Find().SetLimit(int64(limit)).SetSkip(int64(skip)))
}

func (o *S3Mongo) UpdateEngine(ctx context.Context, oldEngine, oldName string, newEngine string) error {
return mongoutil.UpdateOne(ctx, o.coll, bson.M{"engine": oldEngine, "name": oldName}, bson.M{"$set": bson.M{"engine": newEngine}}, false)
}
4 changes: 4 additions & 0 deletions pkg/common/storage/database/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ type ObjectInfo interface {
Delete(ctx context.Context, engine string, name []string) error
FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error)
GetKeyCount(ctx context.Context, engine string, key string) (int64, error)

GetEngineCount(ctx context.Context, engine string) (int64, error)
GetEngineInfo(ctx context.Context, engine string, limit int, skip int) ([]*model.Object, error)
UpdateEngine(ctx context.Context, oldEngine, oldName string, newEngine string) error
}
202 changes: 202 additions & 0 deletions tools/s3/internal/conversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package internal

import (
"context"
"errors"
"fmt"
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo"
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/s3"
"github.com/openimsdk/tools/s3/aws"
"github.com/openimsdk/tools/s3/cos"
"github.com/openimsdk/tools/s3/kodo"
"github.com/openimsdk/tools/s3/minio"
"github.com/openimsdk/tools/s3/oss"
"github.com/spf13/viper"
"go.mongodb.org/mongo-driver/mongo"
"log"
"net/http"
"path/filepath"
"time"
)

const defaultTimeout = time.Second * 10

func readConf(path string, val any) error {
v := viper.New()
v.SetConfigFile(path)
if err := v.ReadInConfig(); err != nil {
return err
}
fn := func(config *mapstructure.DecoderConfig) {
config.TagName = "mapstructure"
}
return v.Unmarshal(val, fn)
}

func getS3(path string, name string, thirdConf *config.Third) (s3.Interface, error) {
switch name {
case "minio":
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
var minioConf config.Minio
if err := readConf(filepath.Join(path, minioConf.GetConfigFileName()), &minioConf); err != nil {
return nil, err
}
var redisConf config.Redis
if err := readConf(filepath.Join(path, redisConf.GetConfigFileName()), &redisConf); err != nil {
return nil, err
}
rdb, err := redisutil.NewRedisClient(ctx, redisConf.Build())
if err != nil {
return nil, err
}
return minio.NewMinio(ctx, redis.NewMinioCache(rdb), *minioConf.Build())
case "cos":
return cos.NewCos(*thirdConf.Object.Cos.Build())
case "oss":
return oss.NewOSS(*thirdConf.Object.Oss.Build())
case "kodo":
return kodo.NewKodo(*thirdConf.Object.Kodo.Build())
case "aws":
return aws.NewAws(*thirdConf.Object.Aws.Build())
default:
return nil, fmt.Errorf("invalid object enable: %s", name)
}
}

func getMongo(path string) (database.ObjectInfo, error) {
var mongoConf config.Mongo
if err := readConf(filepath.Join(path, mongoConf.GetConfigFileName()), &mongoConf); err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
mgocli, err := mongoutil.NewMongoDB(ctx, mongoConf.Build())
if err != nil {
return nil, err
}
return mgo.NewS3Mongo(mgocli.GetDB())
}

func Main(path string, engine string) error {
var thirdConf config.Third
if err := readConf(filepath.Join(path, thirdConf.GetConfigFileName()), &thirdConf); err != nil {
return err
}
if thirdConf.Object.Enable == engine {
return errors.New("same s3 storage")
}
s3db, err := getMongo(path)
if err != nil {
return err
}
oldS3, err := getS3(path, engine, &thirdConf)
if err != nil {
return err
}
newS3, err := getS3(path, thirdConf.Object.Enable, &thirdConf)
if err != nil {
return err
}
count, err := getEngineCount(s3db, oldS3.Engine())
if err != nil {
return err
}
log.Printf("engine %s count: %d", oldS3.Engine(), count)
var skip int
for i := 1; i <= count+1; i++ {
log.Printf("start %d/%d", i, count)
start := time.Now()
res, err := doObject(s3db, newS3, oldS3, skip)
if err != nil {
log.Printf("end [%s] %d/%d error %s", time.Since(start), i, count, err)
return err
}
log.Printf("end [%s] %d/%d result %+v", time.Since(start), i, count, *res)
if res.Skip {
skip++
}
if res.End {
break
}
}
return nil
}

func getEngineCount(db database.ObjectInfo, name string) (int, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
count, err := db.GetEngineCount(ctx, name)
if err != nil {
return 0, err
}
return int(count), nil
}

func doObject(db database.ObjectInfo, newS3, oldS3 s3.Interface, skip int) (*Result, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
infos, err := db.GetEngineInfo(ctx, oldS3.Engine(), 1, skip)
if err != nil {
return nil, err
}
if len(infos) == 0 {
return &Result{End: true}, nil
}
obj := infos[0]
if _, err := db.Take(ctx, newS3.Engine(), obj.Name); err == nil {
return &Result{Skip: true}, nil
} else if !errors.Is(err, mongo.ErrNoDocuments) {
return nil, err
}
downloadURL, err := oldS3.AccessURL(ctx, obj.Key, time.Hour, &s3.AccessURLOption{})
if err != nil {
return nil, err
}
putURL, err := newS3.PresignedPutObject(ctx, obj.Key, time.Hour)
if err != nil {
return nil, err
}
downloadResp, err := http.Get(downloadURL)
if err != nil {
return nil, err
}
defer downloadResp.Body.Close()
switch downloadResp.StatusCode {
case http.StatusNotFound:
return &Result{Skip: true}, nil
case http.StatusOK:
default:
return nil, fmt.Errorf("download object failed %s", downloadResp.Status)
}
log.Printf("file size %d", obj.Size)
request, err := http.NewRequest(http.MethodPut, putURL, downloadResp.Body)
if err != nil {
return nil, err
}
putResp, err := http.DefaultClient.Do(request)
if err != nil {
return nil, err
}
defer putResp.Body.Close()
if putResp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("put object failed %s", putResp.Status)
}
ctx, cancel = context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
if err := db.UpdateEngine(ctx, obj.Engine, obj.Name, newS3.Engine()); err != nil {
return nil, err
}
return &Result{}, nil
}

type Result struct {
Skip bool
End bool
}
23 changes: 23 additions & 0 deletions tools/s3/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package main

import (
"flag"
"fmt"
"github.com/openimsdk/open-im-server/v3/tools/s3/internal"
"os"
)

func main() {
var (
name string
config string
)
flag.StringVar(&name, "name", "", "old previous storage name")
flag.StringVar(&config, "config", "", "config directory")
flag.Parse()
if err := internal.Main(config, name); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
fmt.Fprintln(os.Stdout, "success")
}

0 comments on commit 819997a

Please sign in to comment.