mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Revert "simplify RC and SVC listers"
This commit is contained in:
parent
ca00e596bd
commit
938872582e
@ -55,7 +55,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
|
||||
client: client,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
|
||||
}
|
||||
e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer(
|
||||
e.serviceStore.Store, e.serviceController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return e.client.Core().Services(api.NamespaceAll).List(options)
|
||||
@ -73,7 +73,6 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
|
||||
},
|
||||
DeleteFunc: e.enqueueService,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
e.podStore.Indexer, e.podController = cache.NewIndexerInformer(
|
||||
@ -263,7 +262,7 @@ func (e *endpointController) syncService(key string) error {
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
obj, exists, err := e.serviceStore.Indexer.GetByKey(key)
|
||||
obj, exists, err := e.serviceStore.Store.GetByKey(key)
|
||||
if err != nil || !exists {
|
||||
// Delete the corresponding endpoint, as the service has been deleted.
|
||||
// TODO: Please note that this will delete an endpoint when a
|
||||
|
@ -114,7 +114,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
|
||||
},
|
||||
)
|
||||
|
||||
cachedClusterClient.serviceStore.Indexer, cachedClusterClient.serviceController = cache.NewIndexerInformer(
|
||||
cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return clientset.Core().Services(v1.NamespaceAll).List(options)
|
||||
@ -149,7 +149,6 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
|
||||
glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName)
|
||||
},
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
cc.clientMap[clusterName] = cachedClusterClient
|
||||
go cachedClusterClient.serviceController.Run(wait.NeverStop)
|
||||
|
@ -63,7 +63,7 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
|
||||
// if serviceCache does not exists, that means the service is not created by federation, we should skip it
|
||||
return nil
|
||||
}
|
||||
serviceInterface, exists, err := clusterCache.serviceStore.Indexer.GetByKey(key)
|
||||
serviceInterface, exists, err := clusterCache.serviceStore.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Errorf("Did not successfully get %v from store: %v, will retry later", key, err)
|
||||
clusterCache.serviceQueue.Add(key)
|
||||
|
@ -144,7 +144,7 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
|
||||
queue: workqueue.New(),
|
||||
knownClusterSet: make(sets.String),
|
||||
}
|
||||
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
|
||||
s.serviceStore.Store, s.serviceController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return s.federationClient.Core().Services(v1.NamespaceAll).List(options)
|
||||
@ -165,7 +165,6 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
|
||||
},
|
||||
DeleteFunc: s.enqueueService,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
s.clusterStore.Store, s.clusterController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
@ -817,7 +816,7 @@ func (s *ServiceController) syncService(key string) error {
|
||||
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
// obj holds the latest service info from apiserver
|
||||
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
|
||||
obj, exists, err := s.serviceStore.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to retrieve service %v from store: %v", key, err)
|
||||
s.queue.Add(key)
|
||||
|
152
pkg/client/cache/listers.go
vendored
152
pkg/client/cache/listers.go
vendored
@ -32,7 +32,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
// AppendFunc is used to add a matching item to whatever list the caller is using
|
||||
type AppendFunc func(interface{})
|
||||
|
||||
func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error {
|
||||
@ -137,6 +136,116 @@ func (s storeToNodeConditionLister) List() (nodes []*api.Node, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
|
||||
type StoreToReplicationControllerLister struct {
|
||||
Indexer
|
||||
}
|
||||
|
||||
// Exists checks if the given rc exists in the store.
|
||||
func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) {
|
||||
_, exists, err := s.Indexer.Get(controller)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
// StoreToReplicationControllerLister lists all controllers in the store.
|
||||
// TODO: converge on the interface in pkg/client
|
||||
func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) {
|
||||
for _, c := range s.Indexer.List() {
|
||||
controllers = append(controllers, *(c.(*api.ReplicationController)))
|
||||
}
|
||||
return controllers, nil
|
||||
}
|
||||
|
||||
func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
|
||||
return storeReplicationControllersNamespacer{s.Indexer, namespace}
|
||||
}
|
||||
|
||||
type storeReplicationControllersNamespacer struct {
|
||||
indexer Indexer
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) ([]api.ReplicationController, error) {
|
||||
controllers := []api.ReplicationController{}
|
||||
|
||||
if s.namespace == api.NamespaceAll {
|
||||
for _, m := range s.indexer.List() {
|
||||
rc := *(m.(*api.ReplicationController))
|
||||
if selector.Matches(labels.Set(rc.Labels)) {
|
||||
controllers = append(controllers, rc)
|
||||
}
|
||||
}
|
||||
return controllers, nil
|
||||
}
|
||||
|
||||
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
|
||||
items, err := s.indexer.Index(NamespaceIndex, key)
|
||||
if err != nil {
|
||||
// Ignore error; do slow search without index.
|
||||
glog.Warningf("can not retrieve list of objects using index : %v", err)
|
||||
for _, m := range s.indexer.List() {
|
||||
rc := *(m.(*api.ReplicationController))
|
||||
if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) {
|
||||
controllers = append(controllers, rc)
|
||||
}
|
||||
}
|
||||
return controllers, nil
|
||||
}
|
||||
for _, m := range items {
|
||||
rc := *(m.(*api.ReplicationController))
|
||||
if selector.Matches(labels.Set(rc.Labels)) {
|
||||
controllers = append(controllers, rc)
|
||||
}
|
||||
}
|
||||
return controllers, nil
|
||||
}
|
||||
|
||||
func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) {
|
||||
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name)
|
||||
}
|
||||
return obj.(*api.ReplicationController), nil
|
||||
}
|
||||
|
||||
// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found.
|
||||
func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
|
||||
var selector labels.Selector
|
||||
var rc api.ReplicationController
|
||||
|
||||
if len(pod.Labels) == 0 {
|
||||
err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
|
||||
return
|
||||
}
|
||||
|
||||
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
|
||||
items, err := s.Indexer.Index(NamespaceIndex, key)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, m := range items {
|
||||
rc = *m.(*api.ReplicationController)
|
||||
selector = labels.Set(rc.Spec.Selector).AsSelectorPreValidated()
|
||||
|
||||
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
|
||||
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
||||
continue
|
||||
}
|
||||
controllers = append(controllers, rc)
|
||||
}
|
||||
if len(controllers) == 0 {
|
||||
err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments.
|
||||
type StoreToDeploymentLister struct {
|
||||
Indexer
|
||||
@ -409,6 +518,47 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex
|
||||
return
|
||||
}
|
||||
|
||||
// StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface
|
||||
// The Store must contain (only) Services.
|
||||
type StoreToServiceLister struct {
|
||||
Store
|
||||
}
|
||||
|
||||
func (s *StoreToServiceLister) List() (services api.ServiceList, err error) {
|
||||
for _, m := range s.Store.List() {
|
||||
services.Items = append(services.Items, *(m.(*api.Service)))
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// TODO: Move this back to scheduler as a helper function that takes a Store,
|
||||
// rather than a method of StoreToServiceLister.
|
||||
func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) {
|
||||
var selector labels.Selector
|
||||
var service api.Service
|
||||
|
||||
for _, m := range s.Store.List() {
|
||||
service = *m.(*api.Service)
|
||||
// consider only services that are in the same namespace as the pod
|
||||
if service.Namespace != pod.Namespace {
|
||||
continue
|
||||
}
|
||||
if service.Spec.Selector == nil {
|
||||
// services with nil selectors match nothing, not everything.
|
||||
continue
|
||||
}
|
||||
selector = labels.Set(service.Spec.Selector).AsSelectorPreValidated()
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
if len(services) == 0 {
|
||||
err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// StoreToEndpointsLister makes a Store that lists endpoints.
|
||||
type StoreToEndpointsLister struct {
|
||||
Store
|
||||
|
150
pkg/client/cache/listers_core.go
vendored
150
pkg/client/cache/listers_core.go
vendored
@ -17,8 +17,6 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
@ -27,24 +25,24 @@ import (
|
||||
// TODO: generate these classes and methods for all resources of interest using
|
||||
// a script. Can use "go generate" once 1.4 is supported by all users.
|
||||
|
||||
// Lister makes an Index have the List method. The Stores must contain only the expected type
|
||||
// StoreToPodLister makes a Store have the List method of the client.PodInterface
|
||||
// The Store must contain (only) Pods.
|
||||
//
|
||||
// Example:
|
||||
// s := cache.NewStore()
|
||||
// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"}
|
||||
// r := cache.NewReflector(lw, &api.Pod{}, s).Run()
|
||||
// l := StoreToPodLister{s}
|
||||
// l.List()
|
||||
|
||||
// StoreToPodLister helps list pods
|
||||
type StoreToPodLister struct {
|
||||
Indexer Indexer
|
||||
}
|
||||
|
||||
func (s *StoreToPodLister) List(selector labels.Selector) (ret []*api.Pod, err error) {
|
||||
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
|
||||
err = ListAll(s.Indexer, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api.Pod))
|
||||
pods = append(pods, m.(*api.Pod))
|
||||
})
|
||||
return ret, err
|
||||
return pods, err
|
||||
}
|
||||
|
||||
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
|
||||
@ -56,11 +54,11 @@ type storePodsNamespacer struct {
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (s storePodsNamespacer) List(selector labels.Selector) (ret []*api.Pod, err error) {
|
||||
func (s storePodsNamespacer) List(selector labels.Selector) (pods []*api.Pod, err error) {
|
||||
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api.Pod))
|
||||
pods = append(pods, m.(*api.Pod))
|
||||
})
|
||||
return ret, err
|
||||
return pods, err
|
||||
}
|
||||
|
||||
func (s storePodsNamespacer) Get(name string) (*api.Pod, error) {
|
||||
@ -73,133 +71,3 @@ func (s storePodsNamespacer) Get(name string) (*api.Pod, error) {
|
||||
}
|
||||
return obj.(*api.Pod), nil
|
||||
}
|
||||
|
||||
// StoreToReplicationControllerLister helps list rcs
|
||||
type StoreToReplicationControllerLister struct {
|
||||
Indexer Indexer
|
||||
}
|
||||
|
||||
func (s *StoreToReplicationControllerLister) List(selector labels.Selector) (ret []*api.ReplicationController, err error) {
|
||||
err = ListAll(s.Indexer, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api.ReplicationController))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
|
||||
return storeReplicationControllersNamespacer{s.Indexer, namespace}
|
||||
}
|
||||
|
||||
type storeReplicationControllersNamespacer struct {
|
||||
indexer Indexer
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (ret []*api.ReplicationController, err error) {
|
||||
err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api.ReplicationController))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (s storeReplicationControllersNamespacer) Get(name string) (*api.ReplicationController, error) {
|
||||
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.NewNotFound(api.Resource("replicationcontroller"), name)
|
||||
}
|
||||
return obj.(*api.ReplicationController), nil
|
||||
}
|
||||
|
||||
// GetPodControllers returns a list of replication controllers managing a pod. Returns an error only if no matching controllers are found.
|
||||
func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
|
||||
if len(pod.Labels) == 0 {
|
||||
err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
|
||||
return
|
||||
}
|
||||
|
||||
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
|
||||
items, err := s.Indexer.Index(NamespaceIndex, key)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, m := range items {
|
||||
rc := m.(*api.ReplicationController)
|
||||
selector := labels.Set(rc.Spec.Selector).AsSelectorPreValidated()
|
||||
|
||||
// If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
|
||||
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
||||
continue
|
||||
}
|
||||
controllers = append(controllers, rc)
|
||||
}
|
||||
if len(controllers) == 0 {
|
||||
err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// StoreToServiceLister helps list services
|
||||
type StoreToServiceLister struct {
|
||||
Indexer Indexer
|
||||
}
|
||||
|
||||
func (s *StoreToServiceLister) List(selector labels.Selector) (ret []*api.Service, err error) {
|
||||
err = ListAll(s.Indexer, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api.Service))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (s *StoreToServiceLister) Services(namespace string) storeServicesNamespacer {
|
||||
return storeServicesNamespacer{s.Indexer, namespace}
|
||||
}
|
||||
|
||||
type storeServicesNamespacer struct {
|
||||
indexer Indexer
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (s storeServicesNamespacer) List(selector labels.Selector) (ret []*api.Service, err error) {
|
||||
err = ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
|
||||
ret = append(ret, m.(*api.Service))
|
||||
})
|
||||
return ret, err
|
||||
}
|
||||
|
||||
func (s storeServicesNamespacer) Get(name string) (*api.Service, error) {
|
||||
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !exists {
|
||||
return nil, errors.NewNotFound(api.Resource("service"), name)
|
||||
}
|
||||
return obj.(*api.Service), nil
|
||||
}
|
||||
|
||||
// TODO: Move this back to scheduler as a helper function that takes a Store,
|
||||
// rather than a method of StoreToServiceLister.
|
||||
func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
|
||||
allServices, err := s.Services(pod.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range allServices {
|
||||
service := allServices[i]
|
||||
if service.Spec.Selector == nil {
|
||||
// services with nil selectors match nothing, not everything.
|
||||
continue
|
||||
}
|
||||
selector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
|
||||
if selector.Matches(labels.Set(pod.Labels)) {
|
||||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
|
||||
return services, nil
|
||||
}
|
||||
|
18
pkg/client/cache/listers_test.go
vendored
18
pkg/client/cache/listers_test.go
vendored
@ -128,7 +128,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
|
||||
testCases := []struct {
|
||||
description string
|
||||
inRCs []*api.ReplicationController
|
||||
list func(StoreToReplicationControllerLister) ([]*api.ReplicationController, error)
|
||||
list func(StoreToReplicationControllerLister) ([]api.ReplicationController, error)
|
||||
outRCNames sets.String
|
||||
expectErr bool
|
||||
onlyIfIndexedByNamespace bool
|
||||
@ -143,7 +143,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
|
||||
ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"},
|
||||
},
|
||||
},
|
||||
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
|
||||
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
|
||||
return lister.ReplicationControllers(api.NamespaceAll).List(labels.Set{}.AsSelectorPreValidated())
|
||||
},
|
||||
outRCNames: sets.NewString("hmm", "foo"),
|
||||
@ -158,7 +158,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
|
||||
ObjectMeta: api.ObjectMeta{Name: "hmm", Namespace: "hmm"},
|
||||
},
|
||||
},
|
||||
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
|
||||
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
|
||||
return lister.ReplicationControllers("hmm").List(labels.Set{}.AsSelectorPreValidated())
|
||||
},
|
||||
outRCNames: sets.NewString("hmm"),
|
||||
@ -168,8 +168,8 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
|
||||
inRCs: []*api.ReplicationController{
|
||||
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
|
||||
},
|
||||
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
|
||||
return lister.List(labels.Everything())
|
||||
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
|
||||
return lister.List()
|
||||
},
|
||||
outRCNames: sets.NewString("basic"),
|
||||
},
|
||||
@ -183,7 +183,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
|
||||
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"},
|
||||
}
|
||||
@ -199,7 +199,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
|
||||
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
|
||||
},
|
||||
},
|
||||
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
|
||||
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod1",
|
||||
@ -228,7 +228,7 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
list: func(lister StoreToReplicationControllerLister) ([]*api.ReplicationController, error) {
|
||||
list: func(lister StoreToReplicationControllerLister) ([]api.ReplicationController, error) {
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod1",
|
||||
@ -754,7 +754,7 @@ func TestStoreToPodLister(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStoreToServiceLister(t *testing.T) {
|
||||
store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
|
||||
store := NewStore(MetaNamespaceKeyFunc)
|
||||
store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo"},
|
||||
Spec: api.ServiceSpec{
|
||||
|
@ -80,7 +80,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
|
||||
}
|
||||
|
||||
e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer(
|
||||
e.serviceStore.Store, e.serviceController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return e.client.Core().Services(api.NamespaceAll).List(options)
|
||||
@ -99,7 +99,6 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client
|
||||
},
|
||||
DeleteFunc: e.enqueueService,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
|
||||
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
@ -336,7 +335,7 @@ func (e *EndpointController) syncService(key string) error {
|
||||
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
|
||||
obj, exists, err := e.serviceStore.Indexer.GetByKey(key)
|
||||
obj, exists, err := e.serviceStore.Store.GetByKey(key)
|
||||
if err != nil || !exists {
|
||||
// Delete the corresponding endpoint, as the service has been deleted.
|
||||
// TODO: Please note that this will delete an endpoint when a
|
||||
|
@ -110,7 +110,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
|
||||
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
|
||||
})
|
||||
@ -174,7 +174,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
@ -216,7 +216,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
||||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
@ -255,7 +255,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
||||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
@ -293,7 +293,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
|
||||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 0, 1, 1)
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
@ -331,7 +331,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
|
||||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 1)
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{},
|
||||
@ -373,7 +373,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
||||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
@ -414,7 +414,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
|
||||
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, api.NamespaceDefault, 1, 1, 0)
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
@ -435,7 +435,7 @@ func TestSyncEndpointsItems(t *testing.T) {
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
|
||||
addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found!
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
@ -478,7 +478,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
|
||||
serviceLabels := map[string]string{"foo": "bar"}
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
@ -539,7 +539,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
|
||||
endpoints.podStoreSynced = alwaysReady
|
||||
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
|
||||
serviceLabels := map[string]string{"baz": "blah"}
|
||||
endpoints.serviceStore.Indexer.Add(&api.Service{
|
||||
endpoints.serviceStore.Store.Add(&api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: ns,
|
||||
|
@ -269,16 +269,16 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon
|
||||
}
|
||||
|
||||
// update lookup cache
|
||||
rm.lookupCache.Update(pod, controllers[0])
|
||||
rm.lookupCache.Update(pod, &controllers[0])
|
||||
|
||||
return controllers[0]
|
||||
return &controllers[0]
|
||||
}
|
||||
|
||||
// isCacheValid check if the cache is valid
|
||||
func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool {
|
||||
_, err := rm.rcStore.ReplicationControllers(cachedRC.Namespace).Get(cachedRC.Name)
|
||||
exists, err := rm.rcStore.Exists(cachedRC)
|
||||
// rc has been deleted or updated, cache is invalid
|
||||
if err != nil || !isControllerMatch(pod, cachedRC) {
|
||||
if err != nil || !exists || !isControllerMatch(pod, cachedRC) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
@ -71,7 +71,7 @@ func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface,
|
||||
}
|
||||
|
||||
// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.
|
||||
type OverlappingControllers []*api.ReplicationController
|
||||
type OverlappingControllers []api.ReplicationController
|
||||
|
||||
func (o OverlappingControllers) Len() int { return len(o) }
|
||||
func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
|
||||
|
@ -119,7 +119,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
||||
},
|
||||
workingQueue: workqueue.NewDelayingQueue(),
|
||||
}
|
||||
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
|
||||
s.serviceStore.Store, s.serviceController = cache.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
|
||||
@ -141,7 +141,6 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
|
||||
},
|
||||
DeleteFunc: s.enqueueService,
|
||||
},
|
||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||
)
|
||||
if err := s.init(); err != nil {
|
||||
return nil, err
|
||||
@ -725,7 +724,7 @@ func (s *ServiceController) syncService(key string) error {
|
||||
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
// obj holds the latest service info from apiserver
|
||||
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
|
||||
obj, exists, err := s.serviceStore.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
|
||||
s.workingQueue.Add(key)
|
||||
|
@ -27,10 +27,10 @@ import (
|
||||
// FromServices builds environment variables that a container is started with,
|
||||
// which tell the container where to find the services it may need, which are
|
||||
// provided as an argument.
|
||||
func FromServices(services []*api.Service) []api.EnvVar {
|
||||
func FromServices(services *api.ServiceList) []api.EnvVar {
|
||||
var result []api.EnvVar
|
||||
for i := range services {
|
||||
service := services[i]
|
||||
for i := range services.Items {
|
||||
service := &services.Items[i]
|
||||
|
||||
// ignore services where ClusterIP is "None" or empty
|
||||
// the services passed to this method should be pre-filtered
|
||||
|
@ -25,61 +25,63 @@ import (
|
||||
)
|
||||
|
||||
func TestFromServices(t *testing.T) {
|
||||
sl := []*api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo-bar"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "1.2.3.4",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8080, Protocol: "TCP"},
|
||||
sl := api.ServiceList{
|
||||
Items: []api.Service{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo-bar"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "1.2.3.4",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8080, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "abc-123"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "5.6.7.8",
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "u-d-p", Port: 8081, Protocol: "UDP"},
|
||||
{Name: "t-c-p", Port: 8081, Protocol: "TCP"},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "abc-123"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "5.6.7.8",
|
||||
Ports: []api.ServicePort{
|
||||
{Name: "u-d-p", Port: 8081, Protocol: "UDP"},
|
||||
{Name: "t-c-p", Port: 8081, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "q-u-u-x"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "9.8.7.6",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
{Name: "8083", Port: 8083, Protocol: "TCP"},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "q-u-u-x"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "9.8.7.6",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
{Name: "8083", Port: 8083, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-none"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "None",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-none"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "None",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-empty"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{Name: "svrc-clusterip-empty"},
|
||||
Spec: api.ServiceSpec{
|
||||
Selector: map[string]string{"bar": "baz"},
|
||||
ClusterIP: "",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 8082, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
vars := envvars.FromServices(sl)
|
||||
vars := envvars.FromServices(&sl)
|
||||
expected := []api.EnvVar{
|
||||
{Name: "FOO_BAR_SERVICE_HOST", Value: "1.2.3.4"},
|
||||
{Name: "FOO_BAR_SERVICE_PORT", Value: "8080"},
|
||||
|
@ -77,7 +77,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
|
||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/security/apparmor"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
@ -372,7 +371,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||
dockerExecHandler = &dockertools.NativeExecHandler{}
|
||||
}
|
||||
|
||||
serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
if kubeClient != nil {
|
||||
// TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
|
||||
// than an interface. There is no way to construct a list+watcher using resource name.
|
||||
@ -386,7 +385,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||
}
|
||||
cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
|
||||
}
|
||||
serviceLister := &cache.StoreToServiceLister{Indexer: serviceStore}
|
||||
serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
|
||||
|
||||
nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
if kubeClient != nil {
|
||||
@ -778,7 +777,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
||||
}
|
||||
|
||||
type serviceLister interface {
|
||||
List(labels.Selector) ([]*api.Service, error)
|
||||
List() (api.ServiceList, error)
|
||||
}
|
||||
|
||||
type nodeLister interface {
|
||||
@ -1448,7 +1447,7 @@ var masterServices = sets.NewString("kubernetes")
|
||||
// pod in namespace ns should see.
|
||||
func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
|
||||
var (
|
||||
serviceMap = make(map[string]*api.Service)
|
||||
serviceMap = make(map[string]api.Service)
|
||||
m = make(map[string]string)
|
||||
)
|
||||
|
||||
@ -1458,16 +1457,15 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
|
||||
// Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
|
||||
return m, nil
|
||||
}
|
||||
services, err := kl.serviceLister.List(labels.Everything())
|
||||
services, err := kl.serviceLister.List()
|
||||
if err != nil {
|
||||
return m, fmt.Errorf("failed to list services when setting up env vars.")
|
||||
}
|
||||
|
||||
// project the services in namespace ns onto the master services
|
||||
for i := range services {
|
||||
service := services[i]
|
||||
for _, service := range services.Items {
|
||||
// ignore services where ClusterIP is "None" or empty
|
||||
if !api.IsServiceIPSet(service) {
|
||||
if !api.IsServiceIPSet(&service) {
|
||||
continue
|
||||
}
|
||||
serviceName := service.Name
|
||||
@ -1487,13 +1485,12 @@ func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mappedServices := []*api.Service{}
|
||||
for key := range serviceMap {
|
||||
mappedServices = append(mappedServices, serviceMap[key])
|
||||
services.Items = []api.Service{}
|
||||
for _, service := range serviceMap {
|
||||
services.Items = append(services.Items, service)
|
||||
}
|
||||
|
||||
for _, e := range envvars.FromServices(mappedServices) {
|
||||
for _, e := range envvars.FromServices(&services) {
|
||||
m[e.Name] = e.Value
|
||||
}
|
||||
return m, nil
|
||||
|
@ -61,7 +61,6 @@ import (
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
||||
kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
@ -897,11 +896,13 @@ func TestDNSConfigurationParams(t *testing.T) {
|
||||
}
|
||||
|
||||
type testServiceLister struct {
|
||||
services []*api.Service
|
||||
services []api.Service
|
||||
}
|
||||
|
||||
func (ls testServiceLister) List(labels.Selector) ([]*api.Service, error) {
|
||||
return ls.services, nil
|
||||
func (ls testServiceLister) List() (api.ServiceList, error) {
|
||||
return api.ServiceList{
|
||||
Items: ls.services,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type testNodeLister struct {
|
||||
@ -937,8 +938,8 @@ func (e envs) Swap(i, j int) { e[i], e[j] = e[j], e[i] }
|
||||
|
||||
func (e envs) Less(i, j int) bool { return e[i].Name < e[j].Name }
|
||||
|
||||
func buildService(name, namespace, clusterIP, protocol string, port int) *api.Service {
|
||||
return &api.Service{
|
||||
func buildService(name, namespace, clusterIP, protocol string, port int) api.Service {
|
||||
return api.Service{
|
||||
ObjectMeta: api.ObjectMeta{Name: name, Namespace: namespace},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: []api.ServicePort{{
|
||||
@ -951,7 +952,7 @@ func buildService(name, namespace, clusterIP, protocol string, port int) *api.Se
|
||||
}
|
||||
|
||||
func TestMakeEnvironmentVariables(t *testing.T) {
|
||||
services := []*api.Service{
|
||||
services := []api.Service{
|
||||
buildService("kubernetes", api.NamespaceDefault, "1.2.3.1", "TCP", 8081),
|
||||
buildService("test", "test1", "1.2.3.3", "TCP", 8083),
|
||||
buildService("kubernetes", "test2", "1.2.3.4", "TCP", 8084),
|
||||
|
@ -63,25 +63,24 @@ func (f FakePodLister) List(s labels.Selector) (selected []*api.Pod, err error)
|
||||
// ServiceLister interface represents anything that can produce a list of services; the list is consumed by a scheduler.
|
||||
type ServiceLister interface {
|
||||
// Lists all the services
|
||||
List(labels.Selector) ([]*api.Service, error)
|
||||
List() (api.ServiceList, error)
|
||||
// Gets the services for the given pod
|
||||
GetPodServices(*api.Pod) ([]*api.Service, error)
|
||||
GetPodServices(*api.Pod) ([]api.Service, error)
|
||||
}
|
||||
|
||||
// FakeServiceLister implements ServiceLister on []api.Service for test purposes.
|
||||
type FakeServiceLister []*api.Service
|
||||
type FakeServiceLister []api.Service
|
||||
|
||||
// List returns api.ServiceList, the list of all services.
|
||||
func (f FakeServiceLister) List(labels.Selector) ([]*api.Service, error) {
|
||||
return f, nil
|
||||
func (f FakeServiceLister) List() (api.ServiceList, error) {
|
||||
return api.ServiceList{Items: f}, nil
|
||||
}
|
||||
|
||||
// GetPodServices gets the services that have the selector that match the labels on the given pod
|
||||
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
|
||||
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []api.Service, err error) {
|
||||
var selector labels.Selector
|
||||
|
||||
for i := range f {
|
||||
service := f[i]
|
||||
for _, service := range f {
|
||||
// consider only services that are in the same namespace as the pod
|
||||
if service.Namespace != pod.Namespace {
|
||||
continue
|
||||
@ -101,38 +100,37 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service
|
||||
// ControllerLister interface represents anything that can produce a list of ReplicationController; the list is consumed by a scheduler.
|
||||
type ControllerLister interface {
|
||||
// Lists all the replication controllers
|
||||
List(labels.Selector) ([]*api.ReplicationController, error)
|
||||
List() ([]api.ReplicationController, error)
|
||||
// Gets the services for the given pod
|
||||
GetPodControllers(*api.Pod) ([]*api.ReplicationController, error)
|
||||
GetPodControllers(*api.Pod) ([]api.ReplicationController, error)
|
||||
}
|
||||
|
||||
// EmptyControllerLister implements ControllerLister on []api.ReplicationController returning empty data
|
||||
type EmptyControllerLister struct{}
|
||||
|
||||
// List returns nil
|
||||
func (f EmptyControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) {
|
||||
func (f EmptyControllerLister) List() ([]api.ReplicationController, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetPodControllers returns nil
|
||||
func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
|
||||
func (f EmptyControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// FakeControllerLister implements ControllerLister on []api.ReplicationController for test purposes.
|
||||
type FakeControllerLister []*api.ReplicationController
|
||||
type FakeControllerLister []api.ReplicationController
|
||||
|
||||
// List returns []api.ReplicationController, the list of all ReplicationControllers.
|
||||
func (f FakeControllerLister) List(labels.Selector) ([]*api.ReplicationController, error) {
|
||||
func (f FakeControllerLister) List() ([]api.ReplicationController, error) {
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// GetPodControllers gets the ReplicationControllers that have the selector that match the labels on the given pod
|
||||
func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []*api.ReplicationController, err error) {
|
||||
func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) {
|
||||
var selector labels.Selector
|
||||
|
||||
for i := range f {
|
||||
controller := f[i]
|
||||
for _, controller := range f {
|
||||
if controller.Namespace != pod.Namespace {
|
||||
continue
|
||||
}
|
||||
|
@ -670,7 +670,7 @@ func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, n
|
||||
// skip looking at other pods in the service if the current pod defines all the required affinity labels
|
||||
if !labelsExist {
|
||||
services, err := s.serviceLister.GetPodServices(pod)
|
||||
if err == nil && len(services) > 0 {
|
||||
if err == nil {
|
||||
// just use the first service and get the other pods within the service
|
||||
// TODO: a separate predicate can be created that tries to handle all services for the pod
|
||||
selector := labels.SelectorFromSet(services[0].Spec.Selector)
|
||||
|
@ -1209,7 +1209,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
tests := []struct {
|
||||
pod *api.Pod
|
||||
pods []*api.Pod
|
||||
services []*api.Service
|
||||
services []api.Service
|
||||
node *api.Node
|
||||
labels []string
|
||||
fits bool
|
||||
@ -1240,7 +1240,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine1"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node1,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: true,
|
||||
labels: []string{"region"},
|
||||
test: "service pod on same node",
|
||||
@ -1249,7 +1249,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node1,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: true,
|
||||
labels: []string{"region"},
|
||||
test: "service pod on different node, region match",
|
||||
@ -1258,7 +1258,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node1,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: false,
|
||||
labels: []string{"region"},
|
||||
test: "service pod on different node, region mismatch",
|
||||
@ -1267,7 +1267,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}},
|
||||
node: &node1,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns2"}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns2"}}},
|
||||
fits: true,
|
||||
labels: []string{"region"},
|
||||
test: "service in different namespace, region mismatch",
|
||||
@ -1276,7 +1276,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns2"}}},
|
||||
node: &node1,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
fits: true,
|
||||
labels: []string{"region"},
|
||||
test: "pod in different namespace, region mismatch",
|
||||
@ -1285,7 +1285,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine3"}, ObjectMeta: api.ObjectMeta{Labels: selector, Namespace: "ns1"}}},
|
||||
node: &node1,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
fits: false,
|
||||
labels: []string{"region"},
|
||||
test: "service and pod in same namespace, region mismatch",
|
||||
@ -1294,7 +1294,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine2"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node1,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: false,
|
||||
labels: []string{"region", "zone"},
|
||||
test: "service pod on different node, multiple labels, not all match",
|
||||
@ -1303,7 +1303,7 @@ func TestServiceAffinity(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: selector}},
|
||||
pods: []*api.Pod{{Spec: api.PodSpec{NodeName: "machine5"}, ObjectMeta: api.ObjectMeta{Labels: selector}}},
|
||||
node: &node4,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: selector}}},
|
||||
fits: true,
|
||||
labels: []string{"region", "zone"},
|
||||
test: "service pod on different node, multiple labels, all match",
|
||||
|
@ -194,7 +194,7 @@ func NewServiceAntiAffinityPriority(podLister algorithm.PodLister, serviceLister
|
||||
// The label to be considered is provided to the struct (ServiceAntiAffinity).
|
||||
func (s *ServiceAntiAffinity) CalculateAntiAffinityPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*api.Node) (schedulerapi.HostPriorityList, error) {
|
||||
var nsServicePods []*api.Pod
|
||||
if services, err := s.serviceLister.GetPodServices(pod); err == nil && len(services) > 0 {
|
||||
if services, err := s.serviceLister.GetPodServices(pod); err == nil {
|
||||
// just use the first service and get the other pods within the service
|
||||
// TODO: a separate predicate can be created that tries to handle all services for the pod
|
||||
selector := labels.SelectorFromSet(services[0].Spec.Selector)
|
||||
|
@ -57,9 +57,9 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
pod *api.Pod
|
||||
pods []*api.Pod
|
||||
nodes []string
|
||||
rcs []*api.ReplicationController
|
||||
rcs []api.ReplicationController
|
||||
rss []extensions.ReplicaSet
|
||||
services []*api.Service
|
||||
services []api.Service
|
||||
expectedList schedulerapi.HostPriorityList
|
||||
test string
|
||||
}{
|
||||
@ -80,7 +80,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 10}},
|
||||
test: "different services",
|
||||
},
|
||||
@ -91,7 +91,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
|
||||
test: "two pods, one service pod",
|
||||
},
|
||||
@ -105,7 +105,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
|
||||
test: "five pods, one service pod in no namespace",
|
||||
},
|
||||
@ -118,7 +118,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
|
||||
test: "four pods, one service pod in default namespace",
|
||||
},
|
||||
@ -132,7 +132,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: "ns1"}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 10}, {Host: "machine2", Score: 0}},
|
||||
test: "five pods, one service pod in specific namespace",
|
||||
},
|
||||
@ -144,7 +144,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
|
||||
test: "three pods, two service pods on different machines",
|
||||
},
|
||||
@ -157,7 +157,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 5}, {Host: "machine2", Score: 0}},
|
||||
test: "four pods, three service pods",
|
||||
},
|
||||
@ -169,7 +169,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
test: "service with partial pod label matches",
|
||||
},
|
||||
@ -181,8 +181,8 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
// "baz=blah" matches both labels1 and labels2, and "foo=bar" matches only labels 1. This means that we assume that we want to
|
||||
// do spreading between all pods. The result should be exactly as above.
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
@ -196,7 +196,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
|
||||
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
@ -210,8 +210,8 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
|
||||
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
|
||||
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
// Taken together Service and Replication Controller should match all Pods, hence result should be equal to one above.
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
test: "disjoined service and replication controller should be treated equally",
|
||||
@ -224,7 +224,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
|
||||
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
|
||||
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
@ -238,7 +238,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"foo": "bar"}}}},
|
||||
// Both Nodes have one pod from the given RC, hence both get 0 score.
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
|
||||
test: "Replication controller with partial pod label matches",
|
||||
@ -264,7 +264,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicationController", "name", "abc123")}},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
|
||||
test: "Another replication controller with partial pod label matches",
|
||||
},
|
||||
@ -344,9 +344,9 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
pod *api.Pod
|
||||
pods []*api.Pod
|
||||
nodes []string
|
||||
rcs []*api.ReplicationController
|
||||
rcs []api.ReplicationController
|
||||
rss []extensions.ReplicaSet
|
||||
services []*api.Service
|
||||
services []api.Service
|
||||
expectedList schedulerapi.HostPriorityList
|
||||
test string
|
||||
}{
|
||||
@ -378,7 +378,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
{
|
||||
pod: buildPod("", labels1, nil),
|
||||
pods: []*api.Pod{buildPod(nodeMachine1Zone1, labels2, nil)},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 10},
|
||||
{Host: nodeMachine1Zone2, Score: 10},
|
||||
@ -395,7 +395,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
buildPod(nodeMachine1Zone1, labels2, nil),
|
||||
buildPod(nodeMachine1Zone2, labels1, nil),
|
||||
},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 10},
|
||||
{Host: nodeMachine1Zone2, Score: 0}, // Already have pod on machine
|
||||
@ -415,7 +415,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
buildPod(nodeMachine1Zone3, labels2, nil),
|
||||
buildPod(nodeMachine2Zone3, labels1, nil),
|
||||
},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 10},
|
||||
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
|
||||
@ -434,7 +434,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
buildPod(nodeMachine2Zone2, labels2, nil),
|
||||
buildPod(nodeMachine1Zone3, labels1, nil),
|
||||
},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 0}, // Pod on node
|
||||
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
|
||||
@ -453,7 +453,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
buildPod(nodeMachine1Zone3, labels1, nil),
|
||||
buildPod(nodeMachine2Zone2, labels2, nil),
|
||||
},
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
{Host: nodeMachine1Zone1, Score: 0}, // Pod on node
|
||||
{Host: nodeMachine1Zone2, Score: 0}, // Pod on node
|
||||
@ -471,7 +471,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
|
||||
buildPod(nodeMachine1Zone2, labels1, controllerRef("ReplicationController", "name", "abc123")),
|
||||
buildPod(nodeMachine1Zone3, labels1, controllerRef("ReplicationController", "name", "abc123")),
|
||||
},
|
||||
rcs: []*api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}},
|
||||
rcs: []api.ReplicationController{{Spec: api.ReplicationControllerSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{
|
||||
// Note that because we put two pods on the same node (nodeMachine1Zone3),
|
||||
// the values here are questionable for zone2, in particular for nodeMachine1Zone2.
|
||||
@ -548,7 +548,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
||||
pod *api.Pod
|
||||
pods []*api.Pod
|
||||
nodes map[string]map[string]string
|
||||
services []*api.Service
|
||||
services []api.Service
|
||||
expectedList schedulerapi.HostPriorityList
|
||||
test string
|
||||
}{
|
||||
@ -573,7 +573,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
||||
pod: &api.Pod{ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
pods: []*api.Pod{{Spec: zone1Spec, ObjectMeta: api.ObjectMeta{Labels: labels2}}},
|
||||
nodes: labeledNodes,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"key": "value"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 10}, {Host: "machine12", Score: 10},
|
||||
{Host: "machine21", Score: 10}, {Host: "machine22", Score: 10},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
@ -587,7 +587,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 10}, {Host: "machine12", Score: 10},
|
||||
{Host: "machine21", Score: 0}, {Host: "machine22", Score: 0},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
@ -601,7 +601,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 5}, {Host: "machine12", Score: 5},
|
||||
{Host: "machine21", Score: 5}, {Host: "machine22", Score: 5},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
@ -616,7 +616,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, Namespace: "ns1"}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}, ObjectMeta: api.ObjectMeta{Namespace: api.NamespaceDefault}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 0}, {Host: "machine12", Score: 0},
|
||||
{Host: "machine21", Score: 10}, {Host: "machine22", Score: 10},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
@ -631,7 +631,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 6}, {Host: "machine12", Score: 6},
|
||||
{Host: "machine21", Score: 3}, {Host: "machine22", Score: 3},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
@ -645,7 +645,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 3}, {Host: "machine12", Score: 3},
|
||||
{Host: "machine21", Score: 6}, {Host: "machine22", Score: 6},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
@ -660,7 +660,7 @@ func TestZoneSpreadPriority(t *testing.T) {
|
||||
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1}},
|
||||
},
|
||||
nodes: labeledNodes,
|
||||
services: []*api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
services: []api.Service{{Spec: api.ServiceSpec{Selector: labels1}}},
|
||||
expectedList: []schedulerapi.HostPriority{{Host: "machine11", Score: 7}, {Host: "machine12", Score: 7},
|
||||
{Host: "machine21", Score: 5}, {Host: "machine22", Score: 5},
|
||||
{Host: "machine01", Score: 0}, {Host: "machine02", Score: 0}},
|
||||
|
@ -108,7 +108,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini
|
||||
NodeLister: &cache.StoreToNodeLister{},
|
||||
PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
|
||||
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
|
||||
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
schedulerCache: schedulerCache,
|
||||
@ -401,7 +401,7 @@ func (f *ConfigFactory) Run() {
|
||||
// Watch and cache all service objects. Scheduler needs to find all pods
|
||||
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
|
||||
// Cache this locally.
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Indexer, 0).RunUntil(f.StopEverything)
|
||||
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Store, 0).RunUntil(f.StopEverything)
|
||||
|
||||
// Watch and cache all ReplicationController objects. Scheduler needs to find all pods
|
||||
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
|
||||
|
@ -493,8 +493,8 @@ func TestZeroRequest(t *testing.T) {
|
||||
{
|
||||
Function: algorithmpriorities.NewSelectorSpreadPriority(
|
||||
algorithm.FakePodLister(test.pods),
|
||||
algorithm.FakeServiceLister([]*api.Service{}),
|
||||
algorithm.FakeControllerLister([]*api.ReplicationController{}),
|
||||
algorithm.FakeServiceLister([]api.Service{}),
|
||||
algorithm.FakeControllerLister([]api.ReplicationController{}),
|
||||
algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})),
|
||||
Weight: 1,
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user