Skip to content

Commit

Permalink
feat: support to restart components with topology orders (#8076)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei authored Sep 23, 2024
1 parent 46e21b6 commit acdc2a7
Show file tree
Hide file tree
Showing 3 changed files with 271 additions and 38 deletions.
141 changes: 113 additions & 28 deletions pkg/operations/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ package operations

import (
"fmt"
"reflect"
"time"

appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -73,30 +71,38 @@ func (r restartOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Clie
return err
}
r.compOpsHelper = newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList)
componentKindList := []client.ObjectList{
&appv1.StatefulSetList{},
&workloads.InstanceSetList{},
orderedComps, err := r.getComponentOrders(reqCtx, cli, opsRes)
if err != nil {
return err
}
for _, objectList := range componentKindList {
if err := r.restartComponent(reqCtx, cli, opsRes, objectList); err != nil {
return err
}
if len(orderedComps) > 0 {
// will restart components in "ReconcileAction"
return nil
}
return nil
return r.restartComponents(reqCtx, cli, opsRes, opsRes.OpsRequest.Spec.RestartList, false)
}

// ReconcileAction will be performed when action is done and loops till OpsRequest.status.phase is Succeed/Failed.
// the Reconcile function for restart opsRequest.
func (r restartOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) (opsv1alpha1.OpsPhase, time.Duration, error) {
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList)
r.compOpsHelper = newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList)
handleRestartProgress := func(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource,
pgRes *progressResource,
compStatus *opsv1alpha1.OpsRequestComponentStatus) (expectProgressCount int32, completedCount int32, err error) {
return handleComponentStatusProgress(reqCtx, cli, opsRes, pgRes, compStatus, r.podApplyCompOps)
}
return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes,
orderedComps, err := r.getComponentOrders(reqCtx, cli, opsRes)
if err != nil {
return "", 0, err
}
if len(orderedComps) > 0 {
if err = r.restartComponents(reqCtx, cli, opsRes, orderedComps, true); err != nil {
return "", 0, err
}
}
return r.compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes,
"restart", handleRestartProgress)
}

Expand All @@ -114,30 +120,109 @@ func (r restartOpsHandler) podApplyCompOps(
return !pod.CreationTimestamp.Before(&ops.Status.StartTimestamp)
}

