diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index 8579b2feb6..b6ce13cefa 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -45,6 +45,7 @@ import ( imagejobutilfunc "github.com/openkruise/kruise/pkg/util/imagejob/utilfunction" "github.com/openkruise/kruise/pkg/util/inplaceupdate" "github.com/openkruise/kruise/pkg/util/lifecycle" + "github.com/openkruise/kruise/pkg/util/specifieddelete" ) // Realistic value for maximum in-flight requests when processing in parallel mode. @@ -622,11 +623,17 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods( } } + // handle specified deleted pod under maxUnavailable constrain + // NOTE: specified deletion is not constraint by partition setting + specifiedDeletedPods, err := ssc.handleSpecifiedDeletedPods(set, status, currentRevision, updateRevision, replicas, maxUnavailable, unavailablePods) + if err != nil { + return status, err + } + updateIndexes := sortPodsToUpdate(set.Spec.UpdateStrategy.RollingUpdate, updateRevision.Name, *set.Spec.Replicas, replicas) klog.V(3).InfoS("Prepare to update pods indexes for StatefulSet", "statefulSet", klog.KObj(set), "podIndexes", updateIndexes) // update pods in sequence for _, target := range updateIndexes { - // the target is already up-to-date, go to next if getPodRevision(replicas[target]) == updateRevision.Name { continue @@ -667,22 +674,26 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods( } // delete the Pod if it is not already terminating and does not match the update revision. - if !isTerminating(replicas[target]) { + if !specifiedDeletedPods.Has(replicas[target].Name) && !isTerminating(replicas[target]) { // todo validate in-place for pub inplacing, inplaceUpdateErr := ssc.inPlaceUpdatePod(set, replicas[target], updateRevision, revisions) if inplaceUpdateErr != nil { return status, inplaceUpdateErr } + // if pod is inplacing or actual deleting, decrease revision + revisionNeedDecrease := inplacing if !inplacing { klog.V(2).InfoS("StatefulSet terminating Pod for update", "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target])) - if _, err := ssc.deletePod(set, replicas[target]); err != nil { + if _, actualDeleting, err := ssc.deletePod(set, replicas[target]); err != nil { return status, err + } else { + revisionNeedDecrease = actualDeleting } } // mark target as unavailable because it's updated unavailablePods.Insert(replicas[target].Name) - if getPodRevision(replicas[target]) == currentRevision.Name { + if revisionNeedDecrease && getPodRevision(replicas[target]) == currentRevision.Name { status.CurrentReplicas-- } } @@ -691,22 +702,63 @@ func (ssc *defaultStatefulSetControl) rollingUpdateStatefulsetPods( return status, nil } -func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (bool, error) { +func (ssc *defaultStatefulSetControl) handleSpecifiedDeletedPods( + set *appsv1beta1.StatefulSet, + status *appsv1beta1.StatefulSetStatus, + currentRevision *apps.ControllerRevision, + updateRevision *apps.ControllerRevision, + replicas []*v1.Pod, + maxUnavailable int, + unavailablePods sets.String) (sets.String, error) { + specifiedDeletedPods := sets.NewString() + for target := len(replicas) - 1; target >= 0; target-- { + if replicas[target] == nil || !specifieddelete.IsSpecifiedDelete(replicas[target]) { + continue + } + // the unavailable pods count exceed the maxUnavailable and the target is available, so we can't process it, + // why skip here rather than return? + // case: pod 0 ready, pod1 unready, pod 2 unready, pod3 ready, pod4 ready + // when maxUnavailable = 3, pod4 with specified deleted will be deleted but pod3 can't + // pod 2 and pod 1 can be deleted because they were unavailable + if len(unavailablePods) >= maxUnavailable && !unavailablePods.Has(replicas[target].Name) { + klog.V(4).InfoS("StatefulSet was waiting for unavailable Pods to update, blocked pod", + "statefulSet", klog.KObj(set), "unavailablePods", unavailablePods.List(), "blockedPod", klog.KObj(replicas[target])) + continue + } + + specifiedDeletedPods.Insert(replicas[target].Name) + if _, actualDeleting, err := ssc.deletePod(set, replicas[target]); err != nil { + return specifiedDeletedPods, err + } else if actualDeleting { + // if actual deleted, update revision count in status + if getPodRevision(replicas[target]) == currentRevision.Name { + status.CurrentReplicas-- + } else if getPodRevision(replicas[target]) == updateRevision.Name { + status.UpdatedReplicas-- + } + } + // mark target as unavailable because it's deleting or pre-deleting + unavailablePods.Insert(replicas[target].Name) + } + return specifiedDeletedPods, nil +} + +func (ssc *defaultStatefulSetControl) deletePod(set *appsv1beta1.StatefulSet, pod *v1.Pod) (modified, actualDeleting bool, err error) { if set.Spec.Lifecycle != nil && lifecycle.IsPodHooked(set.Spec.Lifecycle.PreDelete, pod) { markPodNotReady := set.Spec.Lifecycle.PreDelete.MarkPodNotReady if updated, _, err := ssc.lifecycleControl.UpdatePodLifecycle(pod, appspub.LifecycleStatePreparingDelete, markPodNotReady); err != nil { - return false, err + return false, false, err } else if updated { klog.V(3).InfoS("StatefulSet scaling update pod lifecycle to PreparingDelete", "statefulSet", klog.KObj(set), "pod", klog.KObj(pod)) - return true, nil + return true, false, nil } - return false, nil + return false, false, nil } if err := ssc.podControl.DeleteStatefulPod(set, pod); err != nil { ssc.recorder.Eventf(set, v1.EventTypeWarning, "FailedDelete", "failed to delete pod %s: %v", pod.Name, err) - return false, err + return false, false, err } - return true, nil + return true, true, nil } func (ssc *defaultStatefulSetControl) refreshPodState(set *appsv1beta1.StatefulSet, pod *v1.Pod, updateRevision string) (bool, time.Duration, error) { @@ -992,7 +1044,7 @@ func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set logger.V(2).Info("Pod of StatefulSet is terminating for scale down", "statefulSet", klog.KObj(set), "pod", klog.KObj(condemned[i])) - modified, err := ssc.deletePod(set, condemned[i]) + modified, _, err := ssc.deletePod(set, condemned[i]) if err != nil || (monotonic && modified) { return true, err } @@ -1035,7 +1087,7 @@ func (ssc *defaultStatefulSetControl) processReplica( // regardless of the exit code. if isFailed(replicas[i]) || isSucceeded(replicas[i]) { if replicas[i].DeletionTimestamp == nil { - if _, err := ssc.deletePod(set, replicas[i]); err != nil { + if _, _, err := ssc.deletePod(set, replicas[i]); err != nil { return true, false, err } } diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 01a2a4beba..4026004289 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -30,6 +30,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -54,6 +55,7 @@ import ( utilpointer "k8s.io/utils/pointer" appspub "github.com/openkruise/kruise/apis/apps/pub" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" kruisefake "github.com/openkruise/kruise/pkg/client/clientset/versioned/fake" @@ -2215,6 +2217,153 @@ func TestStatefulSetControlRollingUpdateBlockByMaxUnavailable(t *testing.T) { } } +func TestStatefulSetControlRollingUpdateWithSpecifiedDelete(t *testing.T) { + set := burst(newStatefulSet(6)) + var partition int32 = 3 + var maxUnavailable = intstr.FromInt(3) + set.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{ + Type: apps.RollingUpdateStatefulSetStrategyType, + RollingUpdate: func() *appsv1beta1.RollingUpdateStatefulSetStrategy { + return &appsv1beta1.RollingUpdateStatefulSetStrategy{ + Partition: &partition, + MaxUnavailable: &maxUnavailable, + PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType, + } + }(), + } + + client := fake.NewSimpleClientset() + kruiseClient := kruisefake.NewSimpleClientset(set) + spc, _, ssc, stop := setupController(client, kruiseClient) + defer close(stop) + if err := scaleUpStatefulSetControl(set, ssc, spc, assertBurstInvariants); err != nil { + t.Fatal(err) + } + set, err := spc.setsLister.StatefulSets(set.Namespace).Get(set.Name) + if err != nil { + t.Fatal(err) + } + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Fatal(err) + } + + // set pod 0 to specified delete + originalPods, err := spc.setPodSpecifiedDelete(set, 0) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(originalPods)) + + // start to update + set.Spec.Template.Spec.Containers[0].Image = "foo" + + // first update pod 5 only because pod 0 is specified deleted + if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err := spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + // inplace update 5 and create 0 + if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + if len(pods) != 6 { + t.Fatalf("Expected create pods 5, got pods %v", pods) + } + sort.Sort(ascendingOrdinal(pods)) + _, exist := pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, !exist) + // pod 0 is old image and pod 5/4 is new image + assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo") + assert.Equal(t, pods[4].Spec.Containers[0].Image, "foo") + assert.Equal(t, pods[0].Spec.Containers[0].Image, "nginx") + + // set pod 1/2/5 to specified deleted and pod 0/4/5 to ready + spc.setPodSpecifiedDelete(set, 0) + spc.setPodSpecifiedDelete(set, 1) + spc.setPodSpecifiedDelete(set, 2) + for i := 0; i < 6; i++ { + spc.setPodRunning(set, i) + spc.setPodReady(set, i) + } + originalPods, _ = spc.setPodSpecifiedDelete(set, 5) + sort.Sort(ascendingOrdinal(originalPods)) + + // create new pod for 1/2/5, do not update 3 + if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + // create new pods 5 and inplace update 3 + if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(pods)) + if len(pods) != 6 { + t.Fatalf("Expected create pods 5, got pods %v", pods) + } + + _, exist = pods[5].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, !exist) + _, exist = pods[2].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, !exist) + _, exist = pods[1].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, !exist) + // pod 0 still undeleted + _, exist = pods[0].Labels[appsv1alpha1.SpecifiedDeleteKey] + assert.True(t, exist) + assert.Equal(t, pods[5].Spec.Containers[0].Image, "foo") + assert.Equal(t, pods[3].Spec.Containers[0].Image, "nginx") + assert.Equal(t, pods[2].Spec.Containers[0].Image, "nginx") + assert.Equal(t, pods[1].Spec.Containers[0].Image, "nginx") + + // set pod 3 to specified deleted and all pod to ready => pod3 will be deleted and updated + for i := 0; i < 6; i++ { + spc.setPodRunning(set, i) + spc.setPodReady(set, i) + } + originalPods, _ = spc.setPodSpecifiedDelete(set, 3) + sort.Sort(ascendingOrdinal(originalPods)) + // create new pod for 3, do not inplace-update 3 + if err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + + // create new pods 5 and inplace update 3 + if err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil { + t.Fatal(err) + } + pods, err = spc.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Fatal(err) + } + sort.Sort(ascendingOrdinal(pods)) + if len(pods) != 6 { + t.Fatalf("Expected create pods 5, got pods %v", pods) + } + assert.Equal(t, pods[3].Spec.Containers[0].Image, "foo") +} + func TestStatefulSetControlInPlaceUpdate(t *testing.T) { set := burst(newStatefulSet(3)) var partition int32 = 1 @@ -3119,6 +3268,21 @@ func (om *fakeObjectManager) setPodTerminated(set *appsv1beta1.StatefulSet, ordi return om.podsLister.Pods(set.Namespace).List(selector) } +func (om *fakeObjectManager) setPodSpecifiedDelete(set *appsv1beta1.StatefulSet, ordinal int) ([]*v1.Pod, error) { + pod := newStatefulSetPod(set, ordinal) + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + pod.Labels[appsv1alpha1.SpecifiedDeleteKey] = "true" + fakeResourceVersion(pod) + om.podsIndexer.Update(pod) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + return nil, err + } + return om.podsLister.Pods(set.Namespace).List(selector) +} + var _ StatefulPodControlObjectManager = &fakeObjectManager{} type fakeStatefulSetStatusUpdater struct { diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index c9b811fbfe..4f7522116d 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -348,6 +348,41 @@ var _ = SIGDescribe("AppStatefulSetStorage", func() { } validateExpandVCT(2, injectFn, updateFn, true) }) + + framework.ConformanceIt("should perform rolling updates with specified-deleted with pvc", func() { + ginkgo.By("Creating a new StatefulSet") + vms := []v1.VolumeMount{} + for i := 0; i < 1; i++ { + vms = append(vms, v1.VolumeMount{ + Name: fmt.Sprintf("data%d", i), + MountPath: fmt.Sprintf("/data%d", i), + }) + } + ss = framework.NewStatefulSet(ssName, ns, headlessSvcName, 4, vms, nil, labels) + injectSC(appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType, ss, appsv1beta1.OnPodRollingUpdateVolumeClaimUpdateStrategyType, canExpandSC) + + updateFn := func(update *appsv1beta1.StatefulSet) { + update.Spec.VolumeClaimTemplates[0].Spec.Resources.Requests[v1.ResourceStorage] = resource.MustParse("20") + update.Spec.VolumeClaimUpdateStrategy = appsv1beta1.VolumeClaimUpdateStrategy{ + Type: appsv1beta1.OnPodRollingUpdateVolumeClaimUpdateStrategyType, + } + resizeVCT(update, newSize, 2) + } + testWithSpecifiedDeleted(c, kc, ns, ss, updateFn) + notEqualPVCNum := 0 + sst := framework.NewStatefulSetTester(c, kc) + ss = sst.WaitForStatus(ss) + sst.WaitForStatusPVCReadyReplicas(ss, 2) + + ss = sst.WaitForStatus(ss) + waitForPVCCapacity(context.TODO(), c, kc, ss, func(pvc, template resource.Quantity) bool { + if pvc.Cmp(template) != 0 { + notEqualPVCNum++ + } + return true + }) + gomega.Expect(2).To(gomega.Equal(notEqualPVCNum)) + }) }) ginkgo.Describe("Resize PVC with rollback", func() { @@ -1659,6 +1694,16 @@ var _ = SIGDescribe("StatefulSet", func() { gomega.Expect(pods.Items[i].Labels["test-update"]).To(gomega.Equal("yes")) } }) + + /* + Testname: StatefulSet, Specified delete + Description: Specified delete pod MUST under maxUnavailable constrain. + */ + framework.ConformanceIt("should perform rolling updates with specified-deleted", func() { + ginkgo.By("Creating a new StatefulSet") + ss = framework.NewStatefulSet("ss2", ns, headlessSvcName, 3, nil, nil, labels) + testWithSpecifiedDeleted(c, kc, ns, ss) + }) }) //ginkgo.Describe("Deploy clustered applications [Feature:StatefulSet] [Slow]", func() { @@ -2799,3 +2844,101 @@ func waitForPVCCapacity(ctx context.Context, c clientset.Interface, kc kruisecli return true, nil }) } + +// This function is used by two tests to test StatefulSet rollbacks: one using +// PVCs and one using no storage. +func testWithSpecifiedDeleted(c clientset.Interface, kc kruiseclientset.Interface, ns string, ss *appsv1beta1.StatefulSet, + fns ...func(update *appsv1beta1.StatefulSet)) { + sst := framework.NewStatefulSetTester(c, kc) + *(ss.Spec.Replicas) = 4 + ss, err := kc.AppsV1beta1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss) + ss = sst.WaitForStatus(ss) + currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision + gomega.Expect(currentRevision).To(gomega.Equal(updateRevision), + fmt.Sprintf("StatefulSet %s/%s created with update revision %s not equal to current revision %s", + ss.Namespace, ss.Name, updateRevision, currentRevision)) + pods := sst.GetPodList(ss) + for i := range pods.Items { + gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(currentRevision), + fmt.Sprintf("Pod %s/%s revision %s is not equal to current revision %s", + pods.Items[i].Namespace, + pods.Items[i].Name, + pods.Items[i].Labels[apps.StatefulSetRevisionLabel], + currentRevision)) + } + specifiedDeletePod := func(idx int) { + sst.SortStatefulPods(pods) + oldUid := pods.Items[idx].UID + err = setPodSpecifiedDelete(c, ns, pods.Items[idx].Name) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ss = sst.WaitForStatus(ss) + name := pods.Items[idx].Name + // wait be deleted + sst.WaitForState(ss, func(set2 *appsv1beta1.StatefulSet, pods2 *v1.PodList) (bool, error) { + ss = set2 + pods = pods2 + for i := range pods.Items { + if pods.Items[i].Name == name { + return pods.Items[i].UID != oldUid, nil + } + } + return false, nil + }) + sst.WaitForPodReady(ss, pods.Items[idx].Name) + pods = sst.GetPodList(ss) + sst.SortStatefulPods(pods) + } + specifiedDeletePod(1) + newImage := NewNginxImage + oldImage := ss.Spec.Template.Spec.Containers[0].Image + + ginkgo.By(fmt.Sprintf("Updating StatefulSet template: update image from %s to %s", oldImage, newImage)) + gomega.Expect(oldImage).NotTo(gomega.Equal(newImage), "Incorrect test setup: should update to a different image") + var partition int32 = 2 + ss, err = framework.UpdateStatefulSetWithRetries(kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) { + update.Spec.Template.Spec.Containers[0].Image = newImage + if update.Spec.UpdateStrategy.RollingUpdate == nil { + update.Spec.UpdateStrategy.RollingUpdate = &appsv1beta1.RollingUpdateStatefulSetStrategy{} + } + update.Spec.UpdateStrategy.RollingUpdate = &appsv1beta1.RollingUpdateStatefulSetStrategy{ + Partition: &partition, + } + for _, fn := range fns { + fn(update) + } + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + specifiedDeletePod(2) + + ginkgo.By("Creating a new revision") + ss = sst.WaitForStatus(ss) + currentRevision, updateRevision = ss.Status.CurrentRevision, ss.Status.UpdateRevision + gomega.Expect(currentRevision).NotTo(gomega.Equal(updateRevision), + "Current revision should not equal update revision during rolling update") + specifiedDeletePod(1) + for i := range pods.Items { + if i >= int(partition) { + gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(updateRevision), + fmt.Sprintf("Pod %s/%s revision %s is not equal to updated revision %s", + pods.Items[i].Namespace, + pods.Items[i].Name, + pods.Items[i].Labels[apps.StatefulSetRevisionLabel], + updateRevision)) + } else { + gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(currentRevision), + fmt.Sprintf("Pod %s/%s revision %s is not equal to current revision %s", + pods.Items[i].Namespace, + pods.Items[i].Name, + pods.Items[i].Labels[apps.StatefulSetRevisionLabel], + currentRevision)) + } + } +} + +func setPodSpecifiedDelete(c clientset.Interface, ns, name string) error { + _, err := c.CoreV1().Pods(ns).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(`{"metadata":{"labels":{"apps.kruise.io/specified-delete":"true"}}}`), metav1.PatchOptions{}) + return err +} diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index e2186644ac..7dfbb159f1 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -23,7 +23,6 @@ import ( "strings" "time" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" v1 "k8s.io/api/core/v1" apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -37,6 +36,8 @@ import ( scaleclient "k8s.io/client-go/scale" "k8s.io/kubernetes/pkg/api/legacyscheme" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/onsi/ginkgo" "github.com/onsi/gomega" )