mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
wrap host ports metadata in a prefilter.
This commit is contained in:
parent
7e01fe12bf
commit
c54dbd85e0
@ -304,26 +304,10 @@ func (m *podFitsResourcesMetadata) clone() *podFitsResourcesMetadata {
|
||||
return ©
|
||||
}
|
||||
|
||||
type podFitsHostPortsMetadata struct {
|
||||
podPorts []*v1.ContainerPort
|
||||
}
|
||||
|
||||
func (m *podFitsHostPortsMetadata) clone() *podFitsHostPortsMetadata {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
copy := podFitsHostPortsMetadata{}
|
||||
copy.podPorts = append([]*v1.ContainerPort(nil), m.podPorts...)
|
||||
|
||||
return ©
|
||||
}
|
||||
|
||||
// NOTE: When new fields are added/removed or logic is changed, please make sure that
|
||||
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
|
||||
type predicateMetadata struct {
|
||||
pod *v1.Pod
|
||||
podBestEffort bool
|
||||
pod *v1.Pod
|
||||
|
||||
// evenPodsSpreadMetadata holds info of the minimum match number on each topology spread constraint,
|
||||
// and the match number of all valid topology pairs.
|
||||
@ -331,7 +315,6 @@ type predicateMetadata struct {
|
||||
|
||||
serviceAffinityMetadata *serviceAffinityMetadata
|
||||
podFitsResourcesMetadata *podFitsResourcesMetadata
|
||||
podFitsHostPortsMetadata *podFitsHostPortsMetadata
|
||||
}
|
||||
|
||||
// Ensure that predicateMetadata implements algorithm.Metadata.
|
||||
@ -396,7 +379,6 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister
|
||||
pod: pod,
|
||||
evenPodsSpreadMetadata: evenPodsSpreadMetadata,
|
||||
podFitsResourcesMetadata: getPodFitsResourcesMetedata(pod),
|
||||
podFitsHostPortsMetadata: getPodFitsHostPortsMetadata(pod),
|
||||
}
|
||||
for predicateName, precomputeFunc := range predicateMetadataProducers {
|
||||
klog.V(10).Infof("Precompute: %v", predicateName)
|
||||
@ -405,12 +387,6 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister
|
||||
return predicateMetadata
|
||||
}
|
||||
|
||||
func getPodFitsHostPortsMetadata(pod *v1.Pod) *podFitsHostPortsMetadata {
|
||||
return &podFitsHostPortsMetadata{
|
||||
podPorts: schedutil.GetContainerPorts(pod),
|
||||
}
|
||||
}
|
||||
|
||||
func getPodFitsResourcesMetedata(pod *v1.Pod) *podFitsResourcesMetadata {
|
||||
return &podFitsResourcesMetadata{
|
||||
podRequest: GetResourceRequest(pod),
|
||||
@ -638,10 +614,8 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
|
||||
// its maps and slices, but it does not copy the contents of pointer values.
|
||||
func (meta *predicateMetadata) ShallowCopy() Metadata {
|
||||
newPredMeta := &predicateMetadata{
|
||||
pod: meta.pod,
|
||||
podBestEffort: meta.podBestEffort,
|
||||
pod: meta.pod,
|
||||
}
|
||||
newPredMeta.podFitsHostPortsMetadata = meta.podFitsHostPortsMetadata.clone()
|
||||
newPredMeta.evenPodsSpreadMetadata = meta.evenPodsSpreadMetadata.clone()
|
||||
newPredMeta.serviceAffinityMetadata = meta.serviceAffinityMetadata.clone()
|
||||
newPredMeta.podFitsResourcesMetadata = meta.podFitsResourcesMetadata.clone()
|
||||
|
@ -61,16 +61,6 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
|
||||
if !reflect.DeepEqual(meta1.pod, meta2.pod) {
|
||||
return fmt.Errorf("pods are not the same")
|
||||
}
|
||||
if meta1.podBestEffort != meta2.podBestEffort {
|
||||
return fmt.Errorf("podBestEfforts are not equal")
|
||||
}
|
||||
if len(meta1.podFitsHostPortsMetadata.podPorts) != len(meta2.podFitsHostPortsMetadata.podPorts) {
|
||||
return fmt.Errorf("podPorts are not equal")
|
||||
}
|
||||
for !reflect.DeepEqual(meta1.podFitsHostPortsMetadata.podPorts, meta2.podFitsHostPortsMetadata.podPorts) {
|
||||
return fmt.Errorf("podPorts are not equal")
|
||||
}
|
||||
|
||||
if meta1.serviceAffinityMetadata != nil {
|
||||
sortablePods1 := sortablePods(meta1.serviceAffinityMetadata.matchingPodList)
|
||||
sort.Sort(sortablePods1)
|
||||
@ -238,7 +228,6 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
|
||||
Namespace: "testns",
|
||||
},
|
||||
},
|
||||
podBestEffort: true,
|
||||
podFitsResourcesMetadata: &podFitsResourcesMetadata{
|
||||
podRequest: &schedulernodeinfo.Resource{
|
||||
MilliCPU: 1000,
|
||||
@ -246,17 +235,6 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
|
||||
AllowedPodNumber: 4,
|
||||
},
|
||||
},
|
||||
podFitsHostPortsMetadata: &podFitsHostPortsMetadata{
|
||||
podPorts: []*v1.ContainerPort{
|
||||
{
|
||||
Name: "name",
|
||||
HostPort: 10,
|
||||
ContainerPort: 20,
|
||||
Protocol: "TCP",
|
||||
HostIP: "1.2.3.4",
|
||||
},
|
||||
},
|
||||
},
|
||||
evenPodsSpreadMetadata: &evenPodsSpreadMetadata{
|
||||
tpKeyToCriticalPaths: map[string]*criticalPaths{
|
||||
"name": {{"nodeA", 1}, {"nodeC", 2}},
|
||||
|
@ -1097,13 +1097,18 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta Metadata, nodeI
|
||||
return false, []PredicateFailureReason{ErrServiceAffinityViolated}, nil
|
||||
}
|
||||
|
||||
// PodFitsHostPorts checks if a node has free ports for the requested pod ports.
|
||||
func PodFitsHostPorts(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
var wantPorts []*v1.ContainerPort
|
||||
if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsHostPortsMetadata != nil {
|
||||
wantPorts = predicateMeta.podFitsHostPortsMetadata.podPorts
|
||||
} else {
|
||||
// We couldn't parse metadata - fallback to computing it.
|
||||
// PodFitsHostPorts is a wrapper around PodFitsHostPortsPredicate. This is needed until
|
||||
// we are able to get rid of the FitPredicate function signature.
|
||||
// TODO(#85822): remove this function once predicate registration logic is deleted.
|
||||
func PodFitsHostPorts(pod *v1.Pod, _ Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
return PodFitsHostPortsPredicate(pod, nil, nodeInfo)
|
||||
}
|
||||
|
||||
// PodFitsHostPortsPredicate checks if a node has free ports for the requested pod ports.
|
||||
func PodFitsHostPortsPredicate(pod *v1.Pod, meta []*v1.ContainerPort, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
|
||||
wantPorts := meta
|
||||
if wantPorts == nil {
|
||||
// Fallback to computing it.
|
||||
wantPorts = schedutil.GetContainerPorts(pod)
|
||||
}
|
||||
if len(wantPorts) == 0 {
|
||||
|
@ -10,8 +10,10 @@ go_library(
|
||||
"//pkg/scheduler/framework/plugins/migration:go_default_library",
|
||||
"//pkg/scheduler/framework/v1alpha1:go_default_library",
|
||||
"//pkg/scheduler/nodeinfo:go_default_library",
|
||||
"//pkg/scheduler/util:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -18,13 +18,17 @@ package nodeports
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
|
||||
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
// NodePorts is a plugin that checks if a node has free ports for the requested pod ports.
|
||||
@ -32,17 +36,67 @@ type NodePorts struct{}
|
||||
|
||||
var _ framework.FilterPlugin = &NodePorts{}
|
||||
|
||||
// Name is the name of the plugin used in the plugin registry and configurations.
|
||||
const Name = "NodePorts"
|
||||
const (
|
||||
// Name is the name of the plugin used in the plugin registry and configurations.
|
||||
Name = "NodePorts"
|
||||
|
||||
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data.
|
||||
// Using the name of the plugin will likely help us avoid collisions with other plugins.
|
||||
preFilterStateKey = "PreFilter" + Name
|
||||
)
|
||||
|
||||
type preFilterState []*v1.ContainerPort
|
||||
|
||||
// Clone the prefilter state.
|
||||
func (s preFilterState) Clone() framework.StateData {
|
||||
// The state is not impacted by adding/removing existing pods, hence we don't need to make a deep copy.
|
||||
return s
|
||||
}
|
||||
|
||||
// Name returns name of the plugin. It is used in logs, etc.
|
||||
func (pl *NodePorts) Name() string {
|
||||
return Name
|
||||
}
|
||||
|
||||
// PreFilter invoked at the prefilter extension point.
|
||||
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
s := schedutil.GetContainerPorts(pod)
|
||||
cycleState.Write(preFilterStateKey, preFilterState(s))
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreFilterExtensions do not exist for this plugin.
|
||||
func (pl *NodePorts) PreFilterExtensions() framework.PreFilterExtensions {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) {
|
||||
if cycleState == nil {
|
||||
return nil, fmt.Errorf("invalid nil CycleState")
|
||||
}
|
||||
|
||||
c, err := cycleState.Read(preFilterStateKey)
|
||||
if err != nil {
|
||||
// The metadata wasn't pre-computed in prefilter. We ignore the error for now since
|
||||
// Filter is able to handle that by computing it again.
|
||||
klog.Error(err)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
s, ok := c.(preFilterState)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%+v convert to nodeports.preFilterState error", c)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Filter invoked at the filter extension point.
|
||||
func (pl *NodePorts) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
_, reasons, err := predicates.PodFitsHostPorts(pod, nil, nodeInfo)
|
||||
func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
|
||||
state, err := getPreFilterState(cycleState)
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, err.Error())
|
||||
}
|
||||
_, reasons, err := predicates.PodFitsHostPortsPredicate(pod, state, nodeInfo)
|
||||
return migration.PredicateResultToFrameworkStatus(reasons, err)
|
||||
}
|
||||
|
||||
|
@ -150,7 +150,12 @@ func TestNodePorts(t *testing.T) {
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
p, _ := New(nil, nil)
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
|
||||
cycleState := framework.NewCycleState()
|
||||
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
|
||||
if !preFilterStatus.IsSuccess() {
|
||||
t.Errorf("prefilter failed with status: %v", preFilterStatus)
|
||||
}
|
||||
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo)
|
||||
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
|
||||
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user