From cba277a4504077ea696eabfcafa318892027f8a0 Mon Sep 17 00:00:00 2001 From: AiRanthem Date: Wed, 28 Aug 2024 15:38:53 +0800 Subject: [PATCH] =?UTF-8?q?1.=20adapter.go:=20GetReplicaDetails=20returns?= =?UTF-8?q?=20pods=20in=20the=20subset=202.=20xxx=5Fadapter.go:=20return?= =?UTF-8?q?=20pods=20implementation=20=E2=AC=86=EF=B8=8F=203.=20allocator.?= =?UTF-8?q?go:=20about=20safeReplica=204.=20pod=5Fcondition=5Futils.go:=20?= =?UTF-8?q?extract=20PodUnscheduledTimeout=20function=20from=20workloadwpr?= =?UTF-8?q?ead=205.=20reschedule.go:=20PodUnscheduledTimeout=20function=20?= =?UTF-8?q?extracted=206.=20subset.go:=20add=20some=20field=20to=20Subset?= =?UTF-8?q?=20object=20to=20carry=20related=20information=207.=20subset=5F?= =?UTF-8?q?control.go:=20store=20subset=20pods=20to=20Subset=20object=208.?= =?UTF-8?q?=20uniteddeployment=5Fcontroller.go=20=20=20=201.=20add=20reque?= =?UTF-8?q?ue=20feature=20to=20check=20failed=20pods=20=20=20=202.=20subse?= =?UTF-8?q?t=20unschedulable=20status=20management=209.=20uniteddeployment?= =?UTF-8?q?=5Ftypes.go:=20API=20change=2010.=20uniteddeployment=5Fupdate.g?= =?UTF-8?q?o:=20sync=20unschedulable=20to=20CR?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: AiRanthem --- apis/apps/v1alpha1/uniteddeployment_types.go | 100 +++++++++- apis/apps/v1alpha1/zz_generated.deepcopy.go | 92 ++++++++++ .../apps.kruise.io_uniteddeployments.yaml | 55 +++++- .../uniteddeployment/adapter/adapter.go | 8 +- .../adapter/advanced_statefulset_adapter.go | 3 +- .../adapter/cloneset_adapter.go | 4 +- .../adapter/deployment_adapter.go | 3 +- .../adapter/statefulset_adapter.go | 3 +- pkg/controller/uniteddeployment/allocator.go | 58 +++++- .../uniteddeployment/allocator_test.go | 84 +++++++++ .../uniteddeployment/revision_test.go | 5 +- pkg/controller/uniteddeployment/subset.go | 5 +- .../uniteddeployment/subset_control.go | 6 +- .../uniteddeployment_controller.go | 172 ++++++++++++++++-- .../uniteddeployment_controller_test.go | 85 ++++++++- .../uniteddeployment_controller_utils.go | 23 ++- .../uniteddeployment_update.go | 22 +++ pkg/controller/util/pod_condition_utils.go | 22 +++ pkg/controller/workloadspread/reschedule.go | 26 +-- .../comparable_version_expectation.go | 107 +++++++++++ test/e2e/apps/sidecarset.go | 5 +- test/e2e/apps/uniteddeployment.go | 120 ++++++++++++ test/e2e/framework/uniteddeployment.go | 85 ++++++++- 23 files changed, 1017 insertions(+), 76 deletions(-) create mode 100644 pkg/util/expectations/comparable_version_expectation.go diff --git a/apis/apps/v1alpha1/uniteddeployment_types.go b/apis/apps/v1alpha1/uniteddeployment_types.go index 1e4c18c947..b0ceb130c4 100644 --- a/apis/apps/v1alpha1/uniteddeployment_types.go +++ b/apis/apps/v1alpha1/uniteddeployment_types.go @@ -17,6 +17,9 @@ limitations under the License. package v1alpha1 import ( + "strconv" + "time" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -165,6 +168,10 @@ type Topology struct { // +patchStrategy=merge // +optional Subsets []Subset `json:"subsets,omitempty" patchStrategy:"merge" patchMergeKey:"name"` + + // ScheduleStrategy indicates the strategy the UnitedDeployment used to preform the schedule between each of subsets. + // +optional + ScheduleStrategy UnitedDeploymentScheduleStrategy `json:"scheduleStrategy,omitempty"` } // Subset defines the detail of a subset. @@ -218,6 +225,69 @@ type Subset struct { Patch runtime.RawExtension `json:"patch,omitempty"` } +// UnitedDeploymentScheduleStrategyType is a string enumeration type that enumerates +// all possible schedule strategies for the UnitedDeployment controller. +// +kubebuilder:validation:Enum=Adaptive;Fixed;"" +type UnitedDeploymentScheduleStrategyType string + +const ( + // AdaptiveUnitedDeploymentScheduleStrategyType represents that when a pod is stuck in the pending status and cannot + // be scheduled, allow it to be rescheduled to another subset. + AdaptiveUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Adaptive" + // FixedUnitedDeploymentScheduleStrategyType represents that pods are strictly scheduled to the selected subset + // even if scheduling fail. + FixedUnitedDeploymentScheduleStrategyType UnitedDeploymentScheduleStrategyType = "Fixed" +) + +const ( + DefaultRescheduleCriticalDuration = 30 * time.Second + DefaultUnschedulableStatusLastDuration = 300 * time.Second +) + +// AdaptiveUnitedDeploymentStrategy is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType. +type AdaptiveUnitedDeploymentStrategy struct { + // RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has + // redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status + // over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds. + // +optional + RescheduleCriticalSeconds *int32 `json:"rescheduleCriticalSeconds,omitempty"` + + // UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state, + // with a default value of 300 seconds. + // +optional + UnschedulableLastSeconds *int32 `json:"unschedulableLastSeconds,omitempty"` +} + +// UnitedDeploymentScheduleStrategy defines the schedule performance of UnitedDeployment. +type UnitedDeploymentScheduleStrategy struct { + // Type indicates the type of the UnitedDeploymentScheduleStrategy. + // Default is Fixed + // +optional + Type UnitedDeploymentScheduleStrategyType `json:"type,omitempty"` + + // Adaptive is used to communicate parameters when Type is AdaptiveUnitedDeploymentScheduleStrategyType. + // +optional + Adaptive *AdaptiveUnitedDeploymentStrategy `json:"adaptive,omitempty"` +} + +func (s *UnitedDeploymentScheduleStrategy) IsAdaptive() bool { + return s.Type == AdaptiveUnitedDeploymentScheduleStrategyType +} + +func (s *UnitedDeploymentScheduleStrategy) GetRescheduleCriticalDuration() time.Duration { + if s.Adaptive == nil || s.Adaptive.RescheduleCriticalSeconds == nil { + return DefaultRescheduleCriticalDuration + } + return time.Duration(*s.Adaptive.RescheduleCriticalSeconds) * time.Second +} + +func (s *UnitedDeploymentScheduleStrategy) GetUnschedulableLastDuration() time.Duration { + if s.Adaptive == nil || s.Adaptive.UnschedulableLastSeconds == nil { + return DefaultUnschedulableStatusLastDuration + } + return time.Duration(*s.Adaptive.UnschedulableLastSeconds) * time.Second +} + // UnitedDeploymentStatus defines the observed state of UnitedDeployment. type UnitedDeploymentStatus struct { // ObservedGeneration is the most recent generation observed for this UnitedDeployment. It corresponds to the @@ -252,6 +322,10 @@ type UnitedDeploymentStatus struct { // +optional SubsetReplicas map[string]int32 `json:"subsetReplicas,omitempty"` + // Record whether each subset is unschedulable. + // +optional + SubsetUnschedulable *SubsetUnschedulable `json:"subsetUnschedulable,omitempty"` + // Represents the latest available observations of a UnitedDeployment's current state. // +optional Conditions []UnitedDeploymentCondition `json:"conditions,omitempty"` @@ -278,7 +352,7 @@ type UnitedDeploymentCondition struct { // The reason for the condition's last transition. Reason string `json:"reason,omitempty"` - // A human readable message indicating details about the transition. + // A human-readable message indicating details about the transition. Message string `json:"message,omitempty"` } @@ -293,6 +367,30 @@ type UpdateStatus struct { CurrentPartitions map[string]int32 `json:"currentPartitions,omitempty"` } +type SubsetUnschedulable struct { + Version int `json:"version"` + Status map[string]UnschedulableStatus `json:"status"` +} + +func (s *SubsetUnschedulable) GetVersion() string { + return strconv.Itoa(s.Version) +} + +func (s *SubsetUnschedulable) NotOlderThan(version string) bool { + if v, err := strconv.Atoi(version); err == nil { + return s.Version >= v + } + return false +} + +type UnschedulableStatus struct { + Unschedulable bool `json:"unschedulable"` + // +optional + UnschedulableTimestamp *metav1.Time `json:"unschedulableTimestamp,omitempty"` + // +optional + PendingPods int32 `json:"-"` +} + // +genclient // +genclient:method=GetScale,verb=get,subresource=scale,result=k8s.io/api/autoscaling/v1.Scale // +genclient:method=UpdateScale,verb=update,subresource=scale,input=k8s.io/api/autoscaling/v1.Scale,result=k8s.io/api/autoscaling/v1.Scale diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 9440ee2715..b53b547180 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -30,6 +30,31 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdaptiveUnitedDeploymentStrategy) DeepCopyInto(out *AdaptiveUnitedDeploymentStrategy) { + *out = *in + if in.RescheduleCriticalSeconds != nil { + in, out := &in.RescheduleCriticalSeconds, &out.RescheduleCriticalSeconds + *out = new(int32) + **out = **in + } + if in.UnschedulableLastSeconds != nil { + in, out := &in.UnschedulableLastSeconds, &out.UnschedulableLastSeconds + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdaptiveUnitedDeploymentStrategy. +func (in *AdaptiveUnitedDeploymentStrategy) DeepCopy() *AdaptiveUnitedDeploymentStrategy { + if in == nil { + return nil + } + out := new(AdaptiveUnitedDeploymentStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AdaptiveWorkloadSpreadStrategy) DeepCopyInto(out *AdaptiveWorkloadSpreadStrategy) { *out = *in @@ -3227,6 +3252,28 @@ func (in *SubsetTemplate) DeepCopy() *SubsetTemplate { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SubsetUnschedulable) DeepCopyInto(out *SubsetUnschedulable) { + *out = *in + if in.Status != nil { + in, out := &in.Status, &out.Status + *out = make(map[string]UnschedulableStatus, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubsetUnschedulable. +func (in *SubsetUnschedulable) DeepCopy() *SubsetUnschedulable { + if in == nil { + return nil + } + out := new(SubsetUnschedulable) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SyncStatus) DeepCopyInto(out *SyncStatus) { *out = *in @@ -3268,6 +3315,7 @@ func (in *Topology) DeepCopyInto(out *Topology) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + in.ScheduleStrategy.DeepCopyInto(&out.ScheduleStrategy) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Topology. @@ -3380,6 +3428,26 @@ func (in *UnitedDeploymentList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UnitedDeploymentScheduleStrategy) DeepCopyInto(out *UnitedDeploymentScheduleStrategy) { + *out = *in + if in.Adaptive != nil { + in, out := &in.Adaptive, &out.Adaptive + *out = new(AdaptiveUnitedDeploymentStrategy) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnitedDeploymentScheduleStrategy. +func (in *UnitedDeploymentScheduleStrategy) DeepCopy() *UnitedDeploymentScheduleStrategy { + if in == nil { + return nil + } + out := new(UnitedDeploymentScheduleStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UnitedDeploymentSpec) DeepCopyInto(out *UnitedDeploymentSpec) { *out = *in @@ -3428,6 +3496,11 @@ func (in *UnitedDeploymentStatus) DeepCopyInto(out *UnitedDeploymentStatus) { (*out)[key] = val } } + if in.SubsetUnschedulable != nil { + in, out := &in.SubsetUnschedulable, &out.SubsetUnschedulable + *out = new(SubsetUnschedulable) + (*in).DeepCopyInto(*out) + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]UnitedDeploymentCondition, len(*in)) @@ -3492,6 +3565,25 @@ func (in *UnorderedUpdateStrategy) DeepCopy() *UnorderedUpdateStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UnschedulableStatus) DeepCopyInto(out *UnschedulableStatus) { + *out = *in + if in.UnschedulableTimestamp != nil { + in, out := &in.UnschedulableTimestamp, &out.UnschedulableTimestamp + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UnschedulableStatus. +func (in *UnschedulableStatus) DeepCopy() *UnschedulableStatus { + if in == nil { + return nil + } + out := new(UnschedulableStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in UpdateScatterStrategy) DeepCopyInto(out *UpdateScatterStrategy) { { diff --git a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml index ccff1c0ea8..2454cb1ae4 100644 --- a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml +++ b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml @@ -955,6 +955,38 @@ spec: description: Topology describes the pods distribution detail between each of subsets. properties: + scheduleStrategy: + description: ScheduleStrategy indicates the strategy the UnitedDeployment + used to preform the schedule between each of subsets. + properties: + adaptive: + description: Adaptive is used to communicate parameters when + Type is AdaptiveUnitedDeploymentScheduleStrategyType. + properties: + rescheduleCriticalSeconds: + description: |- + RescheduleCriticalSeconds indicates how long controller will reschedule a schedule failed Pod to the subset that has + redundant capacity after the subset where the Pod lives. If a Pod was scheduled failed and still in an unschedulabe status + over RescheduleCriticalSeconds duration, the controller will reschedule it to a suitable subset. Default is 30 seconds. + format: int32 + type: integer + unschedulableLastSeconds: + description: |- + UnschedulableLastSeconds is used to set the number of seconds for a Subset to recover from an unschedulable state, + with a default value of 300 seconds. + format: int32 + type: integer + type: object + type: + description: |- + Type indicates the type of the UnitedDeploymentScheduleStrategy. + Default is Fixed + enum: + - Adaptive + - Fixed + - "" + type: string + type: object subsets: description: |- Contains the details of each subset. Each element in this array represents one subset @@ -1173,7 +1205,7 @@ spec: format: date-time type: string message: - description: A human readable message indicating details about + description: A human-readable message indicating details about the transition. type: string reason: @@ -1216,6 +1248,27 @@ spec: description: Records the topology detail information of the replicas of each subset. type: object + subsetUnschedulable: + description: Record whether each subset is unschedulable. + properties: + status: + additionalProperties: + properties: + unschedulable: + type: boolean + unschedulableTimestamp: + format: date-time + type: string + required: + - unschedulable + type: object + type: object + version: + type: integer + required: + - status + - version + type: object updateStatus: description: Records the information of update progress. properties: diff --git a/pkg/controller/uniteddeployment/adapter/adapter.go b/pkg/controller/uniteddeployment/adapter/adapter.go index d75c8c5e99..e2d66c5bc4 100644 --- a/pkg/controller/uniteddeployment/adapter/adapter.go +++ b/pkg/controller/uniteddeployment/adapter/adapter.go @@ -17,6 +17,7 @@ limitations under the License. package adapter import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -25,14 +26,15 @@ import ( ) type Adapter interface { - // NewResourceObject creates a empty subset object. + // NewResourceObject creates an empty subset object. NewResourceObject() client.Object - // NewResourceListObject creates a empty subset list object. + // NewResourceListObject creates an empty subset list object. NewResourceListObject() client.ObjectList // GetStatusObservedGeneration returns the observed generation of the subset. GetStatusObservedGeneration(subset metav1.Object) int64 // GetReplicaDetails returns the replicas information of the subset status. - GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) + GetReplicaDetails(subset metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, + statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) // GetSubsetFailure returns failure information of the subset. GetSubsetFailure() *string // ApplySubsetTemplate updates the subset to the latest revision. diff --git a/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go b/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go index 2242d2c751..24cd0b1d03 100644 --- a/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/advanced_statefulset_adapter.go @@ -63,9 +63,8 @@ func (a *AdvancedStatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Obje } // GetReplicaDetails returns the replicas detail the subset needs. -func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { +func (a *AdvancedStatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { set := obj.(*v1beta1.StatefulSet) - var pods []*corev1.Pod pods, err = a.getStatefulSetPods(set) if err != nil { return diff --git a/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go b/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go index bc80150858..a570d2b1cc 100644 --- a/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/cloneset_adapter.go @@ -41,12 +41,10 @@ func (a *CloneSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 { return obj.(*alpha1.CloneSet).Status.ObservedGeneration } -func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { +func (a *CloneSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { set := obj.(*alpha1.CloneSet) - var pods []*corev1.Pod - pods, err = a.getCloneSetPods(set) if err != nil { diff --git a/pkg/controller/uniteddeployment/adapter/deployment_adapter.go b/pkg/controller/uniteddeployment/adapter/deployment_adapter.go index d0978952f9..1ed391c5c8 100644 --- a/pkg/controller/uniteddeployment/adapter/deployment_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/deployment_adapter.go @@ -58,12 +58,11 @@ func (a *DeploymentAdapter) GetStatusObservedGeneration(obj metav1.Object) int64 } // GetReplicaDetails returns the replicas detail the subset needs. -func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { +func (a *DeploymentAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { // Convert to Deployment Object set := obj.(*appsv1.Deployment) // Get all pods belonging to deployment - var pods []*corev1.Pod pods, err = a.getDeploymentPods(set) if err != nil { return diff --git a/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go b/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go index a31e2ad0b0..eb4761d862 100644 --- a/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go +++ b/pkg/controller/uniteddeployment/adapter/statefulset_adapter.go @@ -59,9 +59,8 @@ func (a *StatefulSetAdapter) GetStatusObservedGeneration(obj metav1.Object) int6 } // GetReplicaDetails returns the replicas detail the subset needs. -func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, err error) { +func (a *StatefulSetAdapter) GetReplicaDetails(obj metav1.Object, updatedRevision string) (specReplicas, specPartition *int32, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas int32, pods []*corev1.Pod, err error) { set := obj.(*appsv1.StatefulSet) - var pods []*corev1.Pod pods, err = a.getStatefulSetPods(set) if err != nil { return diff --git a/pkg/controller/uniteddeployment/allocator.go b/pkg/controller/uniteddeployment/allocator.go index cf5a71ac47..9b19fc42fc 100644 --- a/pkg/controller/uniteddeployment/allocator.go +++ b/pkg/controller/uniteddeployment/allocator.go @@ -18,6 +18,7 @@ package uniteddeployment import ( "fmt" + "math" "sort" "strings" @@ -68,6 +69,30 @@ func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator { return &specificAllocator{UnitedDeployment: ud} } +// NotPendingReplicas refers to the number of Pods that an unschedulable subset can safely accommodate. +// Exceeding this number may lead to scheduling failures within that subset. +// This value is only effective in the Adaptive scheduling strategy. +func getNotPendingReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 { + if nameToSubset == nil { + return nil + } + var result = make(map[string]int32) + for name, subset := range *nameToSubset { + result[name] = subset.Status.Replicas - subset.Status.UnschedulableStatus.PendingPods + } + return result +} + +func getSubSetUnschedulable(name string, nameToSubset *map[string]*Subset) (unschedulable bool) { + if subsetObj, ok := (*nameToSubset)[name]; ok { + unschedulable = subsetObj.Status.UnschedulableStatus.Unschedulable + } else { + // newly created subsets are all schedulable + unschedulable = false + } + return +} + type specificAllocator struct { *appsv1alpha1.UnitedDeployment subsets *subsetInfos @@ -250,43 +275,58 @@ type elasticAllocator struct { // maxReplicas: nil # will be satisfied with 4th priority // // the results of map will be: {"subset-a": 3, "subset-b": 2} -func (ac *elasticAllocator) Alloc(_ *map[string]*Subset) (*map[string]int32, error) { +func (ac *elasticAllocator) Alloc(nameToSubset *map[string]*Subset) (*map[string]int32, error) { replicas := int32(1) if ac.Spec.Replicas != nil { replicas = *ac.Spec.Replicas } - minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas) + minReplicasMap, maxReplicasMap, err := ac.validateAndCalculateMinMaxMap(replicas, nameToSubset) if err != nil { return nil, err } return ac.alloc(replicas, minReplicasMap, maxReplicasMap), nil } -func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32) (map[string]int32, map[string]int32, error) { - totalMin, totalMax := int64(0), int64(0) +func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameToSubset *map[string]*Subset) (map[string]int32, map[string]int32, error) { numSubset := len(ac.Spec.Topology.Subsets) minReplicasMap := make(map[string]int32, numSubset) maxReplicasMap := make(map[string]int32, numSubset) + notPendingReplicasMap := getNotPendingReplicasMap(nameToSubset) for index, subset := range ac.Spec.Topology.Subsets { minReplicas := int32(0) + maxReplicas := int32(math.MaxInt32) if subset.MinReplicas != nil { minReplicas, _ = ParseSubsetReplicas(replicas, *subset.MinReplicas) } - totalMin += int64(minReplicas) - minReplicasMap[subset.Name] = minReplicas - - maxReplicas := int32(1000000) if subset.MaxReplicas != nil { maxReplicas, _ = ParseSubsetReplicas(replicas, *subset.MaxReplicas) } - totalMax += int64(maxReplicas) + if ac.Spec.Topology.ScheduleStrategy.IsAdaptive() { + unschedulable := getSubSetUnschedulable(subset.Name, nameToSubset) + // This means that in the Adaptive scheduling strategy, an unschedulable subset can only be scaled down, not scaled up. + if notPendingReplicas, ok := notPendingReplicasMap[subset.Name]; unschedulable && ok { + klog.InfoS("Assign min(notPendingReplicas, minReplicas/maxReplicas) for unschedulable subset", + "subset", subset.Name) + minReplicas = integer.Int32Min(notPendingReplicas, minReplicas) + maxReplicas = integer.Int32Min(notPendingReplicas, maxReplicas) + } + // To prevent healthy pod from being deleted + if notPendingReplicas := notPendingReplicasMap[subset.Name]; !unschedulable && notPendingReplicas > minReplicas { + klog.InfoS("Assign min(notPendingReplicas, maxReplicas) to minReplicas to avoid deleting running pods", + "subset", subset.Name, "minReplicas", minReplicas, "notPendingReplicas", notPendingReplicas, "maxReplicas", maxReplicas) + minReplicas = integer.Int32Min(notPendingReplicas, maxReplicas) + } + } + + minReplicasMap[subset.Name] = minReplicas maxReplicasMap[subset.Name] = maxReplicas if minReplicas > maxReplicas { return nil, nil, fmt.Errorf("subset[%d].maxReplicas must be more than or equal to minReplicas", index) } } + klog.InfoS("elastic allocate maps calculated", "minReplicasMap", minReplicasMap, "maxReplicasMap", maxReplicasMap) return minReplicasMap, maxReplicasMap, nil } diff --git a/pkg/controller/uniteddeployment/allocator_test.go b/pkg/controller/uniteddeployment/allocator_test.go index cc50ab411b..8e3557c331 100644 --- a/pkg/controller/uniteddeployment/allocator_test.go +++ b/pkg/controller/uniteddeployment/allocator_test.go @@ -307,6 +307,90 @@ func TestCapacityAllocator(t *testing.T) { } } +func TestAdaptiveElasticAllocator(t *testing.T) { + getUnitedDeploymentAndSubsets := func(totalReplicas, minReplicas, maxReplicas, failedPods int32) ( + *appsv1alpha1.UnitedDeployment, map[string]*Subset) { + minR, maxR := intstr.FromInt32(minReplicas), intstr.FromInt32(maxReplicas) + return &appsv1alpha1.UnitedDeployment{ + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Replicas: &totalReplicas, + Topology: appsv1alpha1.Topology{ + Subsets: []appsv1alpha1.Subset{ + { + Name: "subset-1", + MinReplicas: &minR, + MaxReplicas: &maxR, + }, + { + Name: "subset-2", + }, + }, + ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + }, + }, + }, + }, map[string]*Subset{ + "subset-1": { + Status: SubsetStatus{ + UnschedulableStatus: appsv1alpha1.UnschedulableStatus{ + Unschedulable: true, + PendingPods: failedPods, + }, + Replicas: maxReplicas, + }, + Spec: SubsetSpec{Replicas: minReplicas}, + }, + "subset-2": { + Status: SubsetStatus{}, + }, + } + } + cases := []struct { + totalReplicas, minReplicas, maxReplicas, pendingPods int32 + subset1Replicas, subset2Replicas int32 + }{ + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 5, + subset1Replicas: 0, subset2Replicas: 10, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 4, + subset1Replicas: 0, subset2Replicas: 10, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 3, + subset1Replicas: 1, subset2Replicas: 9, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 2, + subset1Replicas: 2, subset2Replicas: 8, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 1, + subset1Replicas: 3, subset2Replicas: 7, + }, + { + totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 0, + subset1Replicas: 4, subset2Replicas: 6, + }, + } + for _, testCase := range cases { + ud, subsets := getUnitedDeploymentAndSubsets( + testCase.totalReplicas, testCase.minReplicas, testCase.maxReplicas, testCase.pendingPods) + alloc, err := NewReplicaAllocator(ud).Alloc(&subsets) + if err != nil { + t.Fatalf("unexpected alloc error %v", err) + } else { + subset1Replicas, subset2Replicas := (*alloc)["subset-1"], (*alloc)["subset-2"] + if subset1Replicas != testCase.subset1Replicas || subset2Replicas != testCase.subset2Replicas { + t.Fatalf("subset1Replicas = %d, subset1Replicas = %d, test case is %+v", + subset1Replicas, subset2Replicas, testCase) + } + } + } +} + func createSubset(name string, replicas int32) *nameToReplicas { return &nameToReplicas{ Replicas: replicas, diff --git a/pkg/controller/uniteddeployment/revision_test.go b/pkg/controller/uniteddeployment/revision_test.go index 894ce7910c..101b9683fe 100644 --- a/pkg/controller/uniteddeployment/revision_test.go +++ b/pkg/controller/uniteddeployment/revision_test.go @@ -41,8 +41,9 @@ func TestRevisionManage(t *testing.T) { instance := &appsv1alpha1.UnitedDeployment{ ObjectMeta: metav1.ObjectMeta{ - Name: "foo", - Namespace: "default", + Name: "foo", + Namespace: "default", + Finalizers: []string{UnitedDeploymentFinalizer}, }, Spec: appsv1alpha1.UnitedDeploymentSpec{ Replicas: &one, diff --git a/pkg/controller/uniteddeployment/subset.go b/pkg/controller/uniteddeployment/subset.go index 79b8f006c6..478d5e5b28 100644 --- a/pkg/controller/uniteddeployment/subset.go +++ b/pkg/controller/uniteddeployment/subset.go @@ -17,6 +17,7 @@ limitations under the License. package uniteddeployment import ( + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -36,6 +37,7 @@ type SubsetSpec struct { Replicas int32 UpdateStrategy SubsetUpdateStrategy SubsetRef ResourceRef + SubsetPods []*corev1.Pod } // SubsetStatus stores the observed state of the Subset. @@ -45,6 +47,7 @@ type SubsetStatus struct { ReadyReplicas int32 UpdatedReplicas int32 UpdatedReadyReplicas int32 + UnschedulableStatus appsv1alpha1.UnschedulableStatus } // SubsetUpdateStrategy stores the strategy detail of the Subset. @@ -72,7 +75,7 @@ type ControlInterface interface { CreateSubset(ud *appsv1alpha1.UnitedDeployment, unit string, revision string, replicas, partition int32) error // UpdateSubset updates the target subset with the input information. UpdateSubset(subSet *Subset, ud *appsv1alpha1.UnitedDeployment, revision string, replicas, partition int32) error - // UpdateSubset is used to delete the input subset. + // DeleteSubset is used to delete the input subset. DeleteSubset(*Subset) error // GetSubsetFailure extracts the subset failure message to expose on UnitedDeployment status. GetSubsetFailure(*Subset) *string diff --git a/pkg/controller/uniteddeployment/subset_control.go b/pkg/controller/uniteddeployment/subset_control.go index 9073125950..8a85a869e6 100644 --- a/pkg/controller/uniteddeployment/subset_control.go +++ b/pkg/controller/uniteddeployment/subset_control.go @@ -39,7 +39,7 @@ type SubsetControl struct { adapter adapter.Adapter } -// GetAllSubsets returns all of subsets owned by the UnitedDeployment. +// GetAllSubsets returns all subsets owned by the UnitedDeployment. func (m *SubsetControl) GetAllSubsets(ud *alpha1.UnitedDeployment, updatedRevision string) (subSets []*Subset, err error) { selector, err := metav1.LabelSelectorAsSelector(ud.Spec.Selector) if err != nil { @@ -156,7 +156,8 @@ func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision strin } subset.Spec.SubsetName = subSetName - specReplicas, specPartition, statusReplicas, statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, err := m.adapter.GetReplicaDetails(set, updatedRevision) + specReplicas, specPartition, statusReplicas, + statusReadyReplicas, statusUpdatedReplicas, statusUpdatedReadyReplicas, pods, err := m.adapter.GetReplicaDetails(set, updatedRevision) if err != nil { return subset, err } @@ -176,6 +177,7 @@ func (m *SubsetControl) convertToSubset(set metav1.Object, updatedRevision strin subset.Status.UpdatedReadyReplicas = statusUpdatedReadyReplicas subset.Spec.SubsetRef.Resources = append(subset.Spec.SubsetRef.Resources, set) + subset.Spec.SubsetPods = pods return subset, nil } diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller.go b/pkg/controller/uniteddeployment/uniteddeployment_controller.go index a14f5269cd..a98eb58918 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller.go @@ -21,15 +21,18 @@ import ( "flag" "fmt" "reflect" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -38,10 +41,12 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" "github.com/openkruise/kruise/pkg/controller/uniteddeployment/adapter" + utilcontroller "github.com/openkruise/kruise/pkg/controller/util" "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" "github.com/openkruise/kruise/pkg/util/ratelimiter" + "github.com/openkruise/kruise/pkg/util/requeueduration" ) func init() { @@ -51,16 +56,17 @@ func init() { var ( concurrentReconciles = 3 controllerKind = appsv1alpha1.SchemeGroupVersion.WithKind("UnitedDeployment") + durationStore = requeueduration.DurationStore{} ) const ( controllerName = "uniteddeployment-controller" - eventTypeRevisionProvision = "RevisionProvision" - eventTypeFindSubsets = "FindSubsets" - eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets" - eventTypeSubsetsUpdate = "UpdateSubset" - eventTypeSpecifySubbsetReplicas = "SpecifySubsetReplicas" + eventTypeRevisionProvision = "RevisionProvision" + eventTypeFindSubsets = "FindSubsets" + eventTypeDupSubsetsDelete = "DeleteDuplicatedSubsets" + eventTypeSubsetsUpdate = "UpdateSubset" + eventTypeSpecifySubsetReplicas = "SpecifySubsetReplicas" slowStartInitialBatchSize = 1 ) @@ -145,6 +151,8 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { var _ reconcile.Reconciler = &ReconcileUnitedDeployment{} +const UnitedDeploymentFinalizer = "apps.kruise.io/uniteddeployment-cleanup" + // ReconcileUnitedDeployment reconciles a UnitedDeployment object type ReconcileUnitedDeployment struct { client.Client @@ -177,9 +185,25 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci } return reconcile.Result{}, err } + if instance.DeletionTimestamp == nil && !controllerutil.ContainsFinalizer(instance, UnitedDeploymentFinalizer) { + klog.InfoS("adding UnitedDeploymentFinalizer") + controllerutil.AddFinalizer(instance, UnitedDeploymentFinalizer) + if err = r.updateUnitedDeploymentInstance(instance); err != nil { + klog.ErrorS(err, "Failed to add UnitedDeploymentFinalizer", "unitedDeployment", request) + } + return reconcile.Result{}, err + } if instance.DeletionTimestamp != nil { - return reconcile.Result{}, nil + if controllerutil.RemoveFinalizer(instance, UnitedDeploymentFinalizer) { + // to avoid memory leak + klog.InfoS("cleaning up UnitedDeployment", "unitedDeployment", request) + SubsetUnschedulableStatusExpectation.Delete(getUnitedDeploymentKey(instance)) + if err = r.updateUnitedDeploymentInstance(instance); err != nil { + klog.ErrorS(err, "Failed to remove UnitedDeploymentFinalizer", "unitedDeployment", request) + } + } + return reconcile.Result{}, err } oldStatus := instance.Status.DeepCopy() @@ -197,20 +221,24 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci if updatedRevision != nil { expectedRevision = updatedRevision.Name } - nameToSubset, err := r.getNameToSubset(instance, control, expectedRevision) + nameToSubset, upToDate, err := r.getNameToSubset(instance, control, expectedRevision) if err != nil { klog.ErrorS(err, "Failed to get Subsets of UnitedDeployment", "unitedDeployment", klog.KObj(instance)) r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s", eventTypeFindSubsets), err.Error()) return reconcile.Result{}, err } + if !upToDate { + klog.InfoS("status not up-to-date, requeue in 1s", "unitedDeployment", klog.KObj(instance)) + return reconcile.Result{RequeueAfter: time.Second}, nil + } nextReplicas, err := NewReplicaAllocator(instance).Alloc(nameToSubset) klog.V(4).InfoS("Got UnitedDeployment next replicas", "unitedDeployment", klog.KObj(instance), "nextReplicas", nextReplicas) if err != nil { klog.ErrorS(err, "UnitedDeployment specified subset replicas is ineffective", "unitedDeployment", klog.KObj(instance)) r.recorder.Eventf(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed %s", - eventTypeSpecifySubbsetReplicas), "Specified subset replicas is ineffective: %s", err.Error()) + eventTypeSpecifySubsetReplicas), "Specified subset replicas is ineffective: %s", err.Error()) return reconcile.Result{}, err } @@ -220,6 +248,7 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci newStatus, err := r.manageSubsets(instance, nameToSubset, nextUpdate, currentRevision, updatedRevision, subsetType) if err != nil { + SubsetUnschedulableStatusExpectation.Delete(getUnitedDeploymentKey(instance)) klog.ErrorS(err, "Failed to update UnitedDeployment", "unitedDeployment", klog.KObj(instance)) r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeSubsetsUpdate), err.Error()) return reconcile.Result{}, err @@ -233,26 +262,129 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci } newStatus.LabelSelector = selector.String() - return r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control) + requeueAfter := durationStore.Pop(getUnitedDeploymentKey(instance)) + if requeueAfter > 0 { + klog.InfoS("Requeue needed", "afterSeconds", requeueAfter.Seconds()) + } + return reconcile.Result{RequeueAfter: requeueAfter}, + r.updateStatus(instance, newStatus, oldStatus, nameToSubset, nextReplicas, nextPartitions, currentRevision, updatedRevision, collisionCount, control) +} + +func (r *ReconcileUnitedDeployment) updateUnitedDeploymentInstance(instance *appsv1alpha1.UnitedDeployment) error { + var err error + for i := 0; i < updateRetries; i++ { + if err = r.Update(context.Background(), instance); err == nil { + return nil + } + } + return err } -func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) (*map[string]*Subset, error) { +// getNameToSubset fetches all subset workloads in cluster managed by this UnitedDeployment +// if adaptive scheduling strategy is used, existing subset unscheduable status will be set true here (newly created subsets are default false) +func (r *ReconcileUnitedDeployment) getNameToSubset(instance *appsv1alpha1.UnitedDeployment, control ControlInterface, expectedRevision string) ( + name2Subset *map[string]*Subset, upToDate bool, err error) { subSets, err := control.GetAllSubsets(instance, expectedRevision) if err != nil { r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeFindSubsets), err.Error()) - return nil, fmt.Errorf("fail to get all Subsets for UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err) + return nil, false, fmt.Errorf("fail to get all Subsets for UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err) } klog.V(4).InfoS("Classify UnitedDeployment by subSet name", "unitedDeployment", klog.KObj(instance)) nameToSubsets := r.classifySubsetBySubsetName(instance, subSets) - nameToSubset, err := r.deleteDupSubset(instance, nameToSubsets, control) + nameToSubset, err := r.deleteDupSubset(nameToSubsets, control) if err != nil { r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeDupSubsetsDelete), err.Error()) - return nil, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err) + return nil, false, fmt.Errorf("fail to manage duplicate Subset of UnitedDeployment %s/%s: %s", instance.Namespace, instance.Name, err) + } + + // If the Fixing scheduling strategy is used, the unschedulable state for all subsets remains false and + // the UnschedulableStatus of Subsets are not managed. + if instance.Spec.Topology.ScheduleStrategy.IsAdaptive() { + unitedDeploymentKey := getUnitedDeploymentKey(instance) + SubsetUnschedulableStatusExpectation.Observe(unitedDeploymentKey, instance.Status.SubsetUnschedulable) + if satisfied := SubsetUnschedulableStatusExpectation.Satisfied(unitedDeploymentKey); !satisfied { + return nil, false, nil + } + for name, subset := range *nameToSubset { + manageUnschedulableStatusForExistingSubset(name, subset, instance) + } } - return nameToSubset, nil + return nameToSubset, true, nil +} + +func buildUnschedulableStatus(nameToSubset *map[string]*Subset, ud *appsv1alpha1.UnitedDeployment) *appsv1alpha1.SubsetUnschedulable { + var status = appsv1alpha1.SubsetUnschedulable{ + Status: make(map[string]appsv1alpha1.UnschedulableStatus), + } + for name, subset := range *nameToSubset { + // ignore pending pods to reduce unnecessary versions + pendingPods := subset.Status.UnschedulableStatus.PendingPods + subset.Status.UnschedulableStatus.PendingPods = 0 + status.Status[name] = subset.Status.UnschedulableStatus + subset.Status.UnschedulableStatus.PendingPods = pendingPods + } + if old := ud.Status.SubsetUnschedulable; old != nil { + if !reflect.DeepEqual(old.Status, status.Status) { + status.Version = old.Version + 1 + } else { + status.Version = old.Version + } + } + return &status +} + +// manageUnschedulableStatusForExistingSubset manages subset unscheduable status and store them in the Subset.Status.UnschedulableStatus field. +func manageUnschedulableStatusForExistingSubset(name string, subset *Subset, ud *appsv1alpha1.UnitedDeployment) { + now := time.Now() + unitedDeploymentKey := getUnitedDeploymentKey(ud) + if ud.Status.SubsetUnschedulable == nil { + ud.Status.SubsetUnschedulable = &appsv1alpha1.SubsetUnschedulable{} + } + oldStatus, ok := ud.Status.SubsetUnschedulable.Status[name] + if ok && oldStatus.Unschedulable { + // The unschedulable state of a subset lasts for at least 5 minutes. + // During this period, even if ReadyReplicas == Replicas, the subset is still unschedulable. + if oldStatus.UnschedulableTimestamp == nil { + oldStatus.UnschedulableTimestamp = &metav1.Time{} + } + recoverTime := oldStatus.UnschedulableTimestamp.Add(ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration()) + klog.InfoS("existing unschedulable subset found", "subset", name, "recoverTime", recoverTime) + if now.Before(recoverTime) { + klog.InfoS("subset is still unschedulable", "subset", name) + subset.Status.UnschedulableStatus.Unschedulable = true + subset.Status.UnschedulableStatus.UnschedulableTimestamp = oldStatus.UnschedulableTimestamp + durationStore.Push(unitedDeploymentKey, recoverTime.Sub(now)) + } else { + klog.InfoS("unschedulable subset recovered", "subset", name) + } + } + // Maybe there exist some pending pods because the subset is unschedulable. + if subset.Status.ReadyReplicas < subset.Status.Replicas { + for _, pod := range subset.Spec.SubsetPods { + timeouted, checkAfter := utilcontroller.PodPendingTimeout(pod, ud.Spec.Topology.ScheduleStrategy.GetRescheduleCriticalDuration()) + if timeouted { + subset.Status.UnschedulableStatus.PendingPods++ + } + if checkAfter > 0 { + durationStore.Push(unitedDeploymentKey, checkAfter) + } + } + if subset.Status.UnschedulableStatus.PendingPods > 0 { + klog.InfoS("subset has pending pods", + "subset", subset.Name, "pendingPods", subset.Status.UnschedulableStatus.PendingPods) + if !subset.Status.UnschedulableStatus.Unschedulable { + subset.Status.UnschedulableStatus.Unschedulable = true + if subset.Status.UnschedulableStatus.UnschedulableTimestamp == nil { + subset.Status.UnschedulableStatus.UnschedulableTimestamp = &metav1.Time{Time: time.Now()} + } + durationStore.Push(unitedDeploymentKey, ud.Spec.Topology.ScheduleStrategy.GetUnschedulableLastDuration()) + } + } + } + klog.InfoS("unschedulable status", "subset", name, "unschedulableStatus", subset.Status.UnschedulableStatus) } func calcNextPartitions(ud *appsv1alpha1.UnitedDeployment, nextReplicas *map[string]int32) *map[string]int32 { @@ -288,7 +420,7 @@ func getNextUpdate(ud *appsv1alpha1.UnitedDeployment, nextReplicas *map[string]i return next } -func (r *ReconcileUnitedDeployment) deleteDupSubset(ud *appsv1alpha1.UnitedDeployment, nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) { +func (r *ReconcileUnitedDeployment) deleteDupSubset(nameToSubsets map[string][]*Subset, control ControlInterface) (*map[string]*Subset, error) { nameToSubset := map[string]*Subset{} for name, subsets := range nameToSubsets { if len(subsets) > 1 { @@ -348,10 +480,10 @@ func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(ud *appsv1alpha1. return mapping } -func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) (reconcile.Result, error) { +func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) error { newStatus = r.calculateStatus(newStatus, nameToSubset, nextReplicas, nextPartition, currentRevision, updatedRevision, collisionCount, control) _, err := r.updateUnitedDeployment(instance, oldStatus, newStatus) - return reconcile.Result{}, err + return err } func (r *ReconcileUnitedDeployment) calculateStatus(newStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) *appsv1alpha1.UnitedDeploymentStatus { @@ -431,7 +563,8 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit ud.Generation == newStatus.ObservedGeneration && reflect.DeepEqual(oldStatus.SubsetReplicas, newStatus.SubsetReplicas) && reflect.DeepEqual(oldStatus.UpdateStatus, newStatus.UpdateStatus) && - reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) { + reflect.DeepEqual(oldStatus.Conditions, newStatus.Conditions) && + reflect.DeepEqual(oldStatus.SubsetUnschedulable, newStatus.SubsetUnschedulable) { return ud, nil } @@ -439,13 +572,14 @@ func (r *ReconcileUnitedDeployment) updateUnitedDeployment(ud *appsv1alpha1.Unit var getErr, updateErr error for i, obj := 0, ud; ; i++ { - klog.V(4).InfoS("Updating status", + klog.V(4).InfoS("updating UnitedDeployment status", "updateCount", i, "unitedDeployment", klog.KObj(obj), "replicasSpec", obj.Spec.Replicas, "oldReplicas", obj.Status.Replicas, "newReplicas", newStatus.Replicas, "readyReplicasSpec", obj.Spec.Replicas, "oldReadyReplicas", obj.Status.ReadyReplicas, "newReadyReplicas", newStatus.ReadyReplicas, "oldUpdatedReplicas", obj.Status.UpdatedReplicas, "newUpdatedReplicas", newStatus.UpdatedReplicas, "oldUpdatedReadyReplicas", obj.Status.UpdatedReadyReplicas, "newUpdatedReadyReplicas", newStatus.UpdatedReadyReplicas, "oldObservedGeneration", obj.Status.ObservedGeneration, "newObservedGeneration", newStatus.ObservedGeneration, + "oldSubsetUnschedulable", obj.Status.SubsetUnschedulable, "newSubsetUnschedulable", newStatus.SubsetUnschedulable, ) obj.Status = *newStatus diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go b/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go index 6d52c50d65..a1aea6c5af 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller_test.go @@ -18,6 +18,7 @@ package uniteddeployment import ( "testing" + "time" "github.com/onsi/gomega" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -95,7 +96,7 @@ func TestReconcile(t *testing.T) { }, } - // Setup the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a + // Set up the Manager and Controller. Wrap the Controller Reconcile function so it writes each request to a // channel when it is finished. mgr, err := manager.New(cfg, manager.Options{}) g.Expect(err).NotTo(gomega.HaveOccurred()) @@ -124,3 +125,85 @@ func TestReconcile(t *testing.T) { defer c.Delete(context.TODO(), instance) g.Eventually(requests, timeout).Should(gomega.Receive(gomega.Equal(expectedRequest))) } + +func TestUnschedulableStatusManagement(t *testing.T) { + g := gomega.NewGomegaWithT(t) + ud := &appsv1alpha1.UnitedDeployment{ + Spec: appsv1alpha1.UnitedDeploymentSpec{ + Topology: appsv1alpha1.Topology{ + ScheduleStrategy: appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + }, + }, + }, + } + pod := corev1.Pod{ + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodScheduled, + Status: corev1.ConditionFalse, + Reason: corev1.PodReasonUnschedulable, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.NewTime(time.Now().Add(-15 * time.Second)), + }, + } + subset := &Subset{ + Status: SubsetStatus{ + ReadyReplicas: 0, + Replicas: 1, + }, + Spec: SubsetSpec{ + SubsetPods: []*corev1.Pod{&pod}, + }, + } + + // CASE1: Not timeouted yet + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0)) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.Satisfy(func(after time.Duration) bool { + return after < 15*time.Second && after > 14*time.Second + })) + + //// CASE2: Timeouted + pod.CreationTimestamp = metav1.NewTime(time.Now().Add(-31 * time.Second)) + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(1)) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.BeEquivalentTo(appsv1alpha1.DefaultUnschedulableStatusLastDuration)) + + // CASE3: Unschedulable status + ud.Status.SubsetUnschedulable = &appsv1alpha1.SubsetUnschedulable{ + Status: map[string]appsv1alpha1.UnschedulableStatus{ + subset.Name: { + Unschedulable: true, + UnschedulableTimestamp: &metav1.Time{Time: time.Now().Add(-time.Minute)}, + }, + }, + } + subset.Status.ReadyReplicas = 1 + subset.Status.UnschedulableStatus.PendingPods = 0 + manageUnschedulableStatusForExistingSubset(subset.Name, subset, ud) + g.Expect(g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0))) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.Satisfy(func(after time.Duration) bool { + return after < appsv1alpha1.DefaultUnschedulableStatusLastDuration-time.Minute && + after > 59*time.Second+appsv1alpha1.DefaultUnschedulableStatusLastDuration-2*time.Minute + })) + + // CASE4: Status Reset + ud.Status.SubsetUnschedulable = &appsv1alpha1.SubsetUnschedulable{ + Status: map[string]appsv1alpha1.UnschedulableStatus{ + subset.Name: { + Unschedulable: true, + UnschedulableTimestamp: &metav1.Time{Time: time.Now().Add(-time.Minute - appsv1alpha1.DefaultUnschedulableStatusLastDuration)}, + }, + }, + } + subset.Status.UnschedulableStatus.Unschedulable = false + g.Expect(g.Expect(subset.Status.UnschedulableStatus.PendingPods).To(gomega.BeEquivalentTo(0))) + g.Expect(subset.Status.UnschedulableStatus.Unschedulable).To(gomega.BeFalse()) + g.Expect(durationStore.Pop(getUnitedDeploymentKey(ud))).To(gomega.BeEquivalentTo(0)) +} diff --git a/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go b/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go index e26dea22fc..37f779b5f6 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_controller_utils.go @@ -23,11 +23,11 @@ import ( "strconv" "strings" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/util/expectations" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" ) const updateRetries = 5 @@ -132,3 +132,22 @@ func filterOutCondition(conditions []appsv1alpha1.UnitedDeploymentCondition, con } return newConditions } + +var SubsetUnschedulableStatusExpectation = expectations.NewComparableVersionExpectation() + +func getUnitedDeploymentKey(ud *appsv1alpha1.UnitedDeployment) string { + return ud.GetNamespace() + "/" + ud.GetName() +} + +//type subsetUnschedulableRevisionAdapter struct{} +// +//func (r *subsetUnschedulableRevisionAdapter) EqualToRevisionHash(_ string, obj metav1.Object, version string) bool { +// ud := obj.(*appsv1alpha1.UnitedDeployment) +// return getSubsetUnschedulableStatusVersion(ud) == version +//} +// +//func (r *subsetUnschedulableRevisionAdapter) WriteRevisionHash(obj metav1.Object, hash string) { +// // No need to implement yet. +//} +// +//var SubsetUnschedulableStatusUpdateExpectation = expectations.NewUpdateExpectations(&subsetUnschedulableRevisionAdapter{}) diff --git a/pkg/controller/uniteddeployment/uniteddeployment_update.go b/pkg/controller/uniteddeployment/uniteddeployment_update.go index 024945ce98..ebc140ec2e 100644 --- a/pkg/controller/uniteddeployment/uniteddeployment_update.go +++ b/pkg/controller/uniteddeployment/uniteddeployment_update.go @@ -32,6 +32,16 @@ import ( func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeployment, nameToSubset *map[string]*Subset, nextUpdate map[string]SubsetUpdate, currentRevision, updatedRevision *appsv1.ControllerRevision, subsetType subSetType) (newStatus *appsv1alpha1.UnitedDeploymentStatus, updateErr error) { newStatus = ud.Status.DeepCopy() + + if ud.Spec.Topology.ScheduleStrategy.IsAdaptive() { + newUnschedulableStatus := buildUnschedulableStatus(nameToSubset, ud) + if oldUnschedulableStatus := ud.Status.SubsetUnschedulable; oldUnschedulableStatus == nil || oldUnschedulableStatus.Version < newUnschedulableStatus.Version { + klog.InfoS("unschedulable status expected", "old", oldUnschedulableStatus, "new", newUnschedulableStatus) + SubsetUnschedulableStatusExpectation.Expect(getUnitedDeploymentKey(ud), newUnschedulableStatus) + } + newStatus.SubsetUnschedulable = newUnschedulableStatus + } + exists, provisioned, err := r.manageSubsetProvision(ud, nameToSubset, nextUpdate, currentRevision, updatedRevision, subsetType) if err != nil { SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetProvisioned, corev1.ConditionFalse, "Error", err.Error())) @@ -58,6 +68,7 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym } } + var newPodCreated = false if len(needUpdate) > 0 { _, updateErr = util.SlowStartBatch(len(needUpdate), slowStartInitialBatchSize, func(index int) error { cell := needUpdate[index] @@ -72,6 +83,7 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym if updateSubsetErr != nil { r.recorder.Event(ud.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeSubsetsUpdate), fmt.Sprintf("Error updating PodSet (%s) %s when updating: %s", subsetType, subset.Name, updateSubsetErr)) } + newPodCreated = newPodCreated || subset.Spec.Replicas < replicas return updateSubsetErr }) } @@ -79,6 +91,11 @@ func (r *ReconcileUnitedDeployment) manageSubsets(ud *appsv1alpha1.UnitedDeploym if updateErr == nil { SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionTrue, "", "")) } else { + // If using an Adaptive scheduling strategy, when the subset is scaled out leading to the creation of new Pods, + // future potential scheduling failures need to be checked for rescheduling. + if strategy := ud.Spec.Topology.ScheduleStrategy; strategy.IsAdaptive() && newPodCreated { + durationStore.Push(getUnitedDeploymentKey(ud), strategy.GetRescheduleCriticalDuration()) + } SetUnitedDeploymentCondition(newStatus, NewUnitedDeploymentCondition(appsv1alpha1.SubsetUpdated, corev1.ConditionFalse, "Error", updateErr.Error())) } return @@ -132,6 +149,11 @@ func (r *ReconcileUnitedDeployment) manageSubsetProvision(ud *appsv1alpha1.Unite return nil }) if createdErr == nil { + // When a new subset is created, regardless of whether it contains newly created Pods, + // a requeue is triggered to treat it as an existing subset and update its unschedulable information. + if strategy := ud.Spec.Topology.ScheduleStrategy; strategy.IsAdaptive() { + durationStore.Push(getUnitedDeploymentKey(ud), strategy.GetRescheduleCriticalDuration()) + } r.recorder.Eventf(ud.DeepCopy(), corev1.EventTypeNormal, fmt.Sprintf("Successful%s", eventTypeSubsetsUpdate), "Create %d Subset (%s)", createdNum, subsetType) } else { errs = append(errs, createdErr) diff --git a/pkg/controller/util/pod_condition_utils.go b/pkg/controller/util/pod_condition_utils.go index 44fa65a7aa..ad5ff48ae2 100644 --- a/pkg/controller/util/pod_condition_utils.go +++ b/pkg/controller/util/pod_condition_utils.go @@ -2,6 +2,7 @@ package util import ( "encoding/json" + "time" v1 "k8s.io/api/core/v1" ) @@ -22,3 +23,24 @@ func UpdateMessageKvCondition(kv map[string]interface{}, condition *v1.PodCondit message, _ := json.Marshal(kv) condition.Message = string(message) } + +// PodPendingTimeout return true when Pod was scheduled failed and timeout. +// nextCheckAfter > 0 means the pod is failed to schedule but not timeout yet. +func PodPendingTimeout(pod *v1.Pod, timeout time.Duration) (timeouted bool, nextCheckAfter time.Duration) { + if pod.DeletionTimestamp != nil || pod.Status.Phase != v1.PodPending || pod.Spec.NodeName != "" { + return false, -1 + } + for _, condition := range pod.Status.Conditions { + if condition.Type == v1.PodScheduled && condition.Status == v1.ConditionFalse && + condition.Reason == v1.PodReasonUnschedulable { + currentTime := time.Now() + expectSchedule := pod.CreationTimestamp.Add(timeout) + // schedule timeout + if expectSchedule.Before(currentTime) { + return true, -1 + } + return false, expectSchedule.Sub(currentTime) + } + } + return false, -1 +} diff --git a/pkg/controller/workloadspread/reschedule.go b/pkg/controller/workloadspread/reschedule.go index 810a357c47..7ac8c30d97 100644 --- a/pkg/controller/workloadspread/reschedule.go +++ b/pkg/controller/workloadspread/reschedule.go @@ -20,6 +20,7 @@ import ( "context" "time" + "github.com/openkruise/kruise/pkg/controller/util" corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -111,26 +112,9 @@ func (r *ReconcileWorkloadSpread) deletePodsForSubset(ws *appsv1alpha1.WorkloadS // PodUnscheduledTimeout return true when Pod was scheduled failed and timeout. func PodUnscheduledTimeout(ws *appsv1alpha1.WorkloadSpread, pod *corev1.Pod) bool { - if pod.DeletionTimestamp != nil || pod.Status.Phase != corev1.PodPending || pod.Spec.NodeName != "" { - return false + timeouted, nextCheckAfter := util.PodPendingTimeout(pod, time.Second*time.Duration(*ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds)) + if nextCheckAfter > 0 { + durationStore.Push(getWorkloadSpreadKey(ws), nextCheckAfter) } - for _, condition := range pod.Status.Conditions { - if condition.Type == corev1.PodScheduled && condition.Status == corev1.ConditionFalse && - condition.Reason == corev1.PodReasonUnschedulable { - currentTime := time.Now() - rescheduleCriticalSeconds := *ws.Spec.ScheduleStrategy.Adaptive.RescheduleCriticalSeconds - - expectSchedule := pod.CreationTimestamp.Add(time.Second * time.Duration(rescheduleCriticalSeconds)) - // schedule timeout - if expectSchedule.Before(currentTime) { - return true - } - - // no timeout, requeue key when expectSchedule is equal to time.Now() - durationStore.Push(getWorkloadSpreadKey(ws), expectSchedule.Sub(currentTime)) - - return false - } - } - return false + return timeouted } diff --git a/pkg/util/expectations/comparable_version_expectation.go b/pkg/util/expectations/comparable_version_expectation.go new file mode 100644 index 0000000000..e1ad284fc2 --- /dev/null +++ b/pkg/util/expectations/comparable_version_expectation.go @@ -0,0 +1,107 @@ +/* +Copyright 2024 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package expectations + +import ( + "reflect" + "sync" +) + +type VersionComparable interface { + GetVersion() string + NotOlderThan(string) bool +} + +// ComparableVersionExpectation is used to address synchronization issues between concurrent reconciles +// that dependent on resource states. +// Before updating a resource, call Expect to declare that the resource will be updated. +// Before reading a resource, use Observe and Satisfied to ensure the resource has been updated. +// When a resource is deleted, make sure to call Delete to remove it from the cache. +type ComparableVersionExpectation interface { + // Expect Declares that the status is about to be updated to a newer version. + Expect(controllerKey string, obj VersionComparable) + // Observe detects whether the given version matches the expectation + Observe(controllerKey string, obj VersionComparable) + // Satisfied tells whether the expectation is satisfied, which means + // a version newer than the one when Expect is called is delected by Observe. + Satisfied(controllerKey string) bool + // Delete deletes the expectation + Delete(controllerKey string) +} + +func NewComparableVersionExpectation() ComparableVersionExpectation { + return &comparableVersionExpectation{ + objectVersions: make(map[string]*objectCacheVersions), + } +} + +type comparableVersionExpectation struct { + sync.Mutex + objectVersions map[string]*objectCacheVersions +} + +func isNil(i VersionComparable) bool { + return i == nil || (reflect.ValueOf(i).Kind() == reflect.Ptr && reflect.ValueOf(i).IsNil()) +} + +func (c *comparableVersionExpectation) Expect(controllerKey string, obj VersionComparable) { + c.Lock() + defer c.Unlock() + + if isNil(obj) { + return + } + expectations := c.objectVersions[controllerKey] + if expectations == nil { + c.objectVersions[controllerKey] = &objectCacheVersions{ + version: obj.GetVersion(), + } + return + } + if obj.NotOlderThan(c.objectVersions[controllerKey].version) { + c.objectVersions[controllerKey].version = obj.GetVersion() + } +} + +func (c *comparableVersionExpectation) Observe(controllerKey string, obj VersionComparable) { + c.Lock() + defer c.Unlock() + + if isNil(obj) { + return + } + expectations := c.objectVersions[controllerKey] + if expectations == nil { + return + } + if obj.NotOlderThan(expectations.version) { + delete(c.objectVersions, controllerKey) + } +} + +func (c *comparableVersionExpectation) Satisfied(controllerKey string) bool { + c.Lock() + defer c.Unlock() + + return c.objectVersions[controllerKey] == nil +} + +func (c *comparableVersionExpectation) Delete(controllerKey string) { + c.Lock() + defer c.Unlock() + delete(c.objectVersions, controllerKey) +} diff --git a/test/e2e/apps/sidecarset.go b/test/e2e/apps/sidecarset.go index d5dd53250a..d9a964e9d3 100644 --- a/test/e2e/apps/sidecarset.go +++ b/test/e2e/apps/sidecarset.go @@ -866,7 +866,10 @@ var _ = SIGDescribe("SidecarSet", func() { gomega.Expect(upgradeSpec1.SidecarSetName).To(gomega.Equal(sidecarSetIn.Name)) gomega.Expect(upgradeSpec1.SidecarSetHash).To(gomega.Equal(sidecarcontrol.GetSidecarSetRevision(sidecarSetIn))) target1 := sets.NewString(upgradeSpec1.SidecarList...) - gomega.Expect(reflect.DeepEqual(origin.List(), target1.List())).To(gomega.Equal(true)) + if equal := reflect.DeepEqual(origin.List(), target1.List()); !equal { + fmt.Printf("origin.List() != target1.List()\norigin: %v, target: %v\n", origin.List(), target1.List()) + gomega.Expect(equal).To(gomega.Equal(true)) + } // SidecarSetHashWithoutImageAnnotation = "kruise.io/sidecarset-hash-without-image" upgradeSpec2 := sidecarcontrol.GetPodSidecarSetUpgradeSpecInAnnotations(sidecarSetIn.Name, sidecarcontrol.SidecarSetHashWithoutImageAnnotation, pod) gomega.Expect(upgradeSpec2.SidecarSetName).To(gomega.Equal(sidecarSetIn.Name)) diff --git a/test/e2e/apps/uniteddeployment.go b/test/e2e/apps/uniteddeployment.go index 9d41cd9d01..181ba34aac 100644 --- a/test/e2e/apps/uniteddeployment.go +++ b/test/e2e/apps/uniteddeployment.go @@ -1,13 +1,19 @@ package apps import ( + "context" "fmt" "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" "github.com/openkruise/kruise/test/e2e/framework" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" ) var _ = SIGDescribe("uniteddeployment", func() { @@ -77,4 +83,118 @@ var _ = SIGDescribe("uniteddeployment", func() { udManager.Scale(1) udManager.CheckSubsets(replicasMap([]int32{0, 0, 1})) }) + + ginkgo.It("adaptive united deployment with elastic allocator", func() { + replicas := func(r int) *intstr.IntOrString { p := intstr.FromInt32(int32(r)); return &p } + replicasMap := func(replicas []int32) map[string]int32 { + replicaMap := make(map[string]int32) + for i, r := range replicas { + replicaMap[fmt.Sprintf("subset-%d", i)] = r + } + return replicaMap + } + unschedulableMap := func(unschedulables []bool) map[string]bool { + resultMap := make(map[string]bool) + for i, r := range unschedulables { + resultMap[fmt.Sprintf("subset-%d", i)] = r + } + return resultMap + } + + udManager := tester.NewUnitedDeploymentManager("adaptive-ud-elastic-test") + // enable adaptive scheduling + udManager.UnitedDeployment.Spec.Topology.ScheduleStrategy = appsv1alpha1.UnitedDeploymentScheduleStrategy{ + Type: appsv1alpha1.AdaptiveUnitedDeploymentScheduleStrategyType, + Adaptive: &appsv1alpha1.AdaptiveUnitedDeploymentStrategy{ + RescheduleCriticalSeconds: ptr.To(int32(30)), + UnschedulableLastSeconds: ptr.To(int32(30)), + }, + } + udManager.AddSubset("subset-0", nil, nil, replicas(2)) + udManager.AddSubset("subset-1", nil, nil, replicas(2)) + udManager.AddSubset("subset-2", nil, nil, nil) + // make subset-1 unschedulable + nodeKey := "ud-e2e/to-make-a-bad-subset-elastic" + udManager.UnitedDeployment.Spec.Topology.Subsets[1].NodeSelectorTerm = corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: nodeKey, + Operator: corev1.NodeSelectorOpExists, + }, + }, + } + + ginkgo.By("creating united deployment") + udManager.Spec.Replicas = ptr.To(int32(3)) + _, err := f.KruiseClientSet.AppsV1alpha1().UnitedDeployments(udManager.Namespace).Create(context.Background(), + udManager.UnitedDeployment, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("wait for rescheduling, will take long") + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, true, false})) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) + fmt.Println() + + ginkgo.By("scale up while unschedulable") + udManager.Scale(4) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 2})) + fmt.Println() + + ginkgo.By("scale down while unschedulable") + udManager.Scale(3) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 1})) + fmt.Println() + + ginkgo.By("wait subset recovery, will take long") + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + fmt.Println() + + ginkgo.By("scale up after recovery") + udManager.Scale(4) + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 1})) + fmt.Println() + + ginkgo.By("scale down after recovery") + udManager.Scale(3) + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 0})) // even pods in subset-1 are not ready + fmt.Println() + + ginkgo.By("create new subset") + udManager.AddSubset("subset-3", nil, replicas(2), nil) + udManager.Update() + fmt.Println() + + ginkgo.By("waiting final status after scaling up to new subset, will take long") + udManager.Scale(6) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false, false})) + udManager.CheckSubsetPods(replicasMap([]int32{2, 0, 2, 2})) + fmt.Println() + + ginkgo.By("fix subset-1 and wait recover") + nodeList, err := f.ClientSet.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + someNode := nodeList.Items[1] + someNode.Labels[nodeKey] = "haha" + _, err = f.ClientSet.CoreV1().Nodes().Update(context.Background(), &someNode, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + + ginkgo.By("waiting final status after deleting new subset") + udManager.Spec.Topology.Subsets = udManager.Spec.Topology.Subsets[:3] + udManager.Update() + udManager.CheckSubsetPods(replicasMap([]int32{2, 2, 2})) + fmt.Println() + + ginkgo.By("scale down after fixed") + udManager.Scale(3) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + udManager.CheckSubsetPods(replicasMap([]int32{2, 1, 0})) + fmt.Println() + + ginkgo.By("scale up after fixed") + udManager.Scale(5) + udManager.CheckUnschedulableStatus(unschedulableMap([]bool{false, false, false})) + udManager.CheckSubsetPods(replicasMap([]int32{2, 2, 1})) + fmt.Println() + }) }) diff --git a/test/e2e/framework/uniteddeployment.go b/test/e2e/framework/uniteddeployment.go index 02bab7b662..26d3f9be85 100644 --- a/test/e2e/framework/uniteddeployment.go +++ b/test/e2e/framework/uniteddeployment.go @@ -3,6 +3,9 @@ package framework import ( "context" "fmt" + "reflect" + "time" + "github.com/onsi/gomega" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" @@ -13,8 +16,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" "k8s.io/utils/pointer" - "reflect" - "time" ) type UnitedDeploymentTester struct { @@ -31,6 +32,8 @@ func NewUnitedDeploymentTester(c clientset.Interface, kc kruiseclientset.Interfa } } +var zero = int64(0) + func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *UnitedDeploymentManager { return &UnitedDeploymentManager{ UnitedDeployment: &appsv1alpha1.UnitedDeployment{ @@ -64,6 +67,7 @@ func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *United }, }, Spec: v1.PodSpec{ + TerminationGracePeriodSeconds: &zero, Containers: []v1.Container{ { Name: "busybox", @@ -81,12 +85,14 @@ func (t *UnitedDeploymentTester) NewUnitedDeploymentManager(name string) *United }, }, kc: t.kc, + c: t.c, } } type UnitedDeploymentManager struct { *appsv1alpha1.UnitedDeployment kc kruiseclientset.Interface + c clientset.Interface } func (m *UnitedDeploymentManager) AddSubset(name string, replicas, minReplicas, maxReplicas *intstr.IntOrString) { @@ -120,7 +126,12 @@ func (m *UnitedDeploymentManager) Create(replicas int32) { gomega.Eventually(func() bool { ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - return ud.Status.Replicas == replicas && ud.Generation == ud.Status.ObservedGeneration + ok := ud.Status.Replicas == replicas && ud.Generation == ud.Status.ObservedGeneration + if !ok { + fmt.Printf("UnitedDeploymentManager.Create failed\nud.Status.Replicas: %d, ud.Generation: %d, ud.Status.ObservedGeneration: %d\n", + ud.Status.Replicas, ud.Generation, ud.Status.ObservedGeneration) + } + return ok }, time.Minute, time.Second).Should(gomega.BeTrue()) } @@ -128,6 +139,72 @@ func (m *UnitedDeploymentManager) CheckSubsets(replicas map[string]int32) { gomega.Eventually(func() bool { ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - return ud.GetGeneration() == ud.Status.ObservedGeneration && *ud.Spec.Replicas == ud.Status.Replicas && reflect.DeepEqual(replicas, ud.Status.SubsetReplicas) + ok := ud.GetGeneration() == ud.Status.ObservedGeneration && *ud.Spec.Replicas == ud.Status.Replicas && reflect.DeepEqual(replicas, ud.Status.SubsetReplicas) + if !ok { + fmt.Printf("UnitedDeploymentManager.CheckSubsets failed\nud.GetGeneration(): %d, ud.Status.ObservedGeneration: %d, *ud.Spec.Replicas: %d, ud.Status.Replicas: %d, ud.Status.SubsetReplicas: %v\n", ud.GetGeneration(), + ud.Status.ObservedGeneration, *ud.Spec.Replicas, ud.Status.Replicas, ud.Status.SubsetReplicas) + } + return ok + }, 3*time.Minute, time.Second).Should(gomega.BeTrue()) +} + +func (m *UnitedDeploymentManager) Update() { + gomega.Eventually(func(g gomega.Gomega) { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.Background(), m.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + ud.Spec = m.UnitedDeployment.DeepCopy().Spec + _, err = m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Update(context.Background(), ud, metav1.UpdateOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + }, time.Minute, time.Second).Should(gomega.Succeed()) +} + +func (m *UnitedDeploymentManager) CheckSubsetPods(expect map[string]int32) { + fmt.Print("CheckSubsetPods ") + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func(g gomega.Gomega) { + actual := map[string]int32{} + for _, subset := range ud.Spec.Topology.Subsets { + podList, err := m.c.CoreV1().Pods(m.Namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("apps.kruise.io/subset-name=%s", subset.Name), + }) + g.Expect(err).NotTo(gomega.HaveOccurred()) + actual[subset.Name] = int32(len(podList.Items)) + } + g.Expect(expect).To(gomega.BeEquivalentTo(actual)) + }, 3*time.Minute, 500*time.Millisecond).Should(gomega.Succeed()) + fmt.Println("pass") +} + +func (m *UnitedDeploymentManager) CheckUnschedulableStatus(expect map[string]bool) { + fmt.Print("CheckUnschedulableStatus ") + gomega.Eventually(func(g gomega.Gomega) { + ud, err := m.kc.AppsV1alpha1().UnitedDeployments(m.Namespace).Get(context.TODO(), m.Name, metav1.GetOptions{}) + g.Expect(err).NotTo(gomega.HaveOccurred()) + g.Expect(ud.Status.SubsetUnschedulable != nil).To(gomega.BeTrue()) + actual := map[string]bool{} + for name := range expect { + actual[name] = ud.Status.SubsetUnschedulable.Status[name].Unschedulable + } + g.Expect(expect).To(gomega.BeEquivalentTo(actual)) + }, 3*time.Minute, 500*time.Millisecond).Should(gomega.Succeed()) + fmt.Println("pass") +} + +func (m *UnitedDeploymentManager) EnsureImage() { + job, err := m.kc.AppsV1alpha1().ImagePullJobs(m.Namespace).Create(context.Background(), &appsv1alpha1.ImagePullJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ud-test-" + m.Name, + Namespace: m.Namespace, + }, + Spec: appsv1alpha1.ImagePullJobSpec{ + Image: "busybox:1.32", + }, + }, metav1.CreateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Eventually(func() bool { + job, err := m.kc.AppsV1alpha1().ImagePullJobs(m.Namespace).Get(context.Background(), job.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + return !job.Status.CompletionTime.IsZero() && len(job.Status.FailedNodes) == 0 }, time.Minute, time.Second).Should(gomega.BeTrue()) }