mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #100357 from Huang-Wei/enq-serviceaffinity
implement EnqueueExtensions interface in serviceaffinity
This commit is contained in:
commit
3c75651d15
@ -95,6 +95,7 @@ type ServiceAffinity struct {
|
||||
var _ framework.PreFilterPlugin = &ServiceAffinity{}
|
||||
var _ framework.FilterPlugin = &ServiceAffinity{}
|
||||
var _ framework.ScorePlugin = &ServiceAffinity{}
|
||||
var _ framework.EnqueueExtensions = &ServiceAffinity{}
|
||||
|
||||
// Name returns name of the plugin. It is used in logs, etc.
|
||||
func (pl *ServiceAffinity) Name() string {
|
||||
@ -208,6 +209,28 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// EventsToRegister returns the possible events that may make a Pod
|
||||
// failed by this plugin schedulable.
|
||||
func (pl *ServiceAffinity) EventsToRegister() []framework.ClusterEvent {
|
||||
if len(pl.args.AffinityLabels) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return []framework.ClusterEvent{
|
||||
// Suppose there is a running Pod backs a Service, and the unschedulable Pod subjects
|
||||
// to the same Service, but failed because of mis-matched affinity labels.
|
||||
// - if the running Pod's labels get updated, it may not back the Service anymore, and
|
||||
// hence make the unschedulable Pod schedulable.
|
||||
// - if the running Pod gets deleted, the unschedulable Pod may also become schedulable.
|
||||
{Resource: framework.Pod, ActionType: framework.Update | framework.Delete},
|
||||
// A new Node or updating a Node's labels may make a Pod schedulable.
|
||||
{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel},
|
||||
// Update or delete of a Service may break the correlation of the Pods that previously
|
||||
// backed it, and hence make a Pod schedulable.
|
||||
{Resource: framework.Service, ActionType: framework.Update | framework.Delete},
|
||||
}
|
||||
}
|
||||
|
||||
// Filter matches nodes in such a way to force that
|
||||
// ServiceAffinity.labels are homogeneous for pods that are scheduled to a node.
|
||||
// (i.e. it returns true IFF this pod can be added to this node such that all other pods in
|
||||
|
@ -66,7 +66,6 @@ var allClusterEvents = []framework.ClusterEvent{
|
||||
{Resource: framework.CSINode, ActionType: framework.All},
|
||||
{Resource: framework.PersistentVolume, ActionType: framework.All},
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All},
|
||||
{Resource: framework.Service, ActionType: framework.All},
|
||||
{Resource: framework.StorageClass, ActionType: framework.All},
|
||||
}
|
||||
|
||||
|
@ -686,7 +686,7 @@ func (*fakePodPlugin) EventsToRegister() []framework.ClusterEvent {
|
||||
return []framework.ClusterEvent{
|
||||
{Resource: framework.Pod, ActionType: framework.All},
|
||||
{Resource: framework.Node, ActionType: framework.Add | framework.Delete},
|
||||
{Resource: framework.Service, ActionType: framework.Delete},
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete},
|
||||
}
|
||||
}
|
||||
|
||||
@ -718,7 +718,6 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
|
||||
{Resource: framework.CSINode, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Service, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString("fakeNoop", bindPlugin, queueSortPlugin),
|
||||
},
|
||||
},
|
||||
@ -733,7 +732,6 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
|
||||
{Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
},
|
||||
},
|
||||
@ -741,15 +739,14 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
|
||||
name: "pod plugin",
|
||||
plugins: []framework.Plugin{&fakePodPlugin{}},
|
||||
want: map[framework.ClusterEvent]sets.String{
|
||||
{Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakePod", bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.NewString("fakePod"),
|
||||
{Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Service, ActionType: framework.Delete}: sets.NewString("fakePod"),
|
||||
{Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakePod", bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.NewString("fakePod"),
|
||||
{Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}: sets.NewString("fakePod"),
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.StorageClass, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -760,9 +757,8 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
|
||||
{Resource: framework.Node, ActionType: framework.Add | framework.Delete}: sets.NewString("fakePod"),
|
||||
{Resource: framework.Pod, ActionType: framework.All}: sets.NewString("fakeNode", "fakePod", bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.CSINode, ActionType: framework.Update | framework.Delete}: sets.NewString("fakeNode"),
|
||||
{Resource: framework.Service, ActionType: framework.Delete}: sets.NewString("fakePod"),
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Delete}: sets.NewString("fakePod"),
|
||||
{Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
@ -775,7 +771,6 @@ func TestNewFrameworkFillEventToPluginMap(t *testing.T) {
|
||||
want: map[framework.ClusterEvent]sets.String{
|
||||
{Resource: framework.Pod, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Node, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.Service, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.CSINode, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolume, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
{Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: sets.NewString(bindPlugin, queueSortPlugin),
|
||||
|
150
test/integration/scheduler/queue_test.go
Normal file
150
test/integration/scheduler/queue_test.go
Normal file
@ -0,0 +1,150 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
schedapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/serviceaffinity"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
testutils "k8s.io/kubernetes/test/integration/util"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
)
|
||||
|
||||
func TestServiceAffinityEnqueue(t *testing.T) {
|
||||
profile := schedapi.KubeSchedulerProfile{
|
||||
SchedulerName: v1.DefaultSchedulerName,
|
||||
Plugins: &schedapi.Plugins{
|
||||
PreFilter: schedapi.PluginSet{
|
||||
Enabled: []schedapi.Plugin{
|
||||
{Name: serviceaffinity.Name},
|
||||
},
|
||||
},
|
||||
Filter: schedapi.PluginSet{
|
||||
Enabled: []schedapi.Plugin{
|
||||
{Name: serviceaffinity.Name},
|
||||
},
|
||||
},
|
||||
},
|
||||
PluginConfig: []schedapi.PluginConfig{
|
||||
{
|
||||
Name: serviceaffinity.Name,
|
||||
Args: &schedapi.ServiceAffinityArgs{
|
||||
AffinityLabels: []string{"hostname"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
// Use zero backoff seconds to bypass backoffQ.
|
||||
testCtx := testutils.InitTestSchedulerWithOptions(
|
||||
t,
|
||||
testutils.InitTestMaster(t, "serviceaffinity-enqueue", nil),
|
||||
nil,
|
||||
scheduler.WithProfiles(profile),
|
||||
scheduler.WithPodInitialBackoffSeconds(0),
|
||||
scheduler.WithPodMaxBackoffSeconds(0),
|
||||
)
|
||||
testutils.SyncInformerFactory(testCtx)
|
||||
// It's intended to not start the scheduler's queue, and hence to
|
||||
// not start any flushing logic. We will pop and schedule the Pods manually later.
|
||||
defer testutils.CleanupTest(t, testCtx)
|
||||
|
||||
cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
|
||||
// Create two Nodes.
|
||||
for i := 1; i <= 2; i++ {
|
||||
nodeName := fmt.Sprintf("node%d", i)
|
||||
capacity := map[v1.ResourceName]string{v1.ResourcePods: "1"}
|
||||
node := st.MakeNode().Name(nodeName).Label("hostname", nodeName).Capacity(capacity).Obj()
|
||||
if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Failed to create Node %q: %v", nodeName, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create a Service.
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "svc",
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{{Port: int32(80)}},
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
},
|
||||
}
|
||||
if _, err := cs.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Failed to create Service %q: %v", svc.Name, err)
|
||||
}
|
||||
|
||||
// Create two Pods.
|
||||
pause := imageutils.GetPauseImageName()
|
||||
for i := 1; i <= 2; i++ {
|
||||
podName := fmt.Sprintf("pod%d", i)
|
||||
pod := st.MakePod().Namespace(ns).Name(podName).Label("foo", "bar").Container(pause).Obj()
|
||||
// Make Pod1 an assigned Pod.
|
||||
if i == 1 {
|
||||
pod.Spec.NodeName = fmt.Sprintf("node%d", i)
|
||||
}
|
||||
if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
|
||||
t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for pod2 to be present in the scheduling queue.
|
||||
if err := wait.Poll(time.Millisecond*200, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
return len(testCtx.Scheduler.SchedulingQueue.PendingPods()) == 1, nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Pop Pod2 out. It should be unschedulable.
|
||||
podInfo := testCtx.Scheduler.NextPod()
|
||||
fwk, ok := testCtx.Scheduler.Profiles[podInfo.Pod.Spec.SchedulerName]
|
||||
if !ok {
|
||||
t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
|
||||
}
|
||||
// Schedule the Pod manually.
|
||||
_, fitError := testCtx.Scheduler.Algorithm.Schedule(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
|
||||
// The fitError is expected to be:
|
||||
// 0/2 nodes are available: 1 Too many pods, 1 node(s) didn't match service affinity.
|
||||
if fitError == nil {
|
||||
t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
|
||||
}
|
||||
testCtx.Scheduler.Error(podInfo, fitError)
|
||||
|
||||
// Scheduling cycle is incremented from 0 to 1 after NextPod() is called, so
|
||||
// pass a number larger than 1 to move Pod to unschedulableQ.
|
||||
testCtx.Scheduler.SchedulingQueue.AddUnschedulableIfNotPresent(podInfo, 10)
|
||||
|
||||
// Trigger a Service event.
|
||||
// We expect this event to trigger moving the test Pod from unschedulableQ to activeQ.
|
||||
if err := cs.CoreV1().Services(ns).Delete(ctx, "svc", metav1.DeleteOptions{}); err != nil {
|
||||
t.Fatalf("Failed to delete service 'svc': %v", err)
|
||||
}
|
||||
|
||||
// Now we should be able to pop the Pod from activeQ again.
|
||||
podInfo = testCtx.Scheduler.NextPod()
|
||||
if podInfo.Attempts != 2 {
|
||||
t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user