From ed673ba0e978c8bb636a9d0e620bbef1f71a9b20 Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Wed, 17 Mar 2021 19:03:12 -0700 Subject: [PATCH 1/2] implement EnqueueExtensions interface in serviceaffinity --- .../serviceaffinity/service_affinity.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go index 4648d7d1e7d..400ab2675c5 100644 --- a/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go +++ b/pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go @@ -95,6 +95,7 @@ type ServiceAffinity struct { var _ framework.PreFilterPlugin = &ServiceAffinity{} var _ framework.FilterPlugin = &ServiceAffinity{} var _ framework.ScorePlugin = &ServiceAffinity{} +var _ framework.EnqueueExtensions = &ServiceAffinity{} // Name returns name of the plugin. It is used in logs, etc. func (pl *ServiceAffinity) Name() string { @@ -208,6 +209,28 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error return s, nil } +// EventsToRegister returns the possible events that may make a Pod +// failed by this plugin schedulable. +func (pl *ServiceAffinity) EventsToRegister() []framework.ClusterEvent { + if len(pl.args.AffinityLabels) == 0 { + return nil + } + + return []framework.ClusterEvent{ + // Suppose there is a running Pod backs a Service, and the unschedulable Pod subjects + // to the same Service, but failed because of mis-matched affinity labels. + // - if the running Pod's labels get updated, it may not back the Service anymore, and + // hence make the unschedulable Pod schedulable. + // - if the running Pod gets deleted, the unschedulable Pod may also become schedulable. + {Resource: framework.Pod, ActionType: framework.Update | framework.Delete}, + // A new Node or updating a Node's labels may make a Pod schedulable. + {Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel}, + // Update or delete of a Service may break the correlation of the Pods that previously + // backed it, and hence make a Pod schedulable. + {Resource: framework.Service, ActionType: framework.Update | framework.Delete}, + } +} + // Filter matches nodes in such a way to force that // ServiceAffinity.labels are homogeneous for pods that are scheduled to a node. // (i.e. it returns true IFF this pod can be added to this node such that all other pods in From 9d1ef9f4c5f581c75515320914ef7732ef589bbe Mon Sep 17 00:00:00 2001 From: Wei Huang Date: Thu, 18 Mar 2021 10:28:17 -0700 Subject: [PATCH 2/2] unregister Service events if a plugin doesn't implement EventsToRegister() --- pkg/scheduler/framework/runtime/framework.go | 1 - .../framework/runtime/framework_test.go | 25 ++- test/integration/scheduler/queue_test.go | 150 ++++++++++++++++++ 3 files changed, 160 insertions(+), 16 deletions(-) create mode 100644 test/integration/scheduler/queue_test.go diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 7177d2ba342..4a7b68c0126 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -66,7 +66,6 @@ var allClusterEvents = []framework.ClusterEvent{ {Resource: framework.CSINode, ActionType: framework.All}, {Resource: framework.PersistentVolume, ActionType: framework.All}, {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}, - {Resource: framework.Service, ActionType: framework.All}, {Resource: framework.StorageClass, ActionType: framework.All}, } diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index b8edc68c94f..390adc58478 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -686,7 +686,7 @@ func (*fakePodPlugin) EventsToRegister() []framework.ClusterEvent { return []framework.ClusterEvent{ {Resource: framework.Pod, ActionType: framework.All}, {Resource: framework.Node, ActionType: framework.Add | framework.Delete}, - {Resource: framework.Service, ActionType: framework.Delete}, + {Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}, } } @@ -718,7 +718,6 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), - {Resource: framework.Service, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin), }, }, @@ -733,7 +732,6 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), - {Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), }, }, @@ -741,15 +739,14 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { name: "pod plugin", plugins: []framework.Plugin{&fakePodPlugin{}}, want: map[framework.ClusterEvent]sets.String{ - {Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakePod", bindPlugin, queueSortPlugin), - {Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.NewString("fakePod"), - {Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), - {Resource: framework.Service, ActionType: framework.Delete}: sets.NewString("fakePod"), - {Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), - {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), - {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), - {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakePod", bindPlugin, queueSortPlugin), + {Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.NewString("fakePod"), + {Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}: sets.NewString("fakePod"), + {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), + {Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), }, }, { @@ -760,9 +757,8 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { {Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.NewString("fakePod"), {Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakeNode", "fakePod", bindPlugin, queueSortPlugin), {Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}: sets.NewString("fakeNode"), - {Resource: framework.Service, ActionType: framework.Delete}: sets.NewString("fakePod"), + {Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}: sets.NewString("fakePod"), {Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), - {Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), @@ -775,7 +771,6 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) { want: map[framework.ClusterEvent]sets.String{ {Resource: framework.Pod, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), - {Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin), diff --git a/test/integration/scheduler/queue_test.go b/test/integration/scheduler/queue_test.go new file mode 100644 index 00000000000..fd0e22e036d --- /dev/null +++ b/test/integration/scheduler/queue_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/kubernetes/pkg/scheduler" + schedapi "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity" + st "k8s.io/kubernetes/pkg/scheduler/testing" + testutils "k8s.io/kubernetes/test/integration/util" + imageutils "k8s.io/kubernetes/test/utils/image" +) + +func TestServiceAffinityEnqueue(t *testing.T) { + profile := schedapi.KubeSchedulerProfile{ + SchedulerName: v1.DefaultSchedulerName, + Plugins: &schedapi.Plugins{ + PreFilter: schedapi.PluginSet{ + Enabled: []schedapi.Plugin{ + {Name: serviceaffinity.Name}, + }, + }, + Filter: schedapi.PluginSet{ + Enabled: []schedapi.Plugin{ + {Name: serviceaffinity.Name}, + }, + }, + }, + PluginConfig: []schedapi.PluginConfig{ + { + Name: serviceaffinity.Name, + Args: &schedapi.ServiceAffinityArgs{ + AffinityLabels: []string{"hostname"}, + }, + }, + }, + } + // Use zero backoff seconds to bypass backoffQ. + testCtx := testutils.InitTestSchedulerWithOptions( + t, + testutils.InitTestMaster(t, "serviceaffinity-enqueue", nil), + nil, + scheduler.WithProfiles(profile), + scheduler.WithPodInitialBackoffSeconds(0), + scheduler.WithPodMaxBackoffSeconds(0), + ) + testutils.SyncInformerFactory(testCtx) + // It's intended to not start the scheduler's queue, and hence to + // not start any flushing logic. We will pop and schedule the Pods manually later. + defer testutils.CleanupTest(t, testCtx) + + cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx + // Create two Nodes. + for i := 1; i <= 2; i++ { + nodeName := fmt.Sprintf("node%d", i) + capacity := map[v1.ResourceName]string{v1.ResourcePods: "1"} + node := st.MakeNode().Name(nodeName).Label("hostname", nodeName).Capacity(capacity).Obj() + if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Node %q: %v", nodeName, err) + } + } + + // Create a Service. + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{Port: int32(80)}}, + Selector: map[string]string{"foo": "bar"}, + }, + } + if _, err := cs.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Service %q: %v", svc.Name, err) + } + + // Create two Pods. + pause := imageutils.GetPauseImageName() + for i := 1; i <= 2; i++ { + podName := fmt.Sprintf("pod%d", i) + pod := st.MakePod().Namespace(ns).Name(podName).Label("foo", "bar").Container(pause).Obj() + // Make Pod1 an assigned Pod. + if i == 1 { + pod.Spec.NodeName = fmt.Sprintf("node%d", i) + } + if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create Pod %q: %v", pod.Name, err) + } + } + + // Wait for pod2 to be present in the scheduling queue. + if err := wait.Poll(time.Millisecond*200, wait.ForeverTestTimeout, func() (bool, error) { + return len(testCtx.Scheduler.SchedulingQueue.PendingPods()) == 1, nil + }); err != nil { + t.Fatal(err) + } + + // Pop Pod2 out. It should be unschedulable. + podInfo := testCtx.Scheduler.NextPod() + fwk, ok := testCtx.Scheduler.Profiles[podInfo.Pod.Spec.SchedulerName] + if !ok { + t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name) + } + // Schedule the Pod manually. + _, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod) + // The fitError is expected to be: + // 0/2 nodes are available: 1 Too many pods, 1 node(s) didn't match service affinity. + if fitError == nil { + t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name) + } + testCtx.Scheduler.Error(podInfo, fitError) + + // Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so + // pass a number larger than 1 to move Pod to unschedulableQ. + testCtx.Scheduler.SchedulingQueue.AddUnschedulableIfNotPresent(podInfo, 10) + + // Trigger a Service event. + // We expect this event to trigger moving the test Pod from unschedulableQ to activeQ. + if err := cs.CoreV1().Services(ns).Delete(ctx, "svc", metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete service 'svc': %v", err) + } + + // Now we should be able to pop the Pod from activeQ again. + podInfo = testCtx.Scheduler.NextPod() + if podInfo.Attempts != 2 { + t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts) + } +}