diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 971b74b7d..09b73fad1 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -22,6 +22,7 @@ package cluster import ( "context" "fmt" + "slices" "strings" corev1 "k8s.io/api/core/v1" @@ -313,7 +314,7 @@ func (o *ClusterObjects) GetClusterInfo() *ClusterInfo { } primaryComponent := FindClusterComp(o.Cluster, o.ClusterDef.Spec.ComponentDefs[0].Name) - internalEndpoints, externalEndpoints := GetComponentEndpoints(o.Services, primaryComponent) + internalEndpoints, externalEndpoints := GetComponentEndpoints(o.Nodes, o.Services, primaryComponent.Name) if len(internalEndpoints) > 0 { cluster.InternalEP = strings.Join(internalEndpoints, ",") } @@ -325,63 +326,150 @@ func (o *ClusterObjects) GetClusterInfo() *ClusterInfo { func (o *ClusterObjects) GetComponentInfo() []*ComponentInfo { var comps []*ComponentInfo - for _, c := range o.Cluster.Spec.ComponentSpecs { - // get all pods belonging to current component - var pods []corev1.Pod + setComponentInfos := func(compSpec appsv1alpha1.ClusterComponentSpec, + resources corev1.ResourceRequirements, + storages []appsv1alpha1.ClusterComponentVolumeClaimTemplate, + replicas int32, + clusterCompName string, + templateName string, + isSharding bool) { + // get all pods belonging to current component and instance template + var ( + pods []corev1.Pod + // a unique name identifier for component object and labeled with "apps.kubeblocks.io/component-name" + componentName string + ) for _, p := range o.Pods.Items { - if n, ok := p.Labels[constant.KBAppComponentLabelKey]; ok && n == c.Name { - pods = append(pods, p) + if isSharding && p.Labels[constant.KBAppShardingNameLabelKey] != clusterCompName { + continue + } + if !isSharding && p.Labels[constant.KBAppComponentLabelKey] != clusterCompName { + continue } + insTplName := appsv1alpha1.GetInstanceTemplateName(o.Cluster.Name, + p.Labels[constant.KBAppComponentLabelKey], p.Name) + if insTplName != templateName { + continue + } + componentName = p.Labels[constant.KBAppComponentLabelKey] + pods = append(pods, p) } - // current component has no derived pods if len(pods) == 0 { - continue + return } - image := types.None if len(pods) > 0 { - image = pods[0].Spec.Containers[0].Image + var images []string + for _, con := range pods[0].Spec.Containers { + if !slices.Contains(images, con.Image) { + images = append(images, con.Image) + } + } + image = strings.Join(images, "\n") } running, waiting, succeeded, failed := util.GetPodStatus(pods) comp := &ComponentInfo{ - Name: c.Name, - NameSpace: o.Cluster.Namespace, - Type: c.ComponentDefRef, - Cluster: o.Cluster.Name, - Replicas: fmt.Sprintf("%d / %d", c.Replicas, len(pods)), - Status: fmt.Sprintf("%d / %d / %d / %d ", running, waiting, succeeded, failed), - Image: image, - } - comp.CPU, comp.Memory = getResourceInfo(c.Resources.Requests, c.Resources.Limits) - comp.Storage = o.getStorageInfo(&c) + Name: clusterCompName, + InstanceTemplateName: templateName, + NameSpace: o.Cluster.Namespace, + ComponentDef: compSpec.ComponentDef, + Cluster: o.Cluster.Name, + Replicas: fmt.Sprintf("%d / %d", replicas, len(pods)), + Status: fmt.Sprintf("%d / %d / %d / %d ", running, waiting, succeeded, failed), + Image: image, + } + comp.CPU, comp.Memory = getResourceInfo(resources.Requests, resources.Limits) + comp.Storage = o.getStorageInfo(storages, componentName) comps = append(comps, comp) } + buildComponentInfos := func(compSpec appsv1alpha1.ClusterComponentSpec, clusterCompName string, isSharding bool) { + var tplReplicas int32 + for _, ins := range compSpec.Instances { + resources := compSpec.Resources + if ins.Resources != nil { + resources = *ins.Resources + } + vcts := o.getCompTemplateVolumeClaimTemplates(&compSpec, ins) + setComponentInfos(compSpec, resources, vcts, ins.GetReplicas(), clusterCompName, ins.Name, isSharding) + } + setComponentInfos(compSpec, compSpec.Resources, compSpec.VolumeClaimTemplates, + compSpec.Replicas-tplReplicas, clusterCompName, "", isSharding) + } + for _, c := range o.Cluster.Spec.ComponentSpecs { + buildComponentInfos(c, c.Name, false) + } + for _, c := range o.Cluster.Spec.ShardingSpecs { + buildComponentInfos(c.Template, c.Name, true) + } return comps } +// getCompTemplateVolumeClaimTemplates merges volume claim for instance template +func (o *ClusterObjects) getCompTemplateVolumeClaimTemplates(compSpec *appsv1alpha1.ClusterComponentSpec, + template appsv1alpha1.InstanceTemplate) []appsv1alpha1.ClusterComponentVolumeClaimTemplate { + var vcts []appsv1alpha1.ClusterComponentVolumeClaimTemplate + for i := range compSpec.VolumeClaimTemplates { + insVctIndex := -1 + for j := range template.VolumeClaimTemplates { + if template.VolumeClaimTemplates[j].Name == compSpec.VolumeClaimTemplates[i].Name { + insVctIndex = j + break + } + } + if insVctIndex != -1 { + vcts = append(vcts, template.VolumeClaimTemplates[insVctIndex]) + } else { + vcts = append(vcts, compSpec.VolumeClaimTemplates[i]) + } + } + return vcts +} + func (o *ClusterObjects) GetInstanceInfo() []*InstanceInfo { var instances []*InstanceInfo for _, pod := range o.Pods.Items { + componentName := getLabelVal(pod.Labels, constant.KBAppComponentLabelKey) instance := &InstanceInfo{ Name: pod.Name, Namespace: pod.Namespace, Cluster: getLabelVal(pod.Labels, constant.AppInstanceLabelKey), - Component: getLabelVal(pod.Labels, constant.KBAppComponentLabelKey), + Component: componentName, Status: o.getPodPhase(&pod), Role: getLabelVal(pod.Labels, constant.RoleLabelKey), AccessMode: getLabelVal(pod.Labels, constant.ConsensusSetAccessModeLabelKey), CreatedTime: util.TimeFormat(&pod.CreationTimestamp), } - - var component *appsv1alpha1.ClusterComponentSpec - for i, c := range o.Cluster.Spec.ComponentSpecs { - if c.Name == instance.Component { - component = &o.Cluster.Spec.ComponentSpecs[i] + var componentSpec *appsv1alpha1.ClusterComponentSpec + shardingCompName := pod.Labels[constant.KBAppShardingNameLabelKey] + if shardingCompName != "" { + instance.Component = BuildShardingComponentName(shardingCompName, instance.Component) + for i, c := range o.Cluster.Spec.ShardingSpecs { + if c.Name == shardingCompName { + componentSpec = &o.Cluster.Spec.ShardingSpecs[i].Template + } + } + } else { + for i, c := range o.Cluster.Spec.ComponentSpecs { + if c.Name == instance.Component { + componentSpec = &o.Cluster.Spec.ComponentSpecs[i] + } + } + } + templateName := appsv1alpha1.GetInstanceTemplateName(o.Cluster.Name, componentName, pod.Name) + template := appsv1alpha1.InstanceTemplate{} + if templateName != "" { + for _, v := range componentSpec.Instances { + if v.Name == templateName { + template = v + break + } } } - instance.Storage = o.getStorageInfo(component) + vcts := o.getCompTemplateVolumeClaimTemplates(componentSpec, template) + instance.Storage = o.getStorageInfo(vcts, pod.Labels[constant.KBAppComponentLabelKey]) + instance.ServiceVersion = componentSpec.ServiceVersion getInstanceNodeInfo(o.Nodes, &pod, instance) instance.CPU, instance.Memory = getResourceInfo(resource.PodRequestsAndLimits(&pod)) instances = append(instances, instance) @@ -475,8 +563,8 @@ func (o *ClusterObjects) getPodPhase(pod *corev1.Pod) string { return reason } -func (o *ClusterObjects) getStorageInfo(component *appsv1alpha1.ClusterComponentSpec) []StorageInfo { - if component == nil { +func (o *ClusterObjects) getStorageInfo(vcts []appsv1alpha1.ClusterComponentVolumeClaimTemplate, componentName string) []StorageInfo { + if len(vcts) == 0 { return nil } @@ -496,7 +584,7 @@ func (o *ClusterObjects) getStorageInfo(component *appsv1alpha1.ClusterComponent continue } - if labels[constant.KBAppComponentLabelKey] != component.Name { + if labels[constant.KBAppComponentLabelKey] != componentName { continue } @@ -514,7 +602,7 @@ func (o *ClusterObjects) getStorageInfo(component *appsv1alpha1.ClusterComponent } var infos []StorageInfo - for _, vcTpl := range component.VolumeClaimTemplates { + for _, vcTpl := range vcts { s := StorageInfo{ Name: vcTpl.Name, } diff --git a/pkg/cluster/component.go b/pkg/cluster/component.go new file mode 100644 index 000000000..222bdfea7 --- /dev/null +++ b/pkg/cluster/component.go @@ -0,0 +1,116 @@ +/* +Copyright (C) 2022-2024 ApeCloud Co., Ltd + +This file is part of KubeBlocks project + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . +*/ + +package cluster + +import ( + "context" + "fmt" + + appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1" + "github.com/apecloud/kubeblocks/pkg/constant" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + + "github.com/apecloud/kbcli/pkg/types" +) + +type ComponentPair struct { + // a unique name identifier for component object, + // and labeled with "apps.kubeblocks.io/component-name" + ComponentName string + ComponentDefName string + ShardingName string +} + +func ListShardingComponents(dynamic dynamic.Interface, clusterName, clusterNamespace, componentName string) ([]*appsv1alpha1.Component, error) { + unstructuredObjList, err := dynamic.Resource(types.ComponentGVR()).Namespace(clusterNamespace).List(context.TODO(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s=%s", constant.AppInstanceLabelKey, clusterName, constant.KBAppShardingNameLabelKey, componentName), + }) + if err != nil { + return nil, nil + } + var components []*appsv1alpha1.Component + for i := range unstructuredObjList.Items { + comp := &appsv1alpha1.Component{} + if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObjList.Items[i].UnstructuredContent(), comp); err != nil { + return nil, err + } + components = append(components, comp) + } + return components, nil +} + +func BuildShardingComponentName(shardingCompName, componentName string) string { + if shardingCompName == "" { + return componentName + } + return fmt.Sprintf("%s(%s)", shardingCompName, componentName) +} + +func GetCompSpecAndCheckSharding(cluster *appsv1alpha1.Cluster, componentName string) (*appsv1alpha1.ClusterComponentSpec, bool) { + compSpec := cluster.Spec.GetComponentByName(componentName) + if compSpec != nil { + return compSpec, false + } + shardingSpec := cluster.Spec.GetShardingByName(componentName) + if shardingSpec == nil { + return nil, false + } + return &shardingSpec.Template, true +} + +func GetClusterComponentPairs(dynamicClient dynamic.Interface, cluster *appsv1alpha1.Cluster) ([]ComponentPair, error) { + var componentPairs []ComponentPair + for _, compSpec := range cluster.Spec.ComponentSpecs { + componentPairs = append(componentPairs, ComponentPair{ + ComponentName: compSpec.Name, + ComponentDefName: compSpec.ComponentDef, + }) + } + for _, shardingSpec := range cluster.Spec.ShardingSpecs { + shardingComponentPairs, err := GetShardingComponentPairs(dynamicClient, cluster, shardingSpec) + if err != nil { + return nil, err + } + componentPairs = append(componentPairs, shardingComponentPairs...) + } + return componentPairs, nil +} + +func GetShardingComponentPairs(dynamicClient dynamic.Interface, cluster *appsv1alpha1.Cluster, shardingSpec appsv1alpha1.ShardingSpec) ([]ComponentPair, error) { + var componentPairs []ComponentPair + shardingComps, err := ListShardingComponents(dynamicClient, cluster.Name, cluster.Namespace, shardingSpec.Name) + if err != nil { + return nil, err + } + if len(shardingComps) == 0 { + return nil, fmt.Errorf(`cannot find any component objects for sharding component "%s"`, shardingSpec.Name) + } + for i := range shardingComps { + compName := shardingComps[i].Labels[constant.KBAppComponentLabelKey] + componentPairs = append(componentPairs, ComponentPair{ + ComponentName: compName, + ComponentDefName: shardingSpec.Template.ComponentDef, + ShardingName: shardingSpec.Name, + }) + } + return componentPairs, nil +} diff --git a/pkg/cluster/helper.go b/pkg/cluster/helper.go index bd7f354b1..e05450232 100644 --- a/pkg/cluster/helper.go +++ b/pkg/cluster/helper.go @@ -100,7 +100,7 @@ func FindClusterComp(cluster *appsv1alpha1.Cluster, compDefName string) *appsv1a } // GetComponentEndpoints gets component internal and external endpoints -func GetComponentEndpoints(svcList *corev1.ServiceList, c *appsv1alpha1.ClusterComponentSpec) ([]string, []string) { +func GetComponentEndpoints(nodes []*corev1.Node, svcList *corev1.ServiceList, compName string) ([]string, []string) { var ( internalEndpoints []string externalEndpoints []string @@ -114,38 +114,57 @@ func GetComponentEndpoints(svcList *corev1.ServiceList, c *appsv1alpha1.ClusterC return result } - internalSvcs, externalSvcs := GetComponentServices(svcList, c) + getEndpointsWithNodePort := func(ip string, ports []corev1.ServicePort) []string { + var result []string + for _, port := range ports { + result = append(result, fmt.Sprintf("%s:%d", ip, port.NodePort)) + } + return result + } + + internalSvcs, externalSvcs := GetComponentServices(svcList, compName) for _, svc := range internalSvcs { dns := fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, svc.Namespace) internalEndpoints = append(internalEndpoints, getEndpoints(dns, svc.Spec.Ports)...) } for _, svc := range externalSvcs { - externalEndpoints = append(externalEndpoints, getEndpoints(GetExternalAddr(svc), svc.Spec.Ports)...) + endpoints := getEndpoints(GetExternalAddr(svc), svc.Spec.Ports) + if svc.Spec.Type == corev1.ServiceTypeNodePort { + if len(nodes) == 0 { + continue + } + nodeInternal, nodeExternal := GetEndpointsFromNode(nodes[0]) + if nodeExternal != "" { + endpoints = getEndpointsWithNodePort(nodeExternal, svc.Spec.Ports) + } else { + endpoints = getEndpointsWithNodePort(nodeInternal, svc.Spec.Ports) + } + } + externalEndpoints = append(externalEndpoints, endpoints...) } return internalEndpoints, externalEndpoints } // GetComponentServices gets component services -func GetComponentServices(svcList *corev1.ServiceList, c *appsv1alpha1.ClusterComponentSpec) ([]*corev1.Service, []*corev1.Service) { +func GetComponentServices(svcList *corev1.ServiceList, compName string) ([]*corev1.Service, []*corev1.Service) { if svcList == nil { return nil, nil } var internalSvcs, externalSvcs []*corev1.Service for i, svc := range svcList.Items { - if svc.GetLabels()[constant.KBAppComponentLabelKey] != c.Name { + if svc.GetLabels()[constant.KBAppComponentLabelKey] != compName { continue } - var ( internalIP = svc.Spec.ClusterIP externalAddr = GetExternalAddr(&svc) ) - if svc.Spec.Type == corev1.ServiceTypeClusterIP && internalIP != "" && internalIP != "None" { + if internalIP != "" && internalIP != "None" { internalSvcs = append(internalSvcs, &svcList.Items[i]) } - if externalAddr != "" { + if externalAddr != "" || svc.Spec.Type == corev1.ServiceTypeNodePort { externalSvcs = append(externalSvcs, &svcList.Items[i]) } } @@ -400,24 +419,6 @@ func GetConfigConstraintByName(dynamic dynamic.Interface, name string) (*appsv1b return ccObj, nil } -func ListShardingComponents(dynamic dynamic.Interface, clusterName, clusterNamespace, componentName string) ([]*appsv1alpha1.Component, error) { - unstructuredObjList, err := dynamic.Resource(types.ComponentGVR()).Namespace(clusterNamespace).List(context.TODO(), metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s,%s=%s", constant.AppInstanceLabelKey, clusterName, constant.KBAppShardingNameLabelKey, componentName), - }) - if err != nil { - return nil, nil - } - var components []*appsv1alpha1.Component - for i := range unstructuredObjList.Items { - comp := &appsv1alpha1.Component{} - if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObjList.Items[i].UnstructuredContent(), comp); err != nil { - return nil, err - } - components = append(components, comp) - } - return components, nil -} - func GetServiceRefs(cd *appsv1alpha1.ClusterDefinition) []string { var serviceRefs []string for _, compDef := range cd.Spec.ComponentDefs { @@ -441,29 +442,6 @@ func GetDefaultServiceRef(cd *appsv1alpha1.ClusterDefinition) (string, error) { return serviceRefs[0], nil } -func GetDefaultVersionByCompDefs(dynamic dynamic.Interface, compDefs []string) (string, error) { - cv := "" - if compDefs == nil { - return "", fmt.Errorf("failed to find default cluster version referencing the nil compDefs") - } - for _, compDef := range compDefs { - comp, err := dynamic.Resource(types.CompDefGVR()).Get(context.Background(), compDef, metav1.GetOptions{}) - if err != nil { - return "", fmt.Errorf("fail to get cluster version due to: %s", err.Error()) - } - labels := comp.GetLabels() - kind := labels[constant.AppNameLabelKey] - version := labels[constant.AppVersionLabelKey] - // todo: fix cv like: mongodb-sharding-5.0, ac-mysql-8.0.30-auditlog - if cv == "" { - cv = fmt.Sprintf("%s-%s", kind, version) - } else if cv != fmt.Sprintf("%s-%s", kind, version) { - return "", fmt.Errorf("can't get the same cluster version by component definition:[%s]", strings.Join(compDefs, ",")) - } - } - return cv, nil -} - func GetReadyNodeForNodePort(client *kubernetes.Clientset) (*corev1.Node, error) { nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ Limit: 20, @@ -487,3 +465,22 @@ func GetReadyNodeForNodePort(client *kubernetes.Clientset) (*corev1.Node, error) } return readyNode, nil } + +func GetEndpointsFromNode(node *corev1.Node) (string, string) { + var ( + internal string + external string + ) + if node == nil { + return internal, external + } + for _, add := range node.Status.Addresses { + if add.Type == corev1.NodeInternalDNS || add.Type == corev1.NodeInternalIP { + internal = add.Address + } + if add.Type == corev1.NodeExternalDNS || add.Type == corev1.NodeExternalIP { + external = add.Address + } + } + return internal, external +} diff --git a/pkg/cluster/helper_test.go b/pkg/cluster/helper_test.go index 606201a01..13a6bb9b8 100644 --- a/pkg/cluster/helper_test.go +++ b/pkg/cluster/helper_test.go @@ -51,7 +51,7 @@ var _ = Describe("helper", func() { It("get cluster endpoints", func() { cluster := testing.FakeCluster("test", "test") svcs := testing.FakeServices() - internalEPs, externalEPs := GetComponentEndpoints(svcs, &cluster.Spec.ComponentSpecs[0]) + internalEPs, externalEPs := GetComponentEndpoints(nil, svcs, cluster.Spec.ComponentSpecs[0].Name) Expect(len(internalEPs)).Should(Equal(3)) Expect(len(externalEPs)).Should(Equal(1)) }) diff --git a/pkg/cluster/printer.go b/pkg/cluster/printer.go index de9f71ef6..74c41ab92 100644 --- a/pkg/cluster/printer.go +++ b/pkg/cluster/printer.go @@ -147,7 +147,7 @@ func AddLabelRow(tbl *printer.TablePrinter, objs *ClusterObjects, opt *PrinterOp func AddComponentRow(tbl *printer.TablePrinter, objs *ClusterObjects, opt *PrinterOptions) { components := objs.GetComponentInfo() for _, c := range components { - tbl.AddRow(c.Name, c.NameSpace, c.Cluster, c.Type, c.Image) + tbl.AddRow(c.Name, c.NameSpace, c.Cluster, c.ComponentDef, c.Image) } } diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index b409b44d4..ae1866154 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -62,16 +62,18 @@ type ClusterInfo struct { } type ComponentInfo struct { - Name string `json:"name,omitempty"` - NameSpace string `json:"nameSpace,omitempty"` - Type string `json:"type,omitempty"` - Cluster string `json:"cluster,omitempty"` - Status string `json:"status,omitempty"` - Replicas string `json:"replicas,omitempty"` - CPU string `json:"cpu,omitempty"` - Memory string `json:"memory,omitempty"` - Image string `json:"image,omitempty"` - Storage []StorageInfo + // component name in cluster.spec + Name string `json:"name,omitempty"` + InstanceTemplateName string `json:"instanceTemplateName,omitempty"` + NameSpace string `json:"nameSpace,omitempty"` + ComponentDef string `json:"type,omitempty"` + Cluster string `json:"cluster,omitempty"` + Status string `json:"status,omitempty"` + Replicas string `json:"replicas,omitempty"` + CPU string `json:"cpu,omitempty"` + Memory string `json:"memory,omitempty"` + Image string `json:"image,omitempty"` + Storage []StorageInfo } type StorageInfo struct { @@ -82,18 +84,19 @@ type StorageInfo struct { } type InstanceInfo struct { - Name string `json:"name,omitempty"` - Namespace string `json:"namespace,omitempty"` - Cluster string `json:"cluster,omitempty"` - Component string `json:"component,omitempty"` - Status string `json:"status,omitempty"` - Role string `json:"role,omitempty"` - AccessMode string `json:"accessMode,omitempty"` - AZ string `json:"az,omitempty"` - Region string `json:"region,omitempty"` - CPU string `json:"cpu,omitempty"` - Memory string `json:"memory,omitempty"` - Storage []StorageInfo - Node string `json:"node,omitempty"` - CreatedTime string `json:"age,omitempty"` + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + Cluster string `json:"cluster,omitempty"` + Component string `json:"component,omitempty"` + Status string `json:"status,omitempty"` + Role string `json:"role,omitempty"` + AccessMode string `json:"accessMode,omitempty"` + AZ string `json:"az,omitempty"` + Region string `json:"region,omitempty"` + CPU string `json:"cpu,omitempty"` + Memory string `json:"memory,omitempty"` + Storage []StorageInfo + Node string `json:"node,omitempty"` + CreatedTime string `json:"age,omitempty"` + ServiceVersion string `json:"serviceVersion,omitempty"` } diff --git a/pkg/cmd/cluster/connect.go b/pkg/cmd/cluster/connect.go index 106202020..e4370a1dc 100644 --- a/pkg/cmd/cluster/connect.go +++ b/pkg/cmd/cluster/connect.go @@ -59,21 +59,24 @@ var connectExample = templates.Examples(` kbcli cluster connect mycluster --client=cli`) type componentAccount struct { + // unique name identifier for component object componentName string secretName string username string } type ConnectOptions struct { - clusterName string - componentName string - targetCluster *appsv1alpha1.Cluster - clientType string - serviceKind string - node *corev1.Node - services []corev1.Service - needGetReadyNode bool - accounts []componentAccount + clusterName string + // componentName in cluster.spec + clusterComponentName string + targetCluster *appsv1alpha1.Cluster + clientType string + serviceKind string + node *corev1.Node + services []corev1.Service + needGetReadyNode bool + accounts []componentAccount + shardingCompMap map[string]string forwardSVC string forwardPort string @@ -82,7 +85,7 @@ type ConnectOptions struct { // NewConnectCmd returns the cmd of connecting to a cluster func NewConnectCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command { - o := &ConnectOptions{ExecOptions: action.NewExecOptions(f, streams)} + o := &ConnectOptions{ExecOptions: action.NewExecOptions(f, streams), shardingCompMap: map[string]string{}} cmd := &cobra.Command{ Use: "connect (NAME | -i INSTANCE-NAME)", Short: "Connect to a cluster or instance.", @@ -95,7 +98,7 @@ func NewConnectCmd(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra }, } cmd.Flags().StringVarP(&o.PodName, "instance", "i", "", "The instance name to connect.") - flags.AddComponentFlag(f, cmd, &o.componentName, "The component to connect. If not specified and no any cluster scope services, pick up the first one.") + flags.AddComponentFlag(f, cmd, &o.clusterComponentName, "The component to connect. If not specified and no any cluster scope services, pick up the first one.") cmd.Flags().StringVar(&o.clientType, "client", "", "Which client connection example should be output.") util.CheckErr(cmd.RegisterFlagCompletionFunc("client", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { var types []string @@ -139,7 +142,7 @@ func (o *ConnectOptions) Validate(args []string) error { if len(args) > 0 { return fmt.Errorf("specify either cluster name or instance name, they are exclusive") } - if len(o.componentName) > 0 { + if len(o.clusterComponentName) > 0 { return fmt.Errorf("component name is valid only when cluster name is specified") } } else if len(args) == 0 { @@ -173,25 +176,10 @@ func (o *ConnectOptions) Complete() error { return nil } -func (o *ConnectOptions) appendRealCompNamesWithSharding(realComponentNames *[]string, shardingCompName string) error { - shardingComps, err := cluster.ListShardingComponents(o.Dynamic, o.clusterName, o.Namespace, shardingCompName) - if err != nil { - return err - } - if len(shardingComps) == 0 { - return fmt.Errorf(`cannot find any component objects for sharding component "%s"`, shardingCompName) - } - for i := range shardingComps { - *realComponentNames = append(*realComponentNames, shardingComps[i].Labels[constant.KBAppComponentLabelKey]) - } - return nil -} - func (o *ConnectOptions) getConnectionInfo() error { var ( - realComponentNames []string - componentDefName string - getter = cluster.ObjectsGetter{ + componentPairs []cluster.ComponentPair + getter = cluster.ObjectsGetter{ Client: o.Client, Dynamic: o.Dynamic, Name: o.clusterName, @@ -205,7 +193,7 @@ func (o *ConnectOptions) getConnectionInfo() error { if err != nil { return err } - if o.PodName == "" && o.componentName == "" && len(o.targetCluster.Spec.Services) > 0 { + if o.PodName == "" && o.clusterComponentName == "" && len(o.targetCluster.Spec.Services) > 0 { // only get cluster connection info with cluster service return o.getConnectInfoWithClusterService(objs.Services) } @@ -221,59 +209,67 @@ func (o *ConnectOptions) getConnectionInfo() error { matchSVCFunc = func(svc corev1.Service, compName string) bool { return svc.Spec.Selector[constant.KBAppPodNameLabelKey] == o.PodName } - realComponentNames = append(realComponentNames, o.Pod.Labels[constant.KBAppComponentLabelKey]) - componentDefName = o.Pod.Labels[constant.ComponentDefinitionLabelKey] - case o.componentName != "": - shardingSpec := o.targetCluster.Spec.GetShardingByName(o.componentName) - if shardingSpec != nil { - if err = o.appendRealCompNamesWithSharding(&realComponentNames, o.componentName); err != nil { + componentPairs = append(componentPairs, cluster.ComponentPair{ + ComponentName: o.Pod.Labels[constant.KBAppComponentLabelKey], + ComponentDefName: o.Pod.Labels[constant.ComponentDefinitionLabelKey], + ShardingName: o.Pod.Labels[constant.KBAppShardingNameLabelKey], + }) + case o.clusterComponentName != "": + compSpec, isSharding := cluster.GetCompSpecAndCheckSharding(o.targetCluster, o.clusterComponentName) + if compSpec == nil { + return fmt.Errorf(`cannot found the component "%s" in the cluster "%s"`, o.clusterComponentName, o.clusterName) + } + if isSharding { + if componentPairs, err = cluster.GetShardingComponentPairs(o.Dynamic, o.targetCluster, appsv1alpha1.ShardingSpec{ + Name: o.clusterComponentName, + Template: *compSpec, + }); err != nil { return err } - componentDefName = shardingSpec.Template.ComponentDef } else { - compSpec := o.targetCluster.Spec.GetComponentByName(o.componentName) - if compSpec == nil { - return fmt.Errorf(`cannot found the component "%s" in the cluster "%s"`, o.componentName, o.clusterName) - } - componentDefName = compSpec.ComponentDef - realComponentNames = append(realComponentNames, o.componentName) + componentPairs = append(componentPairs, cluster.ComponentPair{ + ComponentName: compSpec.Name, + ComponentDefName: compSpec.ComponentDef, + }) } default: - // 2. get first component services - switch { - case len(o.targetCluster.Spec.ComponentSpecs) > 0: - compSpec := o.targetCluster.Spec.ComponentSpecs[0] - realComponentNames = append(realComponentNames, compSpec.Name) - componentDefName = compSpec.ComponentDef - case len(o.targetCluster.Spec.ShardingSpecs) > 0: - shardingSpec := o.targetCluster.Spec.ShardingSpecs[0] - if err = o.appendRealCompNamesWithSharding(&realComponentNames, shardingSpec.Name); err != nil { - return err - } - componentDefName = shardingSpec.Template.ComponentDef - default: - return fmt.Errorf(`cannot found shardingSpecs or componentSpecs in cluster "%s"`, o.clusterName) + // 2. get all component services + componentPairs, err = cluster.GetClusterComponentPairs(o.Dynamic, o.targetCluster) + if err != nil { + return err } } - for _, realCompName := range realComponentNames { - if err = o.getConnectInfoWithPodOrCompService(objs.Services, realCompName, matchSVCFunc); err != nil { + for _, compPair := range componentPairs { + if err = o.getConnectInfoWithCompService(objs.Services, compPair.ComponentName, matchSVCFunc); err != nil { return err } + if err = o.getComponentAccounts(compPair.ComponentDefName, compPair.ComponentName); err != nil { + return err + } + } + o.setShardingCompMap(componentPairs) + return nil +} + +func (o *ConnectOptions) setShardingCompMap(componentPairs []cluster.ComponentPair) { + for _, v := range componentPairs { + if v.ShardingName != "" { + o.shardingCompMap[v.ComponentName] = v.ShardingName + } } - return o.getComponentAccounts(componentDefName, realComponentNames[0]) } -func (o *ConnectOptions) getConnectInfoWithPodOrCompService(services *corev1.ServiceList, realCompName string, match func(svc corev1.Service, realCompName string) bool) error { +func (o *ConnectOptions) getConnectInfoWithCompService(services *corev1.ServiceList, componentName string, match func(svc corev1.Service, componentName string) bool) error { needGetHeadlessSVC := true for i := range services.Items { svc := services.Items[i] - if match(svc, realCompName) { + if match(svc, componentName) { needGetHeadlessSVC = false o.appendService(svc) } } if needGetHeadlessSVC { - return o.getCompHeadlessSVC(realCompName) + return o.getCompHeadlessSVC(componentName) } return nil } @@ -285,10 +281,13 @@ func (o *ConnectOptions) appendService(svc corev1.Service) { } } -func (o *ConnectOptions) getCompHeadlessSVC(realCompName string) error { +func (o *ConnectOptions) getCompHeadlessSVC(componentName string) error { headlessSVC := &corev1.Service{} - headlessSVCName := constant.GenerateDefaultComponentHeadlessServiceName(o.clusterName, realCompName) - if err := util.GetResourceObjectFromGVR(types.ServiceGVR(), client.ObjectKey{Namespace: o.Namespace, Name: headlessSVCName}, o.Dynamic, headlessSVC); client.IgnoreNotFound(err) != nil { + headlessSVCName := constant.GenerateDefaultComponentHeadlessServiceName(o.clusterName, componentName) + if err := util.GetResourceObjectFromGVR(types.ServiceGVR(), client.ObjectKey{Namespace: o.Namespace, Name: headlessSVCName}, o.Dynamic, headlessSVC); err != nil { + if strings.Contains(err.Error(), "not found") { + return nil + } return err } if headlessSVC.Name != "" { @@ -299,7 +298,7 @@ func (o *ConnectOptions) getCompHeadlessSVC(realCompName string) error { func (o *ConnectOptions) getConnectInfoWithClusterService(services *corev1.ServiceList) error { // key: compName value: isSharding - componentMap := map[string]bool{} + clusterComponentMap := map[string]bool{} for _, v := range o.targetCluster.Spec.Services { svcName := constant.GenerateClusterServiceName(o.clusterName, v.ServiceName) for i := range services.Items { @@ -309,48 +308,48 @@ func (o *ConnectOptions) getConnectInfoWithClusterService(services *corev1.Servi } o.appendService(svc) if v.ComponentSelector != "" { - componentMap[v.ComponentSelector] = false + clusterComponentMap[v.ComponentSelector] = false } else if v.ShardingSelector != "" { - componentMap[v.ShardingSelector] = true + clusterComponentMap[v.ShardingSelector] = true } break } } - for compName, isSharding := range componentMap { + for clusterCompName, isSharding := range clusterComponentMap { var ( - compDefName string - realCompName string + compDefName string + componentName string ) if isSharding { - shardingSpec := o.targetCluster.Spec.GetShardingByName(compName) + shardingSpec := o.targetCluster.Spec.GetShardingByName(clusterCompName) if shardingSpec == nil { continue } - shardingComps, err := cluster.ListShardingComponents(o.Dynamic, o.clusterName, o.Namespace, compName) + shardingComps, err := cluster.ListShardingComponents(o.Dynamic, o.clusterName, o.Namespace, clusterCompName) if err != nil { return err } if len(shardingComps) == 0 { - return fmt.Errorf(`cannot find any component objects for sharding component "%s"`, compName) + return fmt.Errorf(`cannot find any component objects for sharding component "%s"`, clusterCompName) } - realCompName = shardingComps[0].Labels[constant.KBAppComponentLabelKey] + componentName = shardingComps[0].Labels[constant.KBAppComponentLabelKey] compDefName = shardingSpec.Template.ComponentDef } else { - compSpec := o.targetCluster.Spec.GetComponentByName(compName) + compSpec := o.targetCluster.Spec.GetComponentByName(clusterCompName) if compSpec == nil { continue } - realCompName = compName + componentName = clusterCompName compDefName = compSpec.ComponentDef } - if err := o.getComponentAccounts(compDefName, realCompName); err != nil { + if err := o.getComponentAccounts(compDefName, componentName); err != nil { return err } } return nil } -func (o *ConnectOptions) getComponentAccounts(componentDefName, realCompName string) error { +func (o *ConnectOptions) getComponentAccounts(componentDefName, componentName string) error { compDef, err := cluster.GetComponentDefByName(o.Dynamic, componentDefName) if err != nil { return err @@ -361,30 +360,14 @@ func (o *ConnectOptions) getComponentAccounts(componentDefName, realCompName str continue } o.accounts = append(o.accounts, componentAccount{ - componentName: realCompName, - secretName: constant.GenerateAccountSecretName(o.clusterName, realCompName, v.Name), + componentName: componentName, + secretName: constant.GenerateAccountSecretName(o.clusterName, componentName, v.Name), username: v.Name, }) } return nil } -func (o *ConnectOptions) getEndpointsFromNode() (string, string) { - var ( - internal string - external string - ) - for _, add := range o.node.Status.Addresses { - if add.Type == corev1.NodeInternalDNS || add.Type == corev1.NodeInternalIP { - internal = add.Address - } - if add.Type == corev1.NodeExternalDNS || add.Type == corev1.NodeExternalIP { - external = add.Address - } - } - return internal, external -} - func (o *ConnectOptions) showEndpoints() { tbl := newTbl(o.Out, "", "COMPONENT", "SERVICE-NAME", "TYPE", "PORT", "INTERNAL", "EXTERNAL") for _, svc := range o.services { @@ -393,6 +376,9 @@ func (o *ConnectOptions) showEndpoints() { if compName == "" { compName = svc.Spec.Selector[constant.KBAppComponentLabelKey] } + if shardingName, ok := o.shardingCompMap[compName]; ok { + compName = cluster.BuildShardingComponentName(shardingName, compName) + } internal := fmt.Sprintf("%s.%s.svc.cluster.local", svc.Name, svc.Namespace) if svc.Spec.ClusterIP == corev1.ClusterIPNone { podName := o.PodName @@ -403,7 +389,7 @@ func (o *ConnectOptions) showEndpoints() { } external := cluster.GetExternalAddr(&svc) if svc.Spec.Type == corev1.ServiceTypeNodePort && o.node != nil { - nodeInternal, nodeExternal := o.getEndpointsFromNode() + nodeInternal, nodeExternal := cluster.GetEndpointsFromNode(o.node) for _, p := range svc.Spec.Ports { ports = append(ports, fmt.Sprintf("%s(nodePort: %s)", strconv.Itoa(int(p.Port)), strconv.Itoa(int(p.NodePort)))) @@ -439,7 +425,11 @@ func (o *ConnectOptions) showEndpoints() { func (o *ConnectOptions) showAccounts() { tbl := newTbl(o.Out, "\nAccount Secrets:", "COMPONENT", "SECRET-NAME", "USERNAME", "PASSWORD-KEY") for _, account := range o.accounts { - tbl.AddRow(account.componentName, account.secretName, account.username, "") + compName := account.componentName + if shardingName, ok := o.shardingCompMap[compName]; ok { + compName = cluster.BuildShardingComponentName(shardingName, compName) + } + tbl.AddRow(compName, account.secretName, account.username, "") } tbl.Print() } diff --git a/pkg/cmd/cluster/connect_test.go b/pkg/cmd/cluster/connect_test.go index a8164b812..f8f8ac9c8 100644 --- a/pkg/cmd/cluster/connect_test.go +++ b/pkg/cmd/cluster/connect_test.go @@ -72,7 +72,8 @@ var _ = Describe("connection", func() { } tf.Client = tf.UnstructuredClient - tf.FakeDynamicClient = testing.FakeDynamicClient(cluster, testing.FakeCompDef(), &services.Items[0], &pods.Items[0], &pods.Items[1], &pods.Items[2]) + tf.FakeDynamicClient = testing.FakeDynamicClient(cluster, testing.FakeCompDef(), + &services.Items[0], &pods.Items[0], &pods.Items[1], &pods.Items[2]) streams = genericiooptions.NewTestIOStreamsDiscard() }) @@ -99,14 +100,14 @@ var _ = Describe("connection", func() { // set instance name and cluster name, should fail o.PodName = "test-pod-0" Expect(o.Validate([]string{testing.ClusterName})).Should(HaveOccurred()) - o.componentName = "test-component" + o.clusterComponentName = "test-component" Expect(o.Validate([]string{})).Should(HaveOccurred()) // unset pod name o.PodName = "" Expect(o.Validate([]string{testing.ClusterName})).Should(Succeed()) // unset component name - o.componentName = "" + o.clusterComponentName = "" Expect(o.Validate([]string{testing.ClusterName})).Should(Succeed()) }) @@ -143,11 +144,11 @@ var _ = Describe("connection", func() { o := initOption(nil) Expect(o.runShowExample()).Should(Succeed()) Expect(o.services).Should(HaveLen(4)) - Expect(o.accounts).Should(HaveLen(1)) + Expect(o.accounts).Should(HaveLen(2)) By("specify one component") o = initOption(func(o *ConnectOptions) { - o.componentName = testing.ComponentName + o.clusterComponentName = testing.ComponentName }) Expect(o.runShowExample()).Should(Succeed()) Expect(o.services).Should(HaveLen(4)) diff --git a/pkg/cmd/cluster/describe.go b/pkg/cmd/cluster/describe.go index 5128c55b0..c58c9a6d9 100644 --- a/pkg/cmd/cluster/describe.go +++ b/pkg/cmd/cluster/describe.go @@ -158,7 +158,9 @@ func (o *describeOptions) describeCluster(name string) error { showCluster(o.Cluster, o.Out) // show endpoints - showEndpoints(o.Cluster, o.Services, o.Out) + if err = showEndpoints(o.dynamic, o.Nodes, o.Cluster, o.Services, o.Out); err != nil { + return err + } // topology showTopology(o.ClusterObjects.GetInstanceInfo(), o.Out) @@ -210,31 +212,32 @@ func showCluster(c *appsv1alpha1.Cluster, out io.Writer) { return } title := fmt.Sprintf("Name: %s\t Created Time: %s", c.Name, util.TimeFormat(&c.CreationTimestamp)) - tbl := newTbl(out, title, "NAMESPACE", "CLUSTER-DEFINITION", "VERSION", "STATUS", "TERMINATION-POLICY") - tbl.AddRow(c.Namespace, c.Spec.ClusterDefRef, c.Spec.ClusterVersionRef, string(c.Status.Phase), string(c.Spec.TerminationPolicy)) + tbl := newTbl(out, title, "NAMESPACE", "CLUSTER-DEFINITION", "TOPOLOGY", "STATUS", "TERMINATION-POLICY") + tbl.AddRow(c.Namespace, c.Spec.ClusterDefRef, c.Spec.Topology, string(c.Status.Phase), string(c.Spec.TerminationPolicy)) tbl.Print() } func showTopology(instances []*cluster.InstanceInfo, out io.Writer) { - tbl := newTbl(out, "\nTopology:", "COMPONENT", "INSTANCE", "ROLE", "STATUS", "AZ", "NODE", "CREATED-TIME") + tbl := newTbl(out, "\nTopology:", "COMPONENT", "SERVICE-VERSION", "INSTANCE", "ROLE", "STATUS", "AZ", "NODE", "CREATED-TIME") for _, ins := range instances { - tbl.AddRow(ins.Component, ins.Name, ins.Role, ins.Status, ins.AZ, ins.Node, ins.CreatedTime) + tbl.AddRow(ins.Component, ins.ServiceVersion, ins.Name, ins.Role, ins.Status, ins.AZ, ins.Node, ins.CreatedTime) } tbl.Print() } func showResource(comps []*cluster.ComponentInfo, out io.Writer) { - tbl := newTbl(out, "\nResources Allocation:", "COMPONENT", "DEDICATED", "CPU(REQUEST/LIMIT)", "MEMORY(REQUEST/LIMIT)", "STORAGE-SIZE", "STORAGE-CLASS") + tbl := newTbl(out, "\nResources Allocation:", "COMPONENT", "INSTANCE-TEMPLATE", "CPU(REQUEST/LIMIT)", "MEMORY(REQUEST/LIMIT)", "STORAGE-SIZE", "STORAGE-CLASS") for _, c := range comps { - tbl.AddRow(c.Name, "false", c.CPU, c.Memory, cluster.BuildStorageSize(c.Storage), cluster.BuildStorageClass(c.Storage)) + tbl.AddRow(c.Name, c.InstanceTemplateName, + c.CPU, c.Memory, cluster.BuildStorageSize(c.Storage), cluster.BuildStorageClass(c.Storage)) } tbl.Print() } func showImages(comps []*cluster.ComponentInfo, out io.Writer) { - tbl := newTbl(out, "\nImages:", "COMPONENT", "TYPE", "IMAGE") + tbl := newTbl(out, "\nImages:", "COMPONENT", "COMPONENT-DEFINITION", "IMAGE") for _, c := range comps { - tbl.AddRow(c.Name, c.Type, c.Image) + tbl.AddRow(c.Name, c.ComponentDef, c.Image) } tbl.Print() } @@ -244,21 +247,26 @@ func showEvents(name string, namespace string, out io.Writer) { fmt.Fprintf(out, "\nShow cluster events: kbcli cluster list-events -n %s %s", namespace, name) } -func showEndpoints(c *appsv1alpha1.Cluster, svcList *corev1.ServiceList, out io.Writer) { +func showEndpoints(dynamic dynamic.Interface, nodes []*corev1.Node, c *appsv1alpha1.Cluster, svcList *corev1.ServiceList, out io.Writer) error { if c == nil { - return + return nil } - tbl := newTbl(out, "\nEndpoints:", "COMPONENT", "MODE", "INTERNAL", "EXTERNAL") - for _, comp := range c.Spec.ComponentSpecs { - internalEndpoints, externalEndpoints := cluster.GetComponentEndpoints(svcList, &comp) + tbl := newTbl(out, "\nEndpoints:", "COMPONENT", "INTERNAL", "EXTERNAL") + componentPairs, err := cluster.GetClusterComponentPairs(dynamic, c) + if err != nil { + return err + } + for _, componentPair := range componentPairs { + internalEndpoints, externalEndpoints := cluster.GetComponentEndpoints(nodes, svcList, componentPair.ComponentName) if len(internalEndpoints) == 0 && len(externalEndpoints) == 0 { continue } - tbl.AddRow(comp.Name, "ReadWrite", util.CheckEmpty(strings.Join(internalEndpoints, "\n")), - util.CheckEmpty(strings.Join(externalEndpoints, "\n"))) + tbl.AddRow(cluster.BuildShardingComponentName(componentPair.ShardingName, componentPair.ComponentName), + util.CheckEmpty(strings.Join(internalEndpoints, "\n")), util.CheckEmpty(strings.Join(externalEndpoints, "\n"))) } tbl.Print() + return nil } func showDataProtection(backupPolicies []dpv1alpha1.BackupPolicy, backupSchedules []dpv1alpha1.BackupSchedule, defaultBackupRepo, continuousMethod, recoverableTimeRange string, out io.Writer) { diff --git a/pkg/testing/fake.go b/pkg/testing/fake.go index 3fd3fc17e..132628194 100644 --- a/pkg/testing/fake.go +++ b/pkg/testing/fake.go @@ -159,6 +159,7 @@ func FakeCluster(name, namespace string, conditions ...metav1.Condition) *appsv1 { Name: ComponentName + "-1", ComponentDefRef: ComponentDefName, + ComponentDef: CompDefName, Replicas: replicas, Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{