From 7cba35f6519c6e0cf72848185ef3f9a524601467 Mon Sep 17 00:00:00 2001 From: carlory Date: Sat, 8 Jul 2023 20:05:54 +0800 Subject: [PATCH] nodeports: scheduler queueing hints Co-authored-by: Kensei Nakada Co-authored-by: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com> --- .../framework/plugins/nodeports/node_ports.go | 52 ++++++++++++++++- .../plugins/nodeports/node_ports_test.go | 57 +++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports.go b/pkg/scheduler/framework/plugins/nodeports/node_ports.go index 656c87495b2..96b32b087e2 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports.go @@ -22,8 +22,10 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/util" ) // NodePorts is a plugin that checks if a node has free ports for the requested pod ports. @@ -112,11 +114,59 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ // Due to immutable fields `spec.containers[*].ports`, pod update events are ignored. - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}}, + {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted}, + // TODO(#110175): Ideally, it's supposed to register only NodeCreated, because NodeUpdated event never means to have any free ports for the Pod. + // But, we may miss NodeCreated event due to preCheck. + // See: https://github.com/kubernetes/kubernetes/issues/109437 + // And, we can remove NodeUpdated event once https://github.com/kubernetes/kubernetes/issues/110175 is solved. + // We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens. + // (the same as QueueAfterBackoff) {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, } } +// isSchedulableAfterPodDeleted is invoked whenever a pod deleted. It checks whether +// that change made a previously unschedulable pod schedulable. +func (pl *NodePorts) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) { + deletedPod, _, err := util.As[*v1.Pod](oldObj, nil) + if err != nil { + return framework.QueueAfterBackoff, err + } + + // If the deleted pod is unscheduled, it doesn't make the target pod schedulable. + if deletedPod.Spec.NodeName == "" { + logger.V(4).Info("the deleted pod is unscheduled and it doesn't make the target pod schedulable", "pod", pod, "deletedPod", deletedPod) + return framework.QueueSkip, nil + } + + // Get the used host ports of the deleted pod. + usedPorts := make(framework.HostPortInfo) + for _, container := range deletedPod.Spec.Containers { + for _, podPort := range container.Ports { + if podPort.HostPort > 0 { + usedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort) + } + } + } + + // If the deleted pod doesn't use any host ports, it doesn't make the target pod schedulable. + if len(usedPorts) == 0 { + return framework.QueueSkip, nil + } + + // Construct a fake NodeInfo that only has the deleted Pod. + // If we can schedule `pod` to this fake node, it means that `pod` and the deleted pod don't have any common port(s). + // So, deleting that pod couldn't make `pod` schedulable. + nodeInfo := framework.NodeInfo{UsedPorts: usedPorts} + if Fits(pod, &nodeInfo) { + logger.V(4).Info("the deleted pod and the target pod don't have any common port(s), returning QueueSkip as deleting this Pod won't make the Pod schedulable", "pod", pod, "deletedPod", deletedPod) + return framework.QueueSkip, nil + } + + logger.V(4).Info("the deleted pod and the target pod have any common port(s), returning QueueAfterBackoff as deleting this Pod may make the Pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(deletedPod)) + return framework.QueueAfterBackoff, nil +} + // Filter invoked at the filter extension point. func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { wantPorts, err := getPreFilterState(cycleState) diff --git a/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go b/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go index 92cd013614f..00774ae8db2 100644 --- a/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go +++ b/pkg/scheduler/framework/plugins/nodeports/node_ports_test.go @@ -24,8 +24,10 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2/ktesting" + _ "k8s.io/klog/v2/ktesting/init" "k8s.io/kubernetes/pkg/scheduler/framework" st "k8s.io/kubernetes/pkg/scheduler/testing" ) @@ -298,3 +300,58 @@ func TestGetContainerPorts(t *testing.T) { }) } } + +func Test_isSchedulableAfterPodDeleted(t *testing.T) { + podWithHostPort := st.MakePod().HostPort(8080) + + testcases := map[string]struct { + pod *v1.Pod + oldObj interface{} + expectedHint framework.QueueingHint + expectedErr bool + }{ + "backoff-wrong-old-object": { + pod: podWithHostPort.Obj(), + oldObj: "not-a-pod", + expectedHint: framework.QueueAfterBackoff, + expectedErr: true, + }, + "skip-queue-on-unscheduled": { + pod: podWithHostPort.Obj(), + oldObj: st.MakePod().Obj(), + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-non-hostport": { + pod: podWithHostPort.Obj(), + oldObj: st.MakePod().Node("fake-node").Obj(), + expectedHint: framework.QueueSkip, + }, + "skip-queue-on-unrelated-hostport": { + pod: podWithHostPort.Obj(), + oldObj: st.MakePod().Node("fake-node").HostPort(8081).Obj(), + expectedHint: framework.QueueSkip, + }, + "queue-on-released-hostport": { + pod: podWithHostPort.Obj(), + oldObj: st.MakePod().Node("fake-node").HostPort(8080).Obj(), + expectedHint: framework.QueueAfterBackoff, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + p, err := New(ctx, nil, nil) + if err != nil { + t.Fatalf("Creating plugin: %v", err) + } + actualHint, err := p.(*NodePorts).isSchedulableAfterPodDeleted(logger, tc.pod, tc.oldObj, nil) + if tc.expectedErr { + require.Error(t, err) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedHint, actualHint) + }) + } +}