Merge pull request #34024 from deads2k/controller-06-deployment-controller

Automatic merge from submit-queue

update deployment and replicaset listers

Updates the deployment lister to avoid copies and updates the deployment controller to use shared informers.

Pushing WIP to see which tests are broken.
This commit is contained in:
Kubernetes Submit Queue 2016-10-06 00:02:34 -07:00 committed by GitHub
commit 42e5f95a6b
23 changed files with 485 additions and 385 deletions

View File

@ -169,7 +169,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
}
frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})
frsc.replicaSetStore.Store, frsc.replicaSetController = cache.NewInformer(
frsc.replicaSetStore.Indexer, frsc.replicaSetController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
versionedOptions := fedutil.VersionizeV1ListOptions(options)
@ -185,6 +185,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
fedutil.NewTriggerOnMetaAndSpecChanges(
func(obj runtime.Object) { frsc.deliverFedReplicaSetObj(obj, replicaSetReviewDelay) },
),
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
frsc.fedUpdater = fedutil.NewFederatedUpdater(frsc.fedReplicaSetInformer,
@ -291,7 +292,7 @@ func (frsc *ReplicaSetController) deliverLocalReplicaSet(obj interface{}, durati
glog.Errorf("Couldn't get key for object %v: %v", obj, err)
return
}
_, exists, err := frsc.replicaSetStore.Store.GetByKey(key)
_, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key)
if err != nil {
glog.Errorf("Couldn't get federation replicaset %v: %v", key, err)
return
@ -419,7 +420,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSet(key string) (reconciliatio
startTime := time.Now()
defer glog.V(4).Infof("Finished reconcile replicaset %q (%v)", key, time.Now().Sub(startTime))
obj, exists, err := frsc.replicaSetStore.Store.GetByKey(key)
obj, exists, err := frsc.replicaSetStore.Indexer.GetByKey(key)
if err != nil {
return statusError, err
}
@ -536,7 +537,7 @@ func (frsc *ReplicaSetController) reconcileReplicaSetsOnClusterChange() {
if !frsc.isSynced() {
frsc.clusterDeliverer.DeliverAfter(allClustersKey, nil, clusterAvailableDelay)
}
rss := frsc.replicaSetStore.Store.List()
rss := frsc.replicaSetStore.Indexer.List()
for _, rs := range rss {
key, _ := controller.KeyFunc(rs)
frsc.deliverReplicaSetByKey(key, 0, false)

View File

@ -21,7 +21,6 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
@ -137,221 +136,6 @@ func (s storeToNodeConditionLister) List() (nodes []*api.Node, err error) {
return
}
// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments.
type StoreToDeploymentLister struct {
Indexer
}
// Exists checks if the given deployment exists in the store.
func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) {
_, exists, err := s.Indexer.Get(deployment)
if err != nil {
return false, err
}
return exists, nil
}
// StoreToDeploymentLister lists all deployments in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) {
for _, c := range s.Indexer.List() {
deployments = append(deployments, *(c.(*extensions.Deployment)))
}
return deployments, nil
}
// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found.
func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []extensions.Deployment, err error) {
if len(rs.Labels) == 0 {
err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)
return
}
// TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label
dList, err := s.Deployments(rs.Namespace).List(labels.Everything())
if err != nil {
return
}
for _, d := range dList {
selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) {
continue
}
deployments = append(deployments, d)
}
if len(deployments) == 0 {
err = fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels)
}
return
}
type storeToDeploymentNamespacer struct {
indexer Indexer
namespace string
}
// storeToDeploymentNamespacer lists deployments under its namespace in the store.
func (s storeToDeploymentNamespacer) List(selector labels.Selector) (deployments []extensions.Deployment, err error) {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
d := *(m.(*extensions.Deployment))
if selector.Matches(labels.Set(d.Labels)) {
deployments = append(deployments, d)
}
}
return
}
key := &extensions.Deployment{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
d := *(m.(*extensions.Deployment))
if s.namespace == d.Namespace && selector.Matches(labels.Set(d.Labels)) {
deployments = append(deployments, d)
}
}
return deployments, nil
}
for _, m := range items {
d := *(m.(*extensions.Deployment))
if selector.Matches(labels.Set(d.Labels)) {
deployments = append(deployments, d)
}
}
return
}
func (s *StoreToDeploymentLister) Deployments(namespace string) storeToDeploymentNamespacer {
return storeToDeploymentNamespacer{s.Indexer, namespace}
}
// GetDeploymentsForPods returns a list of deployments managing a pod. Returns an error only if no matching deployments are found.
func (s *StoreToDeploymentLister) GetDeploymentsForPod(pod *api.Pod) (deployments []extensions.Deployment, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no deployments found for Pod %v because it has no labels", pod.Name)
return
}
if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
return
}
dList, err := s.Deployments(pod.Namespace).List(labels.Everything())
if err != nil {
return
}
for _, d := range dList {
selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
deployments = append(deployments, d)
}
if len(deployments) == 0 {
err = fmt.Errorf("could not find deployments set for Pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets.
type StoreToReplicaSetLister struct {
Store
}
// Exists checks if the given ReplicaSet exists in the store.
func (s *StoreToReplicaSetLister) Exists(rs *extensions.ReplicaSet) (bool, error) {
_, exists, err := s.Store.Get(rs)
if err != nil {
return false, err
}
return exists, nil
}
// List lists all ReplicaSets in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToReplicaSetLister) List() (rss []extensions.ReplicaSet, err error) {
for _, rs := range s.Store.List() {
rss = append(rss, *(rs.(*extensions.ReplicaSet)))
}
return rss, nil
}
type storeReplicaSetsNamespacer struct {
store Store
namespace string
}
func (s storeReplicaSetsNamespacer) List(selector labels.Selector) (rss []extensions.ReplicaSet, err error) {
for _, c := range s.store.List() {
rs := *(c.(*extensions.ReplicaSet))
if s.namespace == api.NamespaceAll || s.namespace == rs.Namespace {
if selector.Matches(labels.Set(rs.Labels)) {
rss = append(rss, rs)
}
}
}
return
}
func (s storeReplicaSetsNamespacer) Get(name string) (*extensions.ReplicaSet, error) {
obj, exists, err := s.store.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(extensions.Resource("replicaset"), name)
}
return obj.(*extensions.ReplicaSet), nil
}
func (s *StoreToReplicaSetLister) ReplicaSets(namespace string) storeReplicaSetsNamespacer {
return storeReplicaSetsNamespacer{s.Store, namespace}
}
// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found.
func (s *StoreToReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
var selector labels.Selector
var rs extensions.ReplicaSet
if len(pod.Labels) == 0 {
err = fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name)
return
}
for _, m := range s.Store.List() {
rs = *m.(*extensions.ReplicaSet)
if rs.Namespace != pod.Namespace {
continue
}
selector, err = unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
err = fmt.Errorf("invalid selector: %v", err)
return
}
// If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
rss = append(rss, rs)
}
if len(rss) == 0 {
err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// StoreToDaemonSetLister gives a store List and Exists methods. The store must contain only DaemonSets.
type StoreToDaemonSetLister struct {
Store

210
pkg/client/cache/listers_extensions.go vendored Normal file
View File

@ -0,0 +1,210 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/labels"
)
// TODO: generate these classes and methods for all resources of interest using
// a script. Can use "go generate" once 1.4 is supported by all users.
// Lister makes an Index have the List method. The Stores must contain only the expected type
// Example:
// s := cache.NewStore()
// lw := cache.ListWatch{Client: c, FieldSelector: sel, Resource: "pods"}
// r := cache.NewReflector(lw, &extensions.Deployment{}, s).Run()
// l := StoreToDeploymentLister{s}
// l.List()
// StoreToDeploymentLister helps list deployments
type StoreToDeploymentLister struct {
Indexer Indexer
}
func (s *StoreToDeploymentLister) List(selector labels.Selector) (ret []*extensions.Deployment, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
ret = append(ret, m.(*extensions.Deployment))
})
return ret, err
}
func (s *StoreToDeploymentLister) Deployments(namespace string) storeDeploymentsNamespacer {
return storeDeploymentsNamespacer{Indexer: s.Indexer, namespace: namespace}
}
type storeDeploymentsNamespacer struct {
Indexer Indexer
namespace string
}
func (s storeDeploymentsNamespacer) List(selector labels.Selector) (ret []*extensions.Deployment, err error) {
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*extensions.Deployment))
})
return ret, err
}
func (s storeDeploymentsNamespacer) Get(name string) (*extensions.Deployment, error) {
obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(extensions.Resource("deployment"), name)
}
return obj.(*extensions.Deployment), nil
}
// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found.
func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []*extensions.Deployment, err error) {
if len(rs.Labels) == 0 {
err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)
return
}
// TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label
dList, err := s.Deployments(rs.Namespace).List(labels.Everything())
if err != nil {
return
}
for _, d := range dList {
selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(rs.Labels)) {
continue
}
deployments = append(deployments, d)
}
if len(deployments) == 0 {
err = fmt.Errorf("could not find deployments set for ReplicaSet %s in namespace %s with labels: %v", rs.Name, rs.Namespace, rs.Labels)
}
return
}
// GetDeploymentsForDeployments returns a list of deployments managing a pod. Returns an error only if no matching deployments are found.
// TODO eliminate shallow copies
func (s *StoreToDeploymentLister) GetDeploymentsForPod(pod *api.Pod) (deployments []*extensions.Deployment, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no deployments found for Pod %v because it has no labels", pod.Name)
return
}
if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
return
}
dList, err := s.Deployments(pod.Namespace).List(labels.Everything())
if err != nil {
return
}
for _, d := range dList {
selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
deployments = append(deployments, d)
}
if len(deployments) == 0 {
err = fmt.Errorf("could not find deployments set for Pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// StoreToReplicaSetLister helps list replicasets
type StoreToReplicaSetLister struct {
Indexer Indexer
}
func (s *StoreToReplicaSetLister) List(selector labels.Selector) (ret []*extensions.ReplicaSet, err error) {
err = ListAll(s.Indexer, selector, func(m interface{}) {
ret = append(ret, m.(*extensions.ReplicaSet))
})
return ret, err
}
func (s *StoreToReplicaSetLister) ReplicaSets(namespace string) storeReplicaSetsNamespacer {
return storeReplicaSetsNamespacer{Indexer: s.Indexer, namespace: namespace}
}
type storeReplicaSetsNamespacer struct {
Indexer Indexer
namespace string
}
func (s storeReplicaSetsNamespacer) List(selector labels.Selector) (ret []*extensions.ReplicaSet, err error) {
err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*extensions.ReplicaSet))
})
return ret, err
}
func (s storeReplicaSetsNamespacer) Get(name string) (*extensions.ReplicaSet, error) {
obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(extensions.Resource("replicaset"), name)
}
return obj.(*extensions.ReplicaSet), nil
}
// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found.
func (s *StoreToReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []*extensions.ReplicaSet, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name)
return
}
list, err := s.ReplicaSets(pod.Namespace).List(labels.Everything())
if err != nil {
return
}
for _, rs := range list {
if rs.Namespace != pod.Namespace {
continue
}
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid selector: %v", err)
}
// If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
rss = append(rss, rs)
}
if len(rss) == 0 {
err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

View File

@ -280,11 +280,11 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
}
func TestStoreToReplicaSetLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
lister := StoreToReplicaSetLister{store}
testCases := []struct {
inRSs []*extensions.ReplicaSet
list func() ([]extensions.ReplicaSet, error)
list func() ([]*extensions.ReplicaSet, error)
outRSNames sets.String
expectErr bool
}{
@ -293,8 +293,8 @@ func TestStoreToReplicaSetLister(t *testing.T) {
inRSs: []*extensions.ReplicaSet{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
},
list: func() ([]extensions.ReplicaSet, error) {
return lister.List()
list: func() ([]*extensions.ReplicaSet, error) {
return lister.List(labels.Everything())
},
outRSNames: sets.NewString("basic"),
},
@ -308,7 +308,7 @@ func TestStoreToReplicaSetLister(t *testing.T) {
},
},
},
list: func() ([]extensions.ReplicaSet, error) {
list: func() ([]*extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"},
}
@ -324,7 +324,7 @@ func TestStoreToReplicaSetLister(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
},
},
list: func() ([]extensions.ReplicaSet, error) {
list: func() ([]*extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
@ -353,7 +353,7 @@ func TestStoreToReplicaSetLister(t *testing.T) {
},
},
},
list: func() ([]extensions.ReplicaSet, error) {
list: func() ([]*extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",

View File

@ -67,27 +67,27 @@ type DeploymentController struct {
syncHandler func(dKey string) error
// A store of deployments, populated by the dController
dStore cache.StoreToDeploymentLister
dLister cache.StoreToDeploymentLister
// Watches changes to all deployments
dController *cache.Controller
// A store of ReplicaSets, populated by the rsController
rsStore cache.StoreToReplicaSetLister
rsLister cache.StoreToReplicaSetLister
// Watches changes to all ReplicaSets
rsController *cache.Controller
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
podLister cache.StoreToPodLister
// Watches changes to all pods
podController *cache.Controller
// dStoreSynced returns true if the Deployment store has been synced at least once.
// dListerSynced returns true if the Deployment store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
dStoreSynced func() bool
// rsStoreSynced returns true if the ReplicaSet store has been synced at least once.
dListerSynced func() bool
// rsListerSynced returns true if the ReplicaSet store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
rsStoreSynced func() bool
// podStoreSynced returns true if the pod store has been synced at least once.
rsListerSynced func() bool
// podListerSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
podListerSynced func() bool
// Deployments that need to be synced
queue workqueue.RateLimitingInterface
@ -109,7 +109,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.dStore.Indexer, dc.dController = cache.NewIndexerInformer(
dc.dLister.Indexer, dc.dController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Extensions().Deployments(api.NamespaceAll).List(options)
@ -129,7 +129,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.rsStore.Store, dc.rsController = cache.NewInformer(
dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Extensions().ReplicaSets(api.NamespaceAll).List(options)
@ -145,9 +145,10 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.podStore.Indexer, dc.podController = cache.NewIndexerInformer(
dc.podLister.Indexer, dc.podController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Core().Pods(api.NamespaceAll).List(options)
@ -167,9 +168,9 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
)
dc.syncHandler = dc.syncDeployment
dc.dStoreSynced = dc.dController.HasSynced
dc.rsStoreSynced = dc.rsController.HasSynced
dc.podStoreSynced = dc.podController.HasSynced
dc.dListerSynced = dc.dController.HasSynced
dc.rsListerSynced = dc.rsController.HasSynced
dc.podListerSynced = dc.podController.HasSynced
return dc
}
@ -183,7 +184,7 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
// Wait for the rc and dc stores to sync before starting any work in this controller.
ready := make(chan struct{})
go dc.waitForSyncedStores(ready, stopCh)
go dc.waitForSyncedListers(ready, stopCh)
select {
case <-ready:
case <-stopCh:
@ -199,10 +200,10 @@ func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
dc.queue.ShutDown()
}
func (dc *DeploymentController) waitForSyncedStores(ready chan<- struct{}, stopCh <-chan struct{}) {
func (dc *DeploymentController) waitForSyncedListers(ready chan<- struct{}, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for !dc.dStoreSynced() || !dc.rsStoreSynced() || !dc.podStoreSynced() {
for !dc.dListerSynced() || !dc.rsListerSynced() || !dc.podListerSynced() {
select {
case <-time.After(StoreSyncedPollPeriod):
case <-stopCh:
@ -255,7 +256,7 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) {
// getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet.
func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment {
deployments, err := dc.dStore.GetDeploymentsForReplicaSet(rs)
deployments, err := dc.dLister.GetDeploymentsForReplicaSet(rs)
if err != nil || len(deployments) == 0 {
glog.V(4).Infof("Error: %v. No deployment found for ReplicaSet %v, deployment controller will avoid syncing.", err, rs.Name)
return nil
@ -268,7 +269,7 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic
sort.Sort(util.BySelectorLastUpdateTime(deployments))
glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
}
return &deployments[0]
return deployments[0]
}
// updateReplicaSet figures out what deployment(s) manage a ReplicaSet when the ReplicaSet
@ -328,7 +329,7 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
// getDeploymentForPod returns the deployment that manages the given Pod.
// If there are multiple deployments for a given Pod, only return the oldest one.
func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment {
deployments, err := dc.dStore.GetDeploymentsForPod(pod)
deployments, err := dc.dLister.GetDeploymentsForPod(pod)
if err != nil || len(deployments) == 0 {
glog.V(4).Infof("Error: %v. No deployment found for Pod %v, deployment controller will avoid syncing.", err, pod.Name)
return nil
@ -338,7 +339,7 @@ func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.De
sort.Sort(util.BySelectorLastUpdateTime(deployments))
glog.Errorf("user error! more than one deployment is selecting pod %s/%s with labels: %#v, returning %s/%s", pod.Namespace, pod.Name, pod.Labels, deployments[0].Namespace, deployments[0].Name)
}
return &deployments[0]
return deployments[0]
}
// When a pod is created, ensure its controller syncs
@ -478,7 +479,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := dc.dStore.Indexer.GetByKey(key)
obj, exists, err := dc.dLister.Indexer.GetByKey(key)
if err != nil {
glog.Errorf("Unable to retrieve deployment %v from store: %v", key, err)
return err
@ -548,24 +549,27 @@ func (dc *DeploymentController) handleOverlap(d *extensions.Deployment) error {
if err != nil {
return fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
}
deployments, err := dc.dStore.Deployments(d.Namespace).List(labels.Everything())
deployments, err := dc.dLister.Deployments(d.Namespace).List(labels.Everything())
if err != nil {
return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err)
}
overlapping := false
for i := range deployments {
other := &deployments[i]
for _, other := range deployments {
if !selector.Empty() && selector.Matches(labels.Set(other.Spec.Template.Labels)) && d.UID != other.UID {
deploymentCopy, err := util.DeploymentDeepCopy(other)
if err != nil {
return err
}
overlapping = true
// We don't care if the overlapping annotation update failed or not (we don't make decision on it)
d, _ = dc.markDeploymentOverlap(d, other.Name)
other, _ = dc.markDeploymentOverlap(other, d.Name)
deploymentCopy, _ = dc.markDeploymentOverlap(deploymentCopy, d.Name)
// Skip syncing this one if older overlapping one is found
// TODO: figure out a better way to determine which deployment to skip,
// either with controller reference, or with validation.
// Using oldest active replica set to determine which deployment to skip wouldn't make much difference,
// since new replica set hasn't been created after selector update
if util.SelectorUpdatedBefore(other, d) {
if util.SelectorUpdatedBefore(deploymentCopy, d) {
return fmt.Errorf("found deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, other.Namespace, other.Name)
}
}

View File

@ -157,9 +157,9 @@ type fixture struct {
client *fake.Clientset
// Objects to put in the store.
dStore []*exp.Deployment
rsStore []*exp.ReplicaSet
podStore []*api.Pod
dLister []*exp.Deployment
rsLister []*exp.ReplicaSet
podLister []*api.Pod
// Actions expected to happen on the client. Objects from here are also
// preloaded into NewSimpleFake.
@ -200,16 +200,16 @@ func (f *fixture) run(deploymentName string) {
f.client = fake.NewSimpleClientset(f.objects...)
c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc)
c.eventRecorder = &record.FakeRecorder{}
c.rsStoreSynced = alwaysReady
c.podStoreSynced = alwaysReady
for _, d := range f.dStore {
c.dStore.Indexer.Add(d)
c.rsListerSynced = alwaysReady
c.podListerSynced = alwaysReady
for _, d := range f.dLister {
c.dLister.Indexer.Add(d)
}
for _, rs := range f.rsStore {
c.rsStore.Store.Add(rs)
for _, rs := range f.rsLister {
c.rsLister.Indexer.Add(rs)
}
for _, pod := range f.podStore {
c.podStore.Indexer.Add(pod)
for _, pod := range f.podLister {
c.podLister.Indexer.Add(pod)
}
err := c.syncDeployment(deploymentName)
@ -240,7 +240,7 @@ func TestSyncDeploymentCreatesReplicaSet(t *testing.T) {
f := newFixture(t)
d := newDeployment(1, nil)
f.dStore = append(f.dStore, d)
f.dLister = append(f.dLister, d)
f.objects = append(f.objects, d)
rs := newReplicaSet(d, "deploymentrs-4186632231", 1)
@ -258,7 +258,7 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) {
d := newDeployment(1, nil)
now := unversioned.Now()
d.DeletionTimestamp = &now
f.dStore = append(f.dStore, d)
f.dLister = append(f.dLister, d)
f.run(getKey(d, t))
}
@ -269,13 +269,13 @@ func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)
controller.eventRecorder = &record.FakeRecorder{}
controller.rsStoreSynced = alwaysReady
controller.podStoreSynced = alwaysReady
controller.rsListerSynced = alwaysReady
controller.podListerSynced = alwaysReady
d := newDeployment(1, nil)
empty := unversioned.LabelSelector{}
d.Spec.Selector = &empty
controller.dStore.Indexer.Add(d)
controller.dLister.Indexer.Add(d)
// We expect the deployment controller to not take action here since it's configuration
// is invalid, even though no replicasets exist that match it's selector.
controller.syncDeployment(fmt.Sprintf("%s/%s", d.ObjectMeta.Namespace, d.ObjectMeta.Name))

View File

@ -108,7 +108,7 @@ func (dc *DeploymentController) waitForInactiveReplicaSets(oldRSs []*extensions.
statusReplicas := rs.Status.Replicas
if err := wait.ExponentialBackoff(unversionedclient.DefaultRetry, func() (bool, error) {
replicaSet, err := dc.rsStore.ReplicaSets(rs.Namespace).Get(rs.Name)
replicaSet, err := dc.rsLister.ReplicaSets(rs.Namespace).Get(rs.Name)
if err != nil {
return false, err
}

View File

@ -103,15 +103,15 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *ext
}
// rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced.
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) {
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]*extensions.ReplicaSet, *api.PodList, error) {
rsList, err := deploymentutil.ListReplicaSets(deployment,
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
func(namespace string, options api.ListOptions) ([]*extensions.ReplicaSet, error) {
return dc.rsLister.ReplicaSets(namespace).List(options.LabelSelector)
})
if err != nil {
return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err)
}
syncedRSList := []extensions.ReplicaSet{}
syncedRSList := []*extensions.ReplicaSet{}
for _, rs := range rsList {
// Add pod-template-hash information if it's not in the RS.
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones
@ -120,7 +120,7 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension
if err != nil {
return nil, nil, err
}
syncedRSList = append(syncedRSList, *syncedRS)
syncedRSList = append(syncedRSList, syncedRS)
}
syncedPodList, err := dc.listPods(deployment)
if err != nil {
@ -133,8 +133,13 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension
// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
// 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
// 3. Add hash label to the rs's label and selector
func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) {
updatedRS = &rs
func (dc *DeploymentController) addHashKeyToRSAndPods(rs *extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) {
objCopy, err := api.Scheme.Copy(rs)
if err != nil {
return nil, err
}
updatedRS = objCopy.(*extensions.ReplicaSet)
// If the rs already has the new hash label in its selector, it's done syncing
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
return
@ -158,7 +163,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
if !rsUpdated {
// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
// Return here and retry in the next sync loop.
return &rs, nil
return rs, nil
}
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
@ -174,7 +179,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
}
options := api.ListOptions{LabelSelector: selector}
pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
pods, err := dc.podLister.Pods(namespace).List(options.LabelSelector)
if err != nil {
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
}
@ -228,7 +233,7 @@ func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet)
func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) {
return deploymentutil.ListPods(deployment,
func(namespace string, options api.ListOptions) (*api.PodList, error) {
pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
pods, err := dc.podLister.Pods(namespace).List(options.LabelSelector)
result := api.PodList{Items: make([]api.Pod, 0, len(pods))}
for i := range pods {
result.Items = append(result.Items, *pods[i])
@ -242,7 +247,7 @@ func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*ap
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
// Note that the pod-template-hash will be added to adopted RSes and pods.
func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
// Calculate revision number for this new replica set
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
@ -250,11 +255,17 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme
if err != nil {
return nil, err
} else if existingNewRS != nil {
// Set existing new replica set's annotation
if deploymentutil.SetNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) {
return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS)
objCopy, err := api.Scheme.Copy(existingNewRS)
if err != nil {
return nil, err
}
return existingNewRS, nil
rsCopy := objCopy.(*extensions.ReplicaSet)
// Set existing new replica set's annotation
if deploymentutil.SetNewReplicaSetAnnotations(deployment, rsCopy, newRevision, true) {
return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(rsCopy)
}
return rsCopy, nil
}
if !createIfNotExisted {

View File

@ -326,10 +326,10 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) {
controller := NewDeploymentController(fake, controller.NoResyncPeriodFunc)
controller.eventRecorder = &record.FakeRecorder{}
controller.rsStoreSynced = alwaysReady
controller.podStoreSynced = alwaysReady
controller.rsListerSynced = alwaysReady
controller.podListerSynced = alwaysReady
for _, rs := range test.oldRSs {
controller.rsStore.Add(rs)
controller.rsLister.Indexer.Add(rs)
}
d := newDeployment(1, &tests[i].revisionHistoryLimit)

View File

@ -368,11 +368,18 @@ func GetNewReplicaSet(deployment *extensions.Deployment, c clientset.Interface)
}
// listReplicaSets lists all RSes the given deployment targets with the given client interface.
func listReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]extensions.ReplicaSet, error) {
func listReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, error) {
return ListReplicaSets(deployment,
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
func(namespace string, options api.ListOptions) ([]*extensions.ReplicaSet, error) {
rsList, err := c.Extensions().ReplicaSets(namespace).List(options)
return rsList.Items, err
if err != nil {
return nil, err
}
ret := []*extensions.ReplicaSet{}
for i := range rsList.Items {
ret = append(ret, &rsList.Items[i])
}
return ret, err
})
}
@ -385,11 +392,11 @@ func listPods(deployment *extensions.Deployment, c clientset.Interface) (*api.Po
}
// TODO: switch this to full namespacers
type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error)
type rsListFunc func(string, api.ListOptions) ([]*extensions.ReplicaSet, error)
type podListFunc func(string, api.ListOptions) (*api.PodList, error)
// ListReplicaSets returns a slice of RSes the given deployment targets.
func ListReplicaSets(deployment *extensions.Deployment, getRSList rsListFunc) ([]extensions.ReplicaSet, error) {
func ListReplicaSets(deployment *extensions.Deployment, getRSList rsListFunc) ([]*extensions.ReplicaSet, error) {
// TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
// should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830;
// or use controllerRef, see https://github.com/kubernetes/kubernetes/issues/2210
@ -442,7 +449,7 @@ func equalIgnoreHash(template1, template2 api.PodTemplateSpec) (bool, error) {
}
// FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template).
func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet) (*extensions.ReplicaSet, error) {
func FindNewReplicaSet(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet) (*extensions.ReplicaSet, error) {
newRSTemplate := GetNewReplicaSetTemplate(deployment)
for i := range rsList {
equal, err := equalIgnoreHash(rsList[i].Spec.Template, newRSTemplate)
@ -451,7 +458,7 @@ func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.Re
}
if equal {
// This is the new ReplicaSet.
return &rsList[i], nil
return rsList[i], nil
}
}
// new ReplicaSet does not exist.
@ -460,11 +467,11 @@ func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.Re
// FindOldReplicaSets returns the old replica sets targeted by the given Deployment, with the given PodList and slice of RSes.
// Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
func FindOldReplicaSets(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, podList *api.PodList) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
func FindOldReplicaSets(deployment *extensions.Deployment, rsList []*extensions.ReplicaSet, podList *api.PodList) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
// Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList.
// All pods and replica sets are labeled with pod-template-hash to prevent overlapping
oldRSs := map[string]extensions.ReplicaSet{}
allOldRSs := map[string]extensions.ReplicaSet{}
oldRSs := map[string]*extensions.ReplicaSet{}
allOldRSs := map[string]*extensions.ReplicaSet{}
newRSTemplate := GetNewReplicaSetTemplate(deployment)
for _, pod := range podList.Items {
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
@ -490,12 +497,12 @@ func FindOldReplicaSets(deployment *extensions.Deployment, rsList []extensions.R
requiredRSs := []*extensions.ReplicaSet{}
for key := range oldRSs {
value := oldRSs[key]
requiredRSs = append(requiredRSs, &value)
requiredRSs = append(requiredRSs, value)
}
allRSs := []*extensions.ReplicaSet{}
for key := range allOldRSs {
value := allOldRSs[key]
allRSs = append(allRSs, &value)
allRSs = append(allRSs, value)
}
return requiredRSs, allRSs, nil
}
@ -826,12 +833,12 @@ func LastSelectorUpdate(d *extensions.Deployment) unversioned.Time {
// BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector,
// first using their creation timestamp and then their names as a tie breaker.
type BySelectorLastUpdateTime []extensions.Deployment
type BySelectorLastUpdateTime []*extensions.Deployment
func (o BySelectorLastUpdateTime) Len() int { return len(o) }
func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o BySelectorLastUpdateTime) Less(i, j int) bool {
ti, tj := LastSelectorUpdate(&o[i]), LastSelectorUpdate(&o[j])
ti, tj := LastSelectorUpdate(o[i]), LastSelectorUpdate(o[j])
if ti.Equal(tj) {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name

View File

@ -512,19 +512,19 @@ func TestFindNewReplicaSet(t *testing.T) {
tests := []struct {
test string
deployment extensions.Deployment
rsList []extensions.ReplicaSet
rsList []*extensions.ReplicaSet
expected *extensions.ReplicaSet
}{
{
test: "Get new ReplicaSet with the same spec but different pod-template-hash value",
deployment: deployment,
rsList: []extensions.ReplicaSet{newRS, oldRS},
rsList: []*extensions.ReplicaSet{&newRS, &oldRS},
expected: &newRS,
},
{
test: "Get nil new ReplicaSet",
deployment: deployment,
rsList: []extensions.ReplicaSet{oldRS},
rsList: []*extensions.ReplicaSet{&oldRS},
expected: nil,
},
}
@ -550,14 +550,14 @@ func TestFindOldReplicaSets(t *testing.T) {
tests := []struct {
test string
deployment extensions.Deployment
rsList []extensions.ReplicaSet
rsList []*extensions.ReplicaSet
podList *api.PodList
expected []*extensions.ReplicaSet
}{
{
test: "Get old ReplicaSets",
deployment: deployment,
rsList: []extensions.ReplicaSet{newRS, oldRS},
rsList: []*extensions.ReplicaSet{&newRS, &oldRS},
podList: &api.PodList{
Items: []api.Pod{
newPod,
@ -569,7 +569,7 @@ func TestFindOldReplicaSets(t *testing.T) {
{
test: "Get old ReplicaSets with no new ReplicaSet",
deployment: deployment,
rsList: []extensions.ReplicaSet{oldRS},
rsList: []*extensions.ReplicaSet{&oldRS},
podList: &api.PodList{
Items: []api.Pod{
oldPod,
@ -580,7 +580,7 @@ func TestFindOldReplicaSets(t *testing.T) {
{
test: "Get empty old ReplicaSets",
deployment: deployment,
rsList: []extensions.ReplicaSet{newRS},
rsList: []*extensions.ReplicaSet{&newRS},
podList: &api.PodList{
Items: []api.Pod{
newPod,

View File

@ -142,7 +142,7 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient i
dc.rcLister.Indexer = dc.rcIndexer
dc.rsStore, dc.rsController = cache.NewInformer(
dc.rsLister.Indexer, dc.rsController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options)
@ -154,9 +154,9 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient i
&extensions.ReplicaSet{},
30*time.Second,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.rsLister.Store = dc.rsStore
dc.rsStore = dc.rsLister.Indexer
dc.dIndexer, dc.dController = cache.NewIndexerInformer(
&cache.ListWatch{
@ -202,7 +202,7 @@ func (dc *DisruptionController) getPodReplicaSets(pod *api.Pod) ([]controllerAnd
for _, rs := range rss {
// GetDeploymentsForReplicaSet returns an error only if no matching
// deployments are found.
_, err := dc.dLister.GetDeploymentsForReplicaSet(&rs)
_, err := dc.dLister.GetDeploymentsForReplicaSet(rs)
if err == nil { // A deployment was found, so this finder will not count this RS.
continue
}
@ -227,7 +227,7 @@ func (dc *DisruptionController) getPodDeployments(pod *api.Pod) ([]controllerAnd
}
controllerScale := map[types.UID]int32{}
for _, rs := range rss {
ds, err := dc.dLister.GetDeploymentsForReplicaSet(&rs)
ds, err := dc.dLister.GetDeploymentsForReplicaSet(rs)
// GetDeploymentsForReplicaSet returns an error only if no matching
// deployments are found. In that case we skip this ReplicaSet.
if err != nil {

View File

@ -80,7 +80,7 @@ func newFakeDisruptionController() (*DisruptionController, *pdbStates) {
pdbLister: cache.StoreToPodDisruptionBudgetLister{Store: cache.NewStore(controller.KeyFunc)},
podLister: cache.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
rsLister: cache.StoreToReplicaSetLister{Store: cache.NewStore(controller.KeyFunc)},
rsLister: cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
dLister: cache.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
getUpdater: func() updater { return ps.Set },
broadcaster: record.NewBroadcaster(),
@ -310,7 +310,7 @@ func TestReplicaSet(t *testing.T) {
add(t, dc.pdbLister.Store, pdb)
rs, _ := newReplicaSet(t, 10)
add(t, dc.rsLister.Store, rs)
add(t, dc.rsLister.Indexer, rs)
pod, _ := newPod(t, "pod")
add(t, dc.podLister.Indexer, pod)
@ -459,7 +459,7 @@ func TestTwoControllers(t *testing.T) {
rs, _ := newReplicaSet(t, collectionSize)
rs.Spec.Selector = newSel(dLabels)
rs.Labels = dLabels
add(t, dc.rsLister.Store, rs)
add(t, dc.rsLister.Indexer, rs)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, true, minimumOne+1, minimumOne, collectionSize)

View File

@ -68,3 +68,87 @@ func (f *daemonSetInformer) Lister() *cache.StoreToDaemonSetLister {
informer := f.Informer()
return &cache.StoreToDaemonSetLister{Store: informer.GetIndexer()}
}
// DeploymentInformer is a type of SharedIndexInformer which watches and lists all deployments.
type DeploymentInformer interface {
Informer() cache.SharedIndexInformer
Lister() *cache.StoreToDeploymentLister
}
type deploymentInformer struct {
*sharedInformerFactory
}
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&extensions.Deployment{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Extensions().Deployments(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Extensions().Deployments(api.NamespaceAll).Watch(options)
},
},
&extensions.Deployment{},
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
func (f *deploymentInformer) Lister() *cache.StoreToDeploymentLister {
informer := f.Informer()
return &cache.StoreToDeploymentLister{Indexer: informer.GetIndexer()}
}
// ReplicaSetInformer is a type of SharedIndexInformer which watches and lists all replicasets.
type ReplicaSetInformer interface {
Informer() cache.SharedIndexInformer
Lister() *cache.StoreToReplicaSetLister
}
type replicaSetInformer struct {
*sharedInformerFactory
}
func (f *replicaSetInformer) Informer() cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(&extensions.ReplicaSet{})
informer, exists := f.informers[informerType]
if exists {
return informer
}
informer = cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return f.client.Extensions().ReplicaSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return f.client.Extensions().ReplicaSets(api.NamespaceAll).Watch(options)
},
},
&extensions.ReplicaSet{},
f.defaultResync,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
f.informers[informerType] = informer
return informer
}
func (f *replicaSetInformer) Lister() *cache.StoreToReplicaSetLister {
informer := f.Informer()
return &cache.StoreToReplicaSetLister{Indexer: informer.GetIndexer()}
}

View File

@ -38,6 +38,8 @@ type SharedInformerFactory interface {
PersistentVolumes() PVInformer
DaemonSets() DaemonSetInformer
Deployments() DeploymentInformer
ReplicaSets() ReplicaSetInformer
}
type sharedInformerFactory struct {
@ -102,3 +104,11 @@ func (f *sharedInformerFactory) PersistentVolumes() PVInformer {
func (f *sharedInformerFactory) DaemonSets() DaemonSetInformer {
return &daemonSetInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) Deployments() DeploymentInformer {
return &deploymentInformer{sharedInformerFactory: f}
}
func (f *sharedInformerFactory) ReplicaSets() ReplicaSetInformer {
return &replicaSetInformer{sharedInformerFactory: f}
}

View File

@ -142,7 +142,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac
garbageCollectorEnabled: garbageCollectorEnabled,
}
rsc.rsStore.Store, rsc.rsController = cache.NewInformer(
rsc.rsStore.Indexer, rsc.rsController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rsc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options)
@ -162,6 +162,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer cac
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -251,9 +252,9 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.Repl
}
// update lookup cache
rsc.lookupCache.Update(pod, &rss[0])
rsc.lookupCache.Update(pod, rss[0])
return &rss[0]
return rss[0]
}
// callback when RS is updated
@ -296,9 +297,9 @@ func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
// isCacheValid check if the cache is valid
func (rsc *ReplicaSetController) isCacheValid(pod *api.Pod, cachedRS *extensions.ReplicaSet) bool {
_, exists, err := rsc.rsStore.Get(cachedRS)
_, err := rsc.rsStore.ReplicaSets(cachedRS.Namespace).Get(cachedRS.Name)
// rs has been deleted or updated, cache is invalid
if err != nil || !exists || !isReplicaSetMatch(pod, cachedRS) {
if err != nil || !isReplicaSetMatch(pod, cachedRS) {
return false
}
return true
@ -564,7 +565,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
return nil
}
obj, exists, err := rsc.rsStore.Store.GetByKey(key)
obj, exists, err := rsc.rsStore.Indexer.GetByKey(key)
if !exists {
glog.Infof("ReplicaSet has been deleted %v", key)
rsc.expectations.DeleteExpectations(key)

View File

@ -165,7 +165,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
// 2 running pods, a controller with 2 replicas, sync is a no-op
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.podControl = &fakePodControl
@ -183,7 +183,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
// 2 running pods and a controller with 1 replica, one pod delete expected
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.syncReplicaSet(getKey(rsSpec, t))
@ -207,7 +207,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
// the controller matching the selectors of the deleted pod into the work queue.
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod")
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
@ -232,7 +232,7 @@ func TestSyncReplicaSetCreates(t *testing.T) {
// A controller with 2 replicas and no pods in the store, 2 creates expected
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -256,7 +256,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
activePods := 5
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(activePods, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod")
@ -301,7 +301,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
rs := newReplicaSet(5, labelMap)
rs.Spec.Template.Labels = extraLabelMap
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0}
rs.Generation = 1
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
@ -344,7 +344,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod")
// Creates a replica and sets expectations
@ -439,7 +439,7 @@ func TestPodControllerLookup(t *testing.T) {
}
for _, c := range testCases {
for _, r := range c.inRSs {
manager.rsStore.Add(r)
manager.rsStore.Indexer.Add(r)
}
if rs := manager.getPodReplicaSet(c.pod); rs != nil {
if c.outRSName != rs.Name {
@ -471,7 +471,7 @@ func TestWatchControllers(t *testing.T) {
// and closes the received channel to indicate that the test can finish.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsStore.Store.GetByKey(key)
obj, exists, err := manager.rsStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key)
}
@ -509,13 +509,13 @@ func TestWatchPods(t *testing.T) {
// Put one ReplicaSet and one pod into the controller's stores
labelMap := map[string]string{"foo": "bar"}
testRSSpec := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(testRSSpec)
manager.rsStore.Indexer.Add(testRSSpec)
received := make(chan string)
// The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and
// send it into the syncHandler.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsStore.Store.GetByKey(key)
obj, exists, err := manager.rsStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key)
}
@ -553,7 +553,7 @@ func TestUpdatePods(t *testing.T) {
received := make(chan string)
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rsStore.Store.GetByKey(key)
obj, exists, err := manager.rsStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find replica set under key %v", key)
}
@ -568,12 +568,12 @@ func TestUpdatePods(t *testing.T) {
// Put 2 ReplicaSets and one pod into the controller's stores
labelMap1 := map[string]string{"foo": "bar"}
testRSSpec1 := newReplicaSet(1, labelMap1)
manager.rsStore.Store.Add(testRSSpec1)
manager.rsStore.Indexer.Add(testRSSpec1)
testRSSpec2 := *testRSSpec1
labelMap2 := map[string]string{"bar": "foo"}
testRSSpec2.Spec.Selector = &unversioned.LabelSelector{MatchLabels: labelMap2}
testRSSpec2.Name = "barfoo"
manager.rsStore.Store.Add(&testRSSpec2)
manager.rsStore.Indexer.Add(&testRSSpec2)
// case 1: We put in the podStore a pod with labels matching testRSSpec1,
// then update its labels to match testRSSpec2. We expect to receive a sync
@ -632,7 +632,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2}
newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rs, "pod")
@ -714,7 +714,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(numReplicas, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
expectedPods := int32(0)
pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod")
@ -728,7 +728,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
for _, replicas := range []int32{int32(numReplicas), 0} {
rsSpec.Spec.Replicas = replicas
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
for i := 0; i < numReplicas; i += burstReplicas {
manager.syncReplicaSet(getKey(rsSpec, t))
@ -866,7 +866,7 @@ func TestRSSyncExpectations(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod")
manager.podStore.Indexer.Add(&pods.Items[0])
postExpectationsPod := pods.Items[1]
@ -889,7 +889,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
manager.podStoreSynced = alwaysReady
rs := newReplicaSet(1, map[string]string{"foo": "bar"})
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -911,7 +911,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
if !exists || err != nil {
t.Errorf("No expectations found for ReplicaSet")
}
manager.rsStore.Delete(rs)
manager.rsStore.Indexer.Delete(rs)
manager.syncReplicaSet(getKey(rs, t))
if _, exists, err = manager.expectations.GetExpectations(rsKey); exists {
@ -936,7 +936,7 @@ func TestRSManagerNotReady(t *testing.T) {
// want to end up creating replicas in this case until the pod reflector
// has synced, so the ReplicaSet controller should just requeue the ReplicaSet.
rsSpec := newReplicaSet(1, map[string]string{"foo": "bar"})
manager.rsStore.Store.Add(rsSpec)
manager.rsStore.Indexer.Add(rsSpec)
rsKey := getKey(rsSpec, t)
manager.syncReplicaSet(rsKey)
@ -980,7 +980,7 @@ func TestOverlappingRSs(t *testing.T) {
}
shuffledControllers := shuffle(controllers)
for j := range shuffledControllers {
manager.rsStore.Store.Add(shuffledControllers[j])
manager.rsStore.Indexer.Add(shuffledControllers[j])
}
// Add a pod and make sure only the oldest ReplicaSet is synced
pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod")
@ -1001,7 +1001,7 @@ func TestDeletionTimestamp(t *testing.T) {
manager.podStoreSynced = alwaysReady
rs := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
rsKey, err := controller.KeyFunc(rs)
if err != nil {
t.Errorf("Couldn't get key for object %#v: %v", rs, err)
@ -1101,7 +1101,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
var trueVar = true
otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
// add to podStore a matching Pod controlled by another controller. Expect no patch.
@ -1120,7 +1120,7 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// add to podStore one more matching pod that doesn't have a controller
// ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it.
@ -1141,7 +1141,7 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// add to podStore a matching pod that has an ownerRef pointing to the rs,
// but ownerRef.Controller is false. Expect a patch to take control it.
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name}
@ -1161,7 +1161,7 @@ func TestPatchPodFails(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// add to podStore two matching pods. Expect two patches to take control
// them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil))
@ -1181,7 +1181,7 @@ func TestPatchExtraPodsThenDelete(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// add to podStore three matching pods. Expect three patches to take control
// them, and later delete one of them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil))
@ -1199,7 +1199,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// put one pod in the podStore
pod := newPod("pod", rs, api.PodRunning, nil)
pod.ResourceVersion = "1"
@ -1246,7 +1246,7 @@ func TestUpdateSelectorControllerRef(t *testing.T) {
// put the updatedRS into the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler
// (updateRS() in this case).
manager.rsStore.Store.Add(&updatedRS)
manager.rsStore.Indexer.Add(&updatedRS)
manager.updateRS(rs, &updatedRS)
// verifies that the rs is added to the queue
rsKey := getKey(rs, t)
@ -1274,7 +1274,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
rs := newReplicaSet(2, labelMap)
now := unversioned.Now()
rs.DeletionTimestamp = &now
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
pod1 := newPod("pod1", rs, api.PodRunning, nil)
manager.podStore.Indexer.Add(pod1)
@ -1304,7 +1304,7 @@ func TestReadyReplicas(t *testing.T) {
rs := newReplicaSet(2, labelMap)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1}
rs.Generation = 1
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
newPodList(manager.podStore.Indexer, 2, api.PodPending, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
@ -1346,7 +1346,7 @@ func TestAvailableReplicas(t *testing.T) {
rs.Generation = 1
// minReadySeconds set to 15s
rs.Spec.MinReadySeconds = 15
manager.rsStore.Store.Add(rs)
manager.rsStore.Indexer.Add(rs)
// First pod becomes ready 20s ago
moment := unversioned.Time{Time: time.Now().Add(-2e10)}

View File

@ -74,7 +74,7 @@ func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.Repli
}
// overlappingReplicaSets sorts a list of ReplicaSets by creation timestamp, using their names as a tie breaker.
type overlappingReplicaSets []extensions.ReplicaSet
type overlappingReplicaSets []*extensions.ReplicaSet
func (o overlappingReplicaSets) Len() int { return len(o) }
func (o overlappingReplicaSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] }

View File

@ -85,7 +85,7 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs
}
// GetPodTemplateSpecHash returns the pod template hash of a ReplicaSet's pod template space
func GetPodTemplateSpecHash(rs extensions.ReplicaSet) string {
func GetPodTemplateSpecHash(rs *extensions.ReplicaSet) string {
meta := rs.Spec.Template.ObjectMeta
meta.Labels = labelsutil.CloneAndRemoveLabel(meta.Labels, extensions.DefaultDeploymentUniqueLabelKey)
return fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(api.PodTemplateSpec{

View File

@ -150,35 +150,23 @@ func (f FakeControllerLister) GetPodControllers(pod *api.Pod) (controllers []*ap
// ReplicaSetLister interface represents anything that can produce a list of ReplicaSet; the list is consumed by a scheduler.
type ReplicaSetLister interface {
// Lists all the replicasets
List() ([]extensions.ReplicaSet, error)
// Gets the replicasets for the given pod
GetPodReplicaSets(*api.Pod) ([]extensions.ReplicaSet, error)
GetPodReplicaSets(*api.Pod) ([]*extensions.ReplicaSet, error)
}
// EmptyReplicaSetLister implements ReplicaSetLister on []extensions.ReplicaSet returning empty data
type EmptyReplicaSetLister struct{}
// List returns nil
func (f EmptyReplicaSetLister) List() ([]extensions.ReplicaSet, error) {
return nil, nil
}
// GetPodReplicaSets returns nil
func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
func (f EmptyReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []*extensions.ReplicaSet, err error) {
return nil, nil
}
// FakeReplicaSetLister implements ControllerLister on []extensions.ReplicaSet for test purposes.
type FakeReplicaSetLister []extensions.ReplicaSet
// List returns []extensions.ReplicaSet, the list of all ReplicaSets.
func (f FakeReplicaSetLister) List() ([]extensions.ReplicaSet, error) {
return f, nil
}
type FakeReplicaSetLister []*extensions.ReplicaSet
// GetPodReplicaSets gets the ReplicaSets that have the selector that match the labels on the given pod
func (f FakeReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
func (f FakeReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []*extensions.ReplicaSet, err error) {
var selector labels.Selector
for _, rs := range f {

View File

@ -58,7 +58,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
pods []*api.Pod
nodes []string
rcs []*api.ReplicationController
rss []extensions.ReplicaSet
rss []*extensions.ReplicaSet
services []*api.Service
expectedList schedulerapi.HostPriorityList
test string
@ -197,7 +197,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
},
nodes: []string{"machine1", "machine2"},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"baz": "blah"}}}},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
rss: []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "service with partial pod label matches with service and replica set",
@ -225,7 +225,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
},
nodes: []string{"machine1", "machine2"},
services: []*api.Service{{Spec: api.ServiceSpec{Selector: map[string]string{"bar": "foo"}}}},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
rss: []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "disjoined service and replica set should be treated equally",
@ -251,7 +251,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
rss: []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}},
test: "Replica set with partial pod label matches",
@ -276,7 +276,7 @@ func TestSelectorSpreadPriority(t *testing.T) {
{Spec: zone2Spec, ObjectMeta: api.ObjectMeta{Labels: labels1, OwnerReferences: controllerRef("ReplicaSet", "name", "abc123")}},
},
nodes: []string{"machine1", "machine2"},
rss: []extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}},
rss: []*extensions.ReplicaSet{{Spec: extensions.ReplicaSetSpec{Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"baz": "blah"}}}}},
// We use ReplicaSet, instead of ReplicationController. The result should be exactly as above.
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 5}},
test: "Another replication set with partial pod label matches",
@ -344,7 +344,7 @@ func TestZoneSelectorSpreadPriority(t *testing.T) {
pods []*api.Pod
nodes []string
rcs []*api.ReplicationController
rss []extensions.ReplicaSet
rss []*extensions.ReplicaSet
services []*api.Service
expectedList schedulerapi.HostPriorityList
test string

View File

@ -110,7 +110,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini
PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
schedulerCache: schedulerCache,
StopEverything: stopEverything,
SchedulerName: schedulerName,
@ -425,7 +425,7 @@ func (f *ConfigFactory) Run() {
// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Store, 0).RunUntil(f.StopEverything)
cache.NewReflector(f.createReplicaSetLW(), &extensions.ReplicaSet{}, f.ReplicaSetLister.Indexer, 0).RunUntil(f.StopEverything)
}
func (f *ConfigFactory) getNextPod() *api.Pod {

View File

@ -496,7 +496,7 @@ func TestZeroRequest(t *testing.T) {
Function: algorithmpriorities.NewSelectorSpreadPriority(
algorithm.FakeServiceLister([]*api.Service{}),
algorithm.FakeControllerLister([]*api.ReplicationController{}),
algorithm.FakeReplicaSetLister([]extensions.ReplicaSet{})),
algorithm.FakeReplicaSetLister([]*extensions.ReplicaSet{})),
Weight: 1,
},
}