From 500959b70c671a802c0b13c256aabfc5bad81224 Mon Sep 17 00:00:00 2001 From: deads2k Date: Fri, 16 Sep 2016 13:19:58 -0400 Subject: [PATCH] fix RC lister --- pkg/client/cache/listers.go | 111 +----------------- pkg/client/cache/listers_core.go | 88 ++++++++++++-- pkg/client/cache/listers_test.go | 16 +-- .../replication/replication_controller.go | 8 +- .../replication_controller_utils.go | 2 +- plugin/pkg/scheduler/algorithm/listers.go | 17 +-- .../priorities/selector_spreading_test.go | 14 +-- .../pkg/scheduler/generic_scheduler_test.go | 2 +- 8 files changed, 110 insertions(+), 148 deletions(-) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 5139cb4b5e4..6bdf3e3b7d5 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/labels" ) +// AppendFunc is used to add a matching item to whatever list the caller is using type AppendFunc func(interface{}) func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error { @@ -136,116 +137,6 @@ func (s storeToNodeConditionLister) List() (nodes []*api.Node, err error) { return } -// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers. -type StoreToReplicationControllerLister struct { - Indexer -} - -// Exists checks if the given rc exists in the store. -func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) { - _, exists, err := s.Indexer.Get(controller) - if err != nil { - return false, err - } - return exists, nil -} - -// StoreToReplicationControllerLister lists all controllers in the store. -// TODO: converge on the interface in pkg/client -func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) { - for _, c := range s.Indexer.List() { - controllers = append(controllers, *(c.(*api.ReplicationController))) - } - return controllers, nil -} - -func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer { - return storeReplicationControllersNamespacer{s.Indexer, namespace} -} - -type storeReplicationControllersNamespacer struct { - indexer Indexer - namespace string -} - -func (s storeReplicationControllersNamespacer) List(selector labels.Selector) ([]api.ReplicationController, error) { - controllers := []api.ReplicationController{} - - if s.namespace == api.NamespaceAll { - for _, m := range s.indexer.List() { - rc := *(m.(*api.ReplicationController)) - if selector.Matches(labels.Set(rc.Labels)) { - controllers = append(controllers, rc) - } - } - return controllers, nil - } - - key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}} - items, err := s.indexer.Index(NamespaceIndex, key) - if err != nil { - // Ignore error; do slow search without index. - glog.Warningf("can not retrieve list of objects using index : %v", err) - for _, m := range s.indexer.List() { - rc := *(m.(*api.ReplicationController)) - if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) { - controllers = append(controllers, rc) - } - } - return controllers, nil - } - for _, m := range items { - rc := *(m.(*api.ReplicationController)) - if selector.Matches(labels.Set(rc.Labels)) { - controllers = append(controllers, rc) - } - } - return controllers, nil -} - -func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) { - obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) - if err != nil { - return nil, err - } - if !exists { - return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name) - } - return obj.(*api.ReplicationController), nil -} - -// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found. -func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { - var selector labels.Selector - var rc api.ReplicationController - - if len(pod.Labels) == 0 { - err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) - return - } - - key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}} - items, err := s.Indexer.Index(NamespaceIndex, key) - if err != nil { - return - } - - for _, m := range items { - rc = *m.(*api.ReplicationController) - selector = labels.Set(rc.Spec.Selector).AsSelectorPreValidated() - - // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. - if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { - continue - } - controllers = append(controllers, rc) - } - if len(controllers) == 0 { - err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) - } - return -} - // StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments. type StoreToDeploymentLister struct { Indexer diff --git a/pkg/client/cache/listers_core.go b/pkg/client/cache/listers_core.go index 11c850ff799..5a137773b85 100644 --- a/pkg/client/cache/listers_core.go +++ b/pkg/client/cache/listers_core.go @@ -17,6 +17,8 @@ limitations under the License. package cache import ( + "fmt" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/labels" @@ -25,24 +27,24 @@ import ( // TODO: generate these classes and methods for all resources of interest using // a script. Can use "go generate" once 1.4 is supported by all users. -// StoreToPodLister makes a Store have the List method of the client.PodInterface -// The Store must contain (only) Pods. -// +// Lister makes an Index have the List method. The Stores must contain only the expected type // Example: // s := cache.NewStore() // lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"} // r := cache.NewReflector(lw, &api.Pod{}, s).Run() // l := StoreToPodLister{s} // l.List() + +// StoreToPodLister helps list pods type StoreToPodLister struct { Indexer Indexer } -func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) { +func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api.Pod, err error) { err = ListAll(s.Indexer, selector, func(m interface{}) { - pods = append(pods, m.(*api.Pod)) + ret = append(ret, m.(*api.Pod)) }) - return pods, err + return ret, err } func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer { @@ -54,11 +56,11 @@ type storePodsNamespacer struct { namespace string } -func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) { +func (s storePodsNamespacer) List(selector labels.Selector) (ret []*api.Pod, err error) { err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) { - pods = append(pods, m.(*api.Pod)) + ret = append(ret, m.(*api.Pod)) }) - return pods, err + return ret, err } func (s storePodsNamespacer) Get(name string) (*api.Pod, error) { @@ -133,3 +135,71 @@ func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []*api.Ser return services, nil } + +// StoreToReplicationControllerLister helps list rcs +type StoreToReplicationControllerLister struct { + Indexer Indexer +} + +func (s *StoreToReplicationControllerLister) List(selector labels.Selector) (ret []*api.ReplicationController, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*api.ReplicationController)) + }) + return ret, err +} + +func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer { + return storeReplicationControllersNamespacer{s.Indexer, namespace} +} + +type storeReplicationControllersNamespacer struct { + indexer Indexer + namespace string +} + +func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (ret []*api.ReplicationController, err error) { + err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*api.ReplicationController)) + }) + return ret, err +} + +func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name) + } + return obj.(*api.ReplicationController), nil +} + +// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found. +func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) { + if len(pod.Labels) == 0 { + err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name) + return + } + + key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}} + items, err := s.Indexer.Index(NamespaceIndex, key) + if err != nil { + return + } + + for _, m := range items { + rc := m.(*api.ReplicationController) + selector := labels.Set(rc.Spec.Selector).AsSelectorPreValidated() + + // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. + if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + controllers = append(controllers, rc) + } + if len(controllers) == 0 { + err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index 89367adf32d..36223eabd96 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -128,7 +128,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { testCases := []struct { description string inRCs []*api.ReplicationController - list func(StoreToReplicationControllerLister) ([]api.ReplicationController, error) + list func(StoreToReplicationControllerLister) ([]*api.ReplicationController, error) outRCNames sets.String expectErr bool onlyIfIndexedByNamespace bool @@ -143,7 +143,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"}, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { return lister.ReplicationControllers(api.NamespaceAll).List(labels.Set{}.AsSelectorPreValidated()) }, outRCNames: sets.NewString("hmm", "foo"), @@ -158,7 +158,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"}, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { return lister.ReplicationControllers("hmm").List(labels.Set{}.AsSelectorPreValidated()) }, outRCNames: sets.NewString("hmm"), @@ -168,8 +168,8 @@ func TestStoreToReplicationControllerLister(t *testing.T) { inRCs: []*api.ReplicationController{ {ObjectMeta: api.ObjectMeta{Name: "basic"}}, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { - return lister.List() + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { + return lister.List(labels.Everything()) }, outRCNames: sets.NewString("basic"), }, @@ -183,7 +183,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { }, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"}, } @@ -199,7 +199,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", @@ -228,7 +228,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) { }, }, }, - list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) { + list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) { pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod1", diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index fafbad77e11..121ccb4e784 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -269,16 +269,16 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon } // update lookup cache - rm.lookupCache.Update(pod, &controllers[0]) + rm.lookupCache.Update(pod, controllers[0]) - return &controllers[0] + return controllers[0] } // isCacheValid check if the cache is valid func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool { - exists, err := rm.rcStore.Exists(cachedRC) + _, err := rm.rcStore.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name) // rc has been deleted or updated, cache is invalid - if err != nil || !exists || !isControllerMatch(pod, cachedRC) { + if err != nil || !isControllerMatch(pod, cachedRC) { return false } return true diff --git a/pkg/controller/replication/replication_controller_utils.go b/pkg/controller/replication/replication_controller_utils.go index e7e22795a06..a5ea91304da 100644 --- a/pkg/controller/replication/replication_controller_utils.go +++ b/pkg/controller/replication/replication_controller_utils.go @@ -71,7 +71,7 @@ func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, } // OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker. -type OverlappingControllers []api.ReplicationController +type OverlappingControllers []*api.ReplicationController func (o OverlappingControllers) Len() int { return len(o) } func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] } diff --git a/plugin/pkg/scheduler/algorithm/listers.go b/plugin/pkg/scheduler/algorithm/listers.go index 519becbaf69..066fe305997 100644 --- a/plugin/pkg/scheduler/algorithm/listers.go +++ b/plugin/pkg/scheduler/algorithm/listers.go @@ -101,37 +101,38 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service // ControllerLister interface represents anything that can produce a list of ReplicationController; the list is consumed by a scheduler. type ControllerLister interface { // Lists all the replication controllers - List() ([]api.ReplicationController, error) + List(labels.Selector) ([]*api.ReplicationController, error) // Gets the services for the given pod - GetPodControllers(*api.Pod) ([]api.ReplicationController, error) + GetPodControllers(*api.Pod) ([]*api.ReplicationController, error) } // EmptyControllerLister implements ControllerLister on []api.ReplicationController returning empty data type EmptyControllerLister struct{} // List returns nil -func (f EmptyControllerLister) List() ([]api.ReplicationController, error) { +func (f EmptyControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) { return nil, nil } // GetPodControllers returns nil -func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { +func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) { return nil, nil } // FakeControllerLister implements ControllerLister on []api.ReplicationController for test purposes. -type FakeControllerLister []api.ReplicationController +type FakeControllerLister []*api.ReplicationController // List returns []api.ReplicationController, the list of all ReplicationControllers. -func (f FakeControllerLister) List() ([]api.ReplicationController, error) { +func (f FakeControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) { return f, nil } // GetPodControllers gets the ReplicationControllers that have the selector that match the labels on the given pod -func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { +func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) { var selector labels.Selector - for _, controller := range f { + for i := range f { + controller := f[i] if controller.Namespace != pod.Namespace { continue } diff --git a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go index e72717d035d..c0e0135b809 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go +++ b/plugin/pkg/scheduler/algorithm/priorities/selector_spreading_test.go @@ -57,7 +57,7 @@ func TestSelectorSpreadPriority(t *testing.T) { pod *api.Pod pods []*api.Pod nodes []string - rcs []api.ReplicationController + rcs []*api.ReplicationController rss []extensions.ReplicaSet services []*api.Service expectedList schedulerapi.HostPriorityList @@ -181,7 +181,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}}, // "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to // do spreading between all pods. The result should be exactly as above. @@ -210,7 +210,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}}, // Taken together Service and Replication Controller should match all Pods, hence result should be equal to one above. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, @@ -238,7 +238,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}}, // Both Nodes have one pod from the given RC, hence both get 0 score. expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}}, test: "Replication controller with partial pod label matches", @@ -264,7 +264,7 @@ func TestSelectorSpreadPriority(t *testing.T) { {Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}}, }, nodes: []string{"machine1", "machine2"}, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}}, expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}}, test: "Another replication controller with partial pod label matches", }, @@ -344,7 +344,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { pod *api.Pod pods []*api.Pod nodes []string - rcs []api.ReplicationController + rcs []*api.ReplicationController rss []extensions.ReplicaSet services []*api.Service expectedList schedulerapi.HostPriorityList @@ -471,7 +471,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) { buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")), buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")), }, - rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}}, + rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}}, expectedList: []schedulerapi.HostPriority{ // Note that because we put two pods on the same node (nodeMachine1Zone3), // the values here are questionable for zone2, in particular for nodeMachine1Zone2. diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index 2bc5eb05ddd..c47064ba7d0 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -494,7 +494,7 @@ func TestZeroRequest(t *testing.T) { Function: algorithmpriorities.NewSelectorSpreadPriority( algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister([]*api.Service{}), - algorithm.FakeControllerLister([]api.ReplicationController{}), + algorithm.FakeControllerLister([]*api.ReplicationController{}), algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})), Weight: 1, },