fix RC lister

This commit is contained in:
deads2k 2016-09-16 13:19:58 -04:00
parent 76d15d193d
commit 500959b70c
8 changed files with 110 additions and 148 deletions

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
)
// AppendFunc is used to add a matching item to whatever list the caller is using
type AppendFunc func(interface{})
func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
@ -136,116 +137,6 @@ func (s storeToNodeConditionLister) List() (nodes []*api.Node, err error) {
return
}
// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
type StoreToReplicationControllerLister struct {
Indexer
}
// Exists checks if the given rc exists in the store.
func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) {
_, exists, err := s.Indexer.Get(controller)
if err != nil {
return false, err
}
return exists, nil
}
// StoreToReplicationControllerLister lists all controllers in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) {
for _, c := range s.Indexer.List() {
controllers = append(controllers, *(c.(*api.ReplicationController)))
}
return controllers, nil
}
func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
return storeReplicationControllersNamespacer{s.Indexer, namespace}
}
type storeReplicationControllersNamespacer struct {
indexer Indexer
namespace string
}
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) ([]api.ReplicationController, error) {
controllers := []api.ReplicationController{}
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return controllers, nil
}
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return controllers, nil
}
for _, m := range items {
rc := *(m.(*api.ReplicationController))
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return controllers, nil
}
func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name)
}
return obj.(*api.ReplicationController), nil
}
// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found.
func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
var selector labels.Selector
var rc api.ReplicationController
if len(pod.Labels) == 0 {
err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
return
}
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
items, err := s.Indexer.Index(NamespaceIndex, key)
if err != nil {
return
}
for _, m := range items {
rc = *m.(*api.ReplicationController)
selector = labels.Set(rc.Spec.Selector).AsSelectorPreValidated()
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
controllers = append(controllers, rc)
}
if len(controllers) == 0 {
err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments.
type StoreToDeploymentLister struct {
Indexer

View File

@ -17,6 +17,8 @@ limitations under the License.
package cache
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/labels"
@ -25,24 +27,24 @@ import (
// TODO: generate these classes and methods for all resources of interest using
// a script. Can use "go generate" once 1.4 is supported by all users.
// StoreToPodLister makes a Store have the List method of the client.PodInterface
// The Store must contain (only) Pods.
//
// Lister makes an Index have the List method. The Stores must contain only the expected type
// Example:
// s := cache.NewStore()
// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"}
// r := cache.NewReflector(lw, &api.Pod{}, s).Run()
// l := StoreToPodLister{s}
// l.List()
// StoreToPodLister helps list pods
type StoreToPodLister struct {
Indexer Indexer
}
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api.Pod, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
pods = append(pods, m.(*api.Pod))
ret = append(ret, m.(*api.Pod))
})
return pods, err
return ret, err
}
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
@ -54,11 +56,11 @@ type storePodsNamespacer struct {
namespace string
}
func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
func (s storePodsNamespacer) List(selector labels.Selector) (ret []*api.Pod, err error) {
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
pods = append(pods, m.(*api.Pod))
ret = append(ret, m.(*api.Pod))
})
return pods, err
return ret, err
}
func (s storePodsNamespacer) Get(name string) (*api.Pod, error) {
@ -133,3 +135,71 @@ func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []*api.Ser
return services, nil
}
// StoreToReplicationControllerLister helps list rcs
type StoreToReplicationControllerLister struct {
Indexer Indexer
}
func (s *StoreToReplicationControllerLister) List(selector labels.Selector) (ret []*api.ReplicationController, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
ret = append(ret, m.(*api.ReplicationController))
})
return ret, err
}
func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
return storeReplicationControllersNamespacer{s.Indexer, namespace}
}
type storeReplicationControllersNamespacer struct {
indexer Indexer
namespace string
}
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (ret []*api.ReplicationController, err error) {
err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*api.ReplicationController))
})
return ret, err
}
func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name)
}
return obj.(*api.ReplicationController), nil
}
// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found.
func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
return
}
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
items, err := s.Indexer.Index(NamespaceIndex, key)
if err != nil {
return
}
for _, m := range items {
rc := m.(*api.ReplicationController)
selector := labels.Set(rc.Spec.Selector).AsSelectorPreValidated()
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
controllers = append(controllers, rc)
}
if len(controllers) == 0 {
err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

View File

@ -128,7 +128,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
testCases := []struct {
description string
inRCs []*api.ReplicationController
list func(StoreToReplicationControllerLister) ([]api.ReplicationController, error)
list func(StoreToReplicationControllerLister) ([]*api.ReplicationController, error)
outRCNames sets.String
expectErr bool
onlyIfIndexedByNamespace bool
@ -143,7 +143,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
return lister.ReplicationControllers(api.NamespaceAll).List(labels.Set{}.AsSelectorPreValidated())
},
outRCNames: sets.NewString("hmm", "foo"),
@ -158,7 +158,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
return lister.ReplicationControllers("hmm").List(labels.Set{}.AsSelectorPreValidated())
},
outRCNames: sets.NewString("hmm"),
@ -168,8 +168,8 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
inRCs: []*api.ReplicationController{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
return lister.List()
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
return lister.List(labels.Everything())
},
outRCNames: sets.NewString("basic"),
},
@ -183,7 +183,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"},
}
@ -199,7 +199,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -228,7 +228,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
},
},
},
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",

