From 81f3bbdd84b6abd4b91ca2146287520f07419ba0 Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Wed, 24 Jul 2024 11:51:17 +0200 Subject: [PATCH 1/2] [KNI][[REVERT][downstream] nrt: add optional kni-specific informer This reverts commit 0338ae3e095a45efd9c76ec43cc235e662b17eec. --- cmd/noderesourcetopology-plugin/main.go | 3 - pkg-kni/podinformer/podinformer.go | 89 ----------------------- pkg/noderesourcetopology/cache/store.go | 4 +- pkg/noderesourcetopology/pluginhelpers.go | 4 +- 4 files changed, 2 insertions(+), 98 deletions(-) delete mode 100644 pkg-kni/podinformer/podinformer.go diff --git a/cmd/noderesourcetopology-plugin/main.go b/cmd/noderesourcetopology-plugin/main.go index d32e55676..372297a1b 100644 --- a/cmd/noderesourcetopology-plugin/main.go +++ b/cmd/noderesourcetopology-plugin/main.go @@ -36,7 +36,6 @@ import ( _ "sigs.k8s.io/scheduler-plugins/apis/config/scheme" knifeatures "sigs.k8s.io/scheduler-plugins/pkg-kni/features" - kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" "github.com/k8stopologyawareschedwg/podfingerprint" ) @@ -66,8 +65,6 @@ func main() { rand.Seed(time.Now().UnixNano()) - kniinformer.Setup() - // Register custom plugins to the scheduler framework. // Later they can consist of scheduler profile(s) and hence // used by various kinds of workloads. diff --git a/pkg-kni/podinformer/podinformer.go b/pkg-kni/podinformer/podinformer.go deleted file mode 100644 index 10e9bbe54..000000000 --- a/pkg-kni/podinformer/podinformer.go +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2023 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package podinformer - -import ( - "context" - "os" - "strconv" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coreinformers "k8s.io/client-go/informers/core/v1" - podlisterv1 "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" - k8scache "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler/framework" -) - -const ( - nrtInformerEnvVar string = "NRT_ENABLE_INFORMER" -) - -var ( - enabled bool -) - -func IsEnabled() bool { - return enabled -} - -func Setup() { - hasNRTInf, ok := os.LookupEnv(nrtInformerEnvVar) - if !ok || hasNRTInf == "" { - klog.InfoS("NRT specific informer disabled", "variableFound", ok, "valueGiven", hasNRTInf != "") - return - } - val, err := strconv.ParseBool(hasNRTInf) - if err != nil { - klog.Error(err, "NRT specific informer disabled") - return - } - klog.InfoS("NRT specific informer status", "value", val) - enabled = val -} - -func FromHandle(handle framework.Handle) (k8scache.SharedIndexInformer, podlisterv1.PodLister) { - if !IsEnabled() { - podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut - return podHandle.Informer(), podHandle.Lister() - } - - podInformer := coreinformers.NewFilteredPodInformer(handle.ClientSet(), metav1.NamespaceAll, 0, cache.Indexers{}, nil) - podLister := podlisterv1.NewPodLister(podInformer.GetIndexer()) - - klog.V(5).InfoS("Start custom pod informer") - ctx := context.Background() - go podInformer.Run(ctx.Done()) - - klog.V(5).InfoS("Syncing custom pod informer") - cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) - klog.V(5).InfoS("Synced custom pod informer") - - return podInformer, podLister -} - -func IsPodRelevantForState(pod *corev1.Pod) bool { - if pod == nil { - return false // should never happen - } - if IsEnabled() { - return true // consider all pods including ones in terminal phase - } - return pod.Status.Phase == corev1.PodRunning // we are interested only about nodes which consume resources -} diff --git a/pkg/noderesourcetopology/cache/store.go b/pkg/noderesourcetopology/cache/store.go index 2741017c5..74534c7db 100644 --- a/pkg/noderesourcetopology/cache/store.go +++ b/pkg/noderesourcetopology/cache/store.go @@ -33,8 +33,6 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" "sigs.k8s.io/scheduler-plugins/pkg/util" - - kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" ) // nrtStore maps the NRT data by node name. It is not thread safe and needs to be protected by a lock. @@ -255,7 +253,7 @@ func makeNodeToPodDataMap(podLister podlisterv1.PodLister, logID string) (map[st return nodeToObjsMap, err } for _, pod := range pods { - if !kniinformer.IsPodRelevantForState(pod) { + if pod.Status.Phase != corev1.PodRunning { // we are interested only about nodes which consume resources continue } diff --git a/pkg/noderesourcetopology/pluginhelpers.go b/pkg/noderesourcetopology/pluginhelpers.go index 4e334fb2d..9a9fea9bf 100644 --- a/pkg/noderesourcetopology/pluginhelpers.go +++ b/pkg/noderesourcetopology/pluginhelpers.go @@ -33,8 +33,6 @@ import ( apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" - - kniinformer "sigs.k8s.io/scheduler-plugins/pkg-kni/podinformer" ) const ( @@ -56,7 +54,7 @@ func initNodeTopologyInformer(tcfg *apiconfig.NodeResourceTopologyMatchArgs, han return nrtcache.NewPassthrough(client), nil } - podSharedInformer, podLister := kniinformer.FromHandle(handle) + podSharedInformer, podLister := nrtcache.InformerFromHandle(handle) nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister) if err != nil { From ba793a9e0cd5f98189c9df5d2c81af70e8e4d38d Mon Sep 17 00:00:00 2001 From: Francesco Romani Date: Tue, 27 Jun 2023 18:58:15 +0200 Subject: [PATCH 2/2] [KNI][CARRY] nrt: cache: add support for dedicated informer Add an option to create and use a separate informer to get unfiltered pod listing from the apiserver. Due to mismatched view with respect to the kubelet, the plugin needs access to pod in terminal phase which are not deleted to make sure the reconciliation is done correctly. xref: https://github.com/kubernetes-sigs/scheduler-plugins/issues/598 xref: https://github.com/kubernetes/kubernetes/issues/119423 Signed-off-by: Francesco Romani (cherry picked from commit e466bc41e31315ca9f9391c8752dc7eae42b4599) --- apis/config/types.go | 13 + apis/config/v1/defaults.go | 5 + apis/config/v1/defaults_test.go | 1 + apis/config/v1/types.go | 13 + apis/config/v1/zz_generated.conversion.go | 2 + apis/config/v1/zz_generated.deepcopy.go | 5 + apis/config/v1beta3/defaults.go | 5 + apis/config/v1beta3/defaults_test.go | 1 + apis/config/v1beta3/types.go | 13 + .../config/v1beta3/zz_generated.conversion.go | 2 + apis/config/v1beta3/zz_generated.deepcopy.go | 5 + apis/config/zz_generated.deepcopy.go | 5 + pkg/noderesourcetopology/cache/overreserve.go | 37 ++- .../cache/overreserve_test.go | 248 +++++++++++++++++- pkg/noderesourcetopology/cache/store.go | 25 -- pkg/noderesourcetopology/cache/store_test.go | 129 --------- pkg/noderesourcetopology/pluginhelpers.go | 5 +- .../podprovider/podprovider.go | 92 +++++++ 18 files changed, 437 insertions(+), 169 deletions(-) create mode 100644 pkg/noderesourcetopology/podprovider/podprovider.go diff --git a/apis/config/types.go b/apis/config/types.go index 959c80fd8..5c6778508 100644 --- a/apis/config/types.go +++ b/apis/config/types.go @@ -176,6 +176,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -192,6 +200,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Dedicated" + InformerMode *CacheInformerMode } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1/defaults.go b/apis/config/v1/defaults.go index 6f1b4df5b..175f7aeb2 100644 --- a/apis/config/v1/defaults.go +++ b/apis/config/v1/defaults.go @@ -89,6 +89,8 @@ var ( defaultResyncMethod = CacheResyncAutodetect + defaultInformerMode = CacheInformerDedicated + // Defaults for NetworkOverhead // DefaultWeightsName contains the default costs to be used by networkAware plugins DefaultWeightsName = "UserDefined" @@ -200,6 +202,9 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg if obj.Cache.ResyncMethod == nil { obj.Cache.ResyncMethod = &defaultResyncMethod } + if obj.Cache.InformerMode == nil { + obj.Cache.InformerMode = &defaultInformerMode + } } // SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs diff --git a/apis/config/v1/defaults_test.go b/apis/config/v1/defaults_test.go index f964eab8d..6f42a78db 100644 --- a/apis/config/v1/defaults_test.go +++ b/apis/config/v1/defaults_test.go @@ -205,6 +205,7 @@ func TestSchedulingDefaults(t *testing.T) { Cache: &NodeResourceTopologyCache{ ForeignPodsDetect: &defaultForeignPodsDetect, ResyncMethod: &defaultResyncMethod, + InformerMode: &defaultInformerMode, }, }, }, diff --git a/apis/config/v1/types.go b/apis/config/v1/types.go index a0e92d4bb..be95eb08e 100644 --- a/apis/config/v1/types.go +++ b/apis/config/v1/types.go @@ -174,6 +174,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -190,6 +198,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"` + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Dedicated" + InformerMode *CacheInformerMode `json:"informerMode,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1/zz_generated.conversion.go b/apis/config/v1/zz_generated.conversion.go index 4c8388b74..90f58e791 100644 --- a/apis/config/v1/zz_generated.conversion.go +++ b/apis/config/v1/zz_generated.conversion.go @@ -344,6 +344,7 @@ func Convert_config_NetworkOverheadArgs_To_v1_NetworkOverheadArgs(in *config.Net func autoConvert_v1_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in *NodeResourceTopologyCache, out *config.NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*config.ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*config.CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*config.CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } @@ -355,6 +356,7 @@ func Convert_v1_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in func autoConvert_config_NodeResourceTopologyCache_To_v1_NodeResourceTopologyCache(in *config.NodeResourceTopologyCache, out *NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } diff --git a/apis/config/v1/zz_generated.deepcopy.go b/apis/config/v1/zz_generated.deepcopy.go index be4875698..0553952b4 100644 --- a/apis/config/v1/zz_generated.deepcopy.go +++ b/apis/config/v1/zz_generated.deepcopy.go @@ -220,6 +220,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/apis/config/v1beta3/defaults.go b/apis/config/v1beta3/defaults.go index eed0492ff..c2207717c 100644 --- a/apis/config/v1beta3/defaults.go +++ b/apis/config/v1beta3/defaults.go @@ -89,6 +89,8 @@ var ( defaultResyncMethod = CacheResyncAutodetect + defaultInformerMode = CacheInformerDedicated + // Defaults for NetworkOverhead // DefaultWeightsName contains the default costs to be used by networkAware plugins DefaultWeightsName = "UserDefined" @@ -200,6 +202,9 @@ func SetDefaults_NodeResourceTopologyMatchArgs(obj *NodeResourceTopologyMatchArg if obj.Cache.ResyncMethod == nil { obj.Cache.ResyncMethod = &defaultResyncMethod } + if obj.Cache.InformerMode == nil { + obj.Cache.InformerMode = &defaultInformerMode + } } // SetDefaults_PreemptionTolerationArgs reuses SetDefaults_DefaultPreemptionArgs diff --git a/apis/config/v1beta3/defaults_test.go b/apis/config/v1beta3/defaults_test.go index 9813b416d..90aa95df7 100644 --- a/apis/config/v1beta3/defaults_test.go +++ b/apis/config/v1beta3/defaults_test.go @@ -205,6 +205,7 @@ func TestSchedulingDefaults(t *testing.T) { Cache: &NodeResourceTopologyCache{ ForeignPodsDetect: &defaultForeignPodsDetect, ResyncMethod: &defaultResyncMethod, + InformerMode: &defaultInformerMode, }, }, }, diff --git a/apis/config/v1beta3/types.go b/apis/config/v1beta3/types.go index 4c3b2fc9f..659b0241a 100644 --- a/apis/config/v1beta3/types.go +++ b/apis/config/v1beta3/types.go @@ -174,6 +174,14 @@ const ( CacheResyncOnlyExclusiveResources CacheResyncMethod = "OnlyExclusiveResources" ) +// CacheInformerMode is a "string" type +type CacheInformerMode string + +const ( + CacheInformerShared CacheInformerMode = "Shared" + CacheInformerDedicated CacheInformerMode = "Dedicated" +) + // NodeResourceTopologyCache define configuration details for the NodeResourceTopology cache. type NodeResourceTopologyCache struct { // ForeignPodsDetect sets how foreign pods should be handled. @@ -190,6 +198,11 @@ type NodeResourceTopologyCache struct { // Has no effect if caching is disabled (CacheResyncPeriod is zero) or if DiscardReservedNodes // is enabled. "Autodetect" is the default, reads hint from NRT objects. Fallback is "All". ResyncMethod *CacheResyncMethod `json:"resyncMethod,omitempty"` + // InformerMode controls the channel the cache uses to get updates about pods. + // "Shared" uses the default settings; "Dedicated" creates a specific subscription which is + // guaranteed to best suit the cache needs, at cost of one extra connection. + // If unspecified, default is "Dedicated" + InformerMode *CacheInformerMode `json:"informerMode,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/apis/config/v1beta3/zz_generated.conversion.go b/apis/config/v1beta3/zz_generated.conversion.go index 5ed3b6a64..2a7c9eb8b 100644 --- a/apis/config/v1beta3/zz_generated.conversion.go +++ b/apis/config/v1beta3/zz_generated.conversion.go @@ -344,6 +344,7 @@ func Convert_config_NetworkOverheadArgs_To_v1beta3_NetworkOverheadArgs(in *confi func autoConvert_v1beta3_NodeResourceTopologyCache_To_config_NodeResourceTopologyCache(in *NodeResourceTopologyCache, out *config.NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*config.ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*config.CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*config.CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } @@ -355,6 +356,7 @@ func Convert_v1beta3_NodeResourceTopologyCache_To_config_NodeResourceTopologyCac func autoConvert_config_NodeResourceTopologyCache_To_v1beta3_NodeResourceTopologyCache(in *config.NodeResourceTopologyCache, out *NodeResourceTopologyCache, s conversion.Scope) error { out.ForeignPodsDetect = (*ForeignPodsDetectMode)(unsafe.Pointer(in.ForeignPodsDetect)) out.ResyncMethod = (*CacheResyncMethod)(unsafe.Pointer(in.ResyncMethod)) + out.InformerMode = (*CacheInformerMode)(unsafe.Pointer(in.InformerMode)) return nil } diff --git a/apis/config/v1beta3/zz_generated.deepcopy.go b/apis/config/v1beta3/zz_generated.deepcopy.go index 377aef4c9..3773eb083 100644 --- a/apis/config/v1beta3/zz_generated.deepcopy.go +++ b/apis/config/v1beta3/zz_generated.deepcopy.go @@ -220,6 +220,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/apis/config/zz_generated.deepcopy.go b/apis/config/zz_generated.deepcopy.go index a783ada93..c2db32ee0 100644 --- a/apis/config/zz_generated.deepcopy.go +++ b/apis/config/zz_generated.deepcopy.go @@ -170,6 +170,11 @@ func (in *NodeResourceTopologyCache) DeepCopyInto(out *NodeResourceTopologyCache *out = new(CacheResyncMethod) **out = **in } + if in.InformerMode != nil { + in, out := &in.InformerMode, &out.InformerMode + *out = new(CacheInformerMode) + **out = **in + } return } diff --git a/pkg/noderesourcetopology/cache/overreserve.go b/pkg/noderesourcetopology/cache/overreserve.go index 68f754f55..3c373e9f2 100644 --- a/pkg/noderesourcetopology/cache/overreserve.go +++ b/pkg/noderesourcetopology/cache/overreserve.go @@ -27,15 +27,16 @@ import ( "github.com/k8stopologyawareschedwg/podfingerprint" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" podlisterv1 "k8s.io/client-go/listers/core/v1" - k8scache "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler/framework" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" ) @@ -50,9 +51,10 @@ type OverReserve struct { nodesWithForeignPods counter podLister podlisterv1.PodLister resyncMethod apiconfig.CacheResyncMethod + isPodRelevant podprovider.PodFilterFunc } -func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister) (*OverReserve, error) { +func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient.Client, podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc) (*OverReserve, error) { if client == nil || podLister == nil { return nil, fmt.Errorf("nrtcache: received nil references") } @@ -74,6 +76,7 @@ func NewOverReserve(cfg *apiconfig.NodeResourceTopologyCache, client ctrlclient. nodesWithForeignPods: newCounter(), podLister: podLister, resyncMethod: resyncMethod, + isPodRelevant: isPodRelevant, } return obj, nil } @@ -199,7 +202,7 @@ func (ov *OverReserve) Resync() { } // node -> pod identifier (namespace, name) - nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, logID) + nodeToObjsMap, err := makeNodeToPodDataMap(ov.podLister, ov.isPodRelevant, logID) if err != nil { klog.ErrorS(err, "cannot find the mapping between running pods and nodes") return @@ -267,16 +270,32 @@ func (ov *OverReserve) FlushNodes(logID string, nrts ...*topologyv1alpha2.NodeRe } } -func InformerFromHandle(handle framework.Handle) (k8scache.SharedIndexInformer, podlisterv1.PodLister) { - podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut - return podHandle.Informer(), podHandle.Lister() -} - // to be used only in tests func (ov *OverReserve) Store() *nrtStore { return ov.nrts } +func makeNodeToPodDataMap(podLister podlisterv1.PodLister, isPodRelevant podprovider.PodFilterFunc, logID string) (map[string][]podData, error) { + nodeToObjsMap := make(map[string][]podData) + pods, err := podLister.List(labels.Everything()) + if err != nil { + return nodeToObjsMap, err + } + for _, pod := range pods { + if !isPodRelevant(pod, logID) { + continue + } + nodeObjs := nodeToObjsMap[pod.Spec.NodeName] + nodeObjs = append(nodeObjs, podData{ + Namespace: pod.Namespace, + Name: pod.Name, + HasExclusiveResources: resourcerequests.AreExclusiveForPod(pod), + }) + nodeToObjsMap[pod.Spec.NodeName] = nodeObjs + } + return nodeToObjsMap, nil +} + func logIDFromTime() string { return fmt.Sprintf("resync%v", time.Now().UnixMilli()) } diff --git a/pkg/noderesourcetopology/cache/overreserve_test.go b/pkg/noderesourcetopology/cache/overreserve_test.go index 9d0599ed8..ec48e2bae 100644 --- a/pkg/noderesourcetopology/cache/overreserve_test.go +++ b/pkg/noderesourcetopology/cache/overreserve_test.go @@ -22,6 +22,7 @@ import ( "sort" "testing" + "github.com/google/go-cmp/cmp" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" "github.com/k8stopologyawareschedwg/podfingerprint" @@ -34,6 +35,7 @@ import ( ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" tu "sigs.k8s.io/scheduler-plugins/test/util" ) @@ -102,12 +104,12 @@ func TestInitEmptyLister(t *testing.T) { fakePodLister := &fakePodLister{} - _, err = NewOverReserve(nil, nil, fakePodLister) + _, err = NewOverReserve(nil, nil, fakePodLister, podprovider.IsPodRelevantAlways) if err == nil { t.Fatalf("accepted nil lister") } - _, err = NewOverReserve(nil, fakeClient, nil) + _, err = NewOverReserve(nil, fakeClient, nil, podprovider.IsPodRelevantAlways) if err == nil { t.Fatalf("accepted nil indexer") } @@ -226,7 +228,7 @@ func TestOverreserveGetCachedNRTCopy(t *testing.T) { checkGetCachedNRTCopy( t, func(client ctrlclient.Client, podLister podlisterv1.PodLister) (Interface, error) { - return NewOverReserve(nil, client, podLister) + return NewOverReserve(nil, client, podLister, podprovider.IsPodRelevantAlways) }, testCases..., ) @@ -724,9 +726,247 @@ func TestNodeWithForeignPods(t *testing.T) { } func mustOverReserve(t *testing.T, client ctrlclient.Client, podLister podlisterv1.PodLister) *OverReserve { - obj, err := NewOverReserve(nil, client, podLister) + obj, err := NewOverReserve(nil, client, podLister, podprovider.IsPodRelevantAlways) if err != nil { t.Fatalf("unexpected error creating cache: %v", err) } return obj } + +func TestMakeNodeToPodDataMap(t *testing.T) { + tcases := []struct { + description string + pods []*corev1.Pod + isPodRelevant podprovider.PodFilterFunc + err error + expected map[string][]podData + expectedErr error + }{ + { + description: "empty pod list - shared", + isPodRelevant: podprovider.IsPodRelevantShared, + expected: make(map[string][]podData), + }, + { + description: "empty pod list - dedicated", + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: make(map[string][]podData), + }, + { + description: "single pod NOT running - succeeded (kubernetes jobs) - dedicated", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "single pod NOT running - failed", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "single pod NOT running - succeeded (kubernetes jobs) - shared", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodSucceeded, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantShared, + expected: map[string][]podData{}, + }, + { + description: "single pod NOT running - failed - shared", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantShared, + expected: map[string][]podData{}, + }, + { + description: "single pod running - dedicated", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "single pod running - shared", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + }, + }, + }, + { + description: "few pods, single node running - dedicated", + pods: []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace1", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace2", + Name: "pod2", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "namespace2", + Name: "pod3", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + }, + isPodRelevant: podprovider.IsPodRelevantDedicated, + expected: map[string][]podData{ + "node1": { + { + Namespace: "namespace1", + Name: "pod1", + }, + { + Namespace: "namespace2", + Name: "pod2", + }, + { + Namespace: "namespace2", + Name: "pod3", + }, + }, + }, + }, + } + + for _, tcase := range tcases { + t.Run(tcase.description, func(t *testing.T) { + podLister := &fakePodLister{ + pods: tcase.pods, + err: tcase.err, + } + got, err := makeNodeToPodDataMap(podLister, tcase.isPodRelevant, tcase.description) + if err != tcase.expectedErr { + t.Errorf("error mismatch: got %v expected %v", err, tcase.expectedErr) + } + if diff := cmp.Diff(got, tcase.expected); diff != "" { + t.Errorf("unexpected result: %v", diff) + } + }) + } +} diff --git a/pkg/noderesourcetopology/cache/store.go b/pkg/noderesourcetopology/cache/store.go index 74534c7db..8ff78b321 100644 --- a/pkg/noderesourcetopology/cache/store.go +++ b/pkg/noderesourcetopology/cache/store.go @@ -21,8 +21,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/labels" - podlisterv1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" @@ -30,7 +28,6 @@ import ( "github.com/k8stopologyawareschedwg/podfingerprint" apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" - "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/resourcerequests" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" "sigs.k8s.io/scheduler-plugins/pkg/util" ) @@ -245,25 +242,3 @@ func checkPodFingerprintForNode(logID string, objs []podData, nodeName, pfpExpec podfingerprint.MarkCompleted(st) return err } - -func makeNodeToPodDataMap(podLister podlisterv1.PodLister, logID string) (map[string][]podData, error) { - nodeToObjsMap := make(map[string][]podData) - pods, err := podLister.List(labels.Everything()) - if err != nil { - return nodeToObjsMap, err - } - for _, pod := range pods { - if pod.Status.Phase != corev1.PodRunning { - // we are interested only about nodes which consume resources - continue - } - nodeObjs := nodeToObjsMap[pod.Spec.NodeName] - nodeObjs = append(nodeObjs, podData{ - Namespace: pod.Namespace, - Name: pod.Name, - HasExclusiveResources: resourcerequests.AreExclusiveForPod(pod), - }) - nodeToObjsMap[pod.Spec.NodeName] = nodeObjs - } - return nodeToObjsMap, nil -} diff --git a/pkg/noderesourcetopology/cache/store_test.go b/pkg/noderesourcetopology/cache/store_test.go index 7f915af45..30871d450 100644 --- a/pkg/noderesourcetopology/cache/store_test.go +++ b/pkg/noderesourcetopology/cache/store_test.go @@ -23,7 +23,6 @@ import ( "sort" "testing" - "github.com/google/go-cmp/cmp" topologyv1alpha2 "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -565,134 +564,6 @@ func TestResourceStoreUpdate(t *testing.T) { } } -func TestMakeNodeToPodDataMap(t *testing.T) { - tcases := []struct { - description string - pods []*corev1.Pod - err error - expected map[string][]podData - expectedErr error - }{ - { - description: "empty pod list", - expected: make(map[string][]podData), - }, - { - description: "single pod NOT running", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }, - }, - expected: make(map[string][]podData), - }, - { - description: "single pod running", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - }, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - }, - }, - }, - { - description: "few pods, single node running", - pods: []*corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace1", - Name: "pod1", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace2", - Name: "pod2", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "namespace2", - Name: "pod3", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - }, - }, - expected: map[string][]podData{ - "node1": { - { - Namespace: "namespace1", - Name: "pod1", - }, - { - Namespace: "namespace2", - Name: "pod2", - }, - { - Namespace: "namespace2", - Name: "pod3", - }, - }, - }, - }, - } - - for _, tcase := range tcases { - t.Run(tcase.description, func(t *testing.T) { - podLister := &fakePodLister{ - pods: tcase.pods, - err: tcase.err, - } - got, err := makeNodeToPodDataMap(podLister, tcase.description) - if err != tcase.expectedErr { - t.Errorf("error mismatch: got %v expected %v", err, tcase.expectedErr) - } - if diff := cmp.Diff(got, tcase.expected); diff != "" { - t.Errorf("unexpected result: %v", diff) - } - }) - } -} - func TestCheckPodFingerprintForNode(t *testing.T) { tcases := []struct { description string diff --git a/pkg/noderesourcetopology/pluginhelpers.go b/pkg/noderesourcetopology/pluginhelpers.go index 9a9fea9bf..832c95433 100644 --- a/pkg/noderesourcetopology/pluginhelpers.go +++ b/pkg/noderesourcetopology/pluginhelpers.go @@ -32,6 +32,7 @@ import ( apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" nrtcache "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/cache" + "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/podprovider" "sigs.k8s.io/scheduler-plugins/pkg/noderesourcetopology/stringify" ) @@ -54,9 +55,9 @@ func initNodeTopologyInformer(tcfg *apiconfig.NodeResourceTopologyMatchArgs, han return nrtcache.NewPassthrough(client), nil } - podSharedInformer, podLister := nrtcache.InformerFromHandle(handle) + podSharedInformer, podLister, isPodRelevant := podprovider.NewFromHandle(handle, tcfg.Cache) - nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister) + nrtCache, err := nrtcache.NewOverReserve(tcfg.Cache, client, podLister, isPodRelevant) if err != nil { return nil, err } diff --git a/pkg/noderesourcetopology/podprovider/podprovider.go b/pkg/noderesourcetopology/podprovider/podprovider.go new file mode 100644 index 000000000..7aa6ec208 --- /dev/null +++ b/pkg/noderesourcetopology/podprovider/podprovider.go @@ -0,0 +1,92 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podprovider + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + podlisterv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + k8scache "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" + + apiconfig "sigs.k8s.io/scheduler-plugins/apis/config" +) + +type PodFilterFunc func(pod *corev1.Pod, logID string) bool + +func NewFromHandle(handle framework.Handle, cacheConf *apiconfig.NodeResourceTopologyCache) (k8scache.SharedIndexInformer, podlisterv1.PodLister, PodFilterFunc) { + dedicated := wantsDedicatedInformer(cacheConf) + if !dedicated { + podHandle := handle.SharedInformerFactory().Core().V1().Pods() // shortcut + return podHandle.Informer(), podHandle.Lister(), IsPodRelevantShared + } + + podInformer := coreinformers.NewFilteredPodInformer(handle.ClientSet(), metav1.NamespaceAll, 0, cache.Indexers{}, nil) + podLister := podlisterv1.NewPodLister(podInformer.GetIndexer()) + + klog.V(5).InfoS("Start custom pod informer") + ctx := context.Background() + go podInformer.Run(ctx.Done()) + + klog.V(5).InfoS("Syncing custom pod informer") + cache.WaitForCacheSync(ctx.Done(), podInformer.HasSynced) + klog.V(5).InfoS("Synced custom pod informer") + + return podInformer, podLister, IsPodRelevantDedicated +} + +// IsPodRelevantAlways is meant to be used in test only +func IsPodRelevantAlways(pod *corev1.Pod, logID string) bool { + return true +} + +func IsPodRelevantShared(pod *corev1.Pod, logID string) bool { + // we are interested only about nodes which consume resources + return pod.Status.Phase == corev1.PodRunning +} + +func IsPodRelevantDedicated(pod *corev1.Pod, logID string) bool { + // Every other phase we're interested into (see https://github.com/kubernetes-sigs/scheduler-plugins/pull/599). + // Note PodUnknown is deprecated and reportedly no longer set since 2015 (!!) + if pod.Status.Phase == corev1.PodPending { + // this is unexpected, so we're loud about it + klog.V(2).InfoS("nrtcache: Listed pod in Pending phase, ignored", "logID", logID, "podUID", pod.UID) + return false + } + if pod.Spec.NodeName == "" { + // this is very unexpected, so we're louder about it + klog.InfoS("nrtcache: Listed pod unbound, ignored", "logID", logID, "podUID", pod.UID) + return false + } + return true +} + +func wantsDedicatedInformer(cacheConf *apiconfig.NodeResourceTopologyCache) bool { + if cacheConf == nil { + return false + } + if cacheConf.InformerMode == nil { + return false + } + infMode := *cacheConf.InformerMode + return infMode == apiconfig.CacheInformerDedicated +}