From 0aab44a00d8bdf3af728f76e7ccb16caff9296b4 Mon Sep 17 00:00:00 2001 From: mqliang Date: Tue, 16 Feb 2016 00:13:38 +0800 Subject: [PATCH] add ReplicaSet support in scheduler --- plugin/pkg/scheduler/algorithm/listers.go | 55 ++++++++++++++++ .../algorithm/priorities/priorities_test.go | 3 +- .../priorities/selector_spreading.go | 18 ++++-- .../priorities/selector_spreading_test.go | 62 ++++++++++++++++++- .../defaults/compatibility_test.go | 17 ++++- .../algorithmprovider/defaults/defaults.go | 4 +- plugin/pkg/scheduler/factory/factory.go | 19 +++++- plugin/pkg/scheduler/factory/plugins.go | 1 + 8 files changed, 167 insertions(+), 12 deletions(-) diff --git a/plugin/pkg/scheduler/algorithm/listers.go b/plugin/pkg/scheduler/algorithm/listers.go index 4bf01b7671b..7db0e8bebc7 100644 --- a/plugin/pkg/scheduler/algorithm/listers.go +++ b/plugin/pkg/scheduler/algorithm/listers.go @@ -20,6 +20,8 @@ import ( "fmt" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/labels" ) @@ -140,3 +142,56 @@ func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api return } + +// ReplicaSetLister interface represents anything that can produce a list of ReplicaSet; the list is consumed by a scheduler. +type ReplicaSetLister interface { + // Lists all the replicasets + List() ([]extensions.ReplicaSet, error) + // Gets the replicasets for the given pod + GetPodReplicaSets(*api.Pod) ([]extensions.ReplicaSet, error) +} + +// EmptyReplicaSetLister implements ReplicaSetLister on []extensions.ReplicaSet returning empty data +type EmptyReplicaSetLister struct{} + +// List returns nil +func (f EmptyReplicaSetLister) List() ([]extensions.ReplicaSet, error) { + return nil, nil +} + +// GetPodReplicaSets returns nil +func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) { + return nil, nil +} + +// FakeReplicaSetLister implements ControllerLister on []extensions.ReplicaSet for test purposes. +type FakeReplicaSetLister []extensions.ReplicaSet + +// List returns []extensions.ReplicaSet, the list of all ReplicaSets. +func (f FakeReplicaSetLister) List() ([]extensions.ReplicaSet, error) { + return f, nil +} + +// GetPodReplicaSets gets the ReplicaSets that have the selector that match the labels on the given pod +func (f FakeReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) { + var selector labels.Selector + + for _, rs := range f { + if rs.Namespace != pod.Namespace { + continue + } + selector, err = unversioned.LabelSelectorAsSelector(rs.Spec.Selector) + if err != nil { + return + } + + if selector.Matches(labels.Set(pod.Labels)) { + rss = append(rss, rs) + } + } + if len(rss) == 0 { + err = fmt.Errorf("Could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + + return +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go index 6cfb7c0130f..eda51aab5bc 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/priorities_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/plugin/pkg/scheduler" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util" @@ -141,7 +142,7 @@ func TestZeroRequest(t *testing.T) { // This should match the configuration in defaultPriorities() in // plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want // to test what's actually in production. - []algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{})), Weight: 1}}, + []algorithm.PriorityConfig{{Function: LeastRequestedPriority, Weight: 1}, {Function: BalancedResourceAllocation, Weight: 1}, {Function: NewSelectorSpreadPriority(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]api.Service{}), algorithm.FakeControllerLister([]api.ReplicationController{}), algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})), Weight: 1}}, algorithm.FakeNodeLister(api.NodeList{Items: test.nodes}), []algorithm.SchedulerExtender{}) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go index 0facd305585..b651001c8f6 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading.go @@ -38,13 +38,15 @@ type SelectorSpread struct { podLister algorithm.PodLister serviceLister algorithm.ServiceLister controllerLister algorithm.ControllerLister + replicaSetLister algorithm.ReplicaSetLister } -func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister) algorithm.PriorityFunction { +func NewSelectorSpreadPriority(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, controllerLister algorithm.ControllerLister, replicaSetLister algorithm.ReplicaSetLister) algorithm.PriorityFunction { selectorSpread := &SelectorSpread{ podLister: podLister, serviceLister: serviceLister, controllerLister: controllerLister, + replicaSetLister: replicaSetLister, } return selectorSpread.CalculateSpreadPriority } @@ -86,10 +88,18 @@ func (s *SelectorSpread) CalculateSpreadPriority(pod *api.Pod, nodeNameToInfo ma selectors = append(selectors, labels.SelectorFromSet(service.Spec.Selector)) } } - controllers, err := s.controllerLister.GetPodControllers(pod) + rcs, err := s.controllerLister.GetPodControllers(pod) if err == nil { - for _, controller := range controllers { - selectors = append(selectors, labels.SelectorFromSet(controller.Spec.Selector)) + for _, rc := range rcs { + selectors = append(selectors, labels.SelectorFromSet(rc.Spec.Selector)) + } + } + rss, err := s.replicaSetLister.GetPodReplicaSets(pod) + if err == nil { + for _, rs := range rss { + if selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector); err == nil { + selectors = append(selectors, selector) + } } } diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index 9fad1eb9177..49bdcb0b024 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -22,7 +22,9 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" wellknownlabels "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -48,6 +50,7 @@ func TestSelectorSpreadPriority(t *testing.T) { pods []*api.Pod nodes []string rcs []api.ReplicationController + rss []extensions.ReplicaSet services []api.Service expectedList schedulerapi.HostPriorityList test string @@ -177,6 +180,20 @@ func TestSelectorSpreadPriority(t *testing.T) { expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, test: "service with partial pod label matches with service and replication controller", }, + { + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []*api.Pod{ + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, + rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, + // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "service with partial pod label matches with service and replication controller", + }, { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}}}, pods: []*api.Pod{ @@ -191,6 +208,20 @@ func TestSelectorSpreadPriority(t *testing.T) { expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, test: "disjoined service and replication controller should be treated equally", }, + { + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: map[string]string{"foo": "bar", "bar": "foo"}}}, + pods: []*api.Pod{ + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, + rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, + // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "disjoined service and replication controller should be treated equally", + }, { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []*api.Pod{ @@ -204,6 +235,19 @@ func TestSelectorSpreadPriority(t *testing.T) { expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}}, test: "Replication controller with partial pod label matches", }, + { + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []*api.Pod{ + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}}, + // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 0}}, + test: "Replication controller with partial pod label matches", + }, { pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, pods: []*api.Pod{ @@ -216,11 +260,24 @@ func TestSelectorSpreadPriority(t *testing.T) { expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, test: "Replication controller with partial pod label matches", }, + { + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}}, + pods: []*api.Pod{ + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}, + {Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}}, + }, + nodes: []string{"machine1", "machine2"}, + rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}}, + // We use ReplicaSet, instead of ReplicationController. The result should be exactly as above. + expectedList: []schedulerapi.HostPriority{{"machine1", 0}, {"machine2", 5}}, + test: "Replication controller with partial pod label matches", + }, } for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) - selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} + selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)} list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeNodeList(test.nodes))) if err != nil { t.Errorf("unexpected error: %v", err) @@ -273,6 +330,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { pods []*api.Pod nodes []string rcs []api.ReplicationController + rss []extensions.ReplicaSet services []api.Service expectedList schedulerapi.HostPriorityList test string @@ -420,7 +478,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { for _, test := range tests { nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods) - selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs)} + selectorSpread := SelectorSpread{podLister: algorithm.FakePodLister(test.pods), serviceLister: algorithm.FakeServiceLister(test.services), controllerLister: algorithm.FakeControllerLister(test.rcs), replicaSetLister: algorithm.FakeReplicaSetLister(test.rss)} list, err := selectorSpread.CalculateSpreadPriority(test.pod, nodeNameToInfo, algorithm.FakeNodeLister(makeLabeledNodeList(labeledNodes))) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go index 009244fd75b..d981d032028 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/compatibility_test.go @@ -20,10 +20,14 @@ import ( "reflect" "testing" + "k8s.io/kubernetes/pkg/api/testapi" + client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/runtime" + utiltesting "k8s.io/kubernetes/pkg/util/testing" schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" "k8s.io/kubernetes/plugin/pkg/scheduler/factory" + "net/http/httptest" ) func TestCompatibility_v1_Scheduler(t *testing.T) { @@ -100,7 +104,18 @@ func TestCompatibility_v1_Scheduler(t *testing.T) { if !reflect.DeepEqual(policy, tc.ExpectedPolicy) { t.Errorf("%s: Expected:\n\t%#v\nGot:\n\t%#v", v, tc.ExpectedPolicy, policy) } - if _, err := factory.NewConfigFactory(nil, "some-scheduler-name").CreateFromConfig(policy); err != nil { + + handler := utiltesting.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + // TODO: Uncomment when fix #19254 + // defer server.Close() + client := client.NewOrDie(&client.Config{Host: server.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) + + if _, err := factory.NewConfigFactory(client, "some-scheduler-name").CreateFromConfig(policy); err != nil { t.Errorf("%s: Error constructing: %v", v, err) continue } diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 9f1b92cdcca..1959e1e794d 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -68,7 +68,7 @@ func init() { "ServiceSpreadingPriority", factory.PriorityConfigFactory{ Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { - return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, algorithm.EmptyControllerLister{}) + return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, algorithm.EmptyControllerLister{}, algorithm.EmptyReplicaSetLister{}) }, Weight: 1, }, @@ -141,7 +141,7 @@ func defaultPriorities() sets.String { "SelectorSpreadPriority", factory.PriorityConfigFactory{ Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { - return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, args.ControllerLister) + return priorities.NewSelectorSpreadPriority(args.PodLister, args.ServiceLister, args.ControllerLister, args.ReplicaSetLister) }, Weight: 1, }, diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 1ecfbbaf15c..e9478f4785f 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/scheduler/api/validation" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/apis/extensions" ) const ( @@ -66,6 +67,8 @@ type ConfigFactory struct { ServiceLister *cache.StoreToServiceLister // a means to list all controllers ControllerLister *cache.StoreToReplicationControllerLister + // a means to list all replicasets + ReplicaSetLister *cache.StoreToReplicaSetLister // Close this to stop all reflectors StopEverything chan struct{} @@ -91,6 +94,7 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, StopEverything: make(chan struct{}), SchedulerName: schedulerName, } @@ -188,6 +192,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, PodLister: f.PodLister, ServiceLister: f.ServiceLister, ControllerLister: f.ControllerLister, + ReplicaSetLister: f.ReplicaSetLister, // All fit predicates only need to consider schedulable nodes. NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), NodeInfo: &predicates.CachedNodeInfo{f.NodeLister}, @@ -220,15 +225,20 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, cache.NewReflector(f.createPersistentVolumeClaimLW(), &api.PersistentVolumeClaim{}, f.PVCLister.Store, 0).RunUntil(f.StopEverything) // Watch and cache all service objects. Scheduler needs to find all pods - // created by the same services or ReplicationControllers, so that it can spread them correctly. + // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // Cache this locally. cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything) // Watch and cache all ReplicationController objects. Scheduler needs to find all pods - // created by the same services or ReplicationControllers, so that it can spread them correctly. + // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // Cache this locally. cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Store, 0).RunUntil(f.StopEverything) + // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods + // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. + // Cache this locally. + cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Store, 0).RunUntil(f.StopEverything) + r := rand.New(rand.NewSource(time.Now().UnixNano())) algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r) @@ -332,6 +342,11 @@ func (factory *ConfigFactory) createControllerLW() *cache.ListWatch { return cache.NewListWatchFromClient(factory.Client, "replicationControllers", api.NamespaceAll, fields.ParseSelectorOrDie("")) } +// Returns a cache.ListWatch that gets all changes to replicasets. +func (factory *ConfigFactory) createReplicaSetLW() *cache.ListWatch { + return cache.NewListWatchFromClient(factory.Client.ExtensionsClient, "replicasets", api.NamespaceAll, fields.ParseSelectorOrDie("")) +} + func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { return func(pod *api.Pod, err error) { if err == scheduler.ErrNoNodesAvailable { diff --git a/plugin/pkg/scheduler/factory/plugins.go b/plugin/pkg/scheduler/factory/plugins.go index 06d3755ee44..93266100bf8 100644 --- a/plugin/pkg/scheduler/factory/plugins.go +++ b/plugin/pkg/scheduler/factory/plugins.go @@ -36,6 +36,7 @@ type PluginFactoryArgs struct { PodLister algorithm.PodLister ServiceLister algorithm.ServiceLister ControllerLister algorithm.ControllerLister + ReplicaSetLister algorithm.ReplicaSetLister NodeLister algorithm.NodeLister NodeInfo predicates.NodeInfo PVInfo predicates.PersistentVolumeInfo