diff --git a/examples/scheduler-policy-config-with-extender.json b/examples/scheduler-policy-config-with-extender.json index 75a1d5bdab3..eea38f5c403 100644 --- a/examples/scheduler-policy-config-with-extender.json +++ b/examples/scheduler-policy-config-with-extender.json @@ -21,7 +21,8 @@ "filterVerb": "filter", "prioritizeVerb": "prioritize", "weight": 5, - "enableHttps": false + "enableHttps": false, + "nodeCacheCapable": false } ] } diff --git a/plugin/pkg/scheduler/algorithm/scheduler_interface.go b/plugin/pkg/scheduler/algorithm/scheduler_interface.go index 761bce0985a..09ec312caf8 100644 --- a/plugin/pkg/scheduler/algorithm/scheduler_interface.go +++ b/plugin/pkg/scheduler/algorithm/scheduler_interface.go @@ -19,6 +19,7 @@ package algorithm import ( "k8s.io/kubernetes/pkg/api/v1" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) // SchedulerExtender is an interface for external processes to influence scheduling @@ -28,7 +29,7 @@ type SchedulerExtender interface { // Filter based on extender-implemented predicate functions. The filtered list is // expected to be a subset of the supplied list. failedNodesMap optionally contains // the list of failed nodes and failure reasons. - Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) + Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) // Prioritize based on extender-implemented priority functions. The returned scores & weight // are used to compute the weighted score for an extender. The weighted scores are added to diff --git a/plugin/pkg/scheduler/algorithm/scheduler_interface_test.go b/plugin/pkg/scheduler/algorithm/scheduler_interface_test.go old mode 100755 new mode 100644 diff --git a/plugin/pkg/scheduler/api/types.go b/plugin/pkg/scheduler/api/types.go index bad6abc640f..90d671e8eec 100644 --- a/plugin/pkg/scheduler/api/types.go +++ b/plugin/pkg/scheduler/api/types.go @@ -128,6 +128,10 @@ type ExtenderConfig struct { // HTTPTimeout specifies the timeout duration for a call to the extender. Filter timeout fails the scheduling of the pod. Prioritize // timeout is ignored, k8s/other extenders priorities are used to select the node. HTTPTimeout time.Duration + // NodeCacheCapable specifies that the extender is capable of caching node information, + // so the scheduler should only send minimal information about the eligible nodes + // assuming that the extender already cached full details of all nodes in the cluster + NodeCacheCapable bool } // ExtenderArgs represents the arguments needed by the extender to filter/prioritize @@ -135,8 +139,12 @@ type ExtenderConfig struct { type ExtenderArgs struct { // Pod being scheduled Pod v1.Pod - // List of candidate nodes where the pod can be scheduled - Nodes v1.NodeList + // List of candidate nodes where the pod can be scheduled; to be populated + // only if ExtenderConfig.NodeCacheCapable == false + Nodes *v1.NodeList + // List of candidate node names where the pod can be scheduled; to be + // populated only if ExtenderConfig.NodeCacheCapable == true + NodeNames *[]string } // FailedNodesMap represents the filtered out nodes, with node names and failure messages @@ -144,8 +152,12 @@ type FailedNodesMap map[string]string // ExtenderFilterResult represents the results of a filter call to an extender type ExtenderFilterResult struct { - // Filtered set of nodes where the pod can be scheduled - Nodes v1.NodeList + // Filtered set of nodes where the pod can be scheduled; to be populated + // only if ExtenderConfig.NodeCacheCapable == false + Nodes *v1.NodeList + // Filtered set of nodes where the pod can be scheduled; to be populated + // only if ExtenderConfig.NodeCacheCapable == true + NodeNames *[]string // Filtered out nodes where the pod can't be scheduled and the failure messages FailedNodes FailedNodesMap // Error message indicating failure diff --git a/plugin/pkg/scheduler/api/v1/types.go b/plugin/pkg/scheduler/api/v1/types.go index 3097d83b874..036b6c50252 100644 --- a/plugin/pkg/scheduler/api/v1/types.go +++ b/plugin/pkg/scheduler/api/v1/types.go @@ -128,6 +128,10 @@ type ExtenderConfig struct { // HTTPTimeout specifies the timeout duration for a call to the extender. Filter timeout fails the scheduling of the pod. Prioritize // timeout is ignored, k8s/other extenders priorities are used to select the node. HTTPTimeout time.Duration `json:"httpTimeout,omitempty"` + // NodeCacheCapable specifies that the extender is capable of caching node information, + // so the scheduler should only send minimal information about the eligible nodes + // assuming that the extender already cached full details of all nodes in the cluster + NodeCacheCapable bool `json:"nodeCacheCapable,omitempty"` } // ExtenderArgs represents the arguments needed by the extender to filter/prioritize @@ -135,8 +139,12 @@ type ExtenderConfig struct { type ExtenderArgs struct { // Pod being scheduled Pod apiv1.Pod `json:"pod"` - // List of candidate nodes where the pod can be scheduled - Nodes apiv1.NodeList `json:"nodes"` + // List of candidate nodes where the pod can be scheduled; to be populated + // only if ExtenderConfig.NodeCacheCapable == false + Nodes *apiv1.NodeList `json:"nodes,omitempty"` + // List of candidate node names where the pod can be scheduled; to be + // populated only if ExtenderConfig.NodeCacheCapable == true + NodeNames *[]string `json:"nodenames,omitempty"` } // FailedNodesMap represents the filtered out nodes, with node names and failure messages @@ -144,8 +152,12 @@ type FailedNodesMap map[string]string // ExtenderFilterResult represents the results of a filter call to an extender type ExtenderFilterResult struct { - // Filtered set of nodes where the pod can be scheduled - Nodes apiv1.NodeList `json:"nodes,omitempty"` + // Filtered set of nodes where the pod can be scheduled; to be populated + // only if ExtenderConfig.NodeCacheCapable == false + Nodes *apiv1.NodeList `json:"nodes,omitempty"` + // Filtered set of nodes where the pod can be scheduled; to be populated + // only if ExtenderConfig.NodeCacheCapable == true + NodeNames *[]string `json:"nodenames,omitempty"` // Filtered out nodes where the pod can't be scheduled and the failure messages FailedNodes FailedNodesMap `json:"failedNodes,omitempty"` // Error message indicating failure diff --git a/plugin/pkg/scheduler/core/extender.go b/plugin/pkg/scheduler/core/extender.go index b33aae28eb3..bcc42d774c7 100644 --- a/plugin/pkg/scheduler/core/extender.go +++ b/plugin/pkg/scheduler/core/extender.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) const ( @@ -37,12 +38,12 @@ const ( // HTTPExtender implements the algorithm.SchedulerExtender interface. type HTTPExtender struct { - extenderURL string - filterVerb string - prioritizeVerb string - weight int - apiVersion string - client *http.Client + extenderURL string + filterVerb string + prioritizeVerb string + weight int + client *http.Client + nodeCacheCapable bool } func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) { @@ -68,7 +69,7 @@ func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, erro return utilnet.SetTransportDefaults(&http.Transport{}), nil } -func NewHTTPExtender(config *schedulerapi.ExtenderConfig, apiVersion string) (algorithm.SchedulerExtender, error) { +func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerExtender, error) { if config.HTTPTimeout.Nanoseconds() == 0 { config.HTTPTimeout = time.Duration(DefaultExtenderTimeout) } @@ -82,45 +83,69 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig, apiVersion string) (al Timeout: config.HTTPTimeout, } return &HTTPExtender{ - extenderURL: config.URLPrefix, - apiVersion: apiVersion, - filterVerb: config.FilterVerb, - prioritizeVerb: config.PrioritizeVerb, - weight: config.Weight, - client: client, + extenderURL: config.URLPrefix, + filterVerb: config.FilterVerb, + prioritizeVerb: config.PrioritizeVerb, + weight: config.Weight, + client: client, + nodeCacheCapable: config.NodeCacheCapable, }, nil } // Filter based on extender implemented predicate functions. The filtered list is // expected to be a subset of the supplied list. failedNodesMap optionally contains // the list of failed nodes and failure reasons. -func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { - var result schedulerapi.ExtenderFilterResult +func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { + var ( + result schedulerapi.ExtenderFilterResult + nodeList *v1.NodeList + nodeNames *[]string + nodeResult []*v1.Node + args *schedulerapi.ExtenderArgs + ) if h.filterVerb == "" { return nodes, schedulerapi.FailedNodesMap{}, nil } - nodeItems := make([]v1.Node, 0, len(nodes)) - for _, node := range nodes { - nodeItems = append(nodeItems, *node) - } - args := schedulerapi.ExtenderArgs{ - Pod: *pod, - Nodes: v1.NodeList{Items: nodeItems}, + if h.nodeCacheCapable { + nodeNameSlice := make([]string, 0, len(nodes)) + for _, node := range nodes { + nodeNameSlice = append(nodeNameSlice, node.Name) + } + nodeNames = &nodeNameSlice + } else { + nodeList = &v1.NodeList{} + for _, node := range nodes { + nodeList.Items = append(nodeList.Items, *node) + } } - if err := h.send(h.filterVerb, &args, &result); err != nil { + args = &schedulerapi.ExtenderArgs{ + Pod: *pod, + Nodes: nodeList, + NodeNames: nodeNames, + } + + if err := h.send(h.filterVerb, args, &result); err != nil { return nil, nil, err } if result.Error != "" { return nil, nil, fmt.Errorf(result.Error) } - nodeResult := make([]*v1.Node, 0, len(result.Nodes.Items)) - for i := range result.Nodes.Items { - nodeResult = append(nodeResult, &result.Nodes.Items[i]) + if h.nodeCacheCapable && result.NodeNames != nil { + nodeResult = make([]*v1.Node, 0, len(*result.NodeNames)) + for i := range *result.NodeNames { + nodeResult = append(nodeResult, nodeNameToInfo[(*result.NodeNames)[i]].Node()) + } + } else if result.Nodes != nil { + nodeResult = make([]*v1.Node, 0, len(result.Nodes.Items)) + for i := range result.Nodes.Items { + nodeResult = append(nodeResult, &result.Nodes.Items[i]) + } } + return nodeResult, result.FailedNodes, nil } @@ -128,7 +153,12 @@ func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, schedu // up for each such priority function. The returned score is added to the score computed // by Kubernetes scheduler. The total score is used to do the host selection. func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi.HostPriorityList, int, error) { - var result schedulerapi.HostPriorityList + var ( + result schedulerapi.HostPriorityList + nodeList *v1.NodeList + nodeNames *[]string + args *schedulerapi.ExtenderArgs + ) if h.prioritizeVerb == "" { result := schedulerapi.HostPriorityList{} @@ -138,16 +168,26 @@ func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi. return &result, 0, nil } - nodeItems := make([]v1.Node, 0, len(nodes)) - for _, node := range nodes { - nodeItems = append(nodeItems, *node) - } - args := schedulerapi.ExtenderArgs{ - Pod: *pod, - Nodes: v1.NodeList{Items: nodeItems}, + if h.nodeCacheCapable { + nodeNameSlice := make([]string, 0, len(nodes)) + for _, node := range nodes { + nodeNameSlice = append(nodeNameSlice, node.Name) + } + nodeNames = &nodeNameSlice + } else { + nodeList = &v1.NodeList{} + for _, node := range nodes { + nodeList.Items = append(nodeList.Items, *node) + } } - if err := h.send(h.prioritizeVerb, &args, &result); err != nil { + args = &schedulerapi.ExtenderArgs{ + Pod: *pod, + Nodes: nodeList, + NodeNames: nodeNames, + } + + if err := h.send(h.prioritizeVerb, args, &result); err != nil { return nil, 0, err } return &result, h.weight, nil @@ -160,7 +200,7 @@ func (h *HTTPExtender) send(action string, args interface{}, result interface{}) return err } - url := h.extenderURL + "/" + h.apiVersion + "/" + action + url := h.extenderURL + "/" + action req, err := http.NewRequest("POST", url, bytes.NewReader(out)) if err != nil { diff --git a/plugin/pkg/scheduler/core/extender_test.go b/plugin/pkg/scheduler/core/extender_test.go index 0bcac4bfdc7..6e333044f6a 100644 --- a/plugin/pkg/scheduler/core/extender_test.go +++ b/plugin/pkg/scheduler/core/extender_test.go @@ -104,12 +104,13 @@ func machine2Prioritizer(_ *v1.Pod, nodeNameToInfo map[string]*schedulercache.No } type FakeExtender struct { - predicates []fitPredicate - prioritizers []priorityConfig - weight int + predicates []fitPredicate + prioritizers []priorityConfig + weight int + nodeCacheCapable bool } -func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { +func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { filtered := []*v1.Node{} failedNodesMap := schedulerapi.FailedNodesMap{} for _, node := range nodes { @@ -130,6 +131,10 @@ func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, schedu failedNodesMap[node.Name] = "FakeExtender failed" } } + + if f.nodeCacheCapable { + return filtered, failedNodesMap, nil + } return filtered, failedNodesMap, nil } diff --git a/plugin/pkg/scheduler/core/generic_scheduler.go b/plugin/pkg/scheduler/core/generic_scheduler.go index 1d6ed49a931..3d24aa8c515 100644 --- a/plugin/pkg/scheduler/core/generic_scheduler.go +++ b/plugin/pkg/scheduler/core/generic_scheduler.go @@ -200,7 +200,7 @@ func findNodesThatFit( if len(filtered) > 0 && len(extenders) != 0 { for _, extender := range extenders { - filteredList, failedMap, err := extender.Filter(pod, filtered) + filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo) if err != nil { return []*v1.Node{}, FailedPredicateMap{}, err } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 76f91cc0084..ae718ff8872 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -325,7 +325,7 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler if len(policy.ExtenderConfigs) != 0 { for ii := range policy.ExtenderConfigs { glog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii]) - if extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii], policy.APIVersion); err != nil { + if extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii]); err != nil { return nil, err } else { extenders = append(extenders, extender) diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index a3257f5f459..0a7ae7a9345 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -62,9 +62,10 @@ type priorityConfig struct { } type Extender struct { - name string - predicates []fitPredicate - prioritizers []priorityConfig + name string + predicates []fitPredicate + prioritizers []priorityConfig + nodeCacheCapable bool } func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Request) { @@ -82,12 +83,9 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ if strings.Contains(req.URL.Path, filter) { resp := &schedulerapi.ExtenderFilterResult{} - nodes, failedNodes, err := e.Filter(&args.Pod, &args.Nodes) + resp, err := e.Filter(&args) if err != nil { resp.Error = err.Error() - } else { - resp.Nodes = *nodes - resp.FailedNodes = failedNodes } if err := encoder.Encode(resp); err != nil { @@ -96,7 +94,7 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ } else if strings.Contains(req.URL.Path, prioritize) { // Prioritize errors are ignored. Default k8s priorities or another extender's // priorities may be applied. - priorities, _ := e.Prioritize(&args.Pod, &args.Nodes) + priorities, _ := e.Prioritize(&args) if err := encoder.Encode(priorities); err != nil { t.Fatalf("Failed to encode %+v", priorities) @@ -106,15 +104,21 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ } } -func (e *Extender) Filter(pod *v1.Pod, nodes *v1.NodeList) (*v1.NodeList, schedulerapi.FailedNodesMap, error) { - filtered := []v1.Node{} +func (e *Extender) filterUsingNodeCache(args *schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) { + nodeSlice := make([]string, 0) failedNodesMap := schedulerapi.FailedNodesMap{} - for _, node := range nodes.Items { + for _, nodeName := range *args.NodeNames { fits := true for _, predicate := range e.predicates { - fit, err := predicate(pod, &node) + fit, err := predicate(&args.Pod, + &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}) if err != nil { - return &v1.NodeList{}, schedulerapi.FailedNodesMap{}, err + return &schedulerapi.ExtenderFilterResult{ + Nodes: nil, + NodeNames: nil, + FailedNodes: schedulerapi.FailedNodesMap{}, + Error: err.Error(), + }, err } if !fit { fits = false @@ -122,24 +126,78 @@ func (e *Extender) Filter(pod *v1.Pod, nodes *v1.NodeList) (*v1.NodeList, schedu } } if fits { - filtered = append(filtered, node) + nodeSlice = append(nodeSlice, nodeName) } else { - failedNodesMap[node.Name] = fmt.Sprintf("extender failed: %s", e.name) + failedNodesMap[nodeName] = fmt.Sprintf("extender failed: %s", e.name) } } - return &v1.NodeList{Items: filtered}, failedNodesMap, nil + + return &schedulerapi.ExtenderFilterResult{ + Nodes: nil, + NodeNames: &nodeSlice, + FailedNodes: failedNodesMap, + }, nil } -func (e *Extender) Prioritize(pod *v1.Pod, nodes *v1.NodeList) (*schedulerapi.HostPriorityList, error) { +func (e *Extender) Filter(args *schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) { + filtered := []v1.Node{} + failedNodesMap := schedulerapi.FailedNodesMap{} + + if e.nodeCacheCapable { + return e.filterUsingNodeCache(args) + } else { + for _, node := range args.Nodes.Items { + fits := true + for _, predicate := range e.predicates { + fit, err := predicate(&args.Pod, &node) + if err != nil { + return &schedulerapi.ExtenderFilterResult{ + Nodes: &v1.NodeList{}, + NodeNames: nil, + FailedNodes: schedulerapi.FailedNodesMap{}, + Error: err.Error(), + }, err + } + if !fit { + fits = false + break + } + } + if fits { + filtered = append(filtered, node) + } else { + failedNodesMap[node.Name] = fmt.Sprintf("extender failed: %s", e.name) + } + } + + return &schedulerapi.ExtenderFilterResult{ + Nodes: &v1.NodeList{Items: filtered}, + NodeNames: nil, + FailedNodes: failedNodesMap, + }, nil + } +} + +func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.HostPriorityList, error) { result := schedulerapi.HostPriorityList{} combinedScores := map[string]int{} + var nodes = &v1.NodeList{Items: []v1.Node{}} + + if e.nodeCacheCapable { + for _, nodeName := range *args.NodeNames { + nodes.Items = append(nodes.Items, v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}) + } + } else { + nodes = args.Nodes + } + for _, prioritizer := range e.prioritizers { weight := prioritizer.weight if weight == 0 { continue } priorityFunc := prioritizer.function - prioritizedList, err := priorityFunc(pod, nodes) + prioritizedList, err := priorityFunc(&args.Pod, nodes) if err != nil { return &schedulerapi.HostPriorityList{}, err } @@ -220,6 +278,17 @@ func TestSchedulerExtender(t *testing.T) { })) defer es2.Close() + extender3 := &Extender{ + name: "extender3", + predicates: []fitPredicate{machine_1_2_3_Predicate}, + prioritizers: []priorityConfig{{machine_2_Prioritizer, 5}}, + nodeCacheCapable: true, + } + es3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + extender3.serveHTTP(t, w, req) + })) + defer es3.Close() + policy := schedulerapi.Policy{ ExtenderConfigs: []schedulerapi.ExtenderConfig{ { @@ -236,6 +305,14 @@ func TestSchedulerExtender(t *testing.T) { Weight: 4, EnableHttps: false, }, + { + URLPrefix: es3.URL, + FilterVerb: filter, + PrioritizeVerb: prioritize, + Weight: 10, + EnableHttps: false, + NodeCacheCapable: true, + }, }, } policy.APIVersion = api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String() @@ -313,10 +390,11 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface) t.Fatalf("Failed to schedule pod: %v", err) } - if myPod, err := cs.Core().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{}); err != nil { + myPod, err = cs.Core().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{}) + if err != nil { t.Fatalf("Failed to get pod: %v", err) - } else if myPod.Spec.NodeName != "machine3" { - t.Fatalf("Failed to schedule using extender, expected machine3, got %v", myPod.Spec.NodeName) + } else if myPod.Spec.NodeName != "machine2" { + t.Fatalf("Failed to schedule using extender, expected machine2, got %v", myPod.Spec.NodeName) } t.Logf("Scheduled pod using extenders") }