Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rebalance #291

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions common/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ func ConnectClickHouse(host string, database string, opt model.ConnetOption) (*C
Username: opt.User,
Password: opt.Password,
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Protocol: opt.Protocol,
DialTimeout: time.Duration(10) * time.Second,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
Expand Down
16 changes: 11 additions & 5 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ func getShardingType(key *model.RebalanceShardingkey, conn *common.Conn) error {

func RebalanceByPartition(conf *model.CKManClickHouseConfig, rebalancer *CKRebalance) error {
var err error
if err = rebalancer.InitCKConns(); err != nil {
if err = rebalancer.InitCKConns(false); err != nil {
log.Logger.Errorf("got error %+v", err)
return err
}
Expand All @@ -1767,7 +1767,7 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb
var err error
start := time.Now()
log.Logger.Info("[rebalance] STEP InitCKConns")
if err = rebalancer.InitCKConns(); err != nil {
if err = rebalancer.InitCKConns(true); err != nil {
log.Logger.Errorf("got error %+v", err)
return err
}
Expand All @@ -1780,19 +1780,25 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb
return err
}
if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil {
return err
time.Sleep(5 * time.Second)
if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil {
return err
}
}
log.Logger.Info("[rebalance] STEP InsertPlan")
if err = rebalancer.InsertPlan(); err != nil {
return errors.Wrapf(err, "table %s.%s rebalance failed, data can be corrupted, please move back from temp table[%s] manually", rebalancer.Database, rebalancer.Table, rebalancer.TmpTable)
}
if err = rebalancer.CheckCounts(rebalancer.Table); err != nil {
return err
time.Sleep(5 * time.Second)
if err = rebalancer.CheckCounts(rebalancer.Table); err != nil {
return err
}
}
log.Logger.Info("[rebalance] STEP Cleanup")
rebalancer.Cleanup()

log.Logger.Infof("[rebalance] DONE, Elapsed: %v sec", time.Since(start).Seconds())
log.Logger.Infof("[rebalance] DONE, Total counts: %d, Elapsed: %v sec", rebalancer.OriCount, time.Since(start).Seconds())
return nil
}

Expand Down
183 changes: 95 additions & 88 deletions service/clickhouse/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package clickhouse
import (
"fmt"
"path/filepath"
"regexp"
"runtime"
"sort"
"strings"
"sync"

"github.com/housepower/ckman/common"
"github.com/housepower/ckman/log"
"github.com/housepower/ckman/model"
"github.com/housepower/ckman/repository"
"github.com/k0kubun/pp"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -41,6 +38,7 @@ type CKRebalance struct {
Engine string
EngineFull string
OriCount uint64
SortingKey []string
}

// TblPartitions is partitions status of a host. A host never move out and move in at the same iteration.
Expand All @@ -54,7 +52,7 @@ type TblPartitions struct {
ToMoveIn bool // plan to move some partitions in
}

func (r *CKRebalance) InitCKConns() (err error) {
func (r *CKRebalance) InitCKConns(withShardingkey bool) (err error) {
locks = make(map[string]*sync.Mutex)
for _, host := range r.Hosts {
_, err = common.ConnectClickHouse(host, model.ClickHouseDefaultDB, r.ConnOpt)
Expand All @@ -65,32 +63,56 @@ func (r *CKRebalance) InitCKConns() (err error) {
locks[host] = &sync.Mutex{}
}

conn := common.GetConnection(r.Hosts[0])
query := fmt.Sprintf("SELECT engine, engine_full FROM system.tables WHERE database = '%s' AND table = '%s'", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.Engine, &r.EngineFull)
if err != nil {
return
if withShardingkey {
conn := common.GetConnection(r.Hosts[0])
// get engine
query := fmt.Sprintf("SELECT engine, engine_full FROM system.tables WHERE database = '%s' AND table = '%s'", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.Engine, &r.EngineFull)
if err != nil {
return
}
}
}
rows.Close()
log.Logger.Infof("table: %s.%s, engine: %s, engine_full:%s", r.Database, r.Table, r.Engine, r.EngineFull)
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
if strings.Contains(r.Engine, "Replacing") {
query += " FINAL"
}
log.Logger.Debugf("query: %s", query)
rows, _ = conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.OriCount)
if err != nil {
return
rows.Close()
log.Logger.Infof("table: %s.%s, engine: %s, engine_full:%s", r.Database, r.Table, r.Engine, r.EngineFull)

//get sortingkey
if strings.Contains(r.Engine, "Replacing") {
query = fmt.Sprintf("SELECT name FROM system.columns WHERE (database = '%s') AND (table = '%s') AND (is_in_sorting_key = 1)", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
var sortingkey string
err = rows.Scan(&sortingkey)
if err != nil {
return
}
r.SortingKey = append(r.SortingKey, sortingkey)
}
rows.Close()
log.Logger.Infof("table: %s.%s, sortingkey:%s", r.Database, r.Table, r.SortingKey)

}

//get original count
if strings.Contains(r.Engine, "Replacing") {
query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table)
} else {
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
}
log.Logger.Debugf("query: %s", query)
rows, _ = conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.OriCount)
if err != nil {
return
}
}
log.Logger.Infof("table: %s.%s, count: %d", r.Database, r.Table, r.OriCount)
rows.Close()
}
log.Logger.Infof("table: %s.%s, count: %d", r.Database, r.Table, r.OriCount)
rows.Close()
return
}

Expand Down Expand Up @@ -402,9 +424,11 @@ func (r *CKRebalance) CreateTemporaryTable() error {
}

func (r *CKRebalance) CheckCounts(tableName string) error {
query := fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, tableName)
var query string
if strings.Contains(r.Engine, "Replacing") {
query += " FINAL"
query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table)
} else {
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
}
log.Logger.Debugf("query: %s", query)
conn := common.GetConnection(r.Hosts[0])
Expand All @@ -428,8 +452,8 @@ func (r *CKRebalance) CheckCounts(tableName string) error {
return nil
}

// moveback from tmp_table to ori_table after rehash
func (r *CKRebalance) InsertPlan() error {
max_insert_threads := runtime.NumCPU()*3/4 + 1 // add 1 to ensure threads not zero
var lastError error
var wg sync.WaitGroup
for idx, host := range r.Hosts {
Expand All @@ -445,26 +469,43 @@ func (r *CKRebalance) InsertPlan() error {
lastError = errors.Wrap(err, host)
return
}

query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE %s %% %d = %d SETTINGS max_insert_threads=%d, max_execution_time=0",
r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, ShardingFunc(r.Shardingkey), len(r.Hosts), idx, max_insert_threads)
query = fmt.Sprintf(`SELECT distinct partition_id FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1 ORDER BY partition_id`, r.Cluster, r.Database, r.TmpTable)
log.Logger.Debugf("[%s]%s", host, query)
if err := conn.Exec(query); err != nil {
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
partitions := make([]string, 0)
for rows.Next() {
var partitionId string
err = rows.Scan(&partitionId)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
partitions = append(partitions, partitionId)
}
rows.Close()
log.Logger.Debugf("host:[%s], parts: %v", host, partitions)

for i, partition := range partitions {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE _partition_id = '%s' AND %s %% %d = %d SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, partition, ShardingFunc(r.Shardingkey), len(r.Hosts), idx)
log.Logger.Debugf("[%s](%d/%d) %s", host, i+1, len(partitions), query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}
}
})
}
wg.Wait()
return lastError
}

