Skip to content

Commit

Permalink
Make AddPartition record compatibile with alternative json name
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Jul 15, 2024
1 parent b0b9290 commit bf6d7bd
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions pkg/ccr/record/add_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@ import (
log "github.com/sirupsen/logrus"
)

type DistributionInfo struct {
BucketNum int `json:"bucketNum"`
Type string `json:"type"`
DistributionColumns []struct {
Name string `json:"name"`
} `json:"distributionColumns"`
}

type AddPartition struct {
DbId int64 `json:"dbId"`
TableId int64 `json:"tableId"`
Sql string `json:"sql"`
IsTemp bool `json:"isTempPartition"`
Partition struct {
DistributionInfo struct {
BucketNum int `json:"bucketNum"`
Type string `json:"type"`
DistributionColumns []struct {
Name string `json:"name"`
} `json:"distributionColumns"`
} `json:"distributionInfo"`
DistributionInfoOld *DistributionInfo `json:"distributionInfo"`
DistributionInfoNew *DistributionInfo `json:"di"`
} `json:"partition"`
}

Expand All @@ -45,9 +48,16 @@ func NewAddPartitionFromJson(data string) (*AddPartition, error) {
return &addPartition, nil
}

func (addPartition *AddPartition) getDistributionInfo() *DistributionInfo {
if addPartition.Partition.DistributionInfoOld != nil {
return addPartition.Partition.DistributionInfoOld
}
return addPartition.Partition.DistributionInfoNew
}

func (addPartition *AddPartition) getDistributionColumns() []string {
var distributionColumns []string
for _, column := range addPartition.Partition.DistributionInfo.DistributionColumns {
for _, column := range addPartition.getDistributionInfo().DistributionColumns {
distributionColumns = append(distributionColumns, column.Name)
}
return distributionColumns
Expand Down Expand Up @@ -81,15 +91,16 @@ func (addPartition *AddPartition) GetSql(destTableName string) string {
// ADD PARTITION p1 VALUES LESS THAN ("2015-01-01")
// DISTRIBUTED BY HASH(k1) BUCKETS 20;
// or DISTRIBUTED BY RANDOM BUCKETS 20;
distributionInfo := addPartition.getDistributionInfo()
if !strings.Contains(strings.ToUpper(addPartitionSql), "DISTRIBUTED BY") {
// addPartitionSql = fmt.Sprintf("%s DISTRIBUTED BY (%s)", addPartitionSql, strings.Join(addPartition.getDistributionColumns(), ","))
if addPartition.Partition.DistributionInfo.Type == "HASH" {
if distributionInfo.Type == "HASH" {
addPartitionSql = fmt.Sprintf("%s DISTRIBUTED BY HASH(%s)", addPartitionSql, strings.Join(addPartition.getDistributionColumns(), ","))
} else {
addPartitionSql = fmt.Sprintf("%s DISTRIBUTED BY RANDOM", addPartitionSql)
}
}
bucketNum := addPartition.Partition.DistributionInfo.BucketNum
bucketNum := distributionInfo.BucketNum
addPartitionSql = fmt.Sprintf("%s BUCKETS %d", addPartitionSql, bucketNum)

return addPartitionSql
Expand Down

0 comments on commit bf6d7bd

Please sign in to comment.