diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index efddcb157..d844258d7 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -13,6 +13,7 @@ import ( controllerruntime "sigs.k8s.io/controller-runtime" "github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options" + "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/controller" kosmos "github.com/kosmos.io/kosmos/pkg/kubenest/controller/kosmos" @@ -81,6 +82,11 @@ func run(ctx context.Context, opts *options.Options) error { return fmt.Errorf("could not create clientset: %v", err) } + kosmosClient, err := versioned.NewForConfig(config) + if err != nil { + return fmt.Errorf("could not create clientset: %v", err) + } + hostPortManager, err := vcnodecontroller.NewHostPortManager(hostKubeClient) if err != nil { return fmt.Errorf("failed to create host port manager: %v", err) @@ -99,6 +105,8 @@ func run(ctx context.Context, opts *options.Options) error { VirtualClusterNodeController := vcnodecontroller.NodeController{ Client: mgr.GetClient(), + RootClientSet: hostKubeClient, + KosmosClient: kosmosClient, EventRecorder: mgr.GetEventRecorderFor(constants.NodeControllerName), } diff --git a/deploy/crds/kosmos.io_globalnodes.yaml b/deploy/crds/kosmos.io_globalnodes.yaml index a8c9c863c..388f0300c 100644 --- a/deploy/crds/kosmos.io_globalnodes.yaml +++ b/deploy/crds/kosmos.io_globalnodes.yaml @@ -22,6 +22,9 @@ spec: - jsonPath: .spec.state name: STATE type: string + - jsonPath: .status.VirtualCluster + name: VIRTUAL_CLUSTER + type: string name: v1alpha1 schema: openAPIV3Schema: diff --git a/pkg/apis/kosmos/v1alpha1/global_node_types.go b/pkg/apis/kosmos/v1alpha1/global_node_types.go index 6e07d72fb..6e7d1d111 100644 --- a/pkg/apis/kosmos/v1alpha1/global_node_types.go +++ b/pkg/apis/kosmos/v1alpha1/global_node_types.go @@ -13,6 +13,7 @@ import ( // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +kubebuilder:printcolumn:name="NODE_IP",type=string,JSONPath=`.spec.nodeIP` // +kubebuilder:printcolumn:name="STATE",type=string,JSONPath=`.spec.state` +// +kubebuilder:printcolumn:name="VIRTUAL_CLUSTER",type=string,JSONPath=`.status.VirtualCluster` type GlobalNode struct { metav1.TypeMeta `json:",inline"` diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go b/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go index ec92fe3f3..7a5ca317b 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "fmt" "os" + "strconv" "k8s.io/klog" ) @@ -66,3 +67,17 @@ func GetExectorPort() string { } return exectorPort } + +func GetDrainWaitSeconds() int { + drainWaitSeconds := os.Getenv("EXECTOR_DRAIN_WAIT_SECONDS") + if len(drainWaitSeconds) == 0 { + drainWaitSeconds = "60" + } + num, err := strconv.Atoi(drainWaitSeconds) + + if err != nil { + klog.Fatalf("convert EXECTOR_DRAIN_WAIT_SECONDS failed, err: %s", err) + } + + return num +} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go index 936c2bc8f..13793c3b9 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go @@ -23,6 +23,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow" "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task" @@ -32,7 +33,9 @@ import ( type NodeController struct { client.Client + RootClientSet kubernetes.Interface EventRecorder record.EventRecorder + KosmosClient versioned.Interface } func (r *NodeController) SetupWithManager(mgr manager.Manager) error { @@ -143,24 +146,27 @@ func (r *NodeController) compareAndTranformNodes(ctx context.Context, targetNode func (r *NodeController) UpdateVirtualClusterStatus(ctx context.Context, virtualCluster v1alpha1.VirtualCluster, status v1alpha1.Phase, reason string) error { retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - targetObj := v1alpha1.VirtualCluster{} + var targetObj v1alpha1.VirtualCluster if err := r.Get(ctx, types.NamespacedName{Name: virtualCluster.Name, Namespace: virtualCluster.Namespace}, &targetObj); err != nil { + klog.Warningf("get target virtualcluster %s namespace %s failed: %v", virtualCluster.Name, virtualCluster.Namespace, err) return err } updateVirtualCluster := targetObj.DeepCopy() - updateVirtualCluster.Status.Phase = status + if len(status) > 0 { + updateVirtualCluster.Status.Phase = status + } updateVirtualCluster.Status.Reason = reason updateTime := metav1.Now() updateVirtualCluster.Status.UpdateTime = &updateTime - - if err := r.Update(ctx, updateVirtualCluster); err != nil { + if _, err := r.KosmosClient.KosmosV1alpha1().VirtualClusters(updateVirtualCluster.Namespace).Update(ctx, updateVirtualCluster, metav1.UpdateOptions{}); err != nil && !apierrors.IsNotFound(err) { + klog.Warningf("update target virtualcluster %s namespace %s failed: %v", virtualCluster.Name, virtualCluster.Namespace, err) return err } return nil }) if retryErr != nil { - return fmt.Errorf("update virtualcluster %s status failed: %s", virtualCluster.Name, retryErr) + return fmt.Errorf("update virtualcluster %s status namespace %s failed: %s", virtualCluster.Name, virtualCluster.Namespace, retryErr) } return nil @@ -227,6 +233,9 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques return reconcile.Result{}, nil } klog.Errorf("get clusternode %s error: %v", request.NamespacedName, err) + if err := r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error()); err != nil { + klog.Errorf("update virtualcluster %s status error: %v", request.NamespacedName, err) + } return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } @@ -238,6 +247,9 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques if !virtualCluster.GetDeletionTimestamp().IsZero() && len(virtualCluster.Spec.Kubeconfig) == 0 { if err := r.DoNodeClean(ctx, virtualCluster); err != nil { klog.Errorf("virtualcluster %s do node clean failed: %v", virtualCluster.Name, err) + if err := r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error()); err != nil { + klog.Errorf("update virtualcluster %s status error: %v", request.NamespacedName, err) + } return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } return reconcile.Result{}, nil @@ -248,8 +260,16 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques return reconcile.Result{}, nil } + if virtualCluster.Status.Phase == v1alpha1.Pending { + klog.V(4).Infof("virtualcluster is pending, cluster name: %s", virtualCluster.Name) + return reconcile.Result{}, nil + } + if err := r.DoNodeTask(ctx, virtualCluster); err != nil { klog.Errorf("virtualcluster %s do node task failed: %v", virtualCluster.Name, err) + if err := r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error()); err != nil { + klog.Errorf("update virtualcluster %s status error: %v", request.NamespacedName, err) + } return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil } @@ -285,7 +305,8 @@ func (r *NodeController) cleanGlobalNode(ctx context.Context, nodeInfos []v1alph if err := workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{ NodeInfo: nodeInfo, VirtualCluster: virtualCluster, - HostK8sClient: r.Client, + HostClient: r.Client, + HostK8sClient: r.RootClientSet, // VirtualK8sClient: _, }); err != nil { return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err) @@ -313,7 +334,8 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob NodeInfo: nodeInfo, VirtualCluster: virtualCluster, KubeDNSAddress: clusterDNS, - HostK8sClient: r.Client, + HostClient: r.Client, + HostK8sClient: r.RootClientSet, VirtualK8sClient: k8sClient, }); err != nil { return fmt.Errorf("join node %s failed: %s", nodeInfo.Name, err) @@ -327,7 +349,8 @@ func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.Gl if err := workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{ NodeInfo: nodeInfo, VirtualCluster: virtualCluster, - HostK8sClient: r.Client, + HostClient: r.Client, + HostK8sClient: r.RootClientSet, VirtualK8sClient: k8sClient, }); err != nil { return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err) diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/join.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/join.go deleted file mode 100644 index eedf3da83..000000000 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/join.go +++ /dev/null @@ -1,272 +0,0 @@ -package task - -import ( - "context" - "encoding/base64" - "fmt" - "time" - - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/util/retry" - "k8s.io/klog/v2" - - "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" - "github.com/kosmos.io/kosmos/pkg/kubenest/util" -) - -func NewCheckEnvTask() Task { - return Task{ - Name: "remote environment check", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") - // check - checkCmd := &exector.CMDExector{ - Cmd: fmt.Sprintf("sh %s check", env.GetExectorShellName()), - } - ret := exectHelper.DoExector(ctx.Done(), checkCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("check node %s failed: %s", to.NodeInfo.Name, ret.String()) - } - return nil, nil - }, - } -} - -func NewKubeadmResetTask() Task { - return Task{ - Name: "remote kubeadm reset", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") - - resetCmd := &exector.CMDExector{ - Cmd: fmt.Sprintf("sh %s unjoin", env.GetExectorShellName()), - } - - ret := exectHelper.DoExector(ctx.Done(), resetCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("reset node %s failed: %s", to.NodeInfo.Name, ret.String()) - } - return nil, nil - }, - } -} - -func NewCleanHostClusterNodeTask() Task { - return Task{ - Name: "clean host cluster node", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - targetNode := &v1.Node{} - if err := to.HostK8sClient.Get(ctx, types.NamespacedName{ - Name: to.NodeInfo.Name, - }, targetNode); err != nil { - if apierrors.IsNotFound(err) { - return nil, nil - } - return nil, fmt.Errorf("get target node %s failed: %s", to.NodeInfo.Name, err) - } - - if err := to.HostK8sClient.Delete(ctx, targetNode); err != nil { - return nil, err - } - - return nil, nil - }, - } -} - -func NewReomteUploadCATask() Task { - return Task{ - Name: "remote upload ca.crt", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") - - nn := types.NamespacedName{ - Namespace: to.VirtualCluster.Namespace, - Name: fmt.Sprintf("%s-cert", to.VirtualCluster.Name), - } - targetCert := &v1.Secret{} - if err := to.HostK8sClient.Get(ctx, nn, targetCert); err != nil { - return nil, fmt.Errorf("get target cert %s failed: %s", nn, err) - } - - cacrt := targetCert.Data["ca.crt"] - scpCrtCmd := &exector.SCPExector{ - DstFilePath: env.GetExectorTmpPath(), - DstFileName: "ca.crt", - SrcByte: cacrt, - } - ret := exectHelper.DoExector(ctx.Done(), scpCrtCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("scp ca.crt to node %s failed: %s", to.NodeInfo.Name, ret.String()) - } - return nil, nil - }, - } -} - -func NewRemoteUpdateKubeletConfTask() Task { - return Task{ - Name: "remote upload kubelet.conf", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") - - kubeconfig, err := base64.StdEncoding.DecodeString(to.VirtualCluster.Spec.Kubeconfig) - if err != nil { - return nil, fmt.Errorf("decode target kubeconfig %s failed: %s", to.VirtualCluster.Name, err) - } - - scpKCCmd := &exector.SCPExector{ - DstFilePath: env.GetExectorTmpPath(), - DstFileName: "kubelet.conf", - SrcByte: kubeconfig, - } - ret := exectHelper.DoExector(ctx.Done(), scpKCCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("scp kubeconfig to node %s failed: %s", to.NodeInfo.Name, ret.String()) - } - return nil, nil - }, - } -} - -func NewRemoteUpdateConfigYamlTask() Task { - return Task{ - Name: "remote upload config.yaml", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") - - scpKubeletConfigCmd := &exector.SCPExector{ - DstFilePath: env.GetExectorTmpPath(), - DstFileName: "config.yaml", - SrcFile: env.GetExectorWorkerDir() + "config.yaml", // from configmap volumn - } - - ret := exectHelper.DoExector(ctx.Done(), scpKubeletConfigCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("scp kubelet config to node %s failed: %s", to.NodeInfo.Name, ret.String()) - } - return nil, nil - }, - } -} - -func NewRemoteNodeJoinTask() Task { - return Task{ - Name: "remote join node to virtual control plane", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") - - joinCmd := &exector.CMDExector{ - Cmd: fmt.Sprintf("sh %s join %s", env.GetExectorShellName(), to.KubeDNSAddress), - } - ret := exectHelper.DoExector(ctx.Done(), joinCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("join node %s failed: %s", to.NodeInfo.Name, ret.String()) - } - return nil, nil - }, - } -} - -func NewWaitNodeReadyTask() Task { - return Task{ - Name: "wait new node ready", - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - waitCtx, cancel := context.WithTimeout(ctx, 60*time.Second) // total waiting time - defer cancel() - - isReady := false - - wait.UntilWithContext(waitCtx, func(ctx context.Context) { - node, err := to.VirtualK8sClient.CoreV1().Nodes().Get(waitCtx, to.NodeInfo.Name, metav1.GetOptions{}) - if err == nil { - if util.IsNodeReady(node.Status.Conditions) { - klog.V(4).Infof("node %s is ready", to.NodeInfo.Name) - isReady = true - cancel() - } else { - klog.V(4).Infof("node %s is not ready, status: %s", to.NodeInfo.Name, node.Status.Phase) - } - } else { - klog.V(4).Infof("get node %s failed: %s", to.NodeInfo.Name, err) - } - }, 10*time.Second) // Interval time - - if isReady { - return nil, nil - } - - return nil, fmt.Errorf("node %s is not ready", to.NodeInfo.Name) - }, - } -} - -func NewUpdateNodeLabelsTask() Task { - return Task{ - Name: "update new-node labels", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - node, err := to.VirtualK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("get node %s failed: %s", to.NodeInfo.Name, err) - } - - updateNode := node.DeepCopy() - for k, v := range to.NodeInfo.Labels { - node.Labels[k] = v - } - - if _, err := to.VirtualK8sClient.CoreV1().Nodes().Update(ctx, updateNode, metav1.UpdateOptions{}); err != nil { - return nil, fmt.Errorf("add label to node %s failed: %s", to.NodeInfo.Name, err) - } - return nil, nil - }, - } -} - -func NewUpdateNodePoolItemStatusTask(nodeState v1alpha1.NodeState, isClean bool) Task { - return Task{ - Name: "Update node status in NodePool ", - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - targetGlobalNode := v1alpha1.GlobalNode{} - - if err := to.HostK8sClient.Get(ctx, types.NamespacedName{Name: to.NodeInfo.Name}, &targetGlobalNode); err != nil { - klog.Errorf("get global node %s failed: %s", to.NodeInfo.Name, err) - return err - } - - updateGlobalNode := targetGlobalNode.DeepCopy() - - updateGlobalNode.Spec.State = nodeState - if err := to.HostK8sClient.Update(ctx, updateGlobalNode); err != nil { - klog.Errorf("update global node %s spec.state failed: %s", updateGlobalNode.Name, err) - return err - } - if isClean { - updateGlobalNode.Status.VirtualCluster = "" - if err := to.HostK8sClient.Status().Update(ctx, updateGlobalNode); err != nil { - klog.Errorf("update global node %s status failed: %s", updateGlobalNode.Name, err) - return err - } - } - return nil - }) - - return nil, err - }, - } -} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go index 5376efb18..fee90f9b1 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go @@ -2,11 +2,25 @@ package task import ( "context" + "encoding/base64" + "fmt" + "strings" + "time" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" ) type TaskOpt struct { @@ -14,13 +28,408 @@ type TaskOpt struct { VirtualCluster v1alpha1.VirtualCluster KubeDNSAddress string - HostK8sClient client.Client + HostClient client.Client + HostK8sClient kubernetes.Interface VirtualK8sClient kubernetes.Interface } type Task struct { - Name string - Run func(context.Context, TaskOpt, interface{}) (interface{}, error) - Retry bool - SubTasks []Task + Name string + Run func(context.Context, TaskOpt, interface{}) (interface{}, error) + Retry bool + SubTasks []Task + ErrorIgnore bool +} + +func NewCheckEnvTask() Task { + return Task{ + Name: "remote environment check", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") + // check + checkCmd := &exector.CMDExector{ + Cmd: fmt.Sprintf("sh %s check", env.GetExectorShellName()), + } + ret := exectHelper.DoExector(ctx.Done(), checkCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("check node %s failed: %s", to.NodeInfo.Name, ret.String()) + } + return nil, nil + }, + } +} + +func NewKubeadmResetTask() Task { + return Task{ + Name: "remote kubeadm reset", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") + + resetCmd := &exector.CMDExector{ + Cmd: fmt.Sprintf("sh %s unjoin", env.GetExectorShellName()), + } + + ret := exectHelper.DoExector(ctx.Done(), resetCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("reset node %s failed: %s", to.NodeInfo.Name, ret.String()) + } + return nil, nil + }, + } +} + +func NewDrainHostNodeTask() Task { + return Task{ + Name: "drain host node", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + targetNode, err := to.HostK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("get node %s failed: %s", to.NodeInfo.Name, err) + } + + if err := util.DrainNode(ctx, targetNode.Name, to.HostK8sClient, targetNode, env.GetDrainWaitSeconds()); err != nil { + return nil, err + } + return nil, nil + }, + } +} + +func NewDrainVirtualNodeTask() Task { + return Task{ + Name: "drain virtual-control-plane node", + Retry: true, + // ErrorIgnore: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + targetNode, err := to.VirtualK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("get node %s failed: %s", to.NodeInfo.Name, err) + } + + if err := util.DrainNode(ctx, targetNode.Name, to.HostK8sClient, targetNode, env.GetDrainWaitSeconds()); err != nil { + return nil, err + } + return nil, nil + }, + } +} + +func NewCleanHostClusterNodeTask() Task { + return Task{ + Name: "clean host cluster node", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + targetNode := &v1.Node{} + if err := to.HostClient.Get(ctx, types.NamespacedName{ + Name: to.NodeInfo.Name, + }, targetNode); err != nil { + if apierrors.IsNotFound(err) { + return nil, nil + } + return nil, fmt.Errorf("get target node %s failed: %s", to.NodeInfo.Name, err) + } + + if err := to.HostClient.Delete(ctx, targetNode); err != nil { + return nil, err + } + + return nil, nil + }, + } +} + +func NewReomteUploadCATask() Task { + return Task{ + Name: "remote upload ca.crt", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") + + nn := types.NamespacedName{ + Namespace: to.VirtualCluster.Namespace, + Name: fmt.Sprintf("%s-cert", to.VirtualCluster.Name), + } + targetCert := &v1.Secret{} + if err := to.HostClient.Get(ctx, nn, targetCert); err != nil { + return nil, fmt.Errorf("get target cert %s failed: %s", nn, err) + } + + cacrt := targetCert.Data["ca.crt"] + scpCrtCmd := &exector.SCPExector{ + DstFilePath: env.GetExectorTmpPath(), + DstFileName: "ca.crt", + SrcByte: cacrt, + } + ret := exectHelper.DoExector(ctx.Done(), scpCrtCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("scp ca.crt to node %s failed: %s", to.NodeInfo.Name, ret.String()) + } + return nil, nil + }, + } +} + +func NewRemoteUpdateKubeletConfTask() Task { + return Task{ + Name: "remote upload kubelet.conf", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") + + kubeconfig, err := base64.StdEncoding.DecodeString(to.VirtualCluster.Spec.Kubeconfig) + if err != nil { + return nil, fmt.Errorf("decode target kubeconfig %s failed: %s", to.VirtualCluster.Name, err) + } + + scpKCCmd := &exector.SCPExector{ + DstFilePath: env.GetExectorTmpPath(), + DstFileName: "kubelet.conf", + SrcByte: kubeconfig, + } + ret := exectHelper.DoExector(ctx.Done(), scpKCCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("scp kubeconfig to node %s failed: %s", to.NodeInfo.Name, ret.String()) + } + return nil, nil + }, + } +} + +func NewRemoteUpdateConfigYamlTask() Task { + return Task{ + Name: "remote upload config.yaml", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") + + scpKubeletConfigCmd := &exector.SCPExector{ + DstFilePath: env.GetExectorTmpPath(), + DstFileName: "config.yaml", + SrcFile: env.GetExectorWorkerDir() + "config.yaml", // from configmap volumn + } + + ret := exectHelper.DoExector(ctx.Done(), scpKubeletConfigCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("scp kubelet config to node %s failed: %s", to.NodeInfo.Name, ret.String()) + } + return nil, nil + }, + } +} + +func NewRemoteNodeJoinTask() Task { + return Task{ + Name: "remote join node to virtual control plane", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") + + joinCmd := &exector.CMDExector{ + Cmd: fmt.Sprintf("sh %s join %s", env.GetExectorShellName(), to.KubeDNSAddress), + } + ret := exectHelper.DoExector(ctx.Done(), joinCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("join node %s failed: %s", to.NodeInfo.Name, ret.String()) + } + return nil, nil + }, + } +} + +func NewWaitNodeReadyTask() Task { + return Task{ + Name: "wait new node ready", + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + waitCtx, cancel := context.WithTimeout(ctx, 60*time.Second) // total waiting time + defer cancel() + + isReady := false + + wait.UntilWithContext(waitCtx, func(ctx context.Context) { + node, err := to.VirtualK8sClient.CoreV1().Nodes().Get(waitCtx, to.NodeInfo.Name, metav1.GetOptions{}) + if err == nil { + if util.IsNodeReady(node.Status.Conditions) { + klog.V(4).Infof("node %s is ready", to.NodeInfo.Name) + isReady = true + cancel() + } else { + klog.V(4).Infof("node %s is not ready, status: %s", to.NodeInfo.Name, node.Status.Phase) + } + } else { + klog.V(4).Infof("get node %s failed: %s", to.NodeInfo.Name, err) + } + }, 10*time.Second) // Interval time + + if isReady { + return nil, nil + } + + return nil, fmt.Errorf("node %s is not ready", to.NodeInfo.Name) + }, + } +} + +func NewUpdateNodeLabelsTask() Task { + return Task{ + Name: "update new-node labels", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + node, err := to.VirtualK8sClient.CoreV1().Nodes().Get(ctx, to.NodeInfo.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("get node %s failed: %s", to.NodeInfo.Name, err) + } + + updateNode := node.DeepCopy() + for k, v := range to.NodeInfo.Labels { + node.Labels[k] = v + } + + if _, err := to.VirtualK8sClient.CoreV1().Nodes().Update(ctx, updateNode, metav1.UpdateOptions{}); err != nil { + return nil, fmt.Errorf("add label to node %s failed: %s", to.NodeInfo.Name, err) + } + return nil, nil + }, + } +} + +func NewUpdateNodePoolItemStatusTask(nodeState v1alpha1.NodeState, isClean bool) Task { + return Task{ + Name: "Update node status in NodePool ", + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + targetGlobalNode := v1alpha1.GlobalNode{} + + if err := to.HostClient.Get(ctx, types.NamespacedName{Name: to.NodeInfo.Name}, &targetGlobalNode); err != nil { + klog.Errorf("get global node %s failed: %s", to.NodeInfo.Name, err) + return err + } + + updateGlobalNode := targetGlobalNode.DeepCopy() + + updateGlobalNode.Spec.State = nodeState + if err := to.HostClient.Update(ctx, updateGlobalNode); err != nil { + klog.Errorf("update global node %s spec.state failed: %s", updateGlobalNode.Name, err) + return err + } + if isClean { + updateGlobalNode.Status.VirtualCluster = "" + if err := to.HostClient.Status().Update(ctx, updateGlobalNode); err != nil { + klog.Errorf("update global node %s status failed: %s", updateGlobalNode.Name, err) + return err + } + } + return nil + }) + + return nil, err + }, + } +} + +func NewRemoveNodeFromVirtualTask() Task { + return Task{ + Name: "remove node from virtual control-plane", + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + err := to.VirtualK8sClient.CoreV1().Nodes().Delete(ctx, to.NodeInfo.Name, metav1.DeleteOptions{}) + if err != nil { + return nil, fmt.Errorf("remove node from cluster failed, node name:%s, erro: %s", to.NodeInfo.Name, err) + } + return nil, nil + }, + } +} + +func NewExecShellUnjoinCmdTask() Task { + return Task{ + Name: "exec shell unjoin cmd", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") + + resetCmd := &exector.CMDExector{ + Cmd: fmt.Sprintf("sh %s unjoin", env.GetExectorShellName()), + } + + ret := exectHelper.DoExector(ctx.Done(), resetCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("reset node %s failed: %s", to.NodeInfo.Name, ret.String()) + } + + return nil, nil + }, + } +} + +func getJoinCmdStr(log string) (string, error) { + strs := strings.Split(log, "kubeadm join") + if len(strs) != 2 { + return "", fmt.Errorf("get join cmd str failed") + } + return fmt.Sprintf("kubeadm join %s", strs[1]), nil +} + +func NewJoinNodeToHostCmd() Task { + return Task{ + Name: "join node to host", + SubTasks: []Task{ + NewGetJoinNodeToHostCmdTask(), + NewExecJoinNodeToHostCmdTask(), + }, + } +} + +func NewGetJoinNodeToHostCmdTask() Task { + return Task{ + Name: "remote get host node join cmd str", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { + masterNodeIP := env.GetExectorHostMasterNodeIP() + hostExectorHelper := exector.NewExectorHelper(masterNodeIP, "") + joinCmdStrCmd := &exector.CMDExector{ + Cmd: "kubeadm token create --print-join-command", + } + ret := hostExectorHelper.DoExector(ctx.Done(), joinCmdStrCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("get host join cmd on node %s failed: %s", to.NodeInfo.Name, ret.String()) + } + + joinCmdStr, err := getJoinCmdStr(ret.LastLog) + if err != nil { + return nil, err + } + return joinCmdStr, nil + }, + } +} + +func NewExecJoinNodeToHostCmdTask() Task { + return Task{ + Name: "remote join node to host", + Retry: true, + Run: func(ctx context.Context, to TaskOpt, args interface{}) (interface{}, error) { + joinCmdStr, ok := args.(string) + if !ok { + return nil, fmt.Errorf("get join cmd str failed") + } + joinCmd := &exector.CMDExector{ + Cmd: joinCmdStr, + } + + exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") + ret := exectHelper.DoExector(ctx.Done(), joinCmd) + if ret.Status != exector.SUCCESS { + return nil, fmt.Errorf("exec join cmd on node %s failed: %s, join cmd: %s", to.NodeInfo.Name, ret.String(), joinCmdStr) + } + return nil, nil + }, + } } diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/unjoin.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/unjoin.go deleted file mode 100644 index 1370738c9..000000000 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/unjoin.go +++ /dev/null @@ -1,111 +0,0 @@ -package task - -import ( - "context" - "fmt" - "strings" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" -) - -func NewRemoveNodeFromVirtualTask() Task { - return Task{ - Name: "remove node from virtual control-plane", - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - err := to.VirtualK8sClient.CoreV1().Nodes().Delete(ctx, to.NodeInfo.Name, metav1.DeleteOptions{}) - if err != nil { - return nil, fmt.Errorf("remove node from cluster failed, node name:%s, erro: %s", to.NodeInfo.Name, err) - } - return nil, nil - }, - } -} - -func NewExecShellUnjoinCmdTask() Task { - return Task{ - Name: "exec shell unjoin cmd", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") - - resetCmd := &exector.CMDExector{ - Cmd: fmt.Sprintf("sh %s unjoin", env.GetExectorShellName()), - } - - ret := exectHelper.DoExector(ctx.Done(), resetCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("reset node %s failed: %s", to.NodeInfo.Name, ret.String()) - } - - return nil, nil - }, - } -} - -func getJoinCmdStr(log string) (string, error) { - strs := strings.Split(log, "kubeadm join") - if len(strs) != 2 { - return "", fmt.Errorf("get join cmd str failed") - } - return fmt.Sprintf("kubeadm join %s", strs[1]), nil -} - -func NewJoinNodeToHostCmd() Task { - return Task{ - Name: "join node to host", - SubTasks: []Task{ - NewGetJoinNodeToHostCmdTask(), - NewExecJoinNodeToHostCmdTask(), - }, - } -} - -func NewGetJoinNodeToHostCmdTask() Task { - return Task{ - Name: "remote get host node join cmd str", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, _ interface{}) (interface{}, error) { - masterNodeIP := env.GetExectorHostMasterNodeIP() - hostExectorHelper := exector.NewExectorHelper(masterNodeIP, "") - joinCmdStrCmd := &exector.CMDExector{ - Cmd: "kubeadm token create --print-join-command", - } - ret := hostExectorHelper.DoExector(ctx.Done(), joinCmdStrCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("get host join cmd on node %s failed: %s", to.NodeInfo.Name, ret.String()) - } - - joinCmdStr, err := getJoinCmdStr(ret.LastLog) - if err != nil { - return nil, err - } - return joinCmdStr, nil - }, - } -} - -func NewExecJoinNodeToHostCmdTask() Task { - return Task{ - Name: "remote join node to host", - Retry: true, - Run: func(ctx context.Context, to TaskOpt, args interface{}) (interface{}, error) { - joinCmdStr, ok := args.(string) - if !ok { - return nil, fmt.Errorf("get join cmd str failed") - } - joinCmd := &exector.CMDExector{ - Cmd: joinCmdStr, - } - - exectHelper := exector.NewExectorHelper(to.NodeInfo.Spec.NodeIP, "") - ret := exectHelper.DoExector(ctx.Done(), joinCmd) - if ret.Status != exector.SUCCESS { - return nil, fmt.Errorf("exec join cmd on node %s failed: %s, join cmd: %s", to.NodeInfo.Name, ret.String(), joinCmdStr) - } - return nil, nil - }, - } -} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go index 6f6f2d447..d00b4824b 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go +++ b/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go @@ -33,6 +33,10 @@ func RunWithRetry(ctx context.Context, task task.Task, opt task.TaskOpt, preArgs } } if err != nil { + if task.ErrorIgnore { + klog.V(4).Infof("work flow ignore err, task name: %s, err: %s", task.Name, err) + return nil, nil + } klog.V(4).Infof("work flow interrupt, task name: %s, err: %s", task.Name, err) return nil, err } @@ -66,6 +70,7 @@ func (w WorkflowData) RunTask(ctx context.Context, opt task.TaskOpt) error { func NewJoinWorkFlow() WorkflowData { joinTasks := []task.Task{ task.NewCheckEnvTask(), + task.NewDrainHostNodeTask(), task.NewKubeadmResetTask(), task.NewCleanHostClusterNodeTask(), task.NewReomteUploadCATask(), @@ -85,6 +90,7 @@ func NewJoinWorkFlow() WorkflowData { func NewUnjoinWorkFlow() WorkflowData { unjoinTasks := []task.Task{ task.NewCheckEnvTask(), + task.NewDrainVirtualNodeTask(), task.NewRemoveNodeFromVirtualTask(), task.NewExecShellUnjoinCmdTask(), task.NewJoinNodeToHostCmd(), diff --git a/pkg/kubenest/util/node.go b/pkg/kubenest/util/node.go index 3b1cc8729..ff8f04267 100644 --- a/pkg/kubenest/util/node.go +++ b/pkg/kubenest/util/node.go @@ -1,6 +1,16 @@ package util -import v1 "k8s.io/api/core/v1" +import ( + "context" + "fmt" + "os" + "time" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/kubernetes" + drain "k8s.io/kubectl/pkg/drain" +) func IsNodeReady(conditions []v1.NodeCondition) bool { for _, condition := range conditions { @@ -10,3 +20,41 @@ func IsNodeReady(conditions []v1.NodeCondition) bool { } return false } + +// DrainNode cordons and drains a node. +func DrainNode(ctx context.Context, nodeName string, client kubernetes.Interface, node *v1.Node, drainWaitSeconds int) error { + if client == nil { + return fmt.Errorf("K8sClient not set") + } + if node == nil { + return fmt.Errorf("node not set") + } + if nodeName == "" { + return fmt.Errorf("node name not set") + } + helper := &drain.Helper{ + Ctx: ctx, + Client: client, + Force: true, + GracePeriodSeconds: -1, + IgnoreAllDaemonSets: true, + Out: os.Stdout, + ErrOut: os.Stdout, + // We want to proceed even when pods are using emptyDir volumes + DeleteEmptyDirData: true, + Timeout: time.Duration(drainWaitSeconds) * time.Second, + } + if err := drain.RunCordonOrUncordon(helper, node, true); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("error cordoning node: %v", err) + } + if err := drain.RunNodeDrain(helper, nodeName); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("error draining node: %v", err) + } + return nil +} diff --git a/vendor/k8s.io/kubectl/pkg/drain/cordon.go b/vendor/k8s.io/kubectl/pkg/drain/cordon.go new file mode 100644 index 000000000..006eef762 --- /dev/null +++ b/vendor/k8s.io/kubectl/pkg/drain/cordon.go @@ -0,0 +1,111 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/kubernetes" +) + +// CordonHelper wraps functionality to cordon/uncordon nodes +type CordonHelper struct { + node *corev1.Node + desired bool +} + +// NewCordonHelper returns a new CordonHelper +func NewCordonHelper(node *corev1.Node) *CordonHelper { + return &CordonHelper{ + node: node, + } +} + +// NewCordonHelperFromRuntimeObject returns a new CordonHelper, or an error if given object is not a +// node or cannot be encoded as JSON +func NewCordonHelperFromRuntimeObject(nodeObject runtime.Object, scheme *runtime.Scheme, gvk schema.GroupVersionKind) (*CordonHelper, error) { + nodeObject, err := scheme.ConvertToVersion(nodeObject, gvk.GroupVersion()) + if err != nil { + return nil, err + } + + node, ok := nodeObject.(*corev1.Node) + if !ok { + return nil, fmt.Errorf("unexpected type %T", nodeObject) + } + + return NewCordonHelper(node), nil +} + +// UpdateIfRequired returns true if c.node.Spec.Unschedulable isn't already set, +// or false when no change is needed +func (c *CordonHelper) UpdateIfRequired(desired bool) bool { + c.desired = desired + + return c.node.Spec.Unschedulable != c.desired +} + +// PatchOrReplace uses given clientset to update the node status, either by patching or +// updating the given node object; it may return error if the object cannot be encoded as +// JSON, or if either patch or update calls fail; it will also return a second error +// whenever creating a patch has failed +func (c *CordonHelper) PatchOrReplace(clientset kubernetes.Interface, serverDryRun bool) (error, error) { + return c.PatchOrReplaceWithContext(context.TODO(), clientset, serverDryRun) +} + +// PatchOrReplaceWithContext provides the option to pass a custom context while updating +// the node status +func (c *CordonHelper) PatchOrReplaceWithContext(clientCtx context.Context, clientset kubernetes.Interface, serverDryRun bool) (error, error) { + client := clientset.CoreV1().Nodes() + + oldData, err := json.Marshal(c.node) + if err != nil { + return err, nil + } + + c.node.Spec.Unschedulable = c.desired + + newData, err := json.Marshal(c.node) + if err != nil { + return err, nil + } + + patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, c.node) + if patchErr == nil { + patchOptions := metav1.PatchOptions{} + if serverDryRun { + patchOptions.DryRun = []string{metav1.DryRunAll} + } + _, err = client.Patch(clientCtx, c.node.Name, types.StrategicMergePatchType, patchBytes, patchOptions) + } else { + updateOptions := metav1.UpdateOptions{} + if serverDryRun { + updateOptions.DryRun = []string{metav1.DryRunAll} + } + _, err = client.Update(clientCtx, c.node, updateOptions) + } + return err, patchErr +} diff --git a/vendor/k8s.io/kubectl/pkg/drain/default.go b/vendor/k8s.io/kubectl/pkg/drain/default.go new file mode 100644 index 000000000..ed70ffbe3 --- /dev/null +++ b/vendor/k8s.io/kubectl/pkg/drain/default.go @@ -0,0 +1,75 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" +) + +// This file contains default implementations of how to +// drain/cordon/uncordon nodes. These functions may be called +// directly, or their functionality copied into your own code, for +// example if you want different output behaviour. + +// RunNodeDrain shows the canonical way to drain a node. +// You should first cordon the node, e.g. using RunCordonOrUncordon +func RunNodeDrain(drainer *Helper, nodeName string) error { + // TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers + list, errs := drainer.GetPodsForDeletion(nodeName) + if errs != nil { + return utilerrors.NewAggregate(errs) + } + if warnings := list.Warnings(); warnings != "" { + fmt.Fprintf(drainer.ErrOut, "WARNING: %s\n", warnings) + } + + if err := drainer.DeleteOrEvictPods(list.Pods()); err != nil { + // Maybe warn about non-deleted pods here + return err + } + return nil +} + +// RunCordonOrUncordon demonstrates the canonical way to cordon or uncordon a Node +func RunCordonOrUncordon(drainer *Helper, node *corev1.Node, desired bool) error { + if drainer.Ctx == nil { + return fmt.Errorf("RunCordonOrUncordon error: drainer.Ctx can't be nil") + } + if drainer.Client == nil { + return fmt.Errorf("RunCordonOrUncordon error: drainer.Client can't be nil") + } + // TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers + c := NewCordonHelper(node) + + if updateRequired := c.UpdateIfRequired(desired); !updateRequired { + // Already done + return nil + } + + err, patchErr := c.PatchOrReplaceWithContext(drainer.Ctx, drainer.Client, false) + if err != nil { + if patchErr != nil { + return fmt.Errorf("cordon error: %s; merge patch error: %w", err.Error(), patchErr) + } + return fmt.Errorf("cordon error: %w", err) + } + + return nil +} diff --git a/vendor/k8s.io/kubectl/pkg/drain/drain.go b/vendor/k8s.io/kubectl/pkg/drain/drain.go new file mode 100644 index 000000000..b36692eb4 --- /dev/null +++ b/vendor/k8s.io/kubectl/pkg/drain/drain.go @@ -0,0 +1,453 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "context" + "fmt" + "io" + "math" + "time" + + corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/kubernetes" + cmdutil "k8s.io/kubectl/pkg/cmd/util" +) + +const ( + // EvictionKind represents the kind of evictions object + EvictionKind = "Eviction" + // EvictionSubresource represents the kind of evictions object as pod's subresource + EvictionSubresource = "pods/eviction" + podSkipMsgTemplate = "pod %q has DeletionTimestamp older than %v seconds, skipping\n" +) + +// Helper contains the parameters to control the behaviour of drainer +type Helper struct { + Ctx context.Context + Client kubernetes.Interface + Force bool + + // GracePeriodSeconds is how long to wait for a pod to terminate. + // IMPORTANT: 0 means "delete immediately"; set to a negative value + // to use the pod's terminationGracePeriodSeconds. + GracePeriodSeconds int + + IgnoreAllDaemonSets bool + Timeout time.Duration + DeleteEmptyDirData bool + Selector string + PodSelector string + ChunkSize int64 + + // DisableEviction forces drain to use delete rather than evict + DisableEviction bool + + // SkipWaitForDeleteTimeoutSeconds ignores pods that have a + // DeletionTimeStamp > N seconds. It's up to the user to decide when this + // option is appropriate; examples include the Node is unready and the pods + // won't drain otherwise + SkipWaitForDeleteTimeoutSeconds int + + // AdditionalFilters are applied sequentially after base drain filters to + // exclude pods using custom logic. Any filter that returns PodDeleteStatus + // with Delete == false will immediately stop execution of further filters. + AdditionalFilters []PodFilter + + Out io.Writer + ErrOut io.Writer + + DryRunStrategy cmdutil.DryRunStrategy + DryRunVerifier *resource.QueryParamVerifier + + // OnPodDeletedOrEvicted is called when a pod is evicted/deleted; for printing progress output + OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool) +} + +type waitForDeleteParams struct { + ctx context.Context + pods []corev1.Pod + interval time.Duration + timeout time.Duration + usingEviction bool + getPodFn func(string, string) (*corev1.Pod, error) + onDoneFn func(pod *corev1.Pod, usingEviction bool) + globalTimeout time.Duration + skipWaitForDeleteTimeoutSeconds int + out io.Writer +} + +// CheckEvictionSupport uses Discovery API to find out if the server support +// eviction subresource If support, it will return its groupVersion; Otherwise, +// it will return an empty GroupVersion +func CheckEvictionSupport(clientset kubernetes.Interface) (schema.GroupVersion, error) { + discoveryClient := clientset.Discovery() + + // version info available in subresources since v1.8.0 in https://github.com/kubernetes/kubernetes/pull/49971 + resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1") + if err != nil { + return schema.GroupVersion{}, err + } + for _, resource := range resourceList.APIResources { + if resource.Name == EvictionSubresource && resource.Kind == EvictionKind && len(resource.Group) > 0 && len(resource.Version) > 0 { + return schema.GroupVersion{Group: resource.Group, Version: resource.Version}, nil + } + } + return schema.GroupVersion{}, nil +} + +func (d *Helper) makeDeleteOptions() metav1.DeleteOptions { + deleteOptions := metav1.DeleteOptions{} + if d.GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(d.GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + if d.DryRunStrategy == cmdutil.DryRunServer { + deleteOptions.DryRun = []string{metav1.DryRunAll} + } + return deleteOptions +} + +// DeletePod will delete the given pod, or return an error if it couldn't +func (d *Helper) DeletePod(pod corev1.Pod) error { + if d.DryRunStrategy == cmdutil.DryRunServer { + if err := d.DryRunVerifier.HasSupport(pod.GroupVersionKind()); err != nil { + return err + } + } + return d.Client.CoreV1().Pods(pod.Namespace).Delete(d.getContext(), pod.Name, d.makeDeleteOptions()) +} + +// EvictPod will evict the given pod, or return an error if it couldn't +func (d *Helper) EvictPod(pod corev1.Pod, evictionGroupVersion schema.GroupVersion) error { + if d.DryRunStrategy == cmdutil.DryRunServer { + if err := d.DryRunVerifier.HasSupport(pod.GroupVersionKind()); err != nil { + return err + } + } + + delOpts := d.makeDeleteOptions() + + switch evictionGroupVersion { + case policyv1.SchemeGroupVersion: + // send policy/v1 if the server supports it + eviction := &policyv1.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: &delOpts, + } + return d.Client.PolicyV1().Evictions(eviction.Namespace).Evict(context.TODO(), eviction) + + default: + // otherwise, fall back to policy/v1beta1, supported by all servers that support the eviction subresource + eviction := &policyv1beta1.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: &delOpts, + } + return d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(context.TODO(), eviction) + } +} + +// GetPodsForDeletion receives resource info for a node, and returns those pods as PodDeleteList, +// or error if it cannot list pods. All pods that are ready to be deleted can be obtained with .Pods(), +// and string with all warning can be obtained with .Warnings(), and .Errors() for all errors that +// occurred during deletion. +func (d *Helper) GetPodsForDeletion(nodeName string) (*PodDeleteList, []error) { + labelSelector, err := labels.Parse(d.PodSelector) + if err != nil { + return nil, []error{err} + } + + podList := &corev1.PodList{} + initialOpts := &metav1.ListOptions{ + LabelSelector: labelSelector.String(), + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String(), + Limit: d.ChunkSize, + } + + err = resource.FollowContinue(initialOpts, func(options metav1.ListOptions) (runtime.Object, error) { + newPods, err := d.Client.CoreV1().Pods(metav1.NamespaceAll).List(d.getContext(), options) + if err != nil { + podR := corev1.SchemeGroupVersion.WithResource(corev1.ResourcePods.String()) + return nil, resource.EnhanceListError(err, options, podR.String()) + } + podList.Items = append(podList.Items, newPods.Items...) + return newPods, nil + }) + + if err != nil { + return nil, []error{err} + } + + list := filterPods(podList, d.makeFilters()) + if errs := list.errors(); len(errs) > 0 { + return list, errs + } + + return list, nil +} + +func filterPods(podList *corev1.PodList, filters []PodFilter) *PodDeleteList { + pods := []PodDelete{} + for _, pod := range podList.Items { + var status PodDeleteStatus + for _, filter := range filters { + status = filter(pod) + if !status.Delete { + // short-circuit as soon as pod is filtered out + // at that point, there is no reason to run pod + // through any additional filters + break + } + } + // Add the pod to PodDeleteList no matter what PodDeleteStatus is, + // those pods whose PodDeleteStatus is false like DaemonSet will + // be catched by list.errors() + pod.Kind = "Pod" + pod.APIVersion = "v1" + pods = append(pods, PodDelete{ + Pod: pod, + Status: status, + }) + } + list := &PodDeleteList{items: pods} + return list +} + +// DeleteOrEvictPods deletes or evicts the pods on the api server +func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error { + if len(pods) == 0 { + return nil + } + + // TODO(justinsb): unnecessary? + getPodFn := func(namespace, name string) (*corev1.Pod, error) { + return d.Client.CoreV1().Pods(namespace).Get(d.getContext(), name, metav1.GetOptions{}) + } + + if !d.DisableEviction { + evictionGroupVersion, err := CheckEvictionSupport(d.Client) + if err != nil { + return err + } + + if !evictionGroupVersion.Empty() { + return d.evictPods(pods, evictionGroupVersion, getPodFn) + } + } + + return d.deletePods(pods, getPodFn) +} + +func (d *Helper) evictPods(pods []corev1.Pod, evictionGroupVersion schema.GroupVersion, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + returnCh := make(chan error, 1) + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if d.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = d.Timeout + } + ctx, cancel := context.WithTimeout(d.getContext(), globalTimeout) + defer cancel() + for _, pod := range pods { + go func(pod corev1.Pod, returnCh chan error) { + refreshPod := false + for { + switch d.DryRunStrategy { + case cmdutil.DryRunServer: + fmt.Fprintf(d.Out, "evicting pod %s/%s (server dry run)\n", pod.Namespace, pod.Name) + default: + fmt.Fprintf(d.Out, "evicting pod %s/%s\n", pod.Namespace, pod.Name) + } + select { + case <-ctx.Done(): + // return here or we'll leak a goroutine. + returnCh <- fmt.Errorf("error when evicting pods/%q -n %q: global timeout reached: %v", pod.Name, pod.Namespace, globalTimeout) + return + default: + } + + // Create a temporary pod so we don't mutate the pod in the loop. + activePod := pod + if refreshPod { + freshPod, err := getPodFn(pod.Namespace, pod.Name) + // We ignore errors and let eviction sort it out with + // the original pod. + if err == nil { + activePod = *freshPod + } + refreshPod = false + } + + err := d.EvictPod(activePod, evictionGroupVersion) + if err == nil { + break + } else if apierrors.IsNotFound(err) { + returnCh <- nil + return + } else if apierrors.IsTooManyRequests(err) { + fmt.Fprintf(d.ErrOut, "error when evicting pods/%q -n %q (will retry after 5s): %v\n", activePod.Name, activePod.Namespace, err) + time.Sleep(5 * time.Second) + } else if !activePod.ObjectMeta.DeletionTimestamp.IsZero() && apierrors.IsForbidden(err) && apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) { + // an eviction request in a deleting namespace will throw a forbidden error, + // if the pod is already marked deleted, we can ignore this error, an eviction + // request will never succeed, but we will waitForDelete for this pod. + break + } else if apierrors.IsForbidden(err) && apierrors.HasStatusCause(err, corev1.NamespaceTerminatingCause) { + // an eviction request in a deleting namespace will throw a forbidden error, + // if the pod is not marked deleted, we retry until it is. + fmt.Fprintf(d.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", activePod.Name, err) + time.Sleep(5 * time.Second) + } else { + returnCh <- fmt.Errorf("error when evicting pods/%q -n %q: %v", activePod.Name, activePod.Namespace, err) + return + } + } + if d.DryRunStrategy == cmdutil.DryRunServer { + returnCh <- nil + return + } + params := waitForDeleteParams{ + ctx: ctx, + pods: []corev1.Pod{pod}, + interval: 1 * time.Second, + timeout: time.Duration(math.MaxInt64), + usingEviction: true, + getPodFn: getPodFn, + onDoneFn: d.OnPodDeletedOrEvicted, + globalTimeout: globalTimeout, + skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds, + out: d.Out, + } + _, err := waitForDelete(params) + if err == nil { + returnCh <- nil + } else { + returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, returnCh) + } + + doneCount := 0 + var errors []error + + numPods := len(pods) + for doneCount < numPods { + select { + case err := <-returnCh: + doneCount++ + if err != nil { + errors = append(errors, err) + } + } + } + + return utilerrors.NewAggregate(errors) +} + +func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if d.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = d.Timeout + } + for _, pod := range pods { + err := d.DeletePod(pod) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + ctx := d.getContext() + params := waitForDeleteParams{ + ctx: ctx, + pods: pods, + interval: 1 * time.Second, + timeout: globalTimeout, + usingEviction: false, + getPodFn: getPodFn, + onDoneFn: d.OnPodDeletedOrEvicted, + globalTimeout: globalTimeout, + skipWaitForDeleteTimeoutSeconds: d.SkipWaitForDeleteTimeoutSeconds, + out: d.Out, + } + _, err := waitForDelete(params) + return err +} + +func waitForDelete(params waitForDeleteParams) ([]corev1.Pod, error) { + pods := params.pods + err := wait.PollImmediate(params.interval, params.timeout, func() (bool, error) { + pendingPods := []corev1.Pod{} + for i, pod := range pods { + p, err := params.getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + if params.onDoneFn != nil { + params.onDoneFn(&pod, params.usingEviction) + } + continue + } else if err != nil { + return false, err + } else { + if shouldSkipPod(*p, params.skipWaitForDeleteTimeoutSeconds) { + fmt.Fprintf(params.out, podSkipMsgTemplate, pod.Name, params.skipWaitForDeleteTimeoutSeconds) + continue + } + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + select { + case <-params.ctx.Done(): + return false, fmt.Errorf("global timeout reached: %v", params.globalTimeout) + default: + return false, nil + } + } + return true, nil + }) + return pods, err +} + +// Since Helper does not have a constructor, we can't enforce Helper.Ctx != nil +// Multiple public methods prevent us from initializing the context in a single +// place as well. +func (d *Helper) getContext() context.Context { + if d.Ctx != nil { + return d.Ctx + } + return context.Background() +} diff --git a/vendor/k8s.io/kubectl/pkg/drain/filters.go b/vendor/k8s.io/kubectl/pkg/drain/filters.go new file mode 100644 index 000000000..4e9a21b8d --- /dev/null +++ b/vendor/k8s.io/kubectl/pkg/drain/filters.go @@ -0,0 +1,258 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "context" + "fmt" + "strings" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + daemonSetFatal = "DaemonSet-managed Pods (use --ignore-daemonsets to ignore)" + daemonSetWarning = "ignoring DaemonSet-managed Pods" + localStorageFatal = "Pods with local storage (use --delete-emptydir-data to override)" + localStorageWarning = "deleting Pods with local storage" + unmanagedFatal = "Pods declare no controller (use --force to override)" + unmanagedWarning = "deleting Pods that declare no controller" +) + +// PodDelete informs filtering logic whether a pod should be deleted or not +type PodDelete struct { + Pod corev1.Pod + Status PodDeleteStatus +} + +// PodDeleteList is a wrapper around []PodDelete +type PodDeleteList struct { + items []PodDelete +} + +// Pods returns a list of all pods marked for deletion after filtering. +func (l *PodDeleteList) Pods() []corev1.Pod { + pods := []corev1.Pod{} + for _, i := range l.items { + if i.Status.Delete { + pods = append(pods, i.Pod) + } + } + return pods +} + +// Warnings returns all warning messages concatenated into a string. +func (l *PodDeleteList) Warnings() string { + ps := make(map[string][]string) + for _, i := range l.items { + if i.Status.Reason == PodDeleteStatusTypeWarning { + ps[i.Status.Message] = append(ps[i.Status.Message], fmt.Sprintf("%s/%s", i.Pod.Namespace, i.Pod.Name)) + } + } + + msgs := []string{} + for key, pods := range ps { + msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", "))) + } + return strings.Join(msgs, "; ") +} + +func (l *PodDeleteList) errors() []error { + failedPods := make(map[string][]string) + for _, i := range l.items { + if i.Status.Reason == PodDeleteStatusTypeError { + msg := i.Status.Message + if msg == "" { + msg = "unexpected error" + } + failedPods[msg] = append(failedPods[msg], fmt.Sprintf("%s/%s", i.Pod.Namespace, i.Pod.Name)) + } + } + errs := make([]error, 0, len(failedPods)) + for msg, pods := range failedPods { + errs = append(errs, fmt.Errorf("cannot delete %s: %s", msg, strings.Join(pods, ", "))) + } + return errs +} + +// PodDeleteStatus informs filters if a pod should be deleted +type PodDeleteStatus struct { + Delete bool + Reason string + Message string +} + +// PodFilter takes a pod and returns a PodDeleteStatus +type PodFilter func(corev1.Pod) PodDeleteStatus + +const ( + // PodDeleteStatusTypeOkay is "Okay" + PodDeleteStatusTypeOkay = "Okay" + // PodDeleteStatusTypeSkip is "Skip" + PodDeleteStatusTypeSkip = "Skip" + // PodDeleteStatusTypeWarning is "Warning" + PodDeleteStatusTypeWarning = "Warning" + // PodDeleteStatusTypeError is "Error" + PodDeleteStatusTypeError = "Error" +) + +// MakePodDeleteStatusOkay is a helper method to return the corresponding PodDeleteStatus +func MakePodDeleteStatusOkay() PodDeleteStatus { + return PodDeleteStatus{ + Delete: true, + Reason: PodDeleteStatusTypeOkay, + } +} + +// MakePodDeleteStatusSkip is a helper method to return the corresponding PodDeleteStatus +func MakePodDeleteStatusSkip() PodDeleteStatus { + return PodDeleteStatus{ + Delete: false, + Reason: PodDeleteStatusTypeSkip, + } +} + +// MakePodDeleteStatusWithWarning is a helper method to return the corresponding PodDeleteStatus +func MakePodDeleteStatusWithWarning(delete bool, message string) PodDeleteStatus { + return PodDeleteStatus{ + Delete: delete, + Reason: PodDeleteStatusTypeWarning, + Message: message, + } +} + +// MakePodDeleteStatusWithError is a helper method to return the corresponding PodDeleteStatus +func MakePodDeleteStatusWithError(message string) PodDeleteStatus { + return PodDeleteStatus{ + Delete: false, + Reason: PodDeleteStatusTypeError, + Message: message, + } +} + +// The filters are applied in a specific order, only the last filter's +// message will be retained if there are any warnings. +func (d *Helper) makeFilters() []PodFilter { + baseFilters := []PodFilter{ + d.skipDeletedFilter, + d.daemonSetFilter, + d.mirrorPodFilter, + d.localStorageFilter, + d.unreplicatedFilter, + } + return append(baseFilters, d.AdditionalFilters...) +} + +func hasLocalStorage(pod corev1.Pod) bool { + for _, volume := range pod.Spec.Volumes { + if volume.EmptyDir != nil { + return true + } + } + + return false +} + +func (d *Helper) daemonSetFilter(pod corev1.Pod) PodDeleteStatus { + // Note that we return false in cases where the pod is DaemonSet managed, + // regardless of flags. + // + // The exception is for pods that are orphaned (the referencing + // management resource - including DaemonSet - is not found). + // Such pods will be deleted if --force is used. + controllerRef := metav1.GetControllerOf(&pod) + if controllerRef == nil || controllerRef.Kind != appsv1.SchemeGroupVersion.WithKind("DaemonSet").Kind { + return MakePodDeleteStatusOkay() + } + // Any finished pod can be removed. + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return MakePodDeleteStatusOkay() + } + + if _, err := d.Client.AppsV1().DaemonSets(pod.Namespace).Get(context.TODO(), controllerRef.Name, metav1.GetOptions{}); err != nil { + // remove orphaned pods with a warning if --force is used + if apierrors.IsNotFound(err) && d.Force { + return MakePodDeleteStatusWithWarning(true, err.Error()) + } + + return MakePodDeleteStatusWithError(err.Error()) + } + + if !d.IgnoreAllDaemonSets { + return MakePodDeleteStatusWithError(daemonSetFatal) + } + + return MakePodDeleteStatusWithWarning(false, daemonSetWarning) +} + +func (d *Helper) mirrorPodFilter(pod corev1.Pod) PodDeleteStatus { + if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found { + return MakePodDeleteStatusSkip() + } + return MakePodDeleteStatusOkay() +} + +func (d *Helper) localStorageFilter(pod corev1.Pod) PodDeleteStatus { + if !hasLocalStorage(pod) { + return MakePodDeleteStatusOkay() + } + // Any finished pod can be removed. + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return MakePodDeleteStatusOkay() + } + if !d.DeleteEmptyDirData { + return MakePodDeleteStatusWithError(localStorageFatal) + } + + // TODO: this warning gets dropped by subsequent filters; + // consider accounting for multiple warning conditions or at least + // preserving the last warning message. + return MakePodDeleteStatusWithWarning(true, localStorageWarning) +} + +func (d *Helper) unreplicatedFilter(pod corev1.Pod) PodDeleteStatus { + // any finished pod can be removed + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return MakePodDeleteStatusOkay() + } + + controllerRef := metav1.GetControllerOf(&pod) + if controllerRef != nil { + return MakePodDeleteStatusOkay() + } + if d.Force { + return MakePodDeleteStatusWithWarning(true, unmanagedWarning) + } + return MakePodDeleteStatusWithError(unmanagedFatal) +} + +func shouldSkipPod(pod corev1.Pod, skipDeletedTimeoutSeconds int) bool { + return skipDeletedTimeoutSeconds > 0 && + !pod.ObjectMeta.DeletionTimestamp.IsZero() && + int(time.Now().Sub(pod.ObjectMeta.GetDeletionTimestamp().Time).Seconds()) > skipDeletedTimeoutSeconds +} + +func (d *Helper) skipDeletedFilter(pod corev1.Pod) PodDeleteStatus { + if shouldSkipPod(pod, d.SkipWaitForDeleteTimeoutSeconds) { + return MakePodDeleteStatusSkip() + } + return MakePodDeleteStatusOkay() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 9740139a3..13f9b7e2b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1683,6 +1683,7 @@ k8s.io/kubectl/pkg/cmd/logs k8s.io/kubectl/pkg/cmd/util k8s.io/kubectl/pkg/cmd/util/podcmd k8s.io/kubectl/pkg/describe +k8s.io/kubectl/pkg/drain k8s.io/kubectl/pkg/polymorphichelpers k8s.io/kubectl/pkg/rawhttp k8s.io/kubectl/pkg/scheme