From da628a7aad22fb2942a8730318f2719beba319cc Mon Sep 17 00:00:00 2001 From: calvin Date: Mon, 29 Jan 2024 11:43:30 +0800 Subject: [PATCH] support split table for internal storage Signed-off-by: calvin --- examples/pediacluster.yaml | 2 +- pkg/storage/internalstorage/register.go | 7 +- .../internalstorage/resource_storage.go | 62 ++++++----- pkg/storage/internalstorage/storage.go | 105 ++++++++++++++---- 4 files changed, 125 insertions(+), 51 deletions(-) diff --git a/examples/pediacluster.yaml b/examples/pediacluster.yaml index 87f385d23..324a7bffa 100644 --- a/examples/pediacluster.yaml +++ b/examples/pediacluster.yaml @@ -4,7 +4,7 @@ metadata: name: cluster-example spec: apiserver: "https://10.30.43.43:6443" - caData: + caData: tokenData: certData: keyData: diff --git a/pkg/storage/internalstorage/register.go b/pkg/storage/internalstorage/register.go index 42061499b..0b5d19894 100644 --- a/pkg/storage/internalstorage/register.go +++ b/pkg/storage/internalstorage/register.go @@ -93,7 +93,12 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) { sqlDB.SetMaxOpenConns(connPool.MaxOpenConns) sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime) - return &StorageFactory{db}, nil + return &StorageFactory{ + db: db, + AutoMigration: cfg.AutoMigration, + DivisionPolicy: cfg.DivisionPolicy, + Mapper: cfg.Mapper, + }, nil } func newLogger(cfg *Config) (logger.Interface, error) { diff --git a/pkg/storage/internalstorage/resource_storage.go b/pkg/storage/internalstorage/resource_storage.go index c306ac9b6..83bf09f4f 100644 --- a/pkg/storage/internalstorage/resource_storage.go +++ b/pkg/storage/internalstorage/resource_storage.go @@ -29,6 +29,7 @@ import ( type ResourceStorage struct { db *gorm.DB codec runtime.Codec + table string storageGroupResource schema.GroupResource storageVersion schema.GroupVersion @@ -83,7 +84,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim resource.DeletedAt = sql.NullTime{Time: deletedAt.Time, Valid: true} } - result := s.db.WithContext(ctx).Create(&resource) + result := s.db.WithContext(ctx).Table(s.table).Create(&resource) return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error) } @@ -116,14 +117,19 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true} } - result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{ - "cluster": cluster, - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "namespace": metaobj.GetNamespace(), - "name": metaobj.GetName(), - }).Updates(updatedResource) + result := s.db.WithContext(ctx). + Model(&Resource{}). + Where(map[string]interface{}{ + "cluster": cluster, + "group": s.storageGroupResource.Group, + "version": s.storageVersion.Version, + "resource": s.storageGroupResource.Resource, + "namespace": metaobj.GetNamespace(), + "name": metaobj.GetName(), + }). + Table(s.table). + Updates(updatedResource) + return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error) } @@ -144,14 +150,17 @@ func (c *ResourceStorage) ConvertDeletedObject(obj interface{}) (runtime.Object, } func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB { - return s.db.Model(&Resource{}).Where(map[string]interface{}{ - "cluster": cluster, - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "namespace": namespace, - "name": name, - }).Delete(&Resource{}) + return s.db.Model(&Resource{}). + Where(map[string]interface{}{ + "cluster": cluster, + "group": s.storageGroupResource.Group, + "version": s.storageVersion.Version, + "resource": s.storageGroupResource.Resource, + "namespace": namespace, + "name": name, + }). + Table(s.table). + Delete(&Resource{}) } func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error { @@ -167,14 +176,15 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim } func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB { - return s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(map[string]interface{}{ - "cluster": cluster, - "group": s.storageGroupResource.Group, - "version": s.storageVersion.Version, - "resource": s.storageGroupResource.Resource, - "namespace": namespace, - "name": name, - }) + return s.db.WithContext(ctx).Model(&Resource{}).Table(s.table).Select("object"). + Where(map[string]interface{}{ + "cluster": cluster, + "group": s.storageGroupResource.Group, + "version": s.storageVersion.Version, + "resource": s.storageGroupResource.Resource, + "namespace": namespace, + "name": name, + }) } func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { @@ -199,7 +209,7 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna result = &ResourceMetadataList{} } - query := s.db.WithContext(ctx).Model(&Resource{}) + query := s.db.WithContext(ctx).Model(&Resource{}).Table(s.table) query = query.Where(map[string]interface{}{ "group": s.storageGroupResource.Group, "version": s.storageVersion.Version, diff --git a/pkg/storage/internalstorage/storage.go b/pkg/storage/internalstorage/storage.go index 25530b283..78ae6c910 100644 --- a/pkg/storage/internalstorage/storage.go +++ b/pkg/storage/internalstorage/storage.go @@ -3,6 +3,8 @@ package internalstorage import ( "context" "fmt" + "strings" + "sync" "gorm.io/gorm" "k8s.io/apimachinery/pkg/runtime/schema" @@ -11,6 +13,8 @@ import ( "github.com/clusterpedia-io/clusterpedia/pkg/storage" ) +var mutex sync.Mutex + type StorageFactory struct { db *gorm.DB AutoMigration *bool @@ -19,34 +23,54 @@ type StorageFactory struct { } func (s *StorageFactory) AutoMigrate() error { + return nil +} + +func (s *StorageFactory) GetSupportedRequestVerbs() []string { + return []string{"get", "list"} +} + +func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { + mutex.Lock() + defer mutex.Unlock() + + var table string if s.AutoMigration != nil && *s.AutoMigration { switch s.DivisionPolicy { - if err := s.db.AutoMigrate(&Resource{}); err != nil { - return err - } case "", DivisionPolicyNone: + table = "resources" + + if exist := s.db.Migrator().HasTable(table); !exist { + if err := s.db.AutoMigrate(&Resource{}); err != nil { + return nil, err + } + } case DivisionPolicyGroupResource: + gvr := schema.GroupVersionResource{ + Group: config.StorageGroupResource.Group, + Version: config.StorageVersion.Version, + Resource: config.StorageGroupResource.Resource, + } - } + table = GenerateTableFor(gvr) + + if exist := s.db.Migrator().HasTable(table); !exist { + if err := s.db.AutoMigrate(&Resource{}); err != nil { + return nil, err + } - if s.DivisionPolicy == "" || s.DivisionPolicy == DivisionPolicyNone { - if err := s.db.AutoMigrate(&Resource{}); err != nil { - return err + err := s.db.Migrator().RenameTable("resources", table) + if err != nil { + return nil, err + } } } } - return nil -} - -func (s *StorageFactory) GetSupportedRequestVerbs() []string { - return []string{"get", "list"} -} - -func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { return &ResourceStorage{ db: s.db, codec: config.Codec, + table: table, storageGroupResource: config.StorageGroupResource, storageVersion: config.StorageVersion, @@ -65,11 +89,23 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) { var resources []Resource - result := f.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version"). - Where(map[string]interface{}{"cluster": cluster}). - Find(&resources) - if result.Error != nil { - return nil, InterpretDBError(cluster, result.Error) + mutex.Lock() + tables, err := f.db.Migrator().GetTables() + if err != nil { + mutex.Unlock() + return nil, err + } + mutex.Unlock() + for _, table := range tables { + var tableResources []Resource + result := f.db.WithContext(ctx).Table(table).Select("group", "version", "resource", "namespace", "name", "resource_version"). + Where(map[string]interface{}{"cluster": cluster}). + Find(&tableResources) + if result.Error != nil { + return nil, InterpretDBError(cluster, result.Error) + } + + resources = append(resources, tableResources...) } resourceversions := make(map[schema.GroupVersionResource]map[string]interface{}) @@ -91,12 +127,25 @@ func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string } func (f *StorageFactory) CleanCluster(ctx context.Context, cluster string) error { - result := f.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{}) - return InterpretDBError(cluster, result.Error) + mutex.Lock() + tables, err := f.db.Migrator().GetTables() + if err != nil { + mutex.Unlock() + return err + } + mutex.Unlock() + + for _, table := range tables { + result := f.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{}) + if result.Error != nil { + return InterpretDBError(cluster, result.Error) + } + } + return nil } func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error { - result := s.db.WithContext(ctx).Where(map[string]interface{}{ + result := s.db.WithContext(ctx).Table(GenerateTableFor(gvr)).Where(map[string]interface{}{ "cluster": cluster, "group": gvr.Group, "version": gvr.Version, @@ -116,3 +165,13 @@ func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*interna func (s *StorageFactory) PrepareCluster(cluster string) error { return nil } + +// GenerateTableFor return table name using gvr string +func GenerateTableFor(gvr schema.GroupVersionResource) string { + if gvr.Group == "" { + return fmt.Sprintf("%s_%s", gvr.Version, gvr.Resource) + } + + group := strings.ReplaceAll(gvr.Group, ".", "_") + return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource) +}