View File

@ -269,16 +269,16 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon
}
// update lookup cache
rm.lookupCache.Update(pod, &controllers[0])
rm.lookupCache.Update(pod, controllers[0])
return &controllers[0]
return controllers[0]
}
// isCacheValid check if the cache is valid
func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool {
exists, err := rm.rcStore.Exists(cachedRC)
_, err := rm.rcStore.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name)
// rc has been deleted or updated, cache is invalid
if err != nil || !exists || !isControllerMatch(pod, cachedRC) {
if err != nil || !isControllerMatch(pod, cachedRC) {
return false
}
return true

View File

@ -71,7 +71,7 @@ func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface,
}
// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.
type OverlappingControllers []api.ReplicationController
type OverlappingControllers []*api.ReplicationController
func (o OverlappingControllers) Len() int { return len(o) }
func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] }

View File

@ -101,37 +101,38 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service
// ControllerLister interface represents anything that can produce a list of ReplicationController; the list is consumed by a scheduler.
type ControllerLister interface {
// Lists all the replication controllers
List() ([]api.ReplicationController, error)
List(labels.Selector) ([]*api.ReplicationController, error)
// Gets the services for the given pod
GetPodControllers(*api.Pod) ([]api.ReplicationController, error)
GetPodControllers(*api.Pod) ([]*api.ReplicationController, error)
}
// EmptyControllerLister implements ControllerLister on []api.ReplicationController returning empty data
type EmptyControllerLister struct{}
// List returns nil
func (f EmptyControllerLister) List() ([]api.ReplicationController, error) {
func (f EmptyControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) {
return nil, nil
}
// GetPodControllers returns nil
func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
return nil, nil
}
// FakeControllerLister implements ControllerLister on []api.ReplicationController for test purposes.
type FakeControllerLister []api.ReplicationController
type FakeControllerLister []*api.ReplicationController
// List returns []api.ReplicationController, the list of all ReplicationControllers.
func (f FakeControllerLister) List() ([]api.ReplicationController, error) {
func (f FakeControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) {
return f, nil
}
// GetPodControllers gets the ReplicationControllers that have the selector that match the labels on the given pod
func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
var selector labels.Selector
for _, controller := range f {
for i := range f {
controller := f[i]
if controller.Namespace != pod.Namespace {
continue
}

View File

@ -57,7 +57,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
pod *api.Pod
pods []*api.Pod
nodes []string
rcs []api.ReplicationController
rcs []*api.ReplicationController
rss []extensions.ReplicaSet
services []*api.Service
expectedList schedulerapi.HostPriorityList
@ -181,7 +181,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
// "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to
// do spreading between all pods. The result should be exactly as above.
@ -210,7 +210,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
// Taken together Service and Replication Controller should match all Pods, hence result should be equal to one above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
@ -238,7 +238,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
// Both Nodes have one pod from the given RC, hence both get 0 score.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
test: "Replication controller with partial pod label matches",
@ -264,7 +264,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "Another replication controller with partial pod label matches",
},
@ -344,7 +344,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
pod *api.Pod
pods []*api.Pod
nodes []string
rcs []api.ReplicationController
rcs []*api.ReplicationController
rss []extensions.ReplicaSet
services []*api.Service
expectedList schedulerapi.HostPriorityList
@ -471,7 +471,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")),
buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")),
},
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}},
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}},
expectedList: []schedulerapi.HostPriority{
// Note that because we put two pods on the same node (nodeMachine1Zone3),
// the values here are questionable for zone2, in particular for nodeMachine1Zone2.

View File

@ -494,7 +494,7 @@ func TestZeroRequest(t *testing.T) {
Function: algorithmpriorities.NewSelectorSpreadPriority(
algorithm.FakePodLister(test.pods),
algorithm.FakeServiceLister([]*api.Service{}),
algorithm.FakeControllerLister([]api.ReplicationController{}),
algorithm.FakeControllerLister([]*api.ReplicationController{}),
algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})),
Weight: 1,
},