Merge pull request #86224 from ahg-g/ahg-prefilters

Wrap host ports metadata in a prefilter
This commit is contained in:
Kubernetes Prow Robot 2019-12-13 16:19:58 -08:00 committed by GitHub
commit efe159e8d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 80 additions and 62 deletions

View File

@ -304,26 +304,10 @@ func (m *podFitsResourcesMetadata) clone() *podFitsResourcesMetadata {
return &copy
}
type podFitsHostPortsMetadata struct {
podPorts []*v1.ContainerPort
}
func (m *podFitsHostPortsMetadata) clone() *podFitsHostPortsMetadata {
if m == nil {
return nil
}
copy := podFitsHostPortsMetadata{}
copy.podPorts = append([]*v1.ContainerPort(nil), m.podPorts...)
return &copy
}
// NOTE: When new fields are added/removed or logic is changed, please make sure that
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
type predicateMetadata struct {
pod *v1.Pod
podBestEffort bool
pod *v1.Pod
// evenPodsSpreadMetadata holds info of the minimum match number on each topology spread constraint,
// and the match number of all valid topology pairs.
@ -331,7 +315,6 @@ type predicateMetadata struct {
serviceAffinityMetadata *serviceAffinityMetadata
podFitsResourcesMetadata *podFitsResourcesMetadata
podFitsHostPortsMetadata *podFitsHostPortsMetadata
}
// Ensure that predicateMetadata implements algorithm.Metadata.
@ -396,7 +379,6 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister
pod: pod,
evenPodsSpreadMetadata: evenPodsSpreadMetadata,
podFitsResourcesMetadata: getPodFitsResourcesMetedata(pod),
podFitsHostPortsMetadata: getPodFitsHostPortsMetadata(pod),
}
for predicateName, precomputeFunc := range predicateMetadataProducers {
klog.V(10).Infof("Precompute: %v", predicateName)
@ -405,12 +387,6 @@ func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister
return predicateMetadata
}
func getPodFitsHostPortsMetadata(pod *v1.Pod) *podFitsHostPortsMetadata {
return &podFitsHostPortsMetadata{
podPorts: schedutil.GetContainerPorts(pod),
}
}
func getPodFitsResourcesMetedata(pod *v1.Pod) *podFitsResourcesMetadata {
return &podFitsResourcesMetadata{
podRequest: GetResourceRequest(pod),
@ -638,10 +614,8 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
// its maps and slices, but it does not copy the contents of pointer values.
func (meta *predicateMetadata) ShallowCopy() Metadata {
newPredMeta := &predicateMetadata{
pod: meta.pod,
podBestEffort: meta.podBestEffort,
pod: meta.pod,
}
newPredMeta.podFitsHostPortsMetadata = meta.podFitsHostPortsMetadata.clone()
newPredMeta.evenPodsSpreadMetadata = meta.evenPodsSpreadMetadata.clone()
newPredMeta.serviceAffinityMetadata = meta.serviceAffinityMetadata.clone()
newPredMeta.podFitsResourcesMetadata = meta.podFitsResourcesMetadata.clone()

View File

@ -61,16 +61,6 @@ func predicateMetadataEquivalent(meta1, meta2 *predicateMetadata) error {
if !reflect.DeepEqual(meta1.pod, meta2.pod) {
return fmt.Errorf("pods are not the same")
}
if meta1.podBestEffort != meta2.podBestEffort {
return fmt.Errorf("podBestEfforts are not equal")
}
if len(meta1.podFitsHostPortsMetadata.podPorts) != len(meta2.podFitsHostPortsMetadata.podPorts) {
return fmt.Errorf("podPorts are not equal")
}
for !reflect.DeepEqual(meta1.podFitsHostPortsMetadata.podPorts, meta2.podFitsHostPortsMetadata.podPorts) {
return fmt.Errorf("podPorts are not equal")
}
if meta1.serviceAffinityMetadata != nil {
sortablePods1 := sortablePods(meta1.serviceAffinityMetadata.matchingPodList)
sort.Sort(sortablePods1)
@ -238,7 +228,6 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
Namespace: "testns",
},
},
podBestEffort: true,
podFitsResourcesMetadata: &podFitsResourcesMetadata{
podRequest: &schedulernodeinfo.Resource{
MilliCPU: 1000,
@ -246,17 +235,6 @@ func TestPredicateMetadata_ShallowCopy(t *testing.T) {
AllowedPodNumber: 4,
},
},
podFitsHostPortsMetadata: &podFitsHostPortsMetadata{
podPorts: []*v1.ContainerPort{
{
Name: "name",
HostPort: 10,
ContainerPort: 20,
Protocol: "TCP",
HostIP: "1.2.3.4",
},
},
},
evenPodsSpreadMetadata: &evenPodsSpreadMetadata{
tpKeyToCriticalPaths: map[string]*criticalPaths{
"name": {{"nodeA", 1}, {"nodeC", 2}},

View File

@ -1097,13 +1097,18 @@ func (s *ServiceAffinity) checkServiceAffinity(pod *v1.Pod, meta Metadata, nodeI
return false, []PredicateFailureReason{ErrServiceAffinityViolated}, nil
}
// PodFitsHostPorts checks if a node has free ports for the requested pod ports.
func PodFitsHostPorts(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
var wantPorts []*v1.ContainerPort
if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsHostPortsMetadata != nil {
wantPorts = predicateMeta.podFitsHostPortsMetadata.podPorts
} else {
// We couldn't parse metadata - fallback to computing it.
// PodFitsHostPorts is a wrapper around PodFitsHostPortsPredicate. This is needed until
// we are able to get rid of the FitPredicate function signature.
// TODO(#85822): remove this function once predicate registration logic is deleted.
func PodFitsHostPorts(pod *v1.Pod, _ Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
return PodFitsHostPortsPredicate(pod, nil, nodeInfo)
}
// PodFitsHostPortsPredicate checks if a node has free ports for the requested pod ports.
func PodFitsHostPortsPredicate(pod *v1.Pod, meta []*v1.ContainerPort, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
wantPorts := meta
if wantPorts == nil {
// Fallback to computing it.
wantPorts = schedutil.GetContainerPorts(pod)
}
if len(wantPorts) == 0 {

View File

@ -10,8 +10,10 @@ go_library(
"//pkg/scheduler/framework/plugins/migration:go_default_library",
"//pkg/scheduler/framework/v1alpha1:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -18,13 +18,17 @@ package nodeports
import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/migration"
framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
"k8s.io/kubernetes/pkg/scheduler/nodeinfo"
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
)
// NodePorts is a plugin that checks if a node has free ports for the requested pod ports.
@ -32,17 +36,67 @@ type NodePorts struct{}
var _ framework.FilterPlugin = &NodePorts{}
// Name is the name of the plugin used in the plugin registry and configurations.
const Name = "NodePorts"
const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = "NodePorts"
// preFilterStateKey is the key in CycleState to InterPodAffinity pre-computed data.
// Using the name of the plugin will likely help us avoid collisions with other plugins.
preFilterStateKey = "PreFilter" + Name
)
type preFilterState []*v1.ContainerPort
// Clone the prefilter state.
func (s preFilterState) Clone() framework.StateData {
// The state is not impacted by adding/removing existing pods, hence we don't need to make a deep copy.
return s
}
// Name returns name of the plugin. It is used in logs, etc.
func (pl *NodePorts) Name() string {
return Name
}
// PreFilter invoked at the prefilter extension point.
func (pl *NodePorts) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
s := schedutil.GetContainerPorts(pod)
cycleState.Write(preFilterStateKey, preFilterState(s))
return nil
}
// PreFilterExtensions do not exist for this plugin.
func (pl *NodePorts) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}
func getPreFilterState(cycleState *framework.CycleState) (preFilterState, error) {
if cycleState == nil {
return nil, fmt.Errorf("invalid nil CycleState")
}
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
// The metadata wasn't pre-computed in prefilter. We ignore the error for now since
// Filter is able to handle that by computing it again.
klog.Error(err)
return nil, nil
}
s, ok := c.(preFilterState)
if !ok {
return nil, fmt.Errorf("%+v convert to nodeports.preFilterState error", c)
}
return s, nil
}
// Filter invoked at the filter extension point.
func (pl *NodePorts) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
_, reasons, err := predicates.PodFitsHostPorts(pod, nil, nodeInfo)
func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *nodeinfo.NodeInfo) *framework.Status {
state, err := getPreFilterState(cycleState)
if err != nil {
return framework.NewStatus(framework.Error, err.Error())
}
_, reasons, err := predicates.PodFitsHostPortsPredicate(pod, state, nodeInfo)
return migration.PredicateResultToFrameworkStatus(reasons, err)
}

View File

@ -150,7 +150,12 @@ func TestNodePorts(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p, _ := New(nil, nil)
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo)
cycleState := framework.NewCycleState()
preFilterStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), cycleState, test.pod)
if !preFilterStatus.IsSuccess() {
t.Errorf("prefilter failed with status: %v", preFilterStatus)
}
gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), cycleState, test.pod, test.nodeInfo)
if !reflect.DeepEqual(gotStatus, test.wantStatus) {
t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus)
}