Skip to content

Commit

Permalink
feat: drain pod before delete node
Browse files Browse the repository at this point in the history
Signed-off-by: OrangeBao <[email protected]>
  • Loading branch information
OrangeBao committed Apr 30, 2024
1 parent 8116856 commit c3ddae7
Show file tree
Hide file tree
Showing 13 changed files with 1,417 additions and 397 deletions.
8 changes: 8 additions & 0 deletions cmd/kubenest/operator/app/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
controllerruntime "sigs.k8s.io/controller-runtime"

"github.com/kosmos.io/kosmos/cmd/kubenest/operator/app/options"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller"
kosmos "github.com/kosmos.io/kosmos/pkg/kubenest/controller/kosmos"
Expand Down Expand Up @@ -81,6 +82,11 @@ func run(ctx context.Context, opts *options.Options) error {
return fmt.Errorf("could not create clientset: %v", err)
}

kosmosClient, err := versioned.NewForConfig(config)
if err != nil {
return fmt.Errorf("could not create clientset: %v", err)
}

hostPortManager, err := vcnodecontroller.NewHostPortManager(hostKubeClient)
if err != nil {
return fmt.Errorf("failed to create host port manager: %v", err)
Expand All @@ -99,6 +105,8 @@ func run(ctx context.Context, opts *options.Options) error {

VirtualClusterNodeController := vcnodecontroller.NodeController{
Client: mgr.GetClient(),
RootClientSet: hostKubeClient,
KosmosClient: kosmosClient,
EventRecorder: mgr.GetEventRecorderFor(constants.NodeControllerName),
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/kubenest/controller/virtualcluster.node.controller/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/base64"
"fmt"
"os"
"strconv"

"k8s.io/klog"
)
Expand Down Expand Up @@ -66,3 +67,17 @@ func GetExectorPort() string {
}
return exectorPort
}

func GetDrainWaitSeconds() int {
drainWaitSeconds := os.Getenv("EXECTOR_DRAIN_WAIT_SECONDS")
if len(drainWaitSeconds) == 0 {
drainWaitSeconds = "60"
}
num, err := strconv.Atoi(drainWaitSeconds)

if err != nil {
klog.Fatalf("convert EXECTOR_DRAIN_WAIT_SECONDS failed, err: %s", err)
}

return num
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned"
"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow"
"github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task"
Expand All @@ -32,7 +33,9 @@ import (

type NodeController struct {
client.Client
RootClientSet kubernetes.Interface
EventRecorder record.EventRecorder
KosmosClient versioned.Interface
}

func (r *NodeController) SetupWithManager(mgr manager.Manager) error {
Expand Down Expand Up @@ -143,24 +146,27 @@ func (r *NodeController) compareAndTranformNodes(ctx context.Context, targetNode

func (r *NodeController) UpdateVirtualClusterStatus(ctx context.Context, virtualCluster v1alpha1.VirtualCluster, status v1alpha1.Phase, reason string) error {
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
targetObj := v1alpha1.VirtualCluster{}
var targetObj v1alpha1.VirtualCluster
if err := r.Get(ctx, types.NamespacedName{Name: virtualCluster.Name, Namespace: virtualCluster.Namespace}, &targetObj); err != nil {
klog.Warningf("get target virtualcluster %s namespace %s failed: %v", virtualCluster.Name, virtualCluster.Namespace, err)
return err
}
updateVirtualCluster := targetObj.DeepCopy()
updateVirtualCluster.Status.Phase = status
if len(status) > 0 {
updateVirtualCluster.Status.Phase = status
}
updateVirtualCluster.Status.Reason = reason
updateTime := metav1.Now()
updateVirtualCluster.Status.UpdateTime = &updateTime

if err := r.Update(ctx, updateVirtualCluster); err != nil {
if _, err := r.KosmosClient.KosmosV1alpha1().VirtualClusters(updateVirtualCluster.Namespace).Update(ctx, updateVirtualCluster, metav1.UpdateOptions{}); err != nil && !apierrors.IsNotFound(err) {
klog.Warningf("update target virtualcluster %s namespace %s failed: %v", virtualCluster.Name, virtualCluster.Namespace, err)
return err
}
return nil
})

if retryErr != nil {
return fmt.Errorf("update virtualcluster %s status failed: %s", virtualCluster.Name, retryErr)
return fmt.Errorf("update virtualcluster %s status namespace %s failed: %s", virtualCluster.Name, virtualCluster.Namespace, retryErr)
}

return nil
Expand Down Expand Up @@ -227,6 +233,7 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques
return reconcile.Result{}, nil
}
klog.Errorf("get clusternode %s error: %v", request.NamespacedName, err)
r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error())

Check failure on line 236 in pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go

View workflow job for this annotation

GitHub Actions / verify

Error return value of `r.UpdateVirtualClusterStatus` is not checked (errcheck)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

Expand All @@ -238,6 +245,7 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques
if !virtualCluster.GetDeletionTimestamp().IsZero() && len(virtualCluster.Spec.Kubeconfig) == 0 {
if err := r.DoNodeClean(ctx, virtualCluster); err != nil {
klog.Errorf("virtualcluster %s do node clean failed: %v", virtualCluster.Name, err)
r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error())

Check failure on line 248 in pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go

View workflow job for this annotation

GitHub Actions / verify

Error return value of `r.UpdateVirtualClusterStatus` is not checked (errcheck)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}
return reconcile.Result{}, nil
Expand All @@ -248,8 +256,14 @@ func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Reques
return reconcile.Result{}, nil
}

if virtualCluster.Status.Phase == v1alpha1.Pending {
klog.V(4).Infof("virtualcluster is pending, cluster name: %s", virtualCluster.Name)
return reconcile.Result{}, nil
}

if err := r.DoNodeTask(ctx, virtualCluster); err != nil {
klog.Errorf("virtualcluster %s do node task failed: %v", virtualCluster.Name, err)
r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error())

Check failure on line 266 in pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go

View workflow job for this annotation

GitHub Actions / verify

Error return value of `r.UpdateVirtualClusterStatus` is not checked (errcheck)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil
}

Expand Down Expand Up @@ -285,7 +299,8 @@ func (r *NodeController) cleanGlobalNode(ctx context.Context, nodeInfos []v1alph
if err := workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
HostK8sClient: r.Client,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
// VirtualK8sClient: _,
}); err != nil {
return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err)
Expand Down Expand Up @@ -313,7 +328,8 @@ func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.Glob
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
KubeDNSAddress: clusterDNS,
HostK8sClient: r.Client,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
VirtualK8sClient: k8sClient,
}); err != nil {
return fmt.Errorf("join node %s failed: %s", nodeInfo.Name, err)
Expand All @@ -327,7 +343,8 @@ func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.Gl
if err := workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{
NodeInfo: nodeInfo,
VirtualCluster: virtualCluster,
HostK8sClient: r.Client,
HostClient: r.Client,
HostK8sClient: r.RootClientSet,
VirtualK8sClient: k8sClient,
}); err != nil {
return fmt.Errorf("unjoin node %s failed: %s", nodeInfo.Name, err)
Expand Down
Loading

0 comments on commit c3ddae7

Please sign in to comment.