// restartStatefulSet restarts statefulSet workload
func (r restartOpsHandler) restartComponent(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource, objList client.ObjectList) error {
if err := cli.List(reqCtx.Ctx, objList,
client.InNamespace(opsRes.Cluster.Namespace),
client.MatchingLabels{constant.AppInstanceLabelKey: opsRes.Cluster.Name}); err != nil {
return err
func (r restartOpsHandler) getComponentOrders(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) ([]opsv1alpha1.ComponentOps, error) {
cd := &appsv1.ClusterDefinition{}
if opsRes.Cluster.Spec.ClusterDef == "" || opsRes.Cluster.Spec.Topology == "" {
return nil, nil
}
if err := cli.Get(reqCtx.Ctx, client.ObjectKey{Name: opsRes.Cluster.Spec.ClusterDef}, cd); err != nil {
return nil, err
}
// components that require sequential restart
var orderedComps []opsv1alpha1.ComponentOps
for _, topology := range cd.Spec.Topologies {
if topology.Name != opsRes.Cluster.Spec.Topology {
continue
}
if topology.Orders != nil && len(topology.Orders.Update) > 0 {
// when using clusterDef and topology, "update orders" includes all components
for _, compName := range topology.Orders.Update {
// get the ordered components to restart
if compOps, ok := r.compOpsHelper.componentOpsSet[compName]; ok {
orderedComps = append(orderedComps, compOps.(opsv1alpha1.ComponentOps))
}
}
}
break
}
return orderedComps, nil
}

items := reflect.ValueOf(objList).Elem().FieldByName("Items")
l := items.Len()
for i := 0; i < l; i++ {
// get the underlying object
object := items.Index(i).Addr().Interface().(client.Object)
template := items.Index(i).FieldByName("Spec").FieldByName("Template").Addr().Interface().(*corev1.PodTemplateSpec)
if r.isRestarted(opsRes, object, template) {
func (r restartOpsHandler) restartComponents(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource, comOpsList []opsv1alpha1.ComponentOps, inOrder bool) error {
for index, compOps := range comOpsList {
if !r.matchToRestart(opsRes, comOpsList, index, inOrder) {
continue
}
if err := cli.Update(reqCtx.Ctx, object); err != nil {
matchingLabels := client.MatchingLabels{constant.AppInstanceLabelKey: opsRes.Cluster.Name}
if opsRes.Cluster.Spec.GetShardingByName(compOps.ComponentName) != nil {
matchingLabels[constant.KBAppShardingNameLabelKey] = compOps.ComponentName
} else {
matchingLabels[constant.KBAppComponentLabelKey] = compOps.ComponentName
}
instanceSetList := &workloads.InstanceSetList{}
if err := cli.List(reqCtx.Ctx, instanceSetList,
client.InNamespace(opsRes.Cluster.Namespace), matchingLabels); err != nil {
return err
}
if len(instanceSetList.Items) == 0 {
return fmt.Errorf(`the instanceSet workloads are not exists for the component "%s"`, compOps.ComponentName)
}
for i := range instanceSetList.Items {
instanceSet := &instanceSetList.Items[i]
if r.isRestarted(opsRes, instanceSet, &instanceSet.Spec.Template) {
continue
}
if err := cli.Update(reqCtx.Ctx, instanceSet); err != nil {
return err
}
}
if inOrder {
// if a component has been restarted in order, break
break
}
}
return nil
}

func (r restartOpsHandler) matchToRestart(opsRes *OpsResource, comOpsList []opsv1alpha1.ComponentOps, index int, inOrder bool) bool {
if !inOrder {
return true
}
compHasRestartCompleted := func(compName string) bool {
if r.getCompReplicas(opsRes.Cluster, compName) == 0 {
return true
}
progressDetails := opsRes.OpsRequest.Status.Components[compName].ProgressDetails
if len(progressDetails) == 0 {
return false
}
for _, v := range progressDetails {
if !isCompletedProgressStatus(v.Status) {
return false
}
}
return true
}
if index > 0 {
if !compHasRestartCompleted(comOpsList[index-1].ComponentName) {
return false
}
}
return !compHasRestartCompleted(comOpsList[index].ComponentName)
}

func (r restartOpsHandler) getCompReplicas(cluster *appsv1.Cluster, compName string) int32 {
compSpec := cluster.Spec.GetComponentByName(compName)
if compSpec != nil {
return compSpec.Replicas
}
shardingSpec := cluster.Spec.GetShardingByName(compName)
if shardingSpec != nil {
return shardingSpec.Template.Replicas
}
return 0
}

// isRestarted checks whether the component has been restarted
func (r restartOpsHandler) isRestarted(opsRes *OpsResource, object client.Object, podTemplate *corev1.PodTemplateSpec) bool {
cName := object.GetLabels()[constant.KBAppComponentLabelKey]
Expand All @@ -156,8 +241,8 @@ func (r restartOpsHandler) isRestarted(opsRes *OpsResource, object client.Object
}
hasRestarted := true
startTimestamp := opsRes.OpsRequest.Status.StartTimestamp
stsRestartTimeStamp := podTemplate.Annotations[constant.RestartAnnotationKey]
if res, _ := time.Parse(time.RFC3339, stsRestartTimeStamp); startTimestamp.After(res) {
workloadRestartTimeStamp := podTemplate.Annotations[constant.RestartAnnotationKey]
if res, _ := time.Parse(time.RFC3339, workloadRestartTimeStamp); startTimestamp.After(res) {
podTemplate.Annotations[constant.RestartAnnotationKey] = startTimestamp.Format(time.RFC3339)
hasRestarted = false
}
Expand Down
96 changes: 87 additions & 9 deletions pkg/operations/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package operations

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

Expand All @@ -28,6 +30,8 @@ import (

appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
opsv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1"
workloads "github.com/apecloud/kubeblocks/apis/workloads/v1"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/generics"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
Expand All @@ -37,9 +41,10 @@ import (
var _ = Describe("Restart OpsRequest", func() {

var (
randomStr = testCtx.GetRandomStr()
compDefName = "test-compdef-" + randomStr
clusterName = "test-cluster-" + randomStr
randomStr = testCtx.GetRandomStr()
compDefName = "test-compdef-" + randomStr
clusterName = "test-cluster-" + randomStr
clusterDefName = "test-clusterdef-" + randomStr
)

cleanEnv := func() {
Expand All @@ -57,6 +62,7 @@ var _ = Describe("Restart OpsRequest", func() {
ml := client.HasLabels{testCtx.TestObjLabelKey}
// namespaced
testapps.ClearResources(&testCtx, generics.OpsRequestSignature, inNS, ml)
testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.InstanceSetSignature, true, inNS, ml)
}

BeforeEach(cleanEnv)
Expand All @@ -71,17 +77,17 @@ var _ = Describe("Restart OpsRequest", func() {
)

BeforeEach(func() {
By("init operations resources ")
opsRes, _, cluster = initOperationsResources(compDefName, clusterName)
reqCtx = intctrlutil.RequestCtx{Ctx: testCtx.Ctx}
})

It("Test restart OpsRequest", func() {
By("init operations resources ")
opsRes, _, cluster = initOperationsResources(compDefName, clusterName)
By("create Restart opsRequest")
opsRes.OpsRequest = createRestartOpsObj(clusterName, "restart-ops-"+randomStr)
mockComponentIsOperating(opsRes.Cluster, appsv1.UpdatingClusterCompPhase, defaultCompName)

By("mock restart OpsRequest is Running")
By("mock restart OpsRequest to Creating")
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsCreatingPhase))
Expand All @@ -91,10 +97,74 @@ var _ = Describe("Restart OpsRequest", func() {
_ = rHandler.Action(reqCtx, k8sClient, opsRes)

_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
})

ExpectCompRestarted := func(opsRequest *opsv1alpha1.OpsRequest, compName string, expectRestarted bool) {
instanceSetName := constant.GenerateWorkloadNamePattern(clusterName, compName)
Eventually(testapps.CheckObj(&testCtx, client.ObjectKey{Name: instanceSetName, Namespace: testCtx.DefaultNamespace},
func(g Gomega, pobj *workloads.InstanceSet) {
startTimestamp := opsRes.OpsRequest.Status.StartTimestamp
workloadRestartTimeStamp := pobj.Spec.Template.Annotations[constant.RestartAnnotationKey]
res, _ := time.Parse(time.RFC3339, workloadRestartTimeStamp)
g.Expect(!startTimestamp.After(res)).Should(Equal(expectRestarted))
})).Should(Succeed())
}

It("Test restart OpsRequest with existing update orders", func() {
By("init operations resources")
opsRes, _, cluster = initOperationsResourcesWithTopology(clusterDefName, compDefName, clusterName)

By("create Restart opsRequest")
opsRes.OpsRequest = createRestartOpsObj(clusterName, "restart-ops-"+randomStr,
defaultCompName, secondaryCompName, thirdCompName)
mockComponentIsOperating(opsRes.Cluster, appsv1.UpdatingClusterCompPhase,
defaultCompName, secondaryCompName, thirdCompName)

By("mock restart OpsRequest to Creating")
_, err := GetOpsManager().Do(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
Eventually(testops.GetOpsRequestPhase(&testCtx, client.ObjectKeyFromObject(opsRes.OpsRequest))).Should(Equal(opsv1alpha1.OpsCreatingPhase))

By("test restart Action")
rHandler := restartOpsHandler{}
_ = rHandler.Action(reqCtx, k8sClient, opsRes)
ExpectCompRestarted(opsRes.OpsRequest, defaultCompName, false)
ExpectCompRestarted(opsRes.OpsRequest, secondaryCompName, false)
ExpectCompRestarted(opsRes.OpsRequest, thirdCompName, false)

By("test reconcile Action")
_, err = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err).ShouldNot(HaveOccurred())
ExpectCompRestarted(opsRes.OpsRequest, defaultCompName, true)
ExpectCompRestarted(opsRes.OpsRequest, secondaryCompName, false)
ExpectCompRestarted(opsRes.OpsRequest, thirdCompName, false)

By("mock restart secondary component completed")
setCompProgress := func(compName string, status opsv1alpha1.ProgressStatus) {
workloadName := constant.GenerateWorkloadNamePattern(clusterName, compName)
opsRes.OpsRequest.Status.Components[compName] = opsv1alpha1.OpsRequestComponentStatus{
ProgressDetails: []opsv1alpha1.ProgressStatusDetail{
{ObjectKey: getProgressObjectKey(constant.PodKind, workloadName+"-0"), Status: status},
{ObjectKey: getProgressObjectKey(constant.PodKind, workloadName+"-1"), Status: status},
{ObjectKey: getProgressObjectKey(constant.PodKind, workloadName+"-2"), Status: status},
},
}
}
setCompProgress(defaultCompName, opsv1alpha1.SucceedProgressStatus)
setCompProgress(secondaryCompName, opsv1alpha1.PendingProgressStatus)

By("test reconcile Action and expect to restart third component")
_, _ = GetOpsManager().Reconcile(reqCtx, k8sClient, opsRes)
Expect(err == nil).Should(BeTrue())
ExpectCompRestarted(opsRes.OpsRequest, defaultCompName, true)
ExpectCompRestarted(opsRes.OpsRequest, secondaryCompName, true)
ExpectCompRestarted(opsRes.OpsRequest, thirdCompName, false)
})

It("expect failed when cluster is stopped", func() {
By("init operations resources ")
opsRes, _, cluster = initOperationsResources(compDefName, clusterName)
By("mock cluster is stopped")
Expect(testapps.ChangeObjStatus(&testCtx, cluster, func() {
cluster.Status.Phase = appsv1.StoppedClusterPhase
Expand All @@ -113,11 +183,19 @@ var _ = Describe("Restart OpsRequest", func() {
})
})

func createRestartOpsObj(clusterName, restartOpsName string) *opsv1alpha1.OpsRequest {
func createRestartOpsObj(clusterName, restartOpsName string, compNames ...string) *opsv1alpha1.OpsRequest {
ops := testops.NewOpsRequestObj(restartOpsName, testCtx.DefaultNamespace,
clusterName, opsv1alpha1.RestartType)
ops.Spec.RestartList = []opsv1alpha1.ComponentOps{
{ComponentName: defaultCompName},
if len(compNames) == 0 {
ops.Spec.RestartList = []opsv1alpha1.ComponentOps{
{ComponentName: defaultCompName},
}
} else {
for _, compName := range compNames {
ops.Spec.RestartList = append(ops.Spec.RestartList, opsv1alpha1.ComponentOps{
ComponentName: compName,
})
}
}
opsRequest := testops.CreateOpsRequest(ctx, testCtx, ops)
opsRequest.Status.Phase = opsv1alpha1.OpsPendingPhase
Expand Down
Loading

0 comments on commit acdc2a7

Please sign in to comment.