mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Merge pull request #119176 from carlory/fix-118893-2
nodeports: scheduler queueing hints
This commit is contained in:
commit
46c307868f
@ -22,8 +22,10 @@ import (
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
|
||||
"k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
// NodePorts is a plugin that checks if a node has free ports for the requested pod ports.
|
||||
@ -112,11 +114,59 @@ func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error)
|
||||
func (pl *NodePorts) EventsToRegister() []framework.ClusterEventWithHint {
|
||||
return []framework.ClusterEventWithHint{
|
||||
// Due to immutable fields `spec.containers[*].ports`, pod update events are ignored.
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
|
||||
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
|
||||
// TODO(#110175): Ideally, it's supposed to register only NodeCreated, because NodeUpdated event never means to have any free ports for the Pod.
|
||||
// But, we may miss NodeCreated event due to preCheck.
|
||||
// See: https://github.com/kubernetes/kubernetes/issues/109437
|
||||
// And, we can remove NodeUpdated event once https://github.com/kubernetes/kubernetes/issues/110175 is solved.
|
||||
// We don't need the QueueingHintFn here because the scheduling of Pods will be always retried with backoff when this Event happens.
|
||||
// (the same as QueueAfterBackoff)
|
||||
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}},
|
||||
}
|
||||
}
|
||||
|
||||
// isSchedulableAfterPodDeleted is invoked whenever a pod deleted. It checks whether
|
||||
// that change made a previously unschedulable pod schedulable.
|
||||
func (pl *NodePorts) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
|
||||
deletedPod, _, err := util.As[*v1.Pod](oldObj, nil)
|
||||
if err != nil {
|
||||
return framework.QueueAfterBackoff, err
|
||||
}
|
||||
|
||||
// If the deleted pod is unscheduled, it doesn't make the target pod schedulable.
|
||||
if deletedPod.Spec.NodeName == "" {
|
||||
logger.V(4).Info("the deleted pod is unscheduled and it doesn't make the target pod schedulable", "pod", pod, "deletedPod", deletedPod)
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
// Get the used host ports of the deleted pod.
|
||||
usedPorts := make(framework.HostPortInfo)
|
||||
for _, container := range deletedPod.Spec.Containers {
|
||||
for _, podPort := range container.Ports {
|
||||
if podPort.HostPort > 0 {
|
||||
usedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If the deleted pod doesn't use any host ports, it doesn't make the target pod schedulable.
|
||||
if len(usedPorts) == 0 {
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
// Construct a fake NodeInfo that only has the deleted Pod.
|
||||
// If we can schedule `pod` to this fake node, it means that `pod` and the deleted pod don't have any common port(s).
|
||||
// So, deleting that pod couldn't make `pod` schedulable.
|
||||
nodeInfo := framework.NodeInfo{UsedPorts: usedPorts}
|
||||
if Fits(pod, &nodeInfo) {
|
||||
logger.V(4).Info("the deleted pod and the target pod don't have any common port(s), returning QueueSkip as deleting this Pod won't make the Pod schedulable", "pod", pod, "deletedPod", deletedPod)
|
||||
return framework.QueueSkip, nil
|
||||
}
|
||||
|
||||
logger.V(4).Info("the deleted pod and the target pod have any common port(s), returning QueueAfterBackoff as deleting this Pod may make the Pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(deletedPod))
|
||||
return framework.QueueAfterBackoff, nil
|
||||
}
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
|
||||
wantPorts, err := getPreFilterState(cycleState)
|
||||
|
@ -24,8 +24,10 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/require"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
_ "k8s.io/klog/v2/ktesting/init"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
st "k8s.io/kubernetes/pkg/scheduler/testing"
|
||||
)
|
||||
@ -298,3 +300,58 @@ func TestGetContainerPorts(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_isSchedulableAfterPodDeleted(t *testing.T) {
|
||||
podWithHostPort := st.MakePod().HostPort(8080)
|
||||
|
||||
testcases := map[string]struct {
|
||||
pod *v1.Pod
|
||||
oldObj interface{}
|
||||
expectedHint framework.QueueingHint
|
||||
expectedErr bool
|
||||
}{
|
||||
"backoff-wrong-old-object": {
|
||||
pod: podWithHostPort.Obj(),
|
||||
oldObj: "not-a-pod",
|
||||
expectedHint: framework.QueueAfterBackoff,
|
||||
expectedErr: true,
|
||||
},
|
||||
"skip-queue-on-unscheduled": {
|
||||
pod: podWithHostPort.Obj(),
|
||||
oldObj: st.MakePod().Obj(),
|
||||
expectedHint: framework.QueueSkip,
|
||||
},
|
||||
"skip-queue-on-non-hostport": {
|
||||
pod: podWithHostPort.Obj(),
|
||||
oldObj: st.MakePod().Node("fake-node").Obj(),
|
||||
expectedHint: framework.QueueSkip,
|
||||
},
|
||||
"skip-queue-on-unrelated-hostport": {
|
||||
pod: podWithHostPort.Obj(),
|
||||
oldObj: st.MakePod().Node("fake-node").HostPort(8081).Obj(),
|
||||
expectedHint: framework.QueueSkip,
|
||||
},
|
||||
"queue-on-released-hostport": {
|
||||
pod: podWithHostPort.Obj(),
|
||||
oldObj: st.MakePod().Node("fake-node").HostPort(8080).Obj(),
|
||||
expectedHint: framework.QueueAfterBackoff,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testcases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
logger, ctx := ktesting.NewTestContext(t)
|
||||
p, err := New(ctx, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Creating plugin: %v", err)
|
||||
}
|
||||
actualHint, err := p.(*NodePorts).isSchedulableAfterPodDeleted(logger, tc.pod, tc.oldObj, nil)
|
||||
if tc.expectedErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expectedHint, actualHint)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user