From 6890868823cb8947c4aca64f4ec34eb500be662d Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Fri, 8 Jul 2016 08:12:44 +0200 Subject: [PATCH] Add meta field to predicates signature --- pkg/kubelet/kubelet.go | 2 +- .../algorithm/predicates/predicates.go | 43 +++++++++++-------- .../algorithm/predicates/predicates_test.go | 34 +++++++-------- plugin/pkg/scheduler/algorithm/types.go | 2 +- plugin/pkg/scheduler/generic_scheduler.go | 2 +- .../pkg/scheduler/generic_scheduler_test.go | 8 ++-- 6 files changed, 50 insertions(+), 41 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index bb5b2f87a25..0438b431382 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2411,7 +2411,7 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str } nodeInfo := schedulercache.NewNodeInfo(otherPods...) nodeInfo.SetNode(node) - fit, err := predicates.GeneralPredicates(pod, nodeInfo) + fit, err := predicates.GeneralPredicates(pod, nil, nodeInfo) if !fit { if re, ok := err.(*predicates.PredicateFailureError); ok { reason := re.PredicateName diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index e6983661191..03eb6afc265 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -65,6 +65,15 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) { return node.(*api.Node), nil } +// podMetadata defines a type, that is an expected type that is passed +// as metadata for predicate functions +type predicateMetadata struct { +} + +func PredicateMetadata(pod *api.Pod) interface{} { + return &predicateMetadata{} +} + func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { // fast path if there is no conflict checking targets. if volume.GCEPersistentDisk == nil && volume.AWSElasticBlockStore == nil && volume.RBD == nil { @@ -106,7 +115,7 @@ func isVolumeConflict(volume api.Volume, pod *api.Pod) bool { // - AWS EBS forbids any two pods mounting the same volume ID // - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image. // TODO: migrate this into some per-volume specific code? -func NoDiskConflict(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func NoDiskConflict(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { for _, v := range pod.Spec.Volumes { for _, ev := range nodeInfo.Pods() { if isVolumeConflict(v, ev) { @@ -203,7 +212,7 @@ func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []api.Volume, namespace return nil } -func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func (c *MaxPDVolumeCountChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { // If a pod doesn't have any volume attached to it, the predicate will always be true. // Thus we make a fast path for it, to avoid unnecessary computations in this case. if len(pod.Spec.Volumes) == 0 { @@ -307,7 +316,7 @@ func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolum return c.predicate } -func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { // If a pod doesn't have any volume attached to it, the predicate will always be true. // Thus we make a fast path for it, to avoid unnecessary computations in this case. if len(pod.Spec.Volumes) == 0 { @@ -450,7 +459,7 @@ func podName(pod *api.Pod) string { return pod.Namespace + "/" + pod.Name } -func PodFitsResources(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { node := nodeInfo.Node() if node == nil { return false, fmt.Errorf("node not found") @@ -556,7 +565,7 @@ func PodMatchesNodeLabels(pod *api.Pod, node *api.Node) bool { return nodeAffinityMatches } -func PodSelectorMatches(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func PodSelectorMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { node := nodeInfo.Node() if node == nil { return false, fmt.Errorf("node not found") @@ -567,7 +576,7 @@ func PodSelectorMatches(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, return false, ErrNodeSelectorNotMatch } -func PodFitsHost(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func PodFitsHost(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { if len(pod.Spec.NodeName) == 0 { return true, nil } @@ -606,7 +615,7 @@ func NewNodeLabelPredicate(labels []string, presence bool) algorithm.FitPredicat // Alternately, eliminating nodes that have a certain label, regardless of value, is also useful // A node may have a label with "retiring" as key and the date as the value // and it may be desirable to avoid scheduling new pods on this node -func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { node := nodeInfo.Node() if node == nil { return false, fmt.Errorf("node not found") @@ -649,7 +658,7 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al // - L is listed in the ServiceAffinity object that is passed into the function // - the pod does not have any NodeSelector for L // - some other pod from the same service is already scheduled onto a node that has value V for label L -func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { var affinitySelector labels.Selector // check if the pod being scheduled has the affinity labels specified in its NodeSelector @@ -721,7 +730,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, nodeInfo *scheduler return false, ErrServiceAffinityViolated } -func PodFitsHostPorts(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { wantPorts := getUsedPorts(pod) if len(wantPorts) == 0 { return true, nil @@ -767,21 +776,21 @@ func haveSame(a1, a2 []string) bool { return false } -func GeneralPredicates(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { - fit, err := PodFitsResources(pod, nodeInfo) +func GeneralPredicates(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { + fit, err := PodFitsResources(pod, meta, nodeInfo) if !fit { return fit, err } - fit, err = PodFitsHost(pod, nodeInfo) + fit, err = PodFitsHost(pod, meta, nodeInfo) if !fit { return fit, err } - fit, err = PodFitsHostPorts(pod, nodeInfo) + fit, err = PodFitsHostPorts(pod, meta, nodeInfo) if !fit { return fit, err } - fit, err = PodSelectorMatches(pod, nodeInfo) + fit, err = PodSelectorMatches(pod, meta, nodeInfo) if !fit { return fit, err } @@ -803,7 +812,7 @@ func NewPodAffinityPredicate(info NodeInfo, podLister algorithm.PodLister, failu return checker.InterPodAffinityMatches } -func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func (checker *PodAffinityChecker) InterPodAffinityMatches(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { node := nodeInfo.Node() if node == nil { return false, fmt.Errorf("node not found") @@ -994,7 +1003,7 @@ func NewTolerationMatchPredicate(info NodeInfo) algorithm.FitPredicate { return tolerationMatch.PodToleratesNodeTaints } -func (t *TolerationMatch) PodToleratesNodeTaints(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func (t *TolerationMatch) PodToleratesNodeTaints(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { node := nodeInfo.Node() taints, err := api.GetTaintsFromNodeAnnotations(node.Annotations) @@ -1045,7 +1054,7 @@ func isPodBestEffort(pod *api.Pod) bool { // CheckNodeMemoryPressurePredicate checks if a pod can be scheduled on a node // reporting memory pressure condition. -func CheckNodeMemoryPressurePredicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func CheckNodeMemoryPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { node := nodeInfo.Node() if node == nil { return false, fmt.Errorf("node not found") diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 1a94b1767e7..f3f0d78b6b6 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -226,7 +226,7 @@ func TestPodFitsResources(t *testing.T) { node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}} test.nodeInfo.SetNode(&node) - fits, err := PodFitsResources(test.pod, test.nodeInfo) + fits, err := PodFitsResources(test.pod, PredicateMetadata(test.pod), test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) } @@ -279,7 +279,7 @@ func TestPodFitsResources(t *testing.T) { node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}} test.nodeInfo.SetNode(&node) - fits, err := PodFitsResources(test.pod, test.nodeInfo) + fits, err := PodFitsResources(test.pod, PredicateMetadata(test.pod), test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) } @@ -335,7 +335,7 @@ func TestPodFitsHost(t *testing.T) { for _, test := range tests { nodeInfo := schedulercache.NewNodeInfo() nodeInfo.SetNode(test.node) - result, err := PodFitsHost(test.pod, nodeInfo) + result, err := PodFitsHost(test.pod, PredicateMetadata(test.pod), nodeInfo) if !reflect.DeepEqual(err, ErrPodNotMatchHostName) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -408,7 +408,7 @@ func TestPodFitsHostPorts(t *testing.T) { }, } for _, test := range tests { - fits, err := PodFitsHostPorts(test.pod, test.nodeInfo) + fits, err := PodFitsHostPorts(test.pod, PredicateMetadata(test.pod), test.nodeInfo) if !reflect.DeepEqual(err, ErrPodNotFitsHostPorts) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -493,7 +493,7 @@ func TestDiskConflicts(t *testing.T) { } for _, test := range tests { - ok, err := NoDiskConflict(test.pod, test.nodeInfo) + ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod), test.nodeInfo) if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -545,7 +545,7 @@ func TestAWSDiskConflicts(t *testing.T) { } for _, test := range tests { - ok, err := NoDiskConflict(test.pod, test.nodeInfo) + ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod), test.nodeInfo) if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -603,7 +603,7 @@ func TestRBDDiskConflicts(t *testing.T) { } for _, test := range tests { - ok, err := NoDiskConflict(test.pod, test.nodeInfo) + ok, err := NoDiskConflict(test.pod, PredicateMetadata(test.pod), test.nodeInfo) if !reflect.DeepEqual(err, ErrDiskConflict) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -1082,7 +1082,7 @@ func TestPodFitsSelector(t *testing.T) { nodeInfo := schedulercache.NewNodeInfo() nodeInfo.SetNode(&node) - fits, err := PodSelectorMatches(test.pod, nodeInfo) + fits, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -1147,7 +1147,7 @@ func TestNodeLabelPresence(t *testing.T) { nodeInfo.SetNode(&node) labelChecker := NodeLabelChecker{test.labels, test.presence} - fits, err := labelChecker.CheckNodeLabelPresence(test.pod, nodeInfo) + fits, err := labelChecker.CheckNodeLabelPresence(test.pod, PredicateMetadata(test.pod), nodeInfo) if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -1292,7 +1292,7 @@ func TestServiceAffinity(t *testing.T) { serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels} nodeInfo := schedulercache.NewNodeInfo() nodeInfo.SetNode(test.node) - fits, err := serviceAffinity.CheckServiceAffinity(test.pod, nodeInfo) + fits, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod), nodeInfo) if !reflect.DeepEqual(err, ErrServiceAffinityViolated) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -1573,7 +1573,7 @@ func TestEBSVolumeCountConflicts(t *testing.T) { for _, test := range tests { pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo) - fits, err := pred(test.newPod, schedulercache.NewNodeInfo(test.existingPods...)) + fits, err := pred(test.newPod, PredicateMetadata(test.newPod), schedulercache.NewNodeInfo(test.existingPods...)) if err != nil && !reflect.DeepEqual(err, ErrMaxVolumeCountExceeded) { t.Errorf("unexpected error: %v", err) } @@ -1734,7 +1734,7 @@ func TestRunGeneralPredicates(t *testing.T) { } for _, test := range resourceTests { test.nodeInfo.SetNode(test.node) - fits, err := GeneralPredicates(test.pod, test.nodeInfo) + fits, err := GeneralPredicates(test.pod, PredicateMetadata(test.pod), test.nodeInfo) if !reflect.DeepEqual(err, test.wErr) { t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr) } @@ -2227,7 +2227,7 @@ func TestInterPodAffinity(t *testing.T) { } nodeInfo := schedulercache.NewNodeInfo(podsOnNode...) nodeInfo.SetNode(test.node) - fits, err := fit.InterPodAffinityMatches(test.pod, nodeInfo) + fits, err := fit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil { t.Errorf("%s: unexpected error %v", test.test, err) } @@ -2393,7 +2393,7 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) { } nodeInfo := schedulercache.NewNodeInfo(podsOnNode...) nodeInfo.SetNode(&node) - fits, err := testFit.InterPodAffinityMatches(test.pod, nodeInfo) + fits, err := testFit.InterPodAffinityMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) if !reflect.DeepEqual(err, ErrPodAffinityNotMatch) && err != nil { t.Errorf("%s: unexpected error %v", test.test, err) } @@ -2404,7 +2404,7 @@ func TestInterPodAffinityWithMultipleNodes(t *testing.T) { if affinity.NodeAffinity != nil { nodeInfo := schedulercache.NewNodeInfo() nodeInfo.SetNode(&node) - fits2, err := PodSelectorMatches(test.pod, nodeInfo) + fits2, err := PodSelectorMatches(test.pod, PredicateMetadata(test.pod), nodeInfo) if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil { t.Errorf("unexpected error: %v", err) } @@ -2691,7 +2691,7 @@ func TestPodToleratesTaints(t *testing.T) { tolerationMatch := TolerationMatch{FakeNodeInfo(test.node)} nodeInfo := schedulercache.NewNodeInfo() nodeInfo.SetNode(&test.node) - fits, err := tolerationMatch.PodToleratesNodeTaints(test.pod, nodeInfo) + fits, err := tolerationMatch.PodToleratesNodeTaints(test.pod, PredicateMetadata(test.pod), nodeInfo) if fits == false && !reflect.DeepEqual(err, ErrTaintsTolerationsNotMatch) { t.Errorf("%s, unexpected error: %v", test.test, err) } @@ -2797,7 +2797,7 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) { } for _, test := range tests { - fits, err := CheckNodeMemoryPressurePredicate(test.pod, test.nodeInfo) + fits, err := CheckNodeMemoryPressurePredicate(test.pod, PredicateMetadata(test.pod), test.nodeInfo) if fits != test.fits { t.Errorf("%s: expected %v got %v", test.name, test.fits, fits) } diff --git a/plugin/pkg/scheduler/algorithm/types.go b/plugin/pkg/scheduler/algorithm/types.go index ac4ebcf1b5e..14959ccaf68 100644 --- a/plugin/pkg/scheduler/algorithm/types.go +++ b/plugin/pkg/scheduler/algorithm/types.go @@ -24,7 +24,7 @@ import ( // FitPredicate is a function that indicates if a pod fits into an existing node. // The failure information is given by the error. -type FitPredicate func(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) +type FitPredicate func(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) type PriorityFunction func(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodeLister NodeLister) (schedulerapi.HostPriorityList, error) diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index a2799fad74d..d084f76850c 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -184,7 +184,7 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No // Checks whether node with a given name and NodeInfo satisfies all predicateFuncs. func podFitsOnNode(pod *api.Pod, info *schedulercache.NodeInfo, predicateFuncs map[string]algorithm.FitPredicate) (bool, string, error) { for _, predicate := range predicateFuncs { - fit, err := predicate(pod, info) + fit, err := predicate(pod, nil, info) if err != nil { switch e := err.(type) { case *predicates.InsufficientResourceError: diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 68ceb53707c..50f04a1fc17 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -33,15 +33,15 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) -func falsePredicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func falsePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { return false, algorithmpredicates.ErrFakePredicate } -func truePredicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func truePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { return true, nil } -func matchesPredicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func matchesPredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { node := nodeInfo.Node() if node == nil { return false, fmt.Errorf("node not found") @@ -52,7 +52,7 @@ func matchesPredicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, er return false, algorithmpredicates.ErrFakePredicate } -func hasNoPodsPredicate(pod *api.Pod, nodeInfo *schedulercache.NodeInfo) (bool, error) { +func hasNoPodsPredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { if len(nodeInfo.Pods()) == 0 { return true, nil }