diff --git a/pkg/clustertree/cluster-manager/cluster_controller.go b/pkg/clustertree/cluster-manager/cluster_controller.go index b0e339504..5e417c811 100644 --- a/pkg/clustertree/cluster-manager/cluster_controller.go +++ b/pkg/clustertree/cluster-manager/cluster_controller.go @@ -187,10 +187,10 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req return reconcile.Result{}, fmt.Errorf("new manager with err %v, cluster %s", err, cluster.Name) } - leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.Root, mgr.GetClient(), c.RootClientset, leafClient) + leafModelHandler := leafUtils.NewLeafModelHandler(cluster, c.RootClientset, leafClient) c.LeafModelHandler = leafModelHandler - nodes, err := c.createNode(ctx, cluster, leafClient) + nodes, leafNodeSelectors, err := c.createNode(ctx, cluster, leafClient) if err != nil { return reconcile.Result{RequeueAfter: RequeueTime}, fmt.Errorf("create node with err %v, cluster %s", err, cluster.Name) } @@ -206,7 +206,7 @@ func (c *ClusterController) Reconcile(ctx context.Context, request reconcile.Req c.ManagerCancelFuncs[cluster.Name] = &cancel c.ControllerManagersLock.Unlock() - if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafClient, kosmosClient, config); err != nil { + if err = c.setupControllers(mgr, cluster, nodes, leafDynamic, leafNodeSelectors, leafClient, kosmosClient, config); err != nil { return reconcile.Result{}, fmt.Errorf("failed to setup cluster %s controllers: %v", cluster.Name, err) } @@ -240,6 +240,7 @@ func (c *ClusterController) setupControllers( cluster *kosmosv1alpha1.Cluster, nodes []*corev1.Node, clientDynamic *dynamic.DynamicClient, + leafNodeSelector map[string]kosmosv1alpha1.NodeSelector, leafClientset kubernetes.Interface, kosmosClient kosmosversioned.Interface, leafRestConfig *rest.Config) error { @@ -262,6 +263,7 @@ func (c *ClusterController) setupControllers( Root: c.Root, RootClientset: c.RootClientset, Nodes: nodes, + LeafNodeSelectors: leafNodeSelector, LeafModelHandler: c.LeafModelHandler, Cluster: cluster, } @@ -269,7 +271,7 @@ func (c *ClusterController) setupControllers( return fmt.Errorf("error starting %s: %v", controllers.NodeResourcesControllerName, err) } - nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, c.RootClientset, c.LeafModelHandler) + nodeLeaseController := controllers.NewNodeLeaseController(leafClientset, c.Root, nodes, leafNodeSelector, c.RootClientset, c.LeafModelHandler) if err := mgr.Add(nodeLeaseController); err != nil { return fmt.Errorf("error starting %s: %v", controllers.NodeLeaseControllerName, err) } @@ -334,19 +336,19 @@ func (c *ClusterController) setupStorageControllers(mgr manager.Manager, isOne2O return nil } -func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, error) { +func (c *ClusterController) createNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster, leafClient kubernetes.Interface) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) { serverVersion, err := leafClient.Discovery().ServerVersion() if err != nil { klog.Errorf("create node failed, can not connect to leaf %s", cluster.Name) - return nil, err + return nil, nil, err } - nodes, err := c.LeafModelHandler.CreateNodeInRoot(ctx, cluster, c.Options.ListenPort, serverVersion.GitVersion) + nodes, leafNodeSelectors, err := c.LeafModelHandler.CreateRootNode(ctx, c.Options.ListenPort, serverVersion.GitVersion) if err != nil { klog.Errorf("create node for cluster %s failed, err: %v", cluster.Name, err) - return nil, err + return nil, nil, err } - return nodes, nil + return nodes, leafNodeSelectors, nil } func (c *ClusterController) deleteNode(ctx context.Context, cluster *kosmosv1alpha1.Cluster) error { diff --git a/pkg/clustertree/cluster-manager/controllers/node_lease_controller.go b/pkg/clustertree/cluster-manager/controllers/node_lease_controller.go index 4dceb03a2..31eedd15c 100644 --- a/pkg/clustertree/cluster-manager/controllers/node_lease_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/node_lease_controller.go @@ -17,6 +17,7 @@ import ( "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" ) @@ -38,19 +39,21 @@ type NodeLeaseController struct { leaseInterval time.Duration statusInterval time.Duration - nodes []*corev1.Node - nodeLock sync.Mutex + nodes []*corev1.Node + LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector + nodeLock sync.Mutex } -func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController { +func NewNodeLeaseController(leafClient kubernetes.Interface, root client.Client, nodes []*corev1.Node, LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector, rootClient kubernetes.Interface, LeafModelHandler leafUtils.LeafModelHandler) *NodeLeaseController { c := &NodeLeaseController{ - leafClient: leafClient, - rootClient: rootClient, - root: root, - nodes: nodes, - LeafModelHandler: LeafModelHandler, - leaseInterval: getRenewInterval(), - statusInterval: DefaultNodeStatusUpdateInterval, + leafClient: leafClient, + rootClient: rootClient, + root: root, + nodes: nodes, + LeafModelHandler: LeafModelHandler, + LeafNodeSelectors: LeafNodeSelectors, + leaseInterval: getRenewInterval(), + statusInterval: DefaultNodeStatusUpdateInterval, } return c } @@ -71,15 +74,15 @@ func (c *NodeLeaseController) syncNodeStatus(ctx context.Context) { } c.nodeLock.Unlock() - err := c.updateNodeStatus(ctx, nodes) + err := c.updateNodeStatus(ctx, nodes, c.LeafNodeSelectors) if err != nil { klog.Errorf(err.Error()) } } // nolint -func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node) error { - err := c.LeafModelHandler.UpdateNodeStatus(ctx, n) +func (c *NodeLeaseController) updateNodeStatus(ctx context.Context, n []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error { + err := c.LeafModelHandler.UpdateRootNodeStatus(ctx, n, leafNodeSelector) if err != nil { klog.Errorf("Could not update node status in root cluster,Error: %v", err) } diff --git a/pkg/clustertree/cluster-manager/controllers/node_resources_controller.go b/pkg/clustertree/cluster-manager/controllers/node_resources_controller.go index 0cfcad813..2ea1472e9 100644 --- a/pkg/clustertree/cluster-manager/controllers/node_resources_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/node_resources_controller.go @@ -39,10 +39,11 @@ type NodeResourcesController struct { GlobalLeafManager leafUtils.LeafResourceManager RootClientset kubernetes.Interface - Nodes []*corev1.Node - LeafModelHandler leafUtils.LeafModelHandler - Cluster *kosmosv1alpha1.Cluster - EventRecorder record.EventRecorder + Nodes []*corev1.Node + LeafNodeSelectors map[string]kosmosv1alpha1.NodeSelector + LeafModelHandler leafUtils.LeafModelHandler + Cluster *kosmosv1alpha1.Cluster + EventRecorder record.EventRecorder } var predicatesFunc = predicate.Funcs{ @@ -110,7 +111,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci }, fmt.Errorf("cannot get node while update nodeInRoot resources %s, err: %v", rootNode.Name, err) } - nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode) + nodesInLeaf, err := c.LeafModelHandler.GetLeafNodes(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name]) if err != nil { klog.Errorf("Could not get node in leaf cluster %s,Error: %v", c.Cluster.Name, err) return controllerruntime.Result{ @@ -118,7 +119,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci }, err } - pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode) + pods, err := c.LeafModelHandler.GetLeafPods(ctx, rootNode, c.LeafNodeSelectors[rootNode.Name]) if err != nil { klog.Errorf("Could not list pod in leaf cluster %s,Error: %v", c.Cluster.Name, err) return controllerruntime.Result{ @@ -130,7 +131,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci clone.Status.Conditions = utils.NodeConditions() // Node2Node mode should sync leaf node's labels and annotations to root nodeInRoot - if c.LeafModelHandler.GetLeafModelType() == leafUtils.DispersionModel { + if c.LeafModelHandler.GetLeafMode() == leafUtils.Node { getNode := func(nodes *corev1.NodeList) *corev1.Node { for _, nodeInLeaf := range nodes.Items { if nodeInLeaf.Name == rootNode.Name { @@ -156,7 +157,7 @@ func (c *NodeResourcesController) Reconcile(ctx context.Context, request reconci } } } - + // TODO ggregation Labels and Annotations for classificationModel clusterResources := utils.CalculateClusterResources(nodesInLeaf, pods) clone.Status.Allocatable = clusterResources clone.Status.Capacity = clusterResources diff --git a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go index fad5cd07c..d557863b1 100644 --- a/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go +++ b/pkg/clustertree/cluster-manager/controllers/pod/root_pod_controller.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kosmos.io/kosmos/cmd/clustertree/cluster-manager/app/options" + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/extensions/daemonset" leafUtils "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/utils" @@ -198,7 +199,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req // create pod in leaf if err != nil { if errors.IsNotFound(err) { - if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod); err != nil { + if err := r.CreatePodInLeafCluster(ctx, lr, &rootpod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil { klog.Errorf("create pod inleaf error, err: %s", err) return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil } else { @@ -212,7 +213,7 @@ func (r *RootPodReconciler) Reconcile(ctx context.Context, request reconcile.Req // update pod in leaf if podutils.ShouldEnqueue(leafPod, &rootpod) { - if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod); err != nil { + if err := r.UpdatePodInLeafCluster(ctx, lr, &rootpod, leafPod, r.GlobalLeafManager.GetClusterNode(rootpod.Spec.NodeName).LeafNodeSelector); err != nil { return reconcile.Result{RequeueAfter: RootPodRequeueTime}, nil } } @@ -698,7 +699,7 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea return nil } -func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod) error { +func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error { if err := podutils.PopulateEnvironmentVariables(ctx, pod, r.envResourceManager); err != nil { // span.SetStatus(err) return err @@ -709,7 +710,7 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf return fmt.Errorf("clusternode info is nil , name: %s", pod.Spec.NodeName) } - basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode == leafUtils.ALL) + basicPod := podutils.FitPod(pod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector) klog.V(4).Infof("Creating pod %v/%+v", pod.Namespace, pod.Name) // create ns @@ -763,24 +764,28 @@ func (r *RootPodReconciler) CreatePodInLeafCluster(ctx context.Context, lr *leaf return nil } -func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootpod *corev1.Pod, leafpod *corev1.Pod) error { +func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, rootPod *corev1.Pod, leafPod *corev1.Pod, nodeSelector kosmosv1alpha1.NodeSelector) error { // TODO: update env // TODO: update config secret pv pvc ... - klog.V(4).Infof("Updating pod %v/%+v", rootpod.Namespace, rootpod.Name) + klog.V(4).Infof("Updating pod %v/%+v", rootPod.Namespace, rootPod.Name) - if !podutils.IsKosmosPod(leafpod) { + if !podutils.IsKosmosPod(leafPod) { klog.V(4).Info("Pod is not created by kosmos tree, ignore") return nil } // not used - podutils.FitLabels(leafpod.ObjectMeta.Labels, lr.IgnoreLabels) - podCopy := leafpod.DeepCopy() + podutils.FitLabels(leafPod.ObjectMeta.Labels, lr.IgnoreLabels) + podCopy := leafPod.DeepCopy() // util.GetUpdatedPod update PodCopy container image, annotations, labels. // recover toleration, affinity, tripped ignore labels. - podutils.GetUpdatedPod(podCopy, rootpod, lr.IgnoreLabels) - if reflect.DeepEqual(leafpod.Spec, podCopy.Spec) && - reflect.DeepEqual(leafpod.Annotations, podCopy.Annotations) && - reflect.DeepEqual(leafpod.Labels, podCopy.Labels) { + clusterNodeInfo := r.GlobalLeafManager.GetClusterNode(rootPod.Spec.NodeName) + if clusterNodeInfo == nil { + return fmt.Errorf("clusternode info is nil , name: %s", rootPod.Spec.NodeName) + } + podutils.GetUpdatedPod(podCopy, rootPod, lr.IgnoreLabels, clusterNodeInfo.LeafMode, nodeSelector) + if reflect.DeepEqual(leafPod.Spec, podCopy.Spec) && + reflect.DeepEqual(leafPod.Annotations, podCopy.Annotations) && + reflect.DeepEqual(leafPod.Labels, podCopy.Labels) { return nil } @@ -796,7 +801,7 @@ func (r *RootPodReconciler) UpdatePodInLeafCluster(ctx context.Context, lr *leaf if err != nil { return fmt.Errorf("could not update pod: %v", err) } - klog.V(4).Infof("Update pod %v/%+v success ", rootpod.Namespace, rootpod.Name) + klog.V(4).Infof("Update pod %v/%+v success ", rootPod.Namespace, rootPod.Name) return nil } diff --git a/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go b/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go index 55c4cbf5f..e33e60f42 100644 --- a/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go +++ b/pkg/clustertree/cluster-manager/utils/leaf_model_handler.go @@ -3,14 +3,14 @@ package utils import ( "context" "fmt" + "reflect" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" - "sigs.k8s.io/controller-runtime/pkg/client" + "k8s.io/klog/v2" kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils" @@ -18,227 +18,149 @@ import ( // LeafModelHandler is the interface to handle the leafModel logic type LeafModelHandler interface { - // GetLeafModelType returns the leafModelType for a Cluster - GetLeafModelType() LeafModelType + // GetLeafMode returns the leafMode for a Cluster + GetLeafMode() LeafMode // GetLeafNodes returns nodes in leaf cluster by the rootNode - GetLeafNodes(ctx context.Context, rootNode *corev1.Node) (*corev1.NodeList, error) + GetLeafNodes(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (*corev1.NodeList, error) // GetLeafPods returns pods in leaf cluster by the rootNode - GetLeafPods(ctx context.Context, rootNode *corev1.Node) (*corev1.PodList, error) + GetLeafPods(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (*corev1.PodList, error) - // UpdateNodeStatus updates the node's status in root cluster - UpdateNodeStatus(ctx context.Context, node []*corev1.Node) error + // UpdateRootNodeStatus updates the node's status in root cluster + UpdateRootNodeStatus(ctx context.Context, node []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error - // CreateNodeInRoot creates the node in root cluster - CreateNodeInRoot(ctx context.Context, cluster *kosmosv1alpha1.Cluster, listenPort int32, gitVersion string) ([]*corev1.Node, error) + // CreateRootNode creates the node in root cluster + CreateRootNode(ctx context.Context, listenPort int32, gitVersion string) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) } -// LeafModelType represents the type of leaf model -type LeafModelType string - -const ( - AggregationModel LeafModelType = "aggregation" - DispersionModel LeafModelType = "dispersion" -) - -// AggregationModelHandler handles the aggregation leaf model -type AggregationModelHandler struct { - Cluster *kosmosv1alpha1.Cluster - LeafClient client.Client - RootClient client.Client +// ClassificationHandler handles the Classification leaf model +type ClassificationHandler struct { + leafMode LeafMode + Cluster *kosmosv1alpha1.Cluster + //LeafClient client.Client + //RootClient client.Client RootClientset kubernetes.Interface + LeafClientset kubernetes.Interface } -// CreateNodeInRoot creates the node in root cluster -func (h AggregationModelHandler) CreateNodeInRoot(ctx context.Context, cluster *kosmosv1alpha1.Cluster, listenPort int32, gitVersion string) ([]*corev1.Node, error) { - nodes := make([]*corev1.Node, 0) - nodeName := fmt.Sprintf("%s%s", utils.KosmosNodePrefix, cluster.Name) - node, err := h.RootClientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - if !errors.IsNotFound(err) { - return nil, err - } - node = utils.BuildNodeTemplate(nodeName) - node.Status.NodeInfo.KubeletVersion = gitVersion - node.Status.DaemonEndpoints = corev1.NodeDaemonEndpoints{ - KubeletEndpoint: corev1.DaemonEndpoint{ - Port: listenPort, - }, - } - - // node.Status.Addresses = GetAddress() - - node, err = h.RootClientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) - if err != nil { - return nil, err - } - } - nodes = append(nodes, node) - return nodes, nil +// GetLeafMode returns the leafMode for a Cluster +func (h ClassificationHandler) GetLeafMode() LeafMode { + return h.leafMode } -// UpdateNodeStatus updates the node's status in root cluster -func (h AggregationModelHandler) UpdateNodeStatus(ctx context.Context, n []*corev1.Node) error { - var name string - if len(n) > 0 { - name = n[0].Name +// GetLeafNodes returns nodes in leaf cluster by the rootNode +func (h ClassificationHandler) GetLeafNodes(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (nodesInLeaf *corev1.NodeList, err error) { + listOption := metav1.ListOptions{} + if h.leafMode == Party { + listOption.LabelSelector = metav1.FormatLabelSelector(selector.LabelSelector) } - node := &corev1.Node{} - namespacedName := types.NamespacedName{ - Name: name, + if h.leafMode == Node { + listOption.FieldSelector = fmt.Sprintf("metadata.name=%s", rootNode.Name) } - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - err := h.RootClient.Get(ctx, namespacedName, node) - if err != nil { - // TODO: If a node is accidentally deleted, recreate it - return fmt.Errorf("cannot get node while update node status %s, err: %v", name, err) - } - clone := node.DeepCopy() - clone.Status.Conditions = utils.NodeConditions() + nodesInLeaf, err = h.LeafClientset.CoreV1().Nodes().List(ctx, listOption) + if err != nil { + return nil, err + } + return nodesInLeaf, nil +} - nodeListInLeaf := &corev1.NodeList{} - err = h.LeafClient.List(ctx, nodeListInLeaf) +// GetLeafPods returns pods in leaf cluster by the rootNode +func (h ClassificationHandler) GetLeafPods(ctx context.Context, rootNode *corev1.Node, selector kosmosv1alpha1.NodeSelector) (pods *corev1.PodList, err error) { + if h.leafMode == Party { + pods, err = h.LeafClientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) if err != nil { - return fmt.Errorf("cannot get node in leaf cluster while update node status err: %v", err) - } - - if len(nodeListInLeaf.Items) == 0 { - return fmt.Errorf("cannot get node in leaf cluster while update node status, leaf node item is 0") + return nil, err } - - clone.Status.Addresses, err = GetAddress(ctx, h.RootClientset, nodeListInLeaf.Items[0].Status.Addresses) - + } else if h.leafMode == Node { + pods, err = h.LeafClientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", rootNode.Name)}) if err != nil { - return err + return nil, err } - - patch, err := utils.CreateMergePatch(node, clone) - + } else { + nodesInLeafs, err := h.GetLeafNodes(ctx, rootNode, selector) if err != nil { - return fmt.Errorf("cannot get node while update node status %s, err: %v", node.Name, err) + return nil, err } - if node, err = h.RootClientset.CoreV1().Nodes().PatchStatus(ctx, node.Name, patch); err != nil { - return err + for _, node := range nodesInLeafs.Items { + podsInNode, err := h.LeafClientset.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s", node.Name), + }) + if err != nil { + return nil, err + } + if pods == nil { + pods = podsInNode + } else { + pods.Items = append(pods.Items, podsInNode.Items...) + } } - return nil - }) - if err != nil { - return err - } - return nil -} - -// GetLeafPods returns pods in leaf cluster by the rootNode -func (h AggregationModelHandler) GetLeafPods(ctx context.Context, rootNode *corev1.Node) (*corev1.PodList, error) { - pods := &corev1.PodList{} - err := h.LeafClient.List(ctx, pods) - if err != nil { - return nil, err } return pods, nil } -// GetLeafNodes returns nodes in leaf cluster by the rootNode -func (h AggregationModelHandler) GetLeafNodes(ctx context.Context, _ *corev1.Node) (*corev1.NodeList, error) { - nodesInLeaf := &corev1.NodeList{} - err := h.LeafClient.List(ctx, nodesInLeaf) - if err != nil { - return nil, err - } - return nodesInLeaf, nil -} - -// GetLeafModelType returns the leafModelType for a Cluster -func (h AggregationModelHandler) GetLeafModelType() LeafModelType { - return AggregationModel -} - -// DispersionModelHandler handles the dispersion leaf model -type DispersionModelHandler struct { - Cluster *kosmosv1alpha1.Cluster - LeafClient client.Client - RootClient client.Client - RootClientset kubernetes.Interface - LeafClientset kubernetes.Interface -} - -// CreateNodeInRoot creates the node in root cluster -func (h DispersionModelHandler) CreateNodeInRoot(ctx context.Context, cluster *kosmosv1alpha1.Cluster, listenPort int32, gitVersion string) ([]*corev1.Node, error) { - nodes := make([]*corev1.Node, 0) - for _, leafModel := range cluster.Spec.ClusterTreeOptions.LeafModels { - // todo only support nodeName now - if leafModel.NodeSelector.NodeName != "" { - nodeName := leafModel.NodeSelector.NodeName - node, err := h.RootClientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - if !errors.IsNotFound(err) { - return nil, err - } - - node = utils.BuildNodeTemplate(nodeName) - nodeAnnotations := node.GetAnnotations() - if nodeAnnotations == nil { - nodeAnnotations = make(map[string]string, 1) - } - nodeAnnotations[utils.KosmosNodeOwnedByClusterAnnotations] = cluster.Name - node.SetAnnotations(nodeAnnotations) - - node.Status.NodeInfo.KubeletVersion = gitVersion - node.Status.DaemonEndpoints = corev1.NodeDaemonEndpoints{ - KubeletEndpoint: corev1.DaemonEndpoint{ - Port: listenPort, - }, - } - - // node.Status.Addresses = GetAddress() - - node, err = h.RootClientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) - if err != nil { - return nil, err - } +// UpdateRootNodeStatus updates the node's status in root cluster +func (h ClassificationHandler) UpdateRootNodeStatus(ctx context.Context, nodesInRoot []*corev1.Node, leafNodeSelector map[string]kosmosv1alpha1.NodeSelector) error { + for _, node := range nodesInRoot { + nodeNameInRoot := node.Name + listOptions := metav1.ListOptions{} + if h.leafMode == Party { + selector, ok := leafNodeSelector[nodeNameInRoot] + if !ok { + klog.Warningf("have no nodeSelector for the join node: v%", nodeNameInRoot) + continue } - nodes = append(nodes, node) + listOptions.LabelSelector = metav1.FormatLabelSelector(selector.LabelSelector) } - } - return nodes, nil -} -// UpdateNodeStatus updates the node's status in root cluster -func (h DispersionModelHandler) UpdateNodeStatus(ctx context.Context, n []*corev1.Node) error { - for _, node := range n { - nodeCopy := node.DeepCopy() - namespacedName := types.NamespacedName{ - Name: nodeCopy.Name, - } err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - nodeInLeaf := &corev1.Node{} - err := h.LeafClient.Get(ctx, namespacedName, nodeInLeaf) + nodeInRoot, err := h.RootClientset.CoreV1().Nodes().Get(ctx, nodeNameInRoot, metav1.GetOptions{}) if err != nil { // TODO: If a node is accidentally deleted, recreate it - return fmt.Errorf("cannot get node in leaf cluster while update node status %s, err: %v", nodeCopy.Name, err) + return fmt.Errorf("cannot get node in root cluster while update the join node status %s, err: %v", nodeNameInRoot, err) } - nodeRoot := &corev1.Node{} - err = h.RootClient.Get(ctx, namespacedName, nodeRoot) + nodesInLeaf, err := h.LeafClientset.CoreV1().Nodes().List(ctx, listOptions) if err != nil { // TODO: If a node is accidentally deleted, recreate it - return fmt.Errorf("cannot get node in root cluster while update node status %s, err: %v", nodeCopy.Name, err) + return fmt.Errorf("cannot get node in leaf cluster while update the join node %s status, err: %v", nodeNameInRoot, err) + } + if len(nodesInLeaf.Items) == 0 { + // TODO: If a node is accidentally deleted, recreate it + return fmt.Errorf("have no node in leaf cluster while update the join node %s status", nodeNameInRoot) + } + + rootCopy := nodeInRoot.DeepCopy() + + if h.leafMode == Node { + rootCopy.Status = *nodesInLeaf.Items[0].Status.DeepCopy() + } else { + rootCopy.Status.Conditions = utils.NodeConditions() + + // Aggregation the resources of the leaf nodes + pods, err := h.GetLeafPods(ctx, rootCopy, leafNodeSelector[nodeNameInRoot]) + if err != nil { + return fmt.Errorf("could not list pod in leaf cluster while update the join node %s status, err: %v", nodeNameInRoot, err) + } + clusterResources := utils.CalculateClusterResources(nodesInLeaf, pods) + rootCopy.Status.Allocatable = clusterResources + rootCopy.Status.Capacity = clusterResources } - rootCopy := nodeRoot.DeepCopy() - nodeRoot.Status = nodeInLeaf.Status - nodeRoot.Status.Addresses, err = GetAddress(ctx, h.RootClientset, nodeInLeaf.Status.Addresses) + rootCopy.Status.Addresses, err = GetAddress(ctx, h.RootClientset, nodesInLeaf.Items[0].Status.Addresses) if err != nil { return err } - nodeRoot.Status.Allocatable = rootCopy.Status.Allocatable - nodeRoot.Status.Capacity = rootCopy.Status.Capacity - if node, err = h.RootClientset.CoreV1().Nodes().UpdateStatus(ctx, nodeRoot, metav1.UpdateOptions{}); err != nil { + patch, err := utils.CreateMergePatch(nodeInRoot, rootCopy) + if err != nil { + return fmt.Errorf("failed to CreateMergePatch while update join node %s status, err: %v", nodeNameInRoot, err) + } + + if _, err = h.RootClientset.CoreV1().Nodes().PatchStatus(ctx, node.Name, patch); err != nil { return err } return nil @@ -250,43 +172,92 @@ func (h DispersionModelHandler) UpdateNodeStatus(ctx context.Context, n []*corev return nil } -func (h DispersionModelHandler) GetLeafPods(ctx context.Context, rootNode *corev1.Node) (*corev1.PodList, error) { - pods, err := h.LeafClientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", rootNode.Name)}) +func createNode(ctx context.Context, clientset kubernetes.Interface, clusterName, nodeName, gitVersion string, listenPort int32) (*corev1.Node, error) { + nodeInRoot, err := clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { - return nil, err + if !errors.IsNotFound(err) { + return nil, err + } + + nodeInRoot = utils.BuildNodeTemplate(nodeName) + nodeAnnotations := nodeInRoot.GetAnnotations() + if nodeAnnotations == nil { + nodeAnnotations = make(map[string]string, 1) + } + nodeAnnotations[utils.KosmosNodeOwnedByClusterAnnotations] = clusterName + nodeInRoot.SetAnnotations(nodeAnnotations) + + nodeInRoot.Status.NodeInfo.KubeletVersion = gitVersion + nodeInRoot.Status.DaemonEndpoints = corev1.NodeDaemonEndpoints{ + KubeletEndpoint: corev1.DaemonEndpoint{ + Port: listenPort, + }, + } + + nodeInRoot, err = clientset.CoreV1().Nodes().Create(ctx, nodeInRoot, metav1.CreateOptions{}) + if err != nil { + return nil, err + } } - return pods, nil + return nodeInRoot, nil } -func (h DispersionModelHandler) GetLeafNodes(ctx context.Context, rootNode *corev1.Node) (*corev1.NodeList, error) { - nodesInLeaf, err := h.LeafClientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("metadata.name=%s", rootNode.Name)}) - if err != nil { - return nil, err +// CreateRootNode creates the node in root cluster +func (h ClassificationHandler) CreateRootNode(ctx context.Context, listenPort int32, gitVersion string) ([]*corev1.Node, map[string]kosmosv1alpha1.NodeSelector, error) { + nodes := make([]*corev1.Node, 0) + leafNodeSelectors := make(map[string]kosmosv1alpha1.NodeSelector) + cluster := h.Cluster + + if h.leafMode == ALL { + nodeNameInRoot := fmt.Sprintf("%s%s", utils.KosmosNodePrefix, cluster.Name) + nodeInRoot, err := createNode(ctx, h.RootClientset, cluster.Name, nodeNameInRoot, gitVersion, listenPort) + if err != nil { + return nil, nil, err + } + nodes = append(nodes, nodeInRoot) + leafNodeSelectors[nodeNameInRoot] = kosmosv1alpha1.NodeSelector{} + } else { + for i, leafModel := range cluster.Spec.ClusterTreeOptions.LeafModels { + var nodeNameInRoot string + if h.leafMode == Node { + nodeNameInRoot = leafModel.NodeSelector.NodeName + } else { + nodeNameInRoot = fmt.Sprintf("%v%v%v%v", utils.KosmosNodePrefix, leafModel.LeafNodeName, "-", i) + } + if len(nodeNameInRoot) > 63 { + nodeNameInRoot = nodeNameInRoot[:63] + } + + nodeInRoot, err := createNode(ctx, h.RootClientset, cluster.Name, nodeNameInRoot, gitVersion, listenPort) + if err != nil { + return nil, nil, err + } + nodes = append(nodes, nodeInRoot) + leafNodeSelectors[nodeNameInRoot] = leafModel.NodeSelector + } } - return nodesInLeaf, nil -} -func (h DispersionModelHandler) GetLeafModelType() LeafModelType { - return DispersionModel + return nodes, leafNodeSelectors, nil } // NewLeafModelHandler create a LeafModelHandler for Cluster -func NewLeafModelHandler(cluster *kosmosv1alpha1.Cluster, root, leafClient client.Client, rootClientset, leafClientset kubernetes.Interface) LeafModelHandler { - // todo support nodeSelector mode - if cluster.Spec.ClusterTreeOptions.LeafModels != nil { - return &DispersionModelHandler{ - Cluster: cluster, - LeafClient: leafClient, - RootClient: root, - RootClientset: rootClientset, - LeafClientset: leafClientset, - } - } else { - return &AggregationModelHandler{ - Cluster: cluster, - LeafClient: leafClient, - RootClient: root, - RootClientset: rootClientset, +func NewLeafModelHandler(cluster *kosmosv1alpha1.Cluster, rootClientset, leafClientset kubernetes.Interface) LeafModelHandler { + classificationModel := &ClassificationHandler{ + leafMode: ALL, + Cluster: cluster, + RootClientset: rootClientset, + LeafClientset: leafClientset, + } + + leafModels := cluster.Spec.ClusterTreeOptions.LeafModels + + if leafModels != nil && !reflect.DeepEqual(leafModels[0].NodeSelector, kosmosv1alpha1.NodeSelector{}) { + if leafModels[0].NodeSelector.LabelSelector != nil && !reflect.DeepEqual(leafModels[0].NodeSelector.LabelSelector, metav1.LabelSelector{}) { + // support nodeSelector mode + classificationModel.leafMode = Party + } else if leafModels[0].NodeSelector.NodeName != "" { + classificationModel.leafMode = Node } } + return classificationModel } diff --git a/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go b/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go index 5c40a96d0..781135568 100644 --- a/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go +++ b/pkg/clustertree/cluster-manager/utils/leaf_resource_manager.go @@ -24,14 +24,15 @@ var ( type LeafMode int const ( - ALL LeafMode = 0 - Node LeafMode = 1 - // Party LeafMode = 2 + ALL LeafMode = iota + Node + Party ) type ClusterNode struct { - NodeName string - LeafMode LeafMode + NodeName string + LeafMode LeafMode + LeafNodeSelector kosmosv1alpha1.NodeSelector } type LeafResource struct { @@ -103,13 +104,18 @@ func (l *leafResourceManager) AddLeafResource(lptr *LeafResource, cluster *kosmo clusterNodes := []ClusterNode{} for i, n := range nodes { - if leafModels != nil && len(leafModels[i].NodeSelector.NodeName) > 0 { + if leafModels != nil && leafModels[i].NodeSelector.LabelSelector != nil { + // TODO: support labelselector + clusterNodes = append(clusterNodes, ClusterNode{ + NodeName: trimNamePrefix(n.Name), + LeafMode: Party, + LeafNodeSelector: leafModels[i].NodeSelector, + }) + } else if leafModels != nil && len(leafModels[i].NodeSelector.NodeName) > 0 { clusterNodes = append(clusterNodes, ClusterNode{ NodeName: n.Name, LeafMode: Node, }) - // } else if leafModels != nil && leafModels[i].NodeSelector.LabelSelector != nil { - // TODO: support labelselector } else { clusterNodes = append(clusterNodes, ClusterNode{ NodeName: trimNamePrefix(n.Name), diff --git a/pkg/utils/podutils/pod.go b/pkg/utils/podutils/pod.go index d6da91e18..3044d3387 100644 --- a/pkg/utils/podutils/pod.go +++ b/pkg/utils/podutils/pod.go @@ -10,6 +10,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/klog" + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + clustertreeutil "github.com/kosmos.io/kosmos/pkg/clustertree/cluster-manager/utils" "github.com/kosmos.io/kosmos/pkg/utils" ) @@ -141,7 +143,64 @@ func FitUnstructuredObjMeta(unstructuredObj *unstructured.Unstructured) { } } -func FitPod(pod *corev1.Pod, ignoreLabels []string, cleanNodeName bool) *corev1.Pod { +func fitNodeAffinity(affinity *corev1.Affinity, nodeSelector kosmosv1alpha1.NodeSelector) (cpAffinity *corev1.Affinity) { + nodeSelectorTerms := make([]corev1.NodeSelectorTerm, 0) + nodeSelectorTerm := corev1.NodeSelectorTerm{ + MatchExpressions: make([]corev1.NodeSelectorRequirement, 0), + } + if nodeSelector.LabelSelector.MatchLabels != nil { + for key, value := range nodeSelector.LabelSelector.MatchLabels { + selector := corev1.NodeSelectorRequirement{ + Key: key, + Operator: corev1.NodeSelectorOpIn, + Values: []string{value}, + } + nodeSelectorTerm.MatchExpressions = append(nodeSelectorTerm.MatchExpressions, selector) + } + } + + if nodeSelector.LabelSelector.MatchExpressions != nil { + for _, item := range nodeSelector.LabelSelector.MatchExpressions { + selector := corev1.NodeSelectorRequirement{ + Key: item.Key, + Operator: corev1.NodeSelectorOperator(item.Operator), + Values: item.Values, + } + nodeSelectorTerm.MatchExpressions = append(nodeSelectorTerm.MatchExpressions, selector) + } + } + nodeSelectorTerms = append(nodeSelectorTerms, nodeSelectorTerm) + + if affinity == nil { + cpAffinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: nodeSelectorTerms, + }, + }, + } + } else { + cpAffinity = affinity.DeepCopy() + if cpAffinity.NodeAffinity == nil { + cpAffinity.NodeAffinity = &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: nodeSelectorTerms, + }, + } + } else if cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { + cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = &corev1.NodeSelector{ + NodeSelectorTerms: nodeSelectorTerms, + } + } else if cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms == nil { + cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeSelectorTerms + } else { + cpAffinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = nodeSelectorTerms + } + } + return cpAffinity +} + +func FitPod(pod *corev1.Pod, ignoreLabels []string, leafMode clustertreeutil.LeafMode, nodeSelector kosmosv1alpha1.NodeSelector) *corev1.Pod { vols := []corev1.Volume{} for _, v := range pod.Spec.Volumes { if strings.HasPrefix(v.Name, "default-token") { @@ -170,10 +229,14 @@ func FitPod(pod *corev1.Pod, ignoreLabels []string, cleanNodeName bool) *corev1. podCopy.Spec.SchedulerName = "" } - if cleanNodeName { + if leafMode != clustertreeutil.Node { podCopy.Spec.NodeName = "" } + if leafMode == clustertreeutil.Party { + podCopy.Spec.Affinity = fitNodeAffinity(pod.Spec.Affinity, nodeSelector) + } + tripped := FitLabels(podCopy.ObjectMeta.Labels, ignoreLabels) if tripped != nil { trippedStr, err := json.Marshal(tripped) @@ -240,7 +303,7 @@ func FitLabels(labels map[string]string, ignoreLabels []string) map[string]strin return trippedLabels } -func GetUpdatedPod(orig, update *corev1.Pod, ignoreLabels []string) { +func GetUpdatedPod(orig, update *corev1.Pod, ignoreLabels []string, leafMode clustertreeutil.LeafMode, nodeSelector kosmosv1alpha1.NodeSelector) { for i := range orig.Spec.InitContainers { orig.Spec.InitContainers[i].Image = update.Spec.InitContainers[i].Image } @@ -265,6 +328,10 @@ func GetUpdatedPod(orig, update *corev1.Pod, ignoreLabels []string) { if orig.Labels != nil { FitLabels(orig.ObjectMeta.Labels, ignoreLabels) } + + if leafMode == clustertreeutil.Party { + orig.Spec.Affinity = fitNodeAffinity(update.Spec.Affinity, nodeSelector) + } } func ConvertAnnotations(annotation map[string]string) *utils.ClustersNodeSelection { diff --git a/pkg/utils/resources.go b/pkg/utils/resources.go index 74ba65e29..ea53b1d65 100644 --- a/pkg/utils/resources.go +++ b/pkg/utils/resources.go @@ -52,26 +52,28 @@ func SubResourceList(base, list corev1.ResourceList) { // lifted from https://github.com/kubernetes/kubernetes/blob/v1.21.8/staging/src/k8s.io/kubectl/pkg/describe/describe.go#L4051 func GetPodsTotalRequestsAndLimits(podList *corev1.PodList) (reqs corev1.ResourceList, limits corev1.ResourceList) { reqs, limits = corev1.ResourceList{}, corev1.ResourceList{} - for _, p := range podList.Items { - pod := p - if IsVirtualPod(&pod) { - continue - } - podReqs, podLimits := v1resource.PodRequestsAndLimits(&pod) - for podReqName, podReqValue := range podReqs { - if value, ok := reqs[podReqName]; !ok { - reqs[podReqName] = podReqValue.DeepCopy() - } else { - value.Add(podReqValue) - reqs[podReqName] = value + if podList.Items != nil { + for _, p := range podList.Items { + pod := p + if IsVirtualPod(&pod) { + continue } - } - for podLimitName, podLimitValue := range podLimits { - if value, ok := limits[podLimitName]; !ok { - limits[podLimitName] = podLimitValue.DeepCopy() - } else { - value.Add(podLimitValue) - limits[podLimitName] = value + podReqs, podLimits := v1resource.PodRequestsAndLimits(&pod) + for podReqName, podReqValue := range podReqs { + if value, ok := reqs[podReqName]; !ok { + reqs[podReqName] = podReqValue.DeepCopy() + } else { + value.Add(podReqValue) + reqs[podReqName] = value + } + } + for podLimitName, podLimitValue := range podLimits { + if value, ok := limits[podLimitName]; !ok { + limits[podLimitName] = podLimitValue.DeepCopy() + } else { + value.Add(podLimitValue) + limits[podLimitName] = value + } } } } diff --git a/test/e2e/elector_test.go b/test/e2e/elector_test.go index 9c34e626f..eb35cbc48 100644 --- a/test/e2e/elector_test.go +++ b/test/e2e/elector_test.go @@ -15,7 +15,7 @@ var _ = ginkgo.Describe("elector testing", func() { ginkgo.Context("gateway role add test", func() { ginkgo.It("Check if gateway role gateway role is set correctly", func() { gomega.Eventually(func(g gomega.Gomega) (bool, error) { - clusterNodes, err := clusterLinkClient.KosmosV1alpha1().ClusterNodes().List(context.TODO(), metav1.ListOptions{}) + clusterNodes, err := hostClusterLinkClient.KosmosV1alpha1().ClusterNodes().List(context.TODO(), metav1.ListOptions{}) if err != nil { return false, err } diff --git a/test/e2e/framework/cluster.go b/test/e2e/framework/cluster.go index 4a5a8b6ab..da11c1d20 100644 --- a/test/e2e/framework/cluster.go +++ b/test/e2e/framework/cluster.go @@ -7,22 +7,78 @@ import ( "log" "os/exec" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" "github.com/kosmos.io/kosmos/hack/projectpath" - clusterlinkv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" ) -func FetchClusters(client versioned.Interface) ([]clusterlinkv1alpha1.Cluster, error) { +func FetchClusters(client versioned.Interface) ([]kosmosv1alpha1.Cluster, error) { clusters, err := client.KosmosV1alpha1().Clusters().List(context.TODO(), metav1.ListOptions{}) if err != nil { return nil, err } - return clusters.Items, nil + return clusters.DeepCopy().Items, nil +} + +func CreateClusters(client versioned.Interface, cluster *kosmosv1alpha1.Cluster) (err error) { + _, err = client.KosmosV1alpha1().Clusters().Create(context.TODO(), cluster, metav1.CreateOptions{}) + if err != nil { + return err + } + return nil +} + +func DeleteClusters(client versioned.Interface, clustername string) (err error) { + err = client.KosmosV1alpha1().Clusters().Delete(context.TODO(), clustername, metav1.DeleteOptions{}) + if err != nil { + return err + } + return nil +} + +func FetchNodes(client kubernetes.Interface) ([]corev1.Node, error) { + nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, err + } + return nodes.DeepCopy().Items, nil +} +func DeleteNode(client kubernetes.Interface, node string) (err error) { + err = client.CoreV1().Nodes().Delete(context.TODO(), node, metav1.DeleteOptions{}) + if err != nil { + return err + } + return nil +} + +func UpdateNodeLabels(client kubernetes.Interface, node corev1.Node) (err error) { + _, err = client.CoreV1().Nodes().Update(context.TODO(), &node, metav1.UpdateOptions{}) + if err != nil { + return err + } + return nil +} + +func WaitNodePresentOnCluster(client kubernetes.Interface, node string) { + ginkgo.By(fmt.Sprintf("Waiting for node(%v) on host cluster", node), func() { + gomega.Eventually(func() bool { + _, err := client.CoreV1().Nodes().Get(context.TODO(), node, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Failed to get node(%v) on host cluster", node, err) + return false + } + return true + }, PollTimeout, PollInterval).Should(gomega.Equal(true)) + }) } func LoadRESTClientConfig(kubeconfig string, context string) (*rest.Config, error) { diff --git a/test/e2e/framework/deployment_sample.go b/test/e2e/framework/deployment_sample.go new file mode 100644 index 000000000..37034f0c7 --- /dev/null +++ b/test/e2e/framework/deployment_sample.go @@ -0,0 +1,169 @@ +package framework + +import ( + "context" + "fmt" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +const ( + // PollInterval defines the interval time for a poll operation. + PollInterval = 15 * time.Second + + // PollTimeout defines the time after which the poll operation times out. + PollTimeout = 180 * time.Second +) + +func NewDeployment(namespace, name string, replicas *int32, nodes []string) *appsv1.Deployment { + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + + Spec: appsv1.DeploymentSpec{ + Replicas: replicas, + + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": name, + }, + }, + + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": name, + }, + }, + + Spec: corev1.PodSpec{ + Tolerations: []corev1.Toleration{ + { + Key: "kosmos.io/node", + Operator: corev1.TolerationOpEqual, + Value: "true", + Effect: corev1.TaintEffectNoSchedule, + }, + }, + + HostNetwork: true, + + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "kubernetes.io/hostname", + Operator: corev1.NodeSelectorOpIn, + Values: nodes, + }, + }, + }, + }, + }, + }, + }, + + Containers: []corev1.Container{ + { + Name: "nginx-container", + Image: "registry.paas/cmss/nginx:1.14.2", + + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + Protocol: "TCP", + }, + }, + + Resources: corev1.ResourceRequirements{ + Limits: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("100m"), + }, + }, + }, + }, + }, + }, + }, + } +} + +func CreateDeployment(client kubernetes.Interface, deployment *appsv1.Deployment) { + ginkgo.By(fmt.Sprintf("Creating Deployment(%s/%s)", deployment.Namespace, deployment.Name), func() { + _, err := client.AppsV1().Deployments(deployment.Namespace).Create(context.TODO(), deployment, metav1.CreateOptions{}) + if err != nil { + klog.Errorf("create deployment occur error :", err) + gomega.Expect(apierrors.IsAlreadyExists(err)).Should(gomega.Equal(true)) + } + }) +} + +func WaitDeploymentPresentOnCluster(client kubernetes.Interface, namespace, name, cluster string) { + ginkgo.By(fmt.Sprintf("Waiting for deployment(%v/%v) on cluster(%v)", namespace, name, cluster), func() { + gomega.Eventually(func() bool { + _, err := client.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Failed to get deployment(%s/%s) on cluster(%s), err: %v", namespace, name, cluster, err) + return false + } + return true + }, PollTimeout, PollInterval).Should(gomega.Equal(true)) + }) +} + +func RemoveDeploymentOnCluster(client kubernetes.Interface, namespace, name string) { + ginkgo.By(fmt.Sprintf("Removing Deployment(%s/%s)", namespace, name), func() { + err := client.AppsV1().Deployments(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) + if err == nil || apierrors.IsNotFound(err) { + return + } + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + }) +} + +func HasElement(str string, strs []string) bool { + for _, e := range strs { + if e == str { + return true + } + } + return false +} + +func WaitPodPresentOnCluster(client kubernetes.Interface, namespace, cluster string, nodes []string, opt metav1.ListOptions) { + ginkgo.By(fmt.Sprintf("Waiting for pod of the deployment on cluster(%v)", cluster), func() { + gomega.Eventually(func() bool { + pods, err := client.CoreV1().Pods(namespace).List(context.TODO(), opt) + if err != nil { + klog.Errorf("Failed to get pod on cluster(%s), err: %v", cluster, err) + return false + } + + for _, pod := range pods.Items { + if HasElement(pod.Spec.NodeName, nodes) { + return true + } + } + return false + }, PollTimeout, PollInterval).Should(gomega.Equal(true)) + }) +} diff --git a/test/e2e/leaf_node_test.go b/test/e2e/leaf_node_test.go new file mode 100644 index 000000000..2e6c5c12b --- /dev/null +++ b/test/e2e/leaf_node_test.go @@ -0,0 +1,269 @@ +// nolint:dupl +package e2e + +import ( + "fmt" + "reflect" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + kosmosv1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/utils" + "github.com/kosmos.io/kosmos/test/e2e/framework" +) + +const ( + ONE2CLUSTER = "-one2cluster" + ONE2NODE = "-one2node" + ONE2PARTY = "-one2party" +) + +var ( + one2Cluster *kosmosv1alpha1.Cluster + one2Node *kosmosv1alpha1.Cluster + one2Party *kosmosv1alpha1.Cluster + partyNodeNames []string + memberNodeNames []string +) + +var _ = ginkgo.Describe("Test leaf node mode -- one2cluster, one2node, one2party", func() { + ginkgo.BeforeEach(func() { + clusters, err := framework.FetchClusters(hostClusterLinkClient) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(clusters).ShouldNot(gomega.BeEmpty()) + partyNodeNames = make([]string, 0) + memberNodeNames = make([]string, 0) + + for _, cluster := range clusters { + if cluster.Name == "cluster-member1" { + nodes, err := framework.FetchNodes(firstKubeClient) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + cluster.ResourceVersion = "" + + one2Cluster = cluster.DeepCopy() + one2Cluster.Name += ONE2CLUSTER + one2Cluster.Spec.ClusterTreeOptions.Enable = true + one2Cluster.Spec.ClusterTreeOptions.LeafModels = nil + + one2Node = cluster.DeepCopy() + one2Node.Name += ONE2NODE + one2Node.Spec.ClusterTreeOptions.Enable = true + + one2Party = cluster.DeepCopy() + one2Party.Name += ONE2PARTY + + nodeLeafModels := make([]kosmosv1alpha1.LeafModel, 0) + for i, node := range nodes { + if i < 2 { + nodeLabels := node.Labels + if nodeLabels == nil { + nodeLabels = make(map[string]string, 0) + } + nodeLabels["test-leaf-party-mode"] = "yes" + node.SetLabels(nodeLabels) + node.ResourceVersion = "" + err = framework.UpdateNodeLabels(firstKubeClient, node) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + + nodeLeaf := kosmosv1alpha1.LeafModel{ + LeafNodeName: one2Node.Name, + Taints: []corev1.Taint{ + { + Effect: utils.KosmosNodeTaintEffect, + Key: utils.KosmosNodeTaintKey, + Value: utils.KosmosNodeValue, + }, + }, + NodeSelector: kosmosv1alpha1.NodeSelector{ + NodeName: node.Name, + LabelSelector: nil, + }, + } + nodeLeafModels = append(nodeLeafModels, nodeLeaf) + memberNodeNames = append(memberNodeNames, node.Name) + + } + one2Node.Spec.ClusterTreeOptions.LeafModels = nodeLeafModels + + partyLeaf := kosmosv1alpha1.LeafModel{ + LeafNodeName: one2Party.Name, + Taints: []corev1.Taint{ + { + Effect: utils.KosmosNodeTaintEffect, + Key: utils.KosmosNodeTaintKey, + Value: utils.KosmosNodeValue, + }, + }, + NodeSelector: kosmosv1alpha1.NodeSelector{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test-leaf-party-mode": "yes", + }, + }, + }, + } + + partyNodeNames = append(partyNodeNames, fmt.Sprintf("%v%v%v", utils.KosmosNodePrefix, partyLeaf.LeafNodeName, "-0")) + one2Party.Spec.ClusterTreeOptions.LeafModels = []kosmosv1alpha1.LeafModel{partyLeaf} + + break + } + } + }) + + ginkgo.Context("Test one2cluster mode", func() { + var deploy *appsv1.Deployment + ginkgo.BeforeEach(func() { + err := framework.DeleteClusters(hostClusterLinkClient, one2Cluster.Name) + if err != nil && !apierrors.IsNotFound(err) { + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + err = framework.CreateClusters(hostClusterLinkClient, one2Cluster) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + framework.WaitNodePresentOnCluster(hostKubeClient, utils.KosmosNodePrefix+one2Cluster.GetName()) + + }) + + ginkgo.It("Test one2cluster mode", func() { + ginkgo.By("Test one2cluster mode", func() { + nodeNameInRoot := utils.KosmosNodePrefix + one2Cluster.GetName() + nodes := []string{nodeNameInRoot} + deployName := one2Cluster.GetName() + "-nginx" + rp := int32(1) + deploy = framework.NewDeployment(corev1.NamespaceDefault, deployName, &rp, nodes) + framework.RemoveDeploymentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name) + framework.CreateDeployment(hostKubeClient, deploy) + + framework.WaitDeploymentPresentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name, one2Cluster.Name) + + opt := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%v", deployName), + } + framework.WaitPodPresentOnCluster(hostKubeClient, deploy.Namespace, one2Cluster.Name, nodes, opt) + framework.WaitPodPresentOnCluster(firstKubeClient, deploy.Namespace, one2Cluster.Name, memberNodeNames, opt) + }) + }) + ginkgo.AfterEach(func() { + if deploy != nil && !reflect.DeepEqual(deploy, appsv1.Deployment{}) { + framework.RemoveDeploymentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name) + } + + err := framework.DeleteClusters(hostClusterLinkClient, one2Cluster.Name) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + err = framework.DeleteNode(hostKubeClient, utils.KosmosNodePrefix+one2Cluster.GetName()) + if err != nil && !apierrors.IsNotFound(err) { + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + }) + }) + + ginkgo.Context("Test one2node mode", func() { + var deploy *appsv1.Deployment + ginkgo.BeforeEach(func() { + err := framework.DeleteClusters(hostClusterLinkClient, one2Node.Name) + if err != nil && !apierrors.IsNotFound(err) { + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + err = framework.CreateClusters(hostClusterLinkClient, one2Node) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + if len(memberNodeNames) > 0 { + framework.WaitNodePresentOnCluster(hostKubeClient, memberNodeNames[0]) + } + }) + + ginkgo.It("Test one2node mode", func() { + ginkgo.By("Test one2cluster mode", func() { + deployName := one2Node.GetName() + "-nginx" + rp := int32(1) + deploy = framework.NewDeployment(corev1.NamespaceDefault, deployName, &rp, memberNodeNames) + framework.RemoveDeploymentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name) + framework.CreateDeployment(hostKubeClient, deploy) + + framework.WaitDeploymentPresentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name, one2Node.Name) + + opt := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%v", deployName), + } + framework.WaitPodPresentOnCluster(hostKubeClient, deploy.Namespace, one2Node.Name, memberNodeNames, opt) + framework.WaitPodPresentOnCluster(firstKubeClient, deploy.Namespace, one2Node.Name, memberNodeNames, opt) + }) + }) + + ginkgo.AfterEach(func() { + if deploy != nil && !reflect.DeepEqual(deploy, appsv1.Deployment{}) { + framework.RemoveDeploymentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name) + } + + err := framework.DeleteClusters(hostClusterLinkClient, one2Node.Name) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + if len(memberNodeNames) > 0 { + for _, node := range memberNodeNames { + err = framework.DeleteNode(hostKubeClient, node) + if err != nil && !apierrors.IsNotFound(err) { + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + } + } + }) + }) + + ginkgo.Context("Test one2party mode", func() { + var deploy *appsv1.Deployment + ginkgo.BeforeEach(func() { + err := framework.DeleteClusters(hostClusterLinkClient, one2Party.Name) + if err != nil && !apierrors.IsNotFound(err) { + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + err = framework.CreateClusters(hostClusterLinkClient, one2Party) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + if len(partyNodeNames) > 0 { + framework.WaitNodePresentOnCluster(hostKubeClient, partyNodeNames[0]) + } + }) + + ginkgo.It("Test one2party mode", func() { + ginkgo.By("Test one2party mode", func() { + deployName := one2Party.GetName() + "-nginx" + rp := int32(1) + deploy = framework.NewDeployment(corev1.NamespaceDefault, deployName, &rp, partyNodeNames) + framework.RemoveDeploymentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name) + framework.CreateDeployment(hostKubeClient, deploy) + + framework.WaitDeploymentPresentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name, one2Party.Name) + + opt := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("app=%v", deployName), + } + framework.WaitPodPresentOnCluster(hostKubeClient, deploy.Namespace, one2Party.Name, partyNodeNames, opt) + framework.WaitPodPresentOnCluster(firstKubeClient, deploy.Namespace, one2Party.Name, memberNodeNames, opt) + }) + }) + ginkgo.AfterEach(func() { + if deploy != nil && !reflect.DeepEqual(deploy, appsv1.Deployment{}) { + framework.RemoveDeploymentOnCluster(hostKubeClient, deploy.Namespace, deploy.Name) + } + + err := framework.DeleteClusters(hostClusterLinkClient, one2Party.Name) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + if len(partyNodeNames) > 0 { + for _, node := range partyNodeNames { + err = framework.DeleteNode(hostKubeClient, node) + if err != nil && !apierrors.IsNotFound(err) { + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + } + } + }) + }) +}) diff --git a/test/e2e/suit_test.go b/test/e2e/suit_test.go index 3f5fc191a..02cc95cd7 100644 --- a/test/e2e/suit_test.go +++ b/test/e2e/suit_test.go @@ -21,15 +21,20 @@ var ( pollInterval time.Duration // pollTimeout defines the time after which the poll operation times out. pollTimeout time.Duration -) -var ( - kubeconfig string - hostContext string - restConfig *rest.Config - kubeClient kubernetes.Interface - dynamicClient dynamic.Interface - clusterLinkClient versioned.Interface + kubeconfig = os.Getenv("KUBECONFIG") + + // host clusters + hostContext string + hostKubeClient kubernetes.Interface + hostDynamicClient dynamic.Interface + hostClusterLinkClient versioned.Interface + + // first-cluster + firstContext string + firstRestConfig *rest.Config + firstKubeClient kubernetes.Interface + firstDynamicClient dynamic.Interface ) const ( @@ -43,6 +48,7 @@ func init() { flag.DurationVar(&pollInterval, "poll-interval", 5*time.Second, "poll-interval defines the interval time for a poll operation") flag.DurationVar(&pollTimeout, "poll-timeout", 300*time.Second, "poll-timeout defines the time which the poll operation times out") flag.StringVar(&hostContext, "host-context", "kind-cluster-host", "name of the host cluster context in kubeconfig file.") + flag.StringVar(&firstContext, "first-context", "kind-cluster-member1", "name of the first member cluster context in kubeconfig file.") } func TestE2E(t *testing.T) { @@ -53,15 +59,23 @@ func TestE2E(t *testing.T) { var _ = ginkgo.SynchronizedBeforeSuite(func() []byte { return nil }, func(bytes []byte) { - kubeconfig = os.Getenv("KUBECONFIG") + // InitClient Initialize the client connecting to the HOST/FIRST/SECOND cluster gomega.Expect(kubeconfig).ShouldNot(gomega.BeEmpty()) - config, err := framework.LoadRESTClientConfig(kubeconfig, hostContext) + hostRestConfig, err := framework.LoadRESTClientConfig(kubeconfig, hostContext) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - restConfig = config - kubeClient, err = kubernetes.NewForConfig(restConfig) + hostKubeClient, err = kubernetes.NewForConfig(hostRestConfig) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - clusterLinkClient, err = versioned.NewForConfig(restConfig) + hostDynamicClient, err = dynamic.NewForConfig(hostRestConfig) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - dynamicClient, err = dynamic.NewForConfig(restConfig) + hostClusterLinkClient, err = versioned.NewForConfig(hostRestConfig) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + gomega.Expect(kubeconfig).ShouldNot(gomega.BeEmpty()) + firstRestConfig, err = framework.LoadRESTClientConfig(kubeconfig, firstContext) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + firstKubeClient, err = kubernetes.NewForConfig(firstRestConfig) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + firstDynamicClient, err = dynamic.NewForConfig(firstRestConfig) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + })