Skip to content

Commit

Permalink
fix conversation
Browse files Browse the repository at this point in the history
Signed-off-by: AiRanthem <[email protected]>
  • Loading branch information
AiRanthem committed Sep 27, 2024
1 parent 378185c commit 55a55dd
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 56 deletions.
11 changes: 9 additions & 2 deletions apis/apps/v1alpha1/uniteddeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,15 @@ func (s *UnitedDeploymentStatus) GetSubsetStatus(subset string) *UnitedDeploymen
return &s.SubsetStatuses[i]
}
}
s.SubsetStatuses = append(s.SubsetStatuses, UnitedDeploymentSubsetStatus{Name: subset})
return &s.SubsetStatuses[len(s.SubsetStatuses)-1]
return nil
}

func (u *UnitedDeployment) InitSubsetStatuses() {
for _, subset := range u.Spec.Topology.Subsets {
if u.Status.GetSubsetStatus(subset.Name) == nil {
u.Status.SubsetStatuses = append(u.Status.SubsetStatuses, UnitedDeploymentSubsetStatus{Name: subset.Name})
}
}
}

// UnitedDeploymentCondition describes current state of a UnitedDeployment.
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/uniteddeployment/adapter/adapter_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package adapter
import (
"fmt"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"k8s.io/kubernetes/pkg/controller"
)

func getSubsetPrefix(controllerName, subsetName string) string {
Expand Down Expand Up @@ -104,7 +104,7 @@ func CalculateUpdatedReplicas(podList []*corev1.Pod, updatedRevision string) (up
revision := getRevision(&pod.ObjectMeta)

// Only count pods that are updated and are not terminating
if revision == updatedRevision && pod.GetDeletionTimestamp() == nil {
if revision == updatedRevision && controller.IsPodActive(pod) {
updatedReplicas++
if podutil.IsPodReady(pod) {
updatedReadyReplicas++
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/uniteddeployment/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewReplicaAllocator(ud *appsv1alpha1.UnitedDeployment) ReplicaAllocator {
// NotPendingReplicas refers to the number of Pods that an unschedulable subset can safely accommodate.
// Exceeding this number may lead to scheduling failures within that subset.
// This value is only effective in the Adaptive scheduling strategy.
func getNotPendingReplicasMap(nameToSubset *map[string]*Subset) map[string]int32 {
func getSubsetRunningReplicas(nameToSubset *map[string]*Subset) map[string]int32 {
if nameToSubset == nil {
return nil
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func (ac *elasticAllocator) validateAndCalculateMinMaxMap(replicas int32, nameTo
numSubset := len(ac.Spec.Topology.Subsets)
minReplicasMap := make(map[string]int32, numSubset)
maxReplicasMap := make(map[string]int32, numSubset)
notPendingReplicasMap := getNotPendingReplicasMap(nameToSubset)
notPendingReplicasMap := getSubsetRunningReplicas(nameToSubset)
for index, subset := range ac.Spec.Topology.Subsets {
minReplicas := int32(0)
maxReplicas := int32(math.MaxInt32)
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/uniteddeployment/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,30 +348,37 @@ func TestAdaptiveElasticAllocator(t *testing.T) {
}
}
cases := []struct {
name string
totalReplicas, minReplicas, maxReplicas, pendingPods int32
subset1Replicas, subset2Replicas int32
}{
{
name: "5 pending pods > maxReplicas -> 0, 10",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 5,
subset1Replicas: 0, subset2Replicas: 10,
},
{
name: "4 pending pods = maxReplicas -> 0, 10",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 4,
subset1Replicas: 0, subset2Replicas: 10,
},
{
name: "3 pending pods < maxReplicas -> 1, 9",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 3,
subset1Replicas: 1, subset2Replicas: 9,
},
{
name: "2 pending pods = minReplicas -> 2, 8",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 2,
subset1Replicas: 2, subset2Replicas: 8,
},
{
name: "1 pending pods < minReplicas -> 3, 7",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 1,
subset1Replicas: 3, subset2Replicas: 7,
},
{
name: "no pending pods -> 2, 8",
totalReplicas: 10, minReplicas: 2, maxReplicas: 4, pendingPods: 0,
subset1Replicas: 4, subset2Replicas: 6,
},
Expand Down
13 changes: 7 additions & 6 deletions pkg/controller/uniteddeployment/uniteddeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,10 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
klog.InfoS("resource version not up-to-date, requeue in 1s", "resourceVersion", instance.GetResourceVersion())
return reconcile.Result{RequeueAfter: time.Second}, nil
}
klog.InfoS("Updated Resource observed", "unitedDeployment", klog.KObj(instance), "ResourceVersion", instance.GetResourceVersion())

oldStatus := instance.Status.DeepCopy()
for _, subset := range instance.Spec.Topology.Subsets {
instance.Status.GetSubsetStatus(subset.Name) // ensure subset status exists
}
instance.InitSubsetStatuses()
currentRevision, updatedRevision, _, collisionCount, err := r.constructUnitedDeploymentRevisions(instance)
if err != nil {
klog.ErrorS(err, "Failed to construct controller revision of UnitedDeployment", "unitedDeployment", klog.KObj(instance))
Expand Down Expand Up @@ -251,10 +250,8 @@ func (r *ReconcileUnitedDeployment) Reconcile(_ context.Context, request reconci
nextUpdate := getNextUpdate(instance, nextReplicas, nextPartitions)
klog.V(4).InfoS("Got UnitedDeployment next update", "unitedDeployment", klog.KObj(instance), "nextUpdate", nextUpdate)

ResourceVersionExpectation.Expect(instance)
newStatus, err := r.manageSubsets(instance, nameToSubset, nextUpdate, currentRevision, updatedRevision, subsetType)
if err != nil {
ResourceVersionExpectation.Delete(instance)
klog.ErrorS(err, "Failed to update UnitedDeployment", "unitedDeployment", klog.KObj(instance))
r.recorder.Event(instance.DeepCopy(), corev1.EventTypeWarning, fmt.Sprintf("Failed%s", eventTypeSubsetsUpdate), err.Error())
return reconcile.Result{}, err
Expand Down Expand Up @@ -454,7 +451,11 @@ func (r *ReconcileUnitedDeployment) classifySubsetBySubsetName(ud *appsv1alpha1.

func (r *ReconcileUnitedDeployment) updateStatus(instance *appsv1alpha1.UnitedDeployment, newStatus, oldStatus *appsv1alpha1.UnitedDeploymentStatus, nameToSubset *map[string]*Subset, nextReplicas, nextPartition *map[string]int32, currentRevision, updatedRevision *appsv1.ControllerRevision, collisionCount int32, control ControlInterface) error {
newStatus = r.calculateStatus(newStatus, nameToSubset, nextReplicas, nextPartition, currentRevision, updatedRevision, collisionCount, control)
_, err := r.updateUnitedDeployment(instance, oldStatus, newStatus)
newObj, err := r.updateUnitedDeployment(instance, oldStatus, newStatus)
if err == nil && newObj != nil {
ResourceVersionExpectation.Expect(newObj)
klog.InfoS("new resource version expected", "UnitedDeployment", klog.KObj(newObj), "ResourceVersion", newObj.GetResourceVersion())
}
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,4 @@ func getUnitedDeploymentKey(ud *appsv1alpha1.UnitedDeployment) string {
return ud.GetNamespace() + "/" + ud.GetName()
}

var ResourceVersionExpectation = expectations.NewReallyNewerResourceVersionExpectation()
var ResourceVersionExpectation = expectations.NewResourceVersionExpectation()
42 changes: 0 additions & 42 deletions pkg/util/expectations/resource_version_expectation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ func NewResourceVersionExpectation() ResourceVersionExpectation {
return &realResourceVersionExpectation{objectVersions: make(map[types.UID]*objectCacheVersions, 100)}
}

func NewReallyNewerResourceVersionExpectation() ResourceVersionExpectation {
return &reallyNewerResourceVersionExpectation{
realResourceVersionExpectation: realResourceVersionExpectation{
objectVersions: make(map[types.UID]*objectCacheVersions, 100),
}}
}

type realResourceVersionExpectation struct {
sync.Mutex
objectVersions map[types.UID]*objectCacheVersions
Expand Down Expand Up @@ -126,38 +119,3 @@ func isResourceVersionNewer(old, new string) bool {

return newCount >= oldCount
}

type reallyNewerResourceVersionExpectation struct {
realResourceVersionExpectation
}

func (r *reallyNewerResourceVersionExpectation) Observe(obj metav1.Object) {
r.Lock()
defer r.Unlock()

expectations := r.objectVersions[obj.GetUID()]
if expectations == nil {
return
}
if isResourceVersionReallyNewer(r.objectVersions[obj.GetUID()].version, obj.GetResourceVersion()) {
delete(r.objectVersions, obj.GetUID())
}
}

func isResourceVersionReallyNewer(old, new string) bool {
if len(old) == 0 {
return true
}

oldCount, err := strconv.ParseUint(old, 10, 64)
if err != nil {
return true
}

newCount, err := strconv.ParseUint(new, 10, 64)
if err != nil {
return false
}

return newCount > oldCount
}

0 comments on commit 55a55dd

Please sign in to comment.