From 99015113ed2619ded80dda262d551670a8e2e5dd Mon Sep 17 00:00:00 2001 From: sjcsjc123 <1401189096@qq.com> Date: Wed, 27 Sep 2023 01:59:17 +0800 Subject: [PATCH] feat: add create etcd cluster and weed cluster function --- utils/weed/check.go | 51 +++++ utils/weed/check_test.go | 27 +++ utils/weed/etcd.go | 305 +++++++++++++++++++++++++++ utils/weed/etcd_client.go | 137 +++++++++++++ utils/weed/exec.go | 34 +++ utils/weed/interface.go | 388 +++++++++++++++++++++++++++++++++++ utils/weed/interface_test.go | 86 ++++++++ utils/weed/weed.go | 63 ++++++ utils/weed/weed_master.go | 170 +++++++++++++++ utils/weed/weed_test.go | 15 ++ utils/weed/weed_volumn.go | 102 +++++++++ 11 files changed, 1378 insertions(+) create mode 100644 utils/weed/check.go create mode 100644 utils/weed/check_test.go create mode 100644 utils/weed/etcd.go create mode 100644 utils/weed/etcd_client.go create mode 100644 utils/weed/exec.go create mode 100644 utils/weed/interface.go create mode 100644 utils/weed/interface_test.go create mode 100644 utils/weed/weed.go create mode 100644 utils/weed/weed_master.go create mode 100644 utils/weed/weed_test.go create mode 100644 utils/weed/weed_volumn.go diff --git a/utils/weed/check.go b/utils/weed/check.go new file mode 100644 index 00000000000..dc132ac33e8 --- /dev/null +++ b/utils/weed/check.go @@ -0,0 +1,51 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "path" + "strconv" + + "github.com/sealerio/sealer/utils/exec" +) + +// checkPort checks if the port is available or can be used. +func checkPort(port int) bool { + // lsof -i:9333 + err := exec.Cmd("lsof", "-i:"+strconv.Itoa(port)) + return err == nil +} + +// checkDir checks if the dir is available or can be used. +//func checkDir(dir string) bool { +// // ls /tmp +// err := exec.Cmd("ls", dir) +// if err != nil { +// return false +// } +// return true +//} + +func checkBinFile(fileName string) bool { + binName := path.Base(fileName) + switch binName { + case "weed": + + case "etcd": + + default: + } + return false +} diff --git a/utils/weed/check_test.go b/utils/weed/check_test.go new file mode 100644 index 00000000000..6b1e5b9ff22 --- /dev/null +++ b/utils/weed/check_test.go @@ -0,0 +1,27 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +//import "testing" +// +//func TestCheckPort(t *testing.T) { +// ok := checkPort(9333) +// t.Log(ok) +//} +// +//func TestCheckDir(t *testing.T) { +// ok := checkDir("/tmp") +// t.Log(ok) +//} diff --git a/utils/weed/etcd.go b/utils/weed/etcd.go new file mode 100644 index 00000000000..10354e13e45 --- /dev/null +++ b/utils/weed/etcd.go @@ -0,0 +1,305 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io/ioutil" + "net" + "os" + "os/exec" + "path" + "runtime" + "strconv" + "sync" + "syscall" +) + +const ( + EtcdGitHubOrg = "etcd-io" + EtcdGithubRepo = "etcd" + GOOSLinux = "linux" + EtcdArtifactType = "etcd" + EtcdVersion = "v3.4.24" + EtcdDestination = "/tmp/etcd.tar.gz" + EtcdBinName = "etcd" + EtcdctlBinName = "etcdctl" + WeedDestination = "/tmp/weed.tar.gz" + WeedBinName = "weed" +) + +func etcdDownloadURL() (string, error) { + var ext string + + switch runtime.GOOS { + case GOOSLinux: + ext = ".tar.gz" + default: + return "", fmt.Errorf("unsupported OS: %s", runtime.GOOS) + } + + // For the function stability, we use the specific version of etcd. + downloadURL := fmt.Sprintf("https://github.com/%s/%s/releases/download/%s/%s-%s-%s-%s%s", + EtcdGitHubOrg, EtcdGithubRepo, EtcdVersion, EtcdArtifactType, EtcdVersion, runtime.GOOS, runtime.GOARCH, ext) + + return downloadURL, nil +} + +type etcd struct { + dataDir string + logDir string + pidDir string + binDir string + clientURL string + peerURL string + peers []string + wg *sync.WaitGroup + configFile string +} + +// Etcd is the interface for etcd cluster. +type Etcd interface { + Exec +} + +type DeleteOptions struct { + RetainLogs bool +} + +type RunOptions struct { + Binary string + Name string + + pidDir string + logDir string + args []string +} + +func NewEtcd(config *Config) Etcd { + return &etcd{ + dataDir: config.DataDir, + logDir: config.LogDir, + pidDir: config.PidDir, + binDir: config.BinDir, + peers: config.MasterIP, + peerURL: config.CurrentIP + ":" + strconv.Itoa(config.PeerPort), + clientURL: config.CurrentIP + ":" + strconv.Itoa(config.ClientPort), + wg: new(sync.WaitGroup), + configFile: config.EtcdConfigPath, + } +} + +func (e *etcd) Name() string { + return "etcd" +} + +func (e *etcd) Start(ctx context.Context, binary string) error { + // Generate etcd config file. + err := e.GenerateConfig() + if err != nil { + return err + } + + option := &RunOptions{ + Binary: binary, + Name: e.Name(), + logDir: e.logDir, + pidDir: e.pidDir, + args: e.BuildArgs(ctx), + } + if err := runBinary(ctx, option, e.wg); err != nil { + return err + } + + return nil +} + +func (e *etcd) BuildArgs(ctx context.Context, params ...interface{}) []string { + return []string{ + "--config-file", e.configFile, + } +} + +// GenerateConfig creates etcd cluster config file. +func (e *etcd) GenerateConfig() error { + initialCluster := "" + index := 0 + for i, peer := range e.peers { + if peer == e.peerURL { + index = i + } + initialCluster += "node" + strconv.Itoa(i) + "=http://" + peer + "," + } + initialCluster = initialCluster[:len(initialCluster)-1] + name := "node" + strconv.Itoa(index) + configContent := fmt.Sprintf(`name: "%s" +data-dir: "%s" +initial-cluster-token: "my-etcd-token" +initial-cluster: "%s" +initial-advertise-peer-urls: "http://%s" +listen-peer-urls: "http://%s" +listen-client-urls: "http://%s" +advertise-client-urls: "http://%s" +log-file: "%s" +pid-file: "%s" +`, name, e.dataDir, initialCluster, e.peerURL, e.peerURL, e.clientURL, e.clientURL, e.logDir, e.pidDir) + + // write config file + err := ioutil.WriteFile(e.configFile, []byte(configContent), 0644) + if err != nil { + return err + } + + return nil +} + +func (e *etcd) IsRunning(ctx context.Context) bool { + _, port, err := net.SplitHostPort(e.clientURL) + if err != nil { + return false + } + err = exec.Command("lsof", "-i:"+port).Run() + return err == nil +} + +func runBinary(ctx context.Context, option *RunOptions, wg *sync.WaitGroup) error { + cmd := exec.CommandContext(ctx, option.Binary, option.args...) + + // output to binary. + logFile := path.Join(option.logDir, "log") + outputFile, err := os.Create(logFile) + if err != nil { + return err + } + + outputFileWriter := bufio.NewWriter(outputFile) + cmd.Stdout = outputFileWriter + cmd.Stderr = outputFileWriter + + if err := cmd.Start(); err != nil { + return err + } + + pid := strconv.Itoa(cmd.Process.Pid) + + pidFile := path.Join(option.pidDir, "pid") + f, err := os.Create(pidFile) + if err != nil { + return err + } + + _, err = f.Write([]byte(pid)) + if err != nil { + return err + } + + go func() { + defer wg.Done() + wg.Add(1) + if err := cmd.Wait(); err != nil { + // Caught signal kill and interrupt error then ignore. + var exit *exec.ExitError + if errors.As(err, &exit) { + if status, ok := exit.Sys().(syscall.WaitStatus); ok { + if status.Signaled() && + (status.Signal() == syscall.SIGKILL || status.Signal() == syscall.SIGINT) { + return + } + } + } + _ = outputFileWriter.Flush() + } + }() + + return nil +} + +func runBinaryWithJSONResponse(ctx context.Context, option *RunOptions, wg *sync.WaitGroup) ([]byte, error) { + cmd := exec.CommandContext(ctx, option.Binary, option.args...) + + var jsonOutput bytes.Buffer + cmd.Stdout = &jsonOutput + cmd.Stderr = os.Stderr + + if err := cmd.Start(); err != nil { + return nil, err + } + + //TODO if pid file == "", skip this step + pid := strconv.Itoa(cmd.Process.Pid) + + pidFile := path.Join(option.pidDir, "pid") + f, err := os.Create(pidFile) + if err != nil { + return nil, err + } + + _, err = f.Write([]byte(pid)) + if err != nil { + return nil, err + } + + go func() { + defer wg.Done() + wg.Add(1) + if err := cmd.Wait(); err != nil { + // Caught signal kill and interrupt error then ignore. + var exit *exec.ExitError + if errors.As(err, &exit) { + if status, ok := exit.Sys().(syscall.WaitStatus); ok { + if status.Signaled() && + (status.Signal() == syscall.SIGKILL || status.Signal() == syscall.SIGINT) { + return + } + } + } + } + }() + + jsonResponse := jsonOutput.Bytes() + return jsonResponse, nil +} + +func CreateDirIfNotExists(dir string) (err error) { + if err := os.MkdirAll(dir, 0755); err != nil && !os.IsExist(err) { + return err + } + return nil +} + +func IsFileExists(filepath string) (bool, error) { + info, err := os.Stat(filepath) + if os.IsNotExist(err) { + // file does not exist + return false, nil + } + + if err != nil { + // Other errors happened. + return false, err + } + + if info.IsDir() { + // It's a directory. + return false, fmt.Errorf("'%s' is directory, not file", filepath) + } + + // The file exists. + return true, nil +} diff --git a/utils/weed/etcd_client.go b/utils/weed/etcd_client.go new file mode 100644 index 00000000000..7e0d0e6d5fe --- /dev/null +++ b/utils/weed/etcd_client.go @@ -0,0 +1,137 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "strings" + + v3 "github.com/gogf/gf/contrib/registry/etcd/v2" + "github.com/gogf/gf/v2/net/gsvc" + etcd3 "go.etcd.io/etcd/client/v3" +) + +// Client is the interface for etcd client. +// It provides the basic operations for etcd cluster. +// Like put, get, delete, register, unregister, get service. +type Client interface { + + // RegisterService register service to etcd cluster. + RegisterService(serviceName string, endpoints []string) error + + // UnRegisterService unregister service from etcd cluster. + UnRegisterService(serviceName string, endpoints []string) error + + // GetService get service from etcd cluster. + GetService(serviceName string) ([]string, error) + + // Put put key-value to etcd cluster. + Put(key, value string) error + + // Get get key-value from etcd cluster. + Get(key string) (string, error) + + // Delete delete key-value from etcd cluster. + Delete(key string) error +} + +type client struct { + peers []string + client *etcd3.Client + registry *v3.Registry + ctx context.Context +} + +func (c *client) Put(key, value string) error { + _, err := c.client.Put(c.ctx, key, value) + if err != nil { + return err + } + return nil +} + +func (c *client) Get(key string) (string, error) { + resp, err := c.client.Get(c.ctx, key) + if err != nil { + return "", err + } + return string(resp.Kvs[0].Value), nil +} + +func (c *client) Delete(key string) error { + _, err := c.client.Delete(c.ctx, key) + if err != nil { + return err + } + return nil +} + +func (c *client) RegisterService(serviceName string, endpoints []string) error { + service := &gsvc.LocalService{ + Name: serviceName, + Endpoints: gsvc.NewEndpoints(strings.Join(endpoints, ",")), + Metadata: map[string]interface{}{ + "protocol": "https", + }, + } + _, err := c.registry.Register(c.ctx, service) + if err != nil { + return err + } + return nil +} + +func (c *client) UnRegisterService(serviceName string, endpoints []string) error { + return c.registry.Deregister(c.ctx, &gsvc.LocalService{ + Name: serviceName, + Endpoints: gsvc.NewEndpoints(strings.Join(endpoints, ",")), + Metadata: map[string]interface{}{ + "protocol": "https", + }, + }) +} + +func (c *client) GetService(serviceName string) ([]string, error) { + result, err := c.registry.Search(c.ctx, gsvc.SearchInput{ + Name: serviceName, + }) + if err != nil { + return nil, err + } + var endpoints []string + for _, src := range result { + for _, endpoint := range src.GetEndpoints() { + s := endpoint.String() + endpoints = append(endpoints, s) + } + } + return endpoints, nil +} + +func NewClient(peers []string) (Client, error) { + c, err := etcd3.New(etcd3.Config{ + Endpoints: peers, + }) + registry := v3.NewWithClient(c) + if err != nil { + return nil, err + } + return &client{ + peers: peers, + client: c, + registry: registry, + ctx: context.Background(), + }, nil +} diff --git a/utils/weed/exec.go b/utils/weed/exec.go new file mode 100644 index 00000000000..4e910157d13 --- /dev/null +++ b/utils/weed/exec.go @@ -0,0 +1,34 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import "context" + +// Exec is the interface for command execution. +// It provides the basic operations for command execution. +// Like start, build args, is running, name. +type Exec interface { + // Start starts cluster component by executing binary. + Start(ctx context.Context, binary string) error + + // BuildArgs build up args for cluster component. + BuildArgs(ctx context.Context, params ...interface{}) []string + + // IsRunning returns the status of current cluster component. + IsRunning(ctx context.Context) bool + + // Name return the name of component. + Name() string +} diff --git a/utils/weed/interface.go b/utils/weed/interface.go new file mode 100644 index 00000000000..0bdc8ec3e6b --- /dev/null +++ b/utils/weed/interface.go @@ -0,0 +1,388 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/exec" + "path" + "runtime" +) + +// Config is the config of the weed cluster and etcd cluster. +type Config struct { + // MasterIP is the IP address of the master node. + MasterIP []string + // VolumeIP is the IP address of the volume node. + VolumeIP []string + // LogDir is the directory of the log file. + LogDir string + // DataDir is the directory of the data file. + DataDir string + // PidDir is the directory of the pid file. + PidDir string + // BinDir is the directory of the etcd binary file. + BinDir string + // EtcdConfigPath is the path of the etcd config file, we will generate it automatically. + EtcdConfigPath string + // CurrentIP is the IP address of the current node. + CurrentIP string + // PeerPort is the port of the peer. + PeerPort int + // ClientPort is the port of the client. + ClientPort int + // WeedMasterPort is the port of the weed master node. + WeedMasterPort int + // WeedVolumePort is the port of the weed volume node. + WeedVolumePort int + // NeedMoreLocalNode is the flag of whether need more local weed node. + NeedMoreLocalNode bool + // WeedMasterDir is the directory of the weed master node. + WeedMasterDir string + // WeedVolumeDir is the directory of the weed volume node. + WeedVolumeDir string + // DefaultReplication is the default replication of the weed cluster. + DefaultReplication string + // WeedLogDir is the directory of the weed log file. + WeedLogDir string + // weedMasterPortList is the port list of the weed master node when need more local weed node. + weedMasterPortList []int + // weedVolumePortList is the port list of the weed volume node when need more local weed node. + weedVolumePortList []int + // weedMDirList is the directory list of the weed master node when need more local weed node. + weedMDirList []string + // weedVDirList is the directory list of the weed volume node when need more local weed node. + weedVDirList []string + // weedMasterList is the list of the weed master node when need more local weed node. + weedMasterList []string +} + +type Deployer interface { + // GetWeedMasterList returns the master list of the weed cluster. + GetWeedMasterList(ctx context.Context) ([]string, error) + + // CreateEtcdCluster creates the etcd cluster. + CreateEtcdCluster(ctx context.Context) error + + // DeleteEtcdCluster deletes the etcd cluster. + DeleteEtcdCluster(ctx context.Context) error + + // CreateWeedCluster creates the weed cluster. + CreateWeedCluster(ctx context.Context) error + + // DeleteWeedCluster deletes the weed cluster. + DeleteWeedCluster(ctx context.Context) error + + // UploadFile uploads the file to the weed cluster. + UploadFile(ctx context.Context, dir string) error + + // DownloadFile download the file from the weed cluster. + DownloadFile(ctx context.Context, dir string, outputDir string) error + + // RemoveFile removes the file from the weed cluster. + RemoveFile(ctx context.Context, dir string) error +} + +type deployer struct { + config *Config + etcd Etcd + client Client + weedMaster Master + weedVolume Volume +} + +func (d *deployer) GetWeedMasterList(ctx context.Context) ([]string, error) { + return d.client.GetService("weed-master") +} + +func (d *deployer) CreateEtcdCluster(ctx context.Context) error { + // prepare etcd + err := d.etcdPrepare() + if err != nil { + return err + } + // start etcd + err = d.etcd.Start(ctx, d.config.BinDir+"/etcd") + if err != nil { + return err + } + // check etcd health + if ok := d.etcd.IsRunning(ctx); !ok { + return fmt.Errorf("etcd is not running") + } + // new client + etcdClient, err := NewClient(d.config.MasterIP) + if err != nil { + return err + } + d.client = etcdClient + return nil +} + +func (d *deployer) downloadEtcd() error { + url, err := etcdDownloadURL() + if err != nil { + return err + } + //download + err = downloadFile(url, EtcdDestination) + if err != nil { + return err + } + etcdDirName := fmt.Sprintf("%s-%s-%s-%s", EtcdArtifactType, EtcdVersion, runtime.GOOS, runtime.GOARCH) + err = exec.Command("tar", "-xvf", EtcdDestination, "-C", extractFolder).Run() + if err != nil { + return err + } + err = os.Rename(path.Join(extractFolder, etcdDirName+"/etcd"), path.Join(d.config.BinDir, EtcdBinName)) + if err != nil { + return err + } + return os.Rename(path.Join(extractFolder, etcdDirName+"/etcdctl"), path.Join(d.config.BinDir, EtcdctlBinName)) +} + +func (d *deployer) downloadWeed() error { + url, err := weedDownloadURL() + if err != nil { + return err + } + err = downloadFile(url, WeedDestination) + if err != nil { + return err + } + weedDirName := fmt.Sprintf("weed_%s_%s", runtime.GOOS, runtime.GOARCH) + err = exec.Command("tar", "-xvf", EtcdDestination, "-C", extractFolder).Run() + if err != nil { + return err + } + return os.Rename(path.Join(extractFolder, weedDirName+"/weed"), path.Join(d.config.BinDir, WeedBinName)) +} + +func (d *deployer) etcdPrepare() error { + var ( + etcdDirs = []string{d.config.DataDir, d.config.LogDir, d.config.PidDir, d.config.BinDir} + ) + for _, dir := range etcdDirs { + if err := CreateDirIfNotExists(dir); err != nil { + return err + } + } + // download etcd + return d.downloadEtcd() + // TODO scp etcd binary file to other nodes +} + +func (d *deployer) weedMasterPrepare() error { + var weedMasterDirs []string + if len(d.config.MasterIP) < 3 { + d.config.NeedMoreLocalNode = true + weedMasterPortList := make([]int, 0) + weedMDirList := make([]string, 0) + weedMasterList := make([]string, 0) + port := d.config.WeedMasterPort + for i := 0; i < 3; i++ { + for { + ok := checkPort(port) + if ok { + weedMasterPortList = append(weedMasterPortList, port) + weedMDirList = append(weedMDirList, d.config.WeedMasterDir+fmt.Sprintf("/%d", port)) + weedMasterList = append(weedMasterList, d.config.CurrentIP+fmt.Sprintf(":%d", port)) + port++ + break + } else { + port++ + } + } + } + weedMasterDirs = weedMDirList + d.config.weedMasterPortList = weedMasterPortList + d.config.weedMDirList = weedMDirList + d.config.weedMasterList = weedMasterList + } else { + weedMasterDirs = []string{d.config.WeedMasterDir} + } + for _, dir := range weedMasterDirs { + if err := CreateDirIfNotExists(dir); err != nil { + return err + } + } + // download weed binary file + if checkBinFile(d.config.BinDir + "/weed") { + return nil + } + return d.downloadWeed() +} + +func (d *deployer) weedVolumePrepare() error { + var weedVolumeDirs []string + if len(d.config.VolumeIP) < 3 { + d.config.NeedMoreLocalNode = true + weedVolumePortList := make([]int, 0) + weedVDirList := make([]string, 0) + port := d.config.WeedVolumePort + for i := 0; i < 3; i++ { + for { + ok := checkPort(port) + if ok { + weedVolumePortList = append(weedVolumePortList, port) + weedVDirList = append(weedVDirList, d.config.WeedVolumeDir+fmt.Sprintf("/%d", port)) + port++ + break + } else { + port++ + } + } + } + weedVolumeDirs = weedVDirList + d.config.weedVolumePortList = weedVolumePortList + d.config.weedVDirList = weedVDirList + } else { + weedVolumeDirs = []string{d.config.WeedVolumeDir} + } + for _, dir := range weedVolumeDirs { + if err := CreateDirIfNotExists(dir); err != nil { + return err + } + } + if d.config.NeedMoreLocalNode { + weedVolume := NewWeedVolume(d.config, d.config.weedMasterList) + d.weedVolume = weedVolume + } else { + weedVolume := NewWeedVolume(d.config, d.config.MasterIP) + d.weedVolume = weedVolume + } + return nil +} + +func (d *deployer) DeleteEtcdCluster(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (d *deployer) CreateWeedCluster(ctx context.Context) error { + // prepare weed master + err := d.weedMasterPrepare() + if err != nil { + return err + } + // start weed master + err = d.weedMaster.Start(ctx, d.config.BinDir+"/weed") + if err != nil { + return err + } + // check weed master health + ok := d.weedMaster.IsRunning(ctx) + if !ok { + return errors.New("weed master is not running") + } + // prepare weed volume + err = d.weedVolumePrepare() + if err != nil { + return err + } + err = d.weedVolume.Start(ctx, d.config.BinDir+"/weed") + if err != nil { + return err + } + // check weed volume health + ok = d.weedVolume.IsRunning(ctx) + if !ok { + return errors.New("weed volume is not running") + } + // register service to etcd cluster + if d.config.NeedMoreLocalNode { + err = d.client.RegisterService("weed-master", d.config.weedMasterList) + if err != nil { + return err + } + } else { + err = d.client.RegisterService("weed-master", d.config.MasterIP) + if err != nil { + return err + } + } + return nil +} + +func (d *deployer) DeleteWeedCluster(ctx context.Context) error { + //TODO implement me + panic("implement me") +} + +func (d *deployer) UploadFile(ctx context.Context, dir string) error { + masterList, err := d.GetWeedMasterList(ctx) + if err != nil { + return err + } + for _, m := range masterList { + resp, err := d.weedMaster.UploadFile(ctx, m, dir) + if err != nil { + continue + } + // upload resp to etcd + bytes, err := json.Marshal(resp) + if err != nil { + continue + } + err = d.client.Put(dir, string(bytes)) + if err != nil { + continue + } + return nil + } + return errors.New("cannot upload file to weed cluster") +} + +func (d *deployer) DownloadFile(ctx context.Context, dir string, outputDir string) error { + masterList, err := d.GetWeedMasterList(ctx) + if err != nil { + return err + } + // get fid + fid, err := d.client.Get(dir) + if err != nil { + return err + } + var resp UploadFileResponse + err = json.Unmarshal([]byte(fid), &resp) + if err != nil { + return err + } + for _, m := range masterList { + err = d.weedMaster.DownloadFile(ctx, m, resp.Fid, outputDir) + if err != nil { + continue + } + return nil + } + return errors.New("cannot download file from weed cluster") +} + +func (d *deployer) RemoveFile(ctx context.Context, dir string) error { + //TODO implement me + panic("implement me") +} + +func NewDeployer(config *Config) Deployer { + return &deployer{ + config: config, + etcd: NewEtcd(config), + weedMaster: NewMaster(config), + } +} diff --git a/utils/weed/interface_test.go b/utils/weed/interface_test.go new file mode 100644 index 00000000000..a1edc516632 --- /dev/null +++ b/utils/weed/interface_test.go @@ -0,0 +1,86 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "testing" +) + +// This should run with root permission. + +//func clean() { +//} +// +func TestDeployer_CreateEtcdCluster1(t *testing.T) { + err := NewDeployer(&Config{ + BinDir: "./test/bin3", + DataDir: "./test/data3", + LogDir: "./test/log3", + MasterIP: []string{"127.0.0.1:1111", "127.0.0.1:2222", "127.0.0.1:3333"}, + PidDir: "./test/pid3", + CurrentIP: "127.0.0.1", + PeerPort: 3333, + ClientPort: 2390, + EtcdConfigPath: "./test/etcd3.conf", + }).CreateEtcdCluster(context.Background()) + if err != nil { + t.Error(err) + return + } +} + +// +//func TestDeployer_CreateEtcdCluster2(t *testing.T) { +// err := NewDeployer(&Config{ +// BinDir: "./test/bin1", +// DataDir: "./test/data1", +// LogDir: "./test/log1", +// MasterIP: []string{"127.0.0.1:1111", "127.0.0.1:2222", "127.0.0.1:3333"}, +// PidDir: "./test/pid1", +// CurrentIP: "127.0.0.1", +// PeerPort: 1111, +// ClientPort: 2391, +// EtcdConfigPath: "./test/etcd1.conf", +// }).CreateEtcdCluster(context.Background()) +// if err != nil { +// t.Error(err) +// return +// } +//} +// +//func TestDeployer_CreateEtcdCluster3(t *testing.T) { +// err := NewDeployer(&Config{ +// BinDir: "./test/bin2", +// DataDir: "./test/data2", +// LogDir: "./test/log2", +// MasterIP: []string{"127.0.0.1:1111", "127.0.0.1:2222", "127.0.0.1:3333"}, +// PidDir: "./test/pid2", +// CurrentIP: "127.0.0.1", +// PeerPort: 2222, +// ClientPort: 2392, +// EtcdConfigPath: "./test/etcd2.conf", +// }).CreateEtcdCluster(context.Background()) +// if err != nil { +// t.Error(err) +// return +// } +//} +// +//func TestDownloadWeed(t *testing.T) { +// d := &deployer{} +// err := d.downloadWeed() +// assert.Nil(t, err) +//} diff --git a/utils/weed/weed.go b/utils/weed/weed.go new file mode 100644 index 00000000000..43f62c0ab82 --- /dev/null +++ b/utils/weed/weed.go @@ -0,0 +1,63 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "errors" + "io" + "net/http" + "os" + "runtime" +) + +var ( + weedURL = "https://github.com/seaweedfs/seaweedfs/releases/download/3.54/" +) + +const ( + extractFolder = "/tmp" +) + +func weedDownloadURL() (string, error) { + if runtime.GOOS != "linux" { + return "", errors.New("unsupported os") + } + switch arch := runtime.GOARCH; arch { + case "amd64": + weedURL += "linux_amd64.tar.gz" + case "arm64": + weedURL += "linux_arm.tar.gz" + default: + return "", errors.New("unsupported arch") + } + return weedURL, nil +} + +func downloadFile(url string, dest string) error { + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + out, err := os.Create(dest) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, resp.Body) + return err +} diff --git a/utils/weed/weed_master.go b/utils/weed/weed_master.go new file mode 100644 index 00000000000..bf89adb30d9 --- /dev/null +++ b/utils/weed/weed_master.go @@ -0,0 +1,170 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "encoding/json" + "strconv" + "strings" + "sync" + + "github.com/sealerio/sealer/utils/exec" +) + +type Master interface { + Exec + UploadFile(ctx context.Context, master string, dir string) (UploadFileResponse, error) + DownloadFile(ctx context.Context, master string, fid string, outputDir string) error + RemoveFile(ctx context.Context, master string, dir string) error +} + +type master struct { + ip string + port int + mDir string + defaultReplication string + peers []string + needMoreLocalNode bool + portList []int + mDirList []string + wg *sync.WaitGroup +} + +type UploadFileResponse struct { + Fid string `json:"fid"` + URL string `json:"url"` + FileName string `json:"fileName"` + Size int64 `json:"size"` +} + +func (m *master) UploadFile(ctx context.Context, master string, dir string) (UploadFileResponse, error) { + runOptions := RunOptions{ + Binary: "weed", + Name: "upload", + args: m.buildUploadFileArgs(ctx, master, dir), + } + jsonResponse, err := runBinaryWithJSONResponse(ctx, &runOptions, m.wg) + if err != nil { + return UploadFileResponse{}, err + } + var uploadFileResponse UploadFileResponse + err = json.Unmarshal(jsonResponse, &uploadFileResponse) + if err != nil { + return UploadFileResponse{}, err + } + return uploadFileResponse, nil +} + +func (m *master) buildUploadFileArgs(ctx context.Context, params ...interface{}) []string { + _ = ctx + return []string{ + "-master=" + params[0].(string), + "-dir=" + params[1].(string), + } +} + +func (m *master) buildDownloadFileArgs(ctx context.Context, params ...interface{}) []string { + _ = ctx + return []string{ + "-server=" + params[0].(string), + "--dir=" + params[2].(string), + params[1].(string), + } +} + +func (m *master) DownloadFile(ctx context.Context, master string, fid string, outputDir string) error { + runOptions := RunOptions{ + Binary: "weed", + Name: "download", + args: m.buildDownloadFileArgs(ctx, master, fid, outputDir), + } + err := runBinary(ctx, &runOptions, m.wg) + if err != nil { + return err + } + return nil +} + +func (m *master) RemoveFile(ctx context.Context, master string, fid string) error { + //TODO weed may not support remove file, may be should consider to use other file system + panic("implement me") +} + +func (m *master) Start(ctx context.Context, binary string) error { + if m.needMoreLocalNode { + return m.startCluster(ctx, binary) + } + return m.startSingle(ctx, binary) +} + +func (m *master) BuildArgs(ctx context.Context, params ...interface{}) []string { + return []string{ + "-ip " + m.ip, + "-port " + params[0].(string), + "-mdir " + params[1].(string), + "-peers " + strings.Join(m.peers, ","), + "-defaultReplication " + m.defaultReplication, + } +} + +func (m *master) IsRunning(ctx context.Context) bool { + err := exec.Cmd("lsof", "-i:"+strconv.Itoa(m.port)) + return err == nil +} + +func (m *master) Name() string { + return "master" +} + +func NewMaster(config *Config) Master { + return &master{ + ip: config.CurrentIP, + port: config.WeedMasterPort, + mDir: config.WeedMasterDir, + defaultReplication: config.DefaultReplication, + peers: config.MasterIP, + needMoreLocalNode: config.NeedMoreLocalNode, + wg: new(sync.WaitGroup), + } +} + +func (m *master) startSingle(ctx context.Context, binary string) error { + runOptions := &RunOptions{ + Binary: binary, + Name: "master", + args: m.BuildArgs(ctx, strconv.Itoa(m.port), m.mDir), + } + err := runBinary(ctx, runOptions, m.wg) + if err != nil { + return err + } + return nil +} + +func (m *master) startCluster(ctx context.Context, binary string) error { + for i, port := range m.portList { + runOptions := &RunOptions{ + Binary: binary, + Name: "master", + args: m.BuildArgs(ctx, strconv.Itoa(port), m.mDirList[i]), + } + err := runBinary(ctx, runOptions, m.wg) + if err != nil { + return err + } + } + return nil +} diff --git a/utils/weed/weed_test.go b/utils/weed/weed_test.go new file mode 100644 index 00000000000..c29b1f66ccc --- /dev/null +++ b/utils/weed/weed_test.go @@ -0,0 +1,15 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed diff --git a/utils/weed/weed_volumn.go b/utils/weed/weed_volumn.go new file mode 100644 index 00000000000..6614b0e8570 --- /dev/null +++ b/utils/weed/weed_volumn.go @@ -0,0 +1,102 @@ +// Copyright © 2021 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weed + +import ( + "context" + "strconv" + "strings" + "sync" + + "github.com/sealerio/sealer/utils/exec" +) + +type Volume interface { + Exec +} + +type volume struct { + ip string + port int + dir string + mServer []string + needMoreLocalNode bool + dirList []string + portList []int + wg *sync.WaitGroup +} + +func NewWeedVolume(config *Config, mServer []string) Volume { + return &volume{ + ip: config.CurrentIP, + port: config.WeedVolumePort, + dir: config.WeedVolumeDir, + mServer: mServer, + needMoreLocalNode: config.NeedMoreLocalNode, + wg: new(sync.WaitGroup), + } +} + +func (v *volume) Start(ctx context.Context, binary string) error { + if v.needMoreLocalNode { + return v.startCluster(ctx, binary) + } + return v.startSingle(ctx, binary) +} + +func (v *volume) BuildArgs(ctx context.Context, params ...interface{}) []string { + return []string{ + "-mServer " + strings.Join(v.mServer, ","), + "-port " + params[0].(string), + "-dir " + params[1].(string), + } +} + +func (v *volume) IsRunning(ctx context.Context) bool { + err := exec.Cmd("lsof", "-i:"+strconv.Itoa(v.port)) + return err == nil +} + +func (v *volume) Name() string { + return "volume" +} + +func (v *volume) startCluster(ctx context.Context, binary string) error { + for i := 0; i < len(v.portList); i++ { + runOptions := &RunOptions{ + Binary: binary, + Name: "volume", + args: v.BuildArgs(ctx, strconv.Itoa(v.portList[i]), v.dirList[i]), + } + err := runBinary(ctx, runOptions, v.wg) + if err != nil { + return err + } + } + return nil +} + +func (v *volume) startSingle(ctx context.Context, binary string) error { + runOptions := &RunOptions{ + Binary: binary, + Name: "volume", + args: v.BuildArgs(ctx, strconv.Itoa(v.port), v.dir), + } + err := runBinary(ctx, runOptions, v.wg) + if err != nil { + return err + } + return nil +}