// backup from ori_table to tmp_table
func (r *CKRebalance) MoveBackup() error {
conf, err := repository.Ps.GetClusterbyName(r.Cluster)
if err != nil {
return err
}
var wg sync.WaitGroup
var lastError error
for _, host := range r.Hosts {
Expand All @@ -473,67 +514,33 @@ func (r *CKRebalance) MoveBackup() error {
_ = common.Pool.Submit(func() {
defer wg.Done()
conn := common.GetConnection(host)
// copy data
cmd := fmt.Sprintf("ls -l %sclickhouse/data/%s/%s/ |grep -v total |awk '{print $9}'", r.DataDir, r.Database, r.Table)
sshOpts := common.SshOptions{
User: conf.SshUser,
Password: conf.SshPassword,
Port: conf.SshPort,
Host: host,
NeedSudo: conf.NeedSudo,
AuthenticateType: conf.AuthenticateType,
}
out, err := common.RemoteExecute(sshOpts, cmd)
query := fmt.Sprintf(`SELECT distinct partition_id FROM system.parts WHERE database = '%s' AND table = '%s' AND active=1 order by partition_id`, r.Database, r.Table)
log.Logger.Debugf("[%s]%s", host, query)
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts := make([]string, 0)
for _, file := range strings.Split(out, "\n") {
file = strings.TrimSpace(strings.TrimSuffix(file, "\r"))
reg, err := regexp.Compile(`[^_]+(_\d+){3,}$`) //parts name
if err != nil {
lastError = errors.Wrap(err, host)
return
}
if reg.MatchString(file) && !strings.HasPrefix(file, "tmp_merge") {
parts = append(parts, file)
}
}
log.Logger.Debugf("host:[%s], parts: %v", host, parts)
var cmds []string
for _, part := range parts {
cmds = append(cmds, fmt.Sprintf("cp -prf %sclickhouse/data/%s/%s/%s %sclickhouse/data/%s/%s/detached/", r.DataDir, r.Database, r.Table, part, r.DataDir, r.Database, r.TmpTable))
}
if len(cmds) > 0 {
log.Logger.Debugf("host:[%s], cmds: %v", host, cmds)
_, err = common.RemoteExecute(sshOpts, strings.Join(cmds, ";"))
partitions := make([]string, 0)
for rows.Next() {
var partitionId string
err = rows.Scan(&partitionId)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
partitions = append(partitions, partitionId)
}
rows.Close()
log.Logger.Debugf("host:[%s], partitions: %v", host, partitions)

var failedParts []string
for _, part := range parts {
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` ATTACH PART '%s' settings mutations_sync=1", r.Database, r.TmpTable, part)
log.Logger.Debugf("[%s]%s", host, query)
for idx, partition := range partitions {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _partition_id = '%s' SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.TmpTable, r.Database, r.Table, partition)
log.Logger.Debugf("[%s](%d/%d) %s", host, idx+1, len(partitions), query)
if err = conn.Exec(query); err != nil {
failedParts = append(failedParts, part)
continue
}
}

if len(failedParts) > 0 {
max_insert_threads := runtime.NumCPU()*3/4 + 1
log.Logger.Infof("[%s]failed parts: %v, retry again", host, failedParts)
for _, part := range failedParts {
query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS max_insert_threads=%d, max_execution_time=0", r.Database, r.TmpTable, r.Database, r.Table, part, max_insert_threads)
log.Logger.Debugf("[%s]%s", host, query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}
lastError = errors.Wrap(err, host)
return
}
}
})
Expand Down
Loading