diff --git a/common/ck.go b/common/ck.go index 4abb1495..4771a124 100644 --- a/common/ck.go +++ b/common/ck.go @@ -56,9 +56,6 @@ func ConnectClickHouse(host string, database string, opt model.ConnetOption) (*C Username: opt.User, Password: opt.Password, }, - Settings: clickhouse.Settings{ - "max_execution_time": 60, - }, Protocol: opt.Protocol, DialTimeout: time.Duration(10) * time.Second, ConnOpenStrategy: clickhouse.ConnOpenInOrder, diff --git a/service/clickhouse/clickhouse_service.go b/service/clickhouse/clickhouse_service.go index 32909697..7bb57835 100644 --- a/service/clickhouse/clickhouse_service.go +++ b/service/clickhouse/clickhouse_service.go @@ -1744,7 +1744,7 @@ func getShardingType(key *model.RebalanceShardingkey, conn *common.Conn) error { func RebalanceByPartition(conf *model.CKManClickHouseConfig, rebalancer *CKRebalance) error { var err error - if err = rebalancer.InitCKConns(); err != nil { + if err = rebalancer.InitCKConns(false); err != nil { log.Logger.Errorf("got error %+v", err) return err } @@ -1767,7 +1767,7 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb var err error start := time.Now() log.Logger.Info("[rebalance] STEP InitCKConns") - if err = rebalancer.InitCKConns(); err != nil { + if err = rebalancer.InitCKConns(true); err != nil { log.Logger.Errorf("got error %+v", err) return err } @@ -1780,19 +1780,25 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb return err } if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil { - return err + time.Sleep(5 * time.Second) + if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil { + return err + } } log.Logger.Info("[rebalance] STEP InsertPlan") if err = rebalancer.InsertPlan(); err != nil { return errors.Wrapf(err, "table %s.%s rebalance failed, data can be corrupted, please move back from temp table[%s] manually", rebalancer.Database, rebalancer.Table, rebalancer.TmpTable) } if err = rebalancer.CheckCounts(rebalancer.Table); err != nil { - return err + time.Sleep(5 * time.Second) + if err = rebalancer.CheckCounts(rebalancer.Table); err != nil { + return err + } } log.Logger.Info("[rebalance] STEP Cleanup") rebalancer.Cleanup() - log.Logger.Infof("[rebalance] DONE, Elapsed: %v sec", time.Since(start).Seconds()) + log.Logger.Infof("[rebalance] DONE, Total counts: %d, Elapsed: %v sec", rebalancer.OriCount, time.Since(start).Seconds()) return nil } diff --git a/service/clickhouse/rebalance.go b/service/clickhouse/rebalance.go index b6918629..5eb5d586 100644 --- a/service/clickhouse/rebalance.go +++ b/service/clickhouse/rebalance.go @@ -3,8 +3,6 @@ package clickhouse import ( "fmt" "path/filepath" - "regexp" - "runtime" "sort" "strings" "sync" @@ -12,7 +10,6 @@ import ( "github.com/housepower/ckman/common" "github.com/housepower/ckman/log" "github.com/housepower/ckman/model" - "github.com/housepower/ckman/repository" "github.com/k0kubun/pp" "github.com/pkg/errors" ) @@ -41,6 +38,7 @@ type CKRebalance struct { Engine string EngineFull string OriCount uint64 + SortingKey []string } // TblPartitions is partitions status of a host. A host never move out and move in at the same iteration. @@ -54,7 +52,7 @@ type TblPartitions struct { ToMoveIn bool // plan to move some partitions in } -func (r *CKRebalance) InitCKConns() (err error) { +func (r *CKRebalance) InitCKConns(withShardingkey bool) (err error) { locks = make(map[string]*sync.Mutex) for _, host := range r.Hosts { _, err = common.ConnectClickHouse(host, model.ClickHouseDefaultDB, r.ConnOpt) @@ -65,32 +63,56 @@ func (r *CKRebalance) InitCKConns() (err error) { locks[host] = &sync.Mutex{} } - conn := common.GetConnection(r.Hosts[0]) - query := fmt.Sprintf("SELECT engine, engine_full FROM system.tables WHERE database = '%s' AND table = '%s'", r.Database, r.Table) - log.Logger.Debugf("query:%s", query) - rows, _ := conn.Query(query) - for rows.Next() { - err = rows.Scan(&r.Engine, &r.EngineFull) - if err != nil { - return + if withShardingkey { + conn := common.GetConnection(r.Hosts[0]) + // get engine + query := fmt.Sprintf("SELECT engine, engine_full FROM system.tables WHERE database = '%s' AND table = '%s'", r.Database, r.Table) + log.Logger.Debugf("query:%s", query) + rows, _ := conn.Query(query) + for rows.Next() { + err = rows.Scan(&r.Engine, &r.EngineFull) + if err != nil { + return + } } - } - rows.Close() - log.Logger.Infof("table: %s.%s, engine: %s, engine_full:%s", r.Database, r.Table, r.Engine, r.EngineFull) - query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table) - if strings.Contains(r.Engine, "Replacing") { - query += " FINAL" - } - log.Logger.Debugf("query: %s", query) - rows, _ = conn.Query(query) - for rows.Next() { - err = rows.Scan(&r.OriCount) - if err != nil { - return + rows.Close() + log.Logger.Infof("table: %s.%s, engine: %s, engine_full:%s", r.Database, r.Table, r.Engine, r.EngineFull) + + //get sortingkey + if strings.Contains(r.Engine, "Replacing") { + query = fmt.Sprintf("SELECT name FROM system.columns WHERE (database = '%s') AND (table = '%s') AND (is_in_sorting_key = 1)", r.Database, r.Table) + log.Logger.Debugf("query:%s", query) + rows, _ := conn.Query(query) + for rows.Next() { + var sortingkey string + err = rows.Scan(&sortingkey) + if err != nil { + return + } + r.SortingKey = append(r.SortingKey, sortingkey) + } + rows.Close() + log.Logger.Infof("table: %s.%s, sortingkey:%s", r.Database, r.Table, r.SortingKey) + } + + //get original count + if strings.Contains(r.Engine, "Replacing") { + query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table) + } else { + query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table) + } + log.Logger.Debugf("query: %s", query) + rows, _ = conn.Query(query) + for rows.Next() { + err = rows.Scan(&r.OriCount) + if err != nil { + return + } + } + log.Logger.Infof("table: %s.%s, count: %d", r.Database, r.Table, r.OriCount) + rows.Close() } - log.Logger.Infof("table: %s.%s, count: %d", r.Database, r.Table, r.OriCount) - rows.Close() return } @@ -402,9 +424,11 @@ func (r *CKRebalance) CreateTemporaryTable() error { } func (r *CKRebalance) CheckCounts(tableName string) error { - query := fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, tableName) + var query string if strings.Contains(r.Engine, "Replacing") { - query += " FINAL" + query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table) + } else { + query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table) } log.Logger.Debugf("query: %s", query) conn := common.GetConnection(r.Hosts[0]) @@ -428,8 +452,8 @@ func (r *CKRebalance) CheckCounts(tableName string) error { return nil } +// moveback from tmp_table to ori_table after rehash func (r *CKRebalance) InsertPlan() error { - max_insert_threads := runtime.NumCPU()*3/4 + 1 // add 1 to ensure threads not zero var lastError error var wg sync.WaitGroup for idx, host := range r.Hosts { @@ -445,26 +469,43 @@ func (r *CKRebalance) InsertPlan() error { lastError = errors.Wrap(err, host) return } - - query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE %s %% %d = %d SETTINGS max_insert_threads=%d, max_execution_time=0", - r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, ShardingFunc(r.Shardingkey), len(r.Hosts), idx, max_insert_threads) + query = fmt.Sprintf(`SELECT distinct partition_id FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1 ORDER BY partition_id`, r.Cluster, r.Database, r.TmpTable) log.Logger.Debugf("[%s]%s", host, query) - if err := conn.Exec(query); err != nil { + rows, err := conn.Query(query) + if err != nil { lastError = errors.Wrap(err, host) return } + partitions := make([]string, 0) + for rows.Next() { + var partitionId string + err = rows.Scan(&partitionId) + if err != nil { + lastError = errors.Wrap(err, host) + return + } + partitions = append(partitions, partitionId) + } + rows.Close() + log.Logger.Debugf("host:[%s], parts: %v", host, partitions) + for i, partition := range partitions { + query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE _partition_id = '%s' AND %s %% %d = %d SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8", + r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, partition, ShardingFunc(r.Shardingkey), len(r.Hosts), idx) + log.Logger.Debugf("[%s](%d/%d) %s", host, i+1, len(partitions), query) + if err = conn.Exec(query); err != nil { + lastError = errors.Wrap(err, host) + return + } + } }) } wg.Wait() return lastError } +// backup from ori_table to tmp_table func (r *CKRebalance) MoveBackup() error { - conf, err := repository.Ps.GetClusterbyName(r.Cluster) - if err != nil { - return err - } var wg sync.WaitGroup var lastError error for _, host := range r.Hosts { @@ -473,67 +514,33 @@ func (r *CKRebalance) MoveBackup() error { _ = common.Pool.Submit(func() { defer wg.Done() conn := common.GetConnection(host) - // copy data - cmd := fmt.Sprintf("ls -l %sclickhouse/data/%s/%s/ |grep -v total |awk '{print $9}'", r.DataDir, r.Database, r.Table) - sshOpts := common.SshOptions{ - User: conf.SshUser, - Password: conf.SshPassword, - Port: conf.SshPort, - Host: host, - NeedSudo: conf.NeedSudo, - AuthenticateType: conf.AuthenticateType, - } - out, err := common.RemoteExecute(sshOpts, cmd) + query := fmt.Sprintf(`SELECT distinct partition_id FROM system.parts WHERE database = '%s' AND table = '%s' AND active=1 order by partition_id`, r.Database, r.Table) + log.Logger.Debugf("[%s]%s", host, query) + rows, err := conn.Query(query) if err != nil { lastError = errors.Wrap(err, host) return } - parts := make([]string, 0) - for _, file := range strings.Split(out, "\n") { - file = strings.TrimSpace(strings.TrimSuffix(file, "\r")) - reg, err := regexp.Compile(`[^_]+(_\d+){3,}$`) //parts name - if err != nil { - lastError = errors.Wrap(err, host) - return - } - if reg.MatchString(file) && !strings.HasPrefix(file, "tmp_merge") { - parts = append(parts, file) - } - } - log.Logger.Debugf("host:[%s], parts: %v", host, parts) - var cmds []string - for _, part := range parts { - cmds = append(cmds, fmt.Sprintf("cp -prf %sclickhouse/data/%s/%s/%s %sclickhouse/data/%s/%s/detached/", r.DataDir, r.Database, r.Table, part, r.DataDir, r.Database, r.TmpTable)) - } - if len(cmds) > 0 { - log.Logger.Debugf("host:[%s], cmds: %v", host, cmds) - _, err = common.RemoteExecute(sshOpts, strings.Join(cmds, ";")) + partitions := make([]string, 0) + for rows.Next() { + var partitionId string + err = rows.Scan(&partitionId) if err != nil { lastError = errors.Wrap(err, host) return } + partitions = append(partitions, partitionId) } + rows.Close() + log.Logger.Debugf("host:[%s], partitions: %v", host, partitions) - var failedParts []string - for _, part := range parts { - query := fmt.Sprintf("ALTER TABLE `%s`.`%s` ATTACH PART '%s' settings mutations_sync=1", r.Database, r.TmpTable, part) - log.Logger.Debugf("[%s]%s", host, query) + for idx, partition := range partitions { + query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _partition_id = '%s' SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8", + r.Database, r.TmpTable, r.Database, r.Table, partition) + log.Logger.Debugf("[%s](%d/%d) %s", host, idx+1, len(partitions), query) if err = conn.Exec(query); err != nil { - failedParts = append(failedParts, part) - continue - } - } - - if len(failedParts) > 0 { - max_insert_threads := runtime.NumCPU()*3/4 + 1 - log.Logger.Infof("[%s]failed parts: %v, retry again", host, failedParts) - for _, part := range failedParts { - query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS max_insert_threads=%d, max_execution_time=0", r.Database, r.TmpTable, r.Database, r.Table, part, max_insert_threads) - log.Logger.Debugf("[%s]%s", host, query) - if err = conn.Exec(query); err != nil { - lastError = errors.Wrap(err, host) - return - } + lastError = errors.Wrap(err, host) + return } } })