diff --git a/pkg/ccr/record/add_partition.go b/pkg/ccr/record/add_partition.go index 49855acd..5a0df378 100644 --- a/pkg/ccr/record/add_partition.go +++ b/pkg/ccr/record/add_partition.go @@ -11,19 +11,22 @@ import ( log "github.com/sirupsen/logrus" ) +type DistributionInfo struct { + BucketNum int `json:"bucketNum"` + Type string `json:"type"` + DistributionColumns []struct { + Name string `json:"name"` + } `json:"distributionColumns"` +} + type AddPartition struct { DbId int64 `json:"dbId"` TableId int64 `json:"tableId"` Sql string `json:"sql"` IsTemp bool `json:"isTempPartition"` Partition struct { - DistributionInfo struct { - BucketNum int `json:"bucketNum"` - Type string `json:"type"` - DistributionColumns []struct { - Name string `json:"name"` - } `json:"distributionColumns"` - } `json:"distributionInfo"` + DistributionInfoOld *DistributionInfo `json:"distributionInfo"` + DistributionInfoNew *DistributionInfo `json:"di"` } `json:"partition"` } @@ -45,9 +48,16 @@ func NewAddPartitionFromJson(data string) (*AddPartition, error) { return &addPartition, nil } +func (addPartition *AddPartition) getDistributionInfo() *DistributionInfo { + if addPartition.Partition.DistributionInfoOld != nil { + return addPartition.Partition.DistributionInfoOld + } + return addPartition.Partition.DistributionInfoNew +} + func (addPartition *AddPartition) getDistributionColumns() []string { var distributionColumns []string - for _, column := range addPartition.Partition.DistributionInfo.DistributionColumns { + for _, column := range addPartition.getDistributionInfo().DistributionColumns { distributionColumns = append(distributionColumns, column.Name) } return distributionColumns @@ -81,15 +91,16 @@ func (addPartition *AddPartition) GetSql(destTableName string) string { // ADD PARTITION p1 VALUES LESS THAN ("2015-01-01") // DISTRIBUTED BY HASH(k1) BUCKETS 20; // or DISTRIBUTED BY RANDOM BUCKETS 20; + distributionInfo := addPartition.getDistributionInfo() if !strings.Contains(strings.ToUpper(addPartitionSql), "DISTRIBUTED BY") { // addPartitionSql = fmt.Sprintf("%s DISTRIBUTED BY (%s)", addPartitionSql, strings.Join(addPartition.getDistributionColumns(), ",")) - if addPartition.Partition.DistributionInfo.Type == "HASH" { + if distributionInfo.Type == "HASH" { addPartitionSql = fmt.Sprintf("%s DISTRIBUTED BY HASH(%s)", addPartitionSql, strings.Join(addPartition.getDistributionColumns(), ",")) } else { addPartitionSql = fmt.Sprintf("%s DISTRIBUTED BY RANDOM", addPartitionSql) } } - bucketNum := addPartition.Partition.DistributionInfo.BucketNum + bucketNum := distributionInfo.BucketNum addPartitionSql = fmt.Sprintf("%s BUCKETS %d", addPartitionSql, bucketNum) return addPartitionSql