Skip to content

Commit

Permalink
Merge pull request kubeedge#5572 from luomengY/kead_ctl_client-go
Browse files Browse the repository at this point in the history
Switch the keadm ctl client to client go and add the restful interface for restart pod.
  • Loading branch information
kubeedge-bot authored Jul 18, 2024
2 parents 33b9b8e + 93aa774 commit 970cc24
Show file tree
Hide file tree
Showing 11 changed files with 346 additions and 283 deletions.
5 changes: 5 additions & 0 deletions common/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,8 @@ type ImagePrePullJobResponse struct {
Reason string
ImageStatus []v1alpha1.ImageStatus
}

type RestartResponse struct {
ErrMessages []string `json:"errMessages,omitempty"`
LogMessages []string `json:"LogMessages,omitempty"`
}
6 changes: 6 additions & 0 deletions edge/pkg/metamanager/metaserver/common/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package common

type RestartInfo struct {
Namespace string
PodNames []string
}
48 changes: 48 additions & 0 deletions edge/pkg/metamanager/metaserver/handlerfactory/ext_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package handlerfactory

import (
"encoding/json"
"net/http"

"github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/common"
)

func (f *Factory) Restart(namespace string) http.Handler {
h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
podNameBytes, err := limitedReadBody(req, int64(3*1024*1024))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

var podNames []string
err = json.Unmarshal(podNameBytes, &podNames)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

restartInfo := common.RestartInfo{
PodNames: podNames,
Namespace: namespace,
}
restartResponse := f.storage.Restart(req.Context(), restartInfo)
restartResBytes, err := json.Marshal(restartResponse)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, err = w.Write(restartResBytes)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
return h
}
72 changes: 72 additions & 0 deletions edge/pkg/metamanager/metaserver/kubernetes/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package storage
import (
"context"
"encoding/json"
"fmt"
"time"

oteltrace "go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -17,9 +20,14 @@ import (
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cri/remote"

"github.com/kubeedge/kubeedge/common/types"
"github.com/kubeedge/kubeedge/edge/pkg/edged/config"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/agent"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/common"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite"
"github.com/kubeedge/kubeedge/edge/pkg/metamanager/metaserver/kubernetes/storage/sqlite/imitator"
"github.com/kubeedge/kubeedge/pkg/metaserver"
Expand Down Expand Up @@ -312,3 +320,67 @@ func (r *REST) Patch(ctx context.Context, pi metaserver.PatchInfo) (runtime.Obje
}
return retObj, nil
}

func (r *REST) Restart(ctx context.Context, restartInfo common.RestartInfo) *types.RestartResponse {
namespace := restartInfo.Namespace
podNames := restartInfo.PodNames
restartResponse := &types.RestartResponse{
ErrMessages: make([]string, 0),
LogMessages: make([]string, 0),
}

endpoint := config.Config.Edged.TailoredKubeletConfig.ContainerRuntimeEndpoint
remoteRuntimeService, err := remote.NewRemoteRuntimeService(endpoint, time.Second*10, oteltrace.NewNoopTracerProvider())
if err != nil {
errMessage := fmt.Sprintf("new remote runtimeservice with err: %v", err)
klog.Errorf("[metaserver/restart] %v", errMessage)
restartResponse.ErrMessages = append(restartResponse.ErrMessages, errMessage)
return restartResponse
}
for _, podName := range podNames {
var labelSelector = map[string]string{
"io.kubernetes.pod.name": podName,
"io.kubernetes.pod.namespace": namespace,
}

filter := &runtimeapi.ContainerFilter{
LabelSelector: labelSelector,
}
containers, err := remoteRuntimeService.ListContainers(ctx, filter)
if err != nil {
errMessage := fmt.Sprintf("failed to list containers: %v", err)
klog.Warningf("[metaserver/restart] %v", errMessage)
restartResponse.ErrMessages = append(restartResponse.ErrMessages, errMessage)
continue
}

if len(containers) == 0 {
errMessage := fmt.Sprintf("not found pod:\"/%s/%s\"", namespace, podName)
klog.Warningf("[metaserver/restart] %v", errMessage)
restartResponse.ErrMessages = append(restartResponse.ErrMessages, errMessage)
continue
}

count := 0
var errMessage string
for _, container := range containers {
containerID := container.Id
err := remoteRuntimeService.StopContainer(ctx, containerID, 3)
if err != nil {
errMessage += fmt.Sprintf("failed to stop container %s for pod \"/%s/%s\" with err:%v\n", container.Metadata.Name, namespace, podName, err)
} else {
count++
}
}

if count == len(containers) {
message := fmt.Sprintf("the pod \"%s/%s\" restart successful", namespace, podName)
klog.V(4).Infof("[metaserver/restart] %v", message)
restartResponse.LogMessages = append(restartResponse.LogMessages, message)
} else {
klog.Warningf("[metaserver/restart] %v", errMessage)
restartResponse.ErrMessages = append(restartResponse.ErrMessages, errMessage)
}
}
return restartResponse
}
6 changes: 5 additions & 1 deletion edge/pkg/metamanager/metaserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ func (ls *MetaServer) BuildBasicHandler() http.Handler {
case reqInfo.Verb == "list", reqInfo.Verb == "watch":
ls.Factory.List().ServeHTTP(w, req)
case reqInfo.Verb == "create":
ls.Factory.Create(reqInfo).ServeHTTP(w, req)
if reqInfo.Name == "restart" {
ls.Factory.Restart(reqInfo.Namespace).ServeHTTP(w, req)
} else {
ls.Factory.Create(reqInfo).ServeHTTP(w, req)
}
case reqInfo.Verb == "delete":
ls.Factory.Delete().ServeHTTP(w, req)
case reqInfo.Verb == "update":
Expand Down
67 changes: 67 additions & 0 deletions keadm/cmd/keadm/app/cmd/ctl/client/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2024 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"

corev1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type PodRequest struct {
Namespace string
LabelSelector string
AllNamespaces bool
PodName string
}

func (podRequest *PodRequest) GetPod(ctx context.Context) (*corev1.Pod, error) {
kubeClient, err := KubeClient()
if err != nil {
return nil, err
}
pod, err := kubeClient.CoreV1().Pods(podRequest.Namespace).Get(ctx, podRequest.PodName, metaV1.GetOptions{})
if err != nil {
return nil, err
}
return pod, nil
}

func (podRequest *PodRequest) GetPods(ctx context.Context) (*corev1.PodList, error) {
kubeClient, err := KubeClient()
if err != nil {
return nil, err
}
if podRequest.AllNamespaces {
podList, err := kubeClient.CoreV1().Pods(metaV1.NamespaceAll).List(ctx, metaV1.ListOptions{
LabelSelector: podRequest.LabelSelector,
})
if err != nil {
return nil, err
}
return podList, nil
}

podList, err := kubeClient.CoreV1().Pods(podRequest.Namespace).List(ctx, metaV1.ListOptions{
LabelSelector: podRequest.LabelSelector,
})
if err != nil {
return nil, err
}
return podList, nil
}
76 changes: 76 additions & 0 deletions keadm/cmd/keadm/app/cmd/ctl/client/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2024 The KubeEdge Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"fmt"
"time"

"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/common"
keadutil "github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
)

func KubeClient() (*kubernetes.Clientset, error) {
kubeConfig, err := getKubeConfig()
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}
return kubeClient, nil
}

func getKubeConfig() (*restclient.Config, error) {
config, err := keadutil.ParseEdgecoreConfig(common.EdgecoreConfigPath)
if err != nil {
return nil, fmt.Errorf("get edge config failed with err:%v", err)
}

if !config.Modules.MetaManager.MetaServer.Enable {
return nil, fmt.Errorf("metaserver don't open")
}

url := config.Modules.MetaManager.MetaServer.Server
ok, requireAuthorization := config.FeatureGates["requireAuthorization"]
if ok && requireAuthorization {
url = "https://" + url
} else {
url = "http://" + url
}
kubeConfig, err := clientcmd.BuildConfigFromFlags(url, "")
if err != nil {
return nil, err
}

if ok && requireAuthorization {
serverCrt := config.Modules.MetaManager.MetaServer.TLSCertFile
serverKey := config.Modules.MetaManager.MetaServer.TLSPrivateKeyFile
tlsCaFile := config.Modules.MetaManager.MetaServer.TLSCaFile

kubeConfig.TLSClientConfig.CAFile = tlsCaFile
kubeConfig.TLSClientConfig.CertFile = serverCrt
kubeConfig.TLSClientConfig.KeyFile = serverKey
}
kubeConfig.Timeout = 1 * time.Minute
return kubeConfig, nil
}
19 changes: 12 additions & 7 deletions keadm/cmd/keadm/app/cmd/ctl/get/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"k8s.io/kubernetes/pkg/printers/storage"

"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/common"
"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/ctl/restful"
"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/ctl/client"
"github.com/kubeedge/kubeedge/keadm/cmd/keadm/app/cmd/util"
)

Expand Down Expand Up @@ -72,18 +72,19 @@ func (o *PodGetOptions) getPods(args []string) error {
}
nodeName := config.Modules.Edged.HostnameOverride

ctx := context.Background()
var podListFilter *api.PodList
if len(args) > 0 {
podListFilter = &api.PodList{
Items: make([]api.Pod, 0, len(args)),
}
var podRequest *restful.PodRequest
var podRequest *client.PodRequest
for _, podName := range args {
podRequest = &restful.PodRequest{
podRequest = &client.PodRequest{
Namespace: o.Namespace,
PodName: podName,
}
pod, err := podRequest.GetPod()
pod, err := podRequest.GetPod(ctx)
if err != nil {
fmt.Println(err.Error())
continue
Expand All @@ -101,12 +102,12 @@ func (o *PodGetOptions) getPods(args []string) error {
}
}
} else {
podRequest := &restful.PodRequest{
podRequest := &client.PodRequest{
Namespace: o.Namespace,
AllNamespaces: o.AllNamespaces,
LabelSelector: o.LabelSelector,
}
podList, err := podRequest.GetPods()
podList, err := podRequest.GetPods(ctx)
if err != nil {
return err
}
Expand All @@ -130,7 +131,11 @@ func (o *PodGetOptions) getPods(args []string) error {
if len(args) > 0 {
return nil
}
fmt.Printf("No resources found in %s namespace.\n", o.Namespace)
if o.AllNamespaces {
fmt.Println("No resources found in all namespace.")
} else {
fmt.Printf("No resources found in %s namespace.\n", o.Namespace)
}
return nil
}

Expand Down
Loading

0 comments on commit 970cc24

Please sign in to comment.