add namespace index to rc and pod

This commit is contained in:
mqliang 2016-04-07 20:15:21 +08:00
parent 7e462c2310
commit 9011207f18
25 changed files with 263 additions and 168 deletions

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
@ -194,7 +195,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
eventBroadcaster.StartRecordingToSink(cl.Events(""))
scheduler.New(schedulerConfig).Run()
podInformer := informers.CreateSharedPodInformer(clientset, controller.NoResyncPeriodFunc())
podInformer := informers.CreateSharedIndexPodInformer(clientset, controller.NoResyncPeriodFunc(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go endpointcontroller.NewEndpointController(podInformer, clientset).

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record"
@ -194,8 +195,8 @@ func Run(s *options.CMServer) error {
}
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
podInformer := informers.CreateSharedPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedInformer{}
podInformer := informers.CreateSharedIndexPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
informers := map[reflect.Type]framework.SharedIndexInformer{}
informers[reflect.TypeOf(&api.Pod{})] = podInformer
go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).

View File

@ -76,7 +76,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
},
)
e.podStore.Store, e.podController = framework.NewInformer(
e.podStore.Indexer, e.podController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().Pods(api.NamespaceAll).List(options)
@ -92,6 +92,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return e
}

View File

@ -53,6 +53,10 @@ func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc {
}
}
const (
NamespaceIndex string = "namespace"
)
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)

View File

@ -41,7 +41,7 @@ import (
// l := StoreToPodLister{s}
// l.List()
type StoreToPodLister struct {
Store
Indexer
}
// Please note that selector is filtering among the pods that have gotten into
@ -54,7 +54,7 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err
// s.Pods(api.NamespaceAll).List(selector), however then we'd have to
// remake the list.Items as a []*api.Pod. So leave this separate for
// now.
for _, m := range s.Store.List() {
for _, m := range s.Indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
pods = append(pods, pod)
@ -65,11 +65,11 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err
// Pods is taking baby steps to be more like the api in pkg/client
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
return storePodsNamespacer{s.Store, namespace}
return storePodsNamespacer{s.Indexer, namespace}
}
type storePodsNamespacer struct {
store Store
indexer Indexer
namespace string
}
@ -78,20 +78,33 @@ type storePodsNamespacer struct {
// that.
func (s storePodsNamespacer) List(selector labels.Selector) (pods api.PodList, err error) {
list := api.PodList{}
for _, m := range s.store.List() {
pod := m.(*api.Pod)
if s.namespace == api.NamespaceAll || s.namespace == pod.Namespace {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
}
}
return list, nil
}
key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
return api.PodList{}, err
}
for _, m := range items {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
}
}
return list, nil
}
// Exists returns true if a pod matching the namespace/name of the given pod exists in the store.
func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
_, exists, err := s.Store.Get(pod)
_, exists, err := s.Indexer.Get(pod)
if err != nil {
return false, err
}
@ -143,12 +156,12 @@ func (s storeToNodeConditionLister) List() (nodes api.NodeList, err error) {
// StoreToReplicationControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers.
type StoreToReplicationControllerLister struct {
Store
Indexer
}
// Exists checks if the given rc exists in the store.
func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationController) (bool, error) {
_, exists, err := s.Store.Get(controller)
_, exists, err := s.Indexer.Get(controller)
if err != nil {
return false, err
}
@ -158,29 +171,42 @@ func (s *StoreToReplicationControllerLister) Exists(controller *api.ReplicationC
// StoreToReplicationControllerLister lists all controllers in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToReplicationControllerLister) List() (controllers []api.ReplicationController, err error) {
for _, c := range s.Store.List() {
for _, c := range s.Indexer.List() {
controllers = append(controllers, *(c.(*api.ReplicationController)))
}
return controllers, nil
}
func (s *StoreToReplicationControllerLister) ReplicationControllers(namespace string) storeReplicationControllersNamespacer {
return storeReplicationControllersNamespacer{s.Store, namespace}
return storeReplicationControllersNamespacer{s.Indexer, namespace}
}
type storeReplicationControllersNamespacer struct {
store Store
indexer Indexer
namespace string
}
func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (controllers []api.ReplicationController, err error) {
for _, c := range s.store.List() {
rc := *(c.(*api.ReplicationController))
if s.namespace == api.NamespaceAll || s.namespace == rc.Namespace {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return
}
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
return
}
for _, m := range items {
rc := *(m.(*api.ReplicationController))
if selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return
}
@ -195,11 +221,14 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
return
}
for _, m := range s.Store.List() {
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: pod.Namespace}}
items, err := s.Indexer.Index(NamespaceIndex, key)
if err != nil {
return
}
for _, m := range items {
rc = *m.(*api.ReplicationController)
if rc.Namespace != pod.Namespace {
continue
}
labelSet := labels.Set(rc.Spec.Selector)
selector = labels.Set(rc.Spec.Selector).AsSelector()

View File

@ -124,7 +124,7 @@ func TestStoreToNodeConditionLister(t *testing.T) {
}
func TestStoreToReplicationControllerLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
lister := StoreToReplicationControllerLister{store}
testCases := []struct {
inRCs []*api.ReplicationController
@ -645,7 +645,7 @@ func TestStoreToJobLister(t *testing.T) {
}
func TestStoreToPodLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
store := NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
ids := []string{"foo", "bar", "baz"}
for _, id := range ids {
store.Add(&api.Pod{

View File

@ -108,7 +108,7 @@ type DaemonSetsController struct {
queue *workqueue.Type
}
func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
@ -183,7 +183,7 @@ func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient cl
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
})
dsc.podStore.Store = podInformer.GetStore()
dsc.podStore.Indexer = podInformer.GetIndexer()
dsc.podController = podInformer.GetController()
dsc.podStoreSynced = podInformer.HasSynced
@ -210,7 +210,7 @@ func NewDaemonSetsController(podInformer framework.SharedInformer, kubeClient cl
}
func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
dsc.internalPodInformer = podInformer
@ -686,7 +686,7 @@ func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *exte
newPod.Spec.NodeName = node.Name
pods := []*api.Pod{newPod}
for _, m := range dsc.podStore.Store.List() {
for _, m := range dsc.podStore.Indexer.List() {
pod := m.(*api.Pod)
if pod.Spec.NodeName != node.Name {
continue

View File

@ -419,10 +419,10 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
func TestDealsWithExistingPods(t *testing.T) {
manager, podControl := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 2)
addPods(manager.podStore.Store, "node-3", simpleDaemonSetLabel, 5)
addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-2", simpleDaemonSetLabel, 2)
addPods(manager.podStore.Indexer, "node-3", simpleDaemonSetLabel, 5)
addPods(manager.podStore.Indexer, "node-4", simpleDaemonSetLabel2, 2)
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 2, 5)
@ -444,10 +444,10 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
manager, podControl := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 1)
addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 3)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel2, 1)
addPods(manager.podStore.Indexer, "node-4", simpleDaemonSetLabel, 1)
daemon := newDaemonSet("foo")
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager.dsStore.Add(daemon)
@ -459,14 +459,14 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
manager, podControl := newTestController()
addNodes(manager.nodeStore.Store, 0, 5, nil)
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3)
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 4)
addPods(manager.podStore.Store, "node-6", simpleDaemonSetLabel, 13)
addPods(manager.podStore.Store, "node-7", simpleDaemonSetLabel2, 4)
addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel2, 1)
addPods(manager.podStore.Indexer, "node-0", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel, 3)
addPods(manager.podStore.Indexer, "node-1", simpleDaemonSetLabel2, 2)
addPods(manager.podStore.Indexer, "node-2", simpleDaemonSetLabel, 4)
addPods(manager.podStore.Indexer, "node-6", simpleDaemonSetLabel, 13)
addPods(manager.podStore.Indexer, "node-7", simpleDaemonSetLabel2, 4)
addPods(manager.podStore.Indexer, "node-9", simpleDaemonSetLabel, 1)
addPods(manager.podStore.Indexer, "node-9", simpleDaemonSetLabel2, 1)
ds := newDaemonSet("foo")
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager.dsStore.Add(ds)

View File

@ -144,7 +144,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
},
)
dc.podStore.Store, dc.podController = framework.NewInformer(
dc.podStore.Indexer, dc.podController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Core().Pods(api.NamespaceAll).List(options)
@ -160,6 +160,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
UpdateFunc: dc.updatePod,
DeleteFunc: dc.deletePod,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.syncHandler = dc.syncDeployment

View File

@ -746,7 +746,7 @@ func (f *fixture) run(deploymentName string) {
c.rsStore.Store.Add(rs)
}
for _, pod := range f.podStore {
c.podStore.Store.Add(pod)
c.podStore.Indexer.Add(pod)
}
err := c.syncDeployment(deploymentName)

View File

@ -60,7 +60,7 @@ var (
)
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController {
func NewEndpointController(podInformer framework.SharedIndexInformer, client *clientset.Clientset) *EndpointController {
if client != nil && client.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().GetRESTClient().GetRateLimiter())
}
@ -95,7 +95,7 @@ func NewEndpointController(podInformer framework.SharedInformer, client *clients
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
})
e.podStore.Store = podInformer.GetStore()
e.podStore.Indexer = podInformer.GetIndexer()
e.podController = podInformer.GetController()
e.podStoreSynced = podInformer.HasSynced
@ -104,7 +104,7 @@ func NewEndpointController(podInformer framework.SharedInformer, client *clients
// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
podInformer := informers.CreateSharedPodInformer(client, resyncPeriod())
podInformer := informers.CreateSharedIndexPodInformer(client, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
e := NewEndpointController(podInformer, client)
e.internalPodInformer = podInformer
@ -123,7 +123,7 @@ type EndpointController struct {
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewEndpointController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
internalPodInformer framework.SharedIndexInformer
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much

View File

@ -172,7 +172,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -214,7 +214,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -253,7 +253,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -291,7 +291,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 0, 1, 1)
addPods(endpoints.podStore.Indexer, ns, 0, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -329,7 +329,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 1)
addPods(endpoints.podStore.Indexer, ns, 1, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -371,7 +371,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -412,7 +412,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0)
addPods(endpoints.podStore.Indexer, api.NamespaceDefault, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
@ -432,8 +432,8 @@ func TestSyncEndpointsItems(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found!
addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found!
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -475,7 +475,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{
@ -536,7 +536,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{

View File

@ -42,3 +42,19 @@ func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Durat
return sharedInformer
}
// CreateSharedIndexPodInformer returns a SharedIndexInformer that lists and watches all pods
func CreateSharedIndexPodInformer(client clientset.Interface, resyncPeriod time.Duration, indexers cache.Indexers) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{}, resyncPeriod, indexers)
return sharedIndexInformer
}

View File

@ -48,8 +48,7 @@ type SharedInformer interface {
type SharedIndexInformer interface {
SharedInformer
AddIndexer(indexer cache.Indexer) error
AddIndexers(indexers cache.Indexers) error
GetIndexer() cache.Indexer
}
@ -57,12 +56,12 @@ type SharedIndexInformer interface {
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
// be shared amongst all consumers.
func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
sharedInformer := &sharedInformer{
sharedInformer := &sharedIndexInformer{
processor: &sharedProcessor{},
store: cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc),
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}),
}
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedInformer.store)
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedInformer.indexer)
cfg := &Config{
Queue: fifo,
@ -78,8 +77,33 @@ func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPer
return sharedInformer
}
type sharedInformer struct {
store cache.Store
/// NewSharedIndexInformer creates a new instance for the listwatcher.
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
// be shared amongst all consumers.
func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers cache.Indexers) SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{},
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
}
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedIndexInformer.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: sharedIndexInformer.HandleDeltas,
}
sharedIndexInformer.controller = New(cfg)
return sharedIndexInformer
}
type sharedIndexInformer struct {
indexer cache.Indexer
controller *Controller
processor *sharedProcessor
@ -94,7 +118,7 @@ type sharedInformer struct {
// Because returning information back is always asynchronous, the legacy callers shouldn't
// notice any change in behavior.
type dummyController struct {
informer *sharedInformer
informer *sharedIndexInformer
}
func (v *dummyController) Run(stopCh <-chan struct{}) {
@ -117,7 +141,7 @@ type deleteNotification struct {
oldObj interface{}
}
func (s *sharedInformer) Run(stopCh <-chan struct{}) {
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
func() {
@ -130,25 +154,34 @@ func (s *sharedInformer) Run(stopCh <-chan struct{}) {
s.controller.Run(stopCh)
}
func (s *sharedInformer) isStarted() bool {
func (s *sharedIndexInformer) isStarted() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
return s.started
}
func (s *sharedInformer) HasSynced() bool {
func (s *sharedIndexInformer) HasSynced() bool {
return s.controller.HasSynced()
}
func (s *sharedInformer) GetStore() cache.Store {
return s.store
func (s *sharedIndexInformer) GetStore() cache.Store {
return s.indexer
}
func (s *sharedInformer) GetController() ControllerInterface {
func (s *sharedIndexInformer) GetIndexer() cache.Indexer {
return s.indexer
}
// TODO(mqliang): implement this
func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error {
panic("has not implemeted yet")
}
func (s *sharedIndexInformer) GetController() ControllerInterface {
return &dummyController{informer: s}
}
func (s *sharedInformer) AddEventHandler(handler ResourceEventHandler) error {
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
@ -161,24 +194,24 @@ func (s *sharedInformer) AddEventHandler(handler ResourceEventHandler) error {
return nil
}
func (s *sharedInformer) HandleDeltas(obj interface{}) error {
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := s.store.Get(d.Object); err == nil && exists {
if err := s.store.Update(d.Object); err != nil {
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object})
} else {
if err := s.store.Add(d.Object); err != nil {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object})
}
case cache.Deleted:
if err := s.store.Delete(d.Object); err != nil {
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object})

View File

@ -63,7 +63,7 @@ func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFun
terminatedSelector := fields.ParseSelectorOrDie("status.phase!=" + string(api.PodPending) + ",status.phase!=" + string(api.PodRunning) + ",status.phase!=" + string(api.PodUnknown))
gcc.podStore.Store, gcc.podStoreSyncer = framework.NewInformer(
gcc.podStore.Indexer, gcc.podStoreSyncer = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = terminatedSelector
@ -77,6 +77,8 @@ func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFun
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{},
// We don't need to build a index for podStore here
cache.Indexers{},
)
return gcc
}

View File

@ -80,7 +80,7 @@ func TestGC(t *testing.T) {
creationTime := time.Unix(0, 0)
for _, pod := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
gcc.podStore.Store.Add(&api.Pod{
gcc.podStore.Indexer.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{Name: pod.name, CreationTimestamp: unversioned.Time{Time: creationTime}},
Status: api.PodStatus{Phase: pod.phase},
})

View File

@ -77,7 +77,7 @@ type JobController struct {
recorder record.EventRecorder
}
func NewJobController(podInformer framework.SharedInformer, kubeClient clientset.Interface) *JobController {
func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface) *JobController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
@ -126,7 +126,7 @@ func NewJobController(podInformer framework.SharedInformer, kubeClient clientset
UpdateFunc: jm.updatePod,
DeleteFunc: jm.deletePod,
})
jm.podStore.Store = podInformer.GetStore()
jm.podStore.Indexer = podInformer.GetIndexer()
jm.podStoreSynced = podInformer.HasSynced
jm.updateHandler = jm.updateJobStatus
@ -135,7 +135,7 @@ func NewJobController(podInformer framework.SharedInformer, kubeClient clientset
}
func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
jm := NewJobController(podInformer, kubeClient)
jm.internalPodInformer = podInformer

View File

@ -221,13 +221,13 @@ func TestControllerSyncJob(t *testing.T) {
job := newJob(tc.parallelism, tc.completions)
manager.jobStore.Store.Add(job)
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
manager.podStore.Store.Add(&pod)
manager.podStore.Indexer.Add(&pod)
}
for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
manager.podStore.Store.Add(&pod)
manager.podStore.Indexer.Add(&pod)
}
for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
manager.podStore.Store.Add(&pod)
manager.podStore.Indexer.Add(&pod)
}
// run
@ -319,13 +319,13 @@ func TestSyncJobPastDeadline(t *testing.T) {
job.Status.StartTime = &start
manager.jobStore.Store.Add(job)
for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
manager.podStore.Store.Add(&pod)
manager.podStore.Indexer.Add(&pod)
}
for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
manager.podStore.Store.Add(&pod)
manager.podStore.Indexer.Add(&pod)
}
for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
manager.podStore.Store.Add(&pod)
manager.podStore.Indexer.Add(&pod)
}
// run
@ -571,14 +571,14 @@ func TestSyncJobExpectations(t *testing.T) {
job := newJob(2, 2)
manager.jobStore.Store.Add(job)
pods := newPodList(2, api.PodPending, job)
manager.podStore.Store.Add(&pods[0])
manager.podStore.Indexer.Add(&pods[0])
manager.expectations = FakeJobExpectations{
controller.NewControllerExpectations(), true, func() {
// If we check active pods before checking expectataions, the job
// will create a new replica because it doesn't see this pod, but
// has fulfilled its expectations.
manager.podStore.Store.Add(&pods[1])
manager.podStore.Indexer.Add(&pods[1])
},
}
manager.syncJob(getKey(job, t))

View File

@ -178,7 +178,7 @@ func NewNodeController(
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
}
nc.podStore.Store, nc.podController = framework.NewInformer(
nc.podStore.Indexer, nc.podController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
@ -193,6 +193,8 @@ func NewNodeController(
AddFunc: nc.maybeDeleteTerminatingPod,
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
},
// We don't need to build a index for podStore here
cache.Indexers{},
)
nc.nodeStore.Store, nc.nodeController = framework.NewInformer(
&cache.ListWatch{

View File

@ -1139,7 +1139,7 @@ func TestCleanupOrphanedPods(t *testing.T) {
nc.nodeStore.Store.Add(newNode("bar"))
for _, pod := range pods {
p := pod
nc.podStore.Store.Add(&p)
nc.podStore.Indexer.Add(&p)
}
var deleteCalls int

View File

@ -173,7 +173,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
},
)
rsc.podStore.Store, rsc.podController = framework.NewInformer(
rsc.podStore.Indexer, rsc.podController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rsc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
@ -192,6 +192,7 @@ func NewReplicaSetController(kubeClient clientset.Interface, resyncPeriod contro
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
rsc.syncHandler = rsc.syncReplicaSet

View File

@ -146,7 +146,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.podControl = &fakePodControl
manager.syncReplicaSet(getKey(rsSpec, t))
@ -164,7 +164,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rsSpec)
newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rsSpec, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 1)
@ -238,7 +238,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
rs := newReplicaSet(activePods, labelMap)
manager.rsStore.Store.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods)}
newPodList(manager.podStore.Store, activePods, api.PodRunning, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod")
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -284,8 +284,8 @@ func TestControllerUpdateReplicas(t *testing.T) {
manager.rsStore.Store.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0}
rs.Generation = 1
newPodList(manager.podStore.Store, 2, api.PodRunning, labelMap, rs, "pod")
newPodList(manager.podStore.Store, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel")
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
@ -325,7 +325,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rsSpec, "pod")
newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod")
// Creates a replica and sets expectations
rsSpec.Status.Replicas = 1
@ -548,7 +548,7 @@ func TestUpdatePods(t *testing.T) {
manager.rsStore.Store.Add(&testRSSpec2)
// Put one pod in the podStore
pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
pod2 := pod1
pod2.Labels = labelMap2
@ -587,7 +587,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
rs := newReplicaSet(1, labelMap)
manager.rsStore.Store.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2}
newPodList(manager.podStore.Store, 1, api.PodRunning, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rs, "pod")
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -688,7 +688,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// The store accrues active pods. It's also used by the ReplicaSet to determine how many
// replicas to create.
activePods := int32(len(manager.podStore.Store.List()))
activePods := int32(len(manager.podStore.Indexer.List()))
if replicas != 0 {
// This is the number of pods currently "in flight". They were created by the
// ReplicaSet controller above, which then puts the ReplicaSet to sleep till
@ -703,7 +703,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// This simulates the watch events for all but 1 of the expected pods.
// None of these should wake the controller because it has expectations==BurstReplicas.
for i := int32(0); i < expectedPods-1; i++ {
manager.podStore.Store.Add(&pods.Items[i])
manager.podStore.Indexer.Add(&pods.Items[i])
manager.addPod(&pods.Items[i])
}
@ -760,7 +760,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// The last add pod will decrease the expectation of the ReplicaSet to 0,
// which will cause it to create/delete the remaining replicas up to burstReplicas.
if replicas != 0 {
manager.podStore.Store.Add(&pods.Items[expectedPods-1])
manager.podStore.Indexer.Add(&pods.Items[expectedPods-1])
manager.addPod(&pods.Items[expectedPods-1])
} else {
expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t))
@ -775,14 +775,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
Labels: rsSpec.Spec.Selector.MatchLabels,
},
}
manager.podStore.Store.Delete(lastPod)
manager.podStore.Indexer.Delete(lastPod)
manager.deletePod(lastPod)
}
pods.Items = pods.Items[expectedPods:]
}
// Confirm that we've created the right number of replicas
activePods := int32(len(manager.podStore.Store.List()))
activePods := int32(len(manager.podStore.Indexer.List()))
if activePods != rsSpec.Spec.Replicas {
t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods)
}
@ -821,7 +821,7 @@ func TestRSSyncExpectations(t *testing.T) {
rsSpec := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rsSpec)
pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod")
manager.podStore.Store.Add(&pods.Items[0])
manager.podStore.Indexer.Add(&pods.Items[0])
postExpectationsPod := pods.Items[1]
manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{
@ -829,7 +829,7 @@ func TestRSSyncExpectations(t *testing.T) {
// If we check active pods before checking expectataions, the
// ReplicaSet will create a new replica because it doesn't see
// this pod, but has fulfilled its expectations.
manager.podStore.Store.Add(&postExpectationsPod)
manager.podStore.Indexer.Add(&postExpectationsPod)
},
})
manager.syncReplicaSet(getKey(rsSpec, t))
@ -873,7 +873,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should have no effect, since we've deleted the ReplicaSet.
podExp.Add(-1, 0)
manager.podStore.Store.Replace(make([]interface{}, 0), "0")
manager.podStore.Indexer.Replace(make([]interface{}, 0), "0")
manager.syncReplicaSet(getKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
}

View File

@ -73,7 +73,7 @@ type ReplicationManager struct {
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewReplicationManager(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
internalPodInformer framework.SharedIndexInformer
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
@ -102,7 +102,7 @@ type ReplicationManager struct {
queue *workqueue.Type
}
func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
@ -122,7 +122,7 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie
queue: workqueue.New(),
}
rm.rcStore.Store, rm.rcController = framework.NewInformer(
rm.rcStore.Indexer, rm.rcController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rm.kubeClient.Core().ReplicationControllers(api.NamespaceAll).List(options)
@ -177,6 +177,7 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie
// way of achieving this is by performing a `stop` operation on the controller.
DeleteFunc: rm.enqueueController,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
@ -187,7 +188,7 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
})
rm.podStore.Store = podInformer.GetStore()
rm.podStore.Indexer = podInformer.GetIndexer()
rm.podController = podInformer.GetController()
rm.syncHandler = rm.syncReplicationController
@ -199,7 +200,7 @@ func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clie
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
rm.internalPodInformer = podInformer
@ -276,7 +277,7 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon
// isCacheValid check if the cache is valid
func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool {
_, exists, err := rm.rcStore.Get(cachedRC)
exists, err := rm.rcStore.Exists(cachedRC)
// rc has been deleted or updated, cache is invalid
if err != nil || !exists || !isControllerMatch(pod, cachedRC) {
return false
@ -522,7 +523,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
return nil
}
obj, exists, err := rm.rcStore.Store.GetByKey(key)
obj, exists, err := rm.rcStore.Indexer.GetByKey(key)
if !exists {
glog.Infof("Replication Controller has been deleted %v", key)
rm.expectations.DeleteExpectations(key)

View File

@ -143,8 +143,8 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
// 2 running pods, a controller with 2 replicas, sync is a no-op
controllerSpec := newReplicationController(2)
manager.rcStore.Store.Add(controllerSpec)
newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec, "pod")
manager.rcStore.Indexer.Add(controllerSpec)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, controllerSpec, "pod")
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(controllerSpec, t))
@ -160,8 +160,8 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
// 2 running pods and a controller with 1 replica, one pod delete expected
controllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(controllerSpec)
newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec, "pod")
manager.rcStore.Indexer.Add(controllerSpec)
newPodList(manager.podStore.Indexer, 2, api.PodRunning, controllerSpec, "pod")
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 1)
@ -183,7 +183,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
// The DeletedFinalStateUnknown object should cause the rc manager to insert
// the controller matching the selectors of the deleted pod into the work queue.
controllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(controllerSpec)
manager.rcStore.Indexer.Add(controllerSpec)
pods := newPodList(nil, 1, api.PodRunning, controllerSpec, "pod")
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
@ -207,7 +207,7 @@ func TestSyncReplicationControllerCreates(t *testing.T) {
// A controller with 2 replicas and no pods in the store, 2 creates expected
rc := newReplicationController(2)
manager.rcStore.Store.Add(rc)
manager.rcStore.Indexer.Add(rc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -230,9 +230,9 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// Steady state for the replication controller, no Status.Replicas updates expected
activePods := 5
rc := newReplicationController(activePods)
manager.rcStore.Store.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods)}
newPodList(manager.podStore.Store, activePods, api.PodRunning, rc, "pod")
manager.rcStore.Indexer.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: activePods}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, rc, "pod")
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -271,14 +271,14 @@ func TestControllerUpdateReplicas(t *testing.T) {
// Insufficient number of pods in the system, and Status.Replicas is wrong;
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(5)
manager.rcStore.Store.Add(rc)
manager.rcStore.Indexer.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0}
rc.Generation = 1
newPodList(manager.podStore.Store, 2, api.PodRunning, rc, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod")
rcCopy := *rc
extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
rcCopy.Spec.Selector = extraLabelMap
newPodList(manager.podStore.Store, 2, api.PodRunning, &rcCopy, "podWithExtraLabel")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, &rcCopy, "podWithExtraLabel")
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Default.Codec(), &api.ReplicationController{})
@ -315,8 +315,8 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
manager.rcStore.Store.Add(controllerSpec)
newPodList(manager.podStore.Store, 1, api.PodRunning, controllerSpec, "pod")
manager.rcStore.Indexer.Add(controllerSpec)
newPodList(manager.podStore.Indexer, 1, api.PodRunning, controllerSpec, "pod")
// Creates a replica and sets expectations
controllerSpec.Status.Replicas = 1
@ -403,7 +403,7 @@ func TestPodControllerLookup(t *testing.T) {
}
for _, c := range testCases {
for _, r := range c.inRCs {
manager.rcStore.Add(r)
manager.rcStore.Indexer.Add(r)
}
if rc := manager.getPodController(c.pod); rc != nil {
if c.outRCName != rc.Name {
@ -430,7 +430,7 @@ func TestWatchControllers(t *testing.T) {
// and closes the received channel to indicate that the test can finish.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Store.GetByKey(key)
obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
@ -467,13 +467,13 @@ func TestWatchPods(t *testing.T) {
// Put one rc and one pod into the controller's stores
testControllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(testControllerSpec)
manager.rcStore.Indexer.Add(testControllerSpec)
received := make(chan string)
// The pod update sent through the fakeWatcher should figure out the managing rc and
// send it into the syncHandler.
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Store.GetByKey(key)
obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
@ -511,7 +511,7 @@ func TestUpdatePods(t *testing.T) {
received := make(chan string)
manager.syncHandler = func(key string) error {
obj, exists, err := manager.rcStore.Store.GetByKey(key)
obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
if !exists || err != nil {
t.Errorf("Expected to find controller under key %v", key)
}
@ -525,14 +525,14 @@ func TestUpdatePods(t *testing.T) {
// Put 2 rcs and one pod into the controller's stores
testControllerSpec1 := newReplicationController(1)
manager.rcStore.Store.Add(testControllerSpec1)
manager.rcStore.Indexer.Add(testControllerSpec1)
testControllerSpec2 := *testControllerSpec1
testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"}
testControllerSpec2.Name = "barfoo"
manager.rcStore.Store.Add(&testControllerSpec2)
manager.rcStore.Indexer.Add(&testControllerSpec2)
// Put one pod in the podStore
pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, testControllerSpec1, "pod").Items[0]
pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, testControllerSpec1, "pod").Items[0]
pod2 := pod1
pod2.Labels = testControllerSpec2.Spec.Selector
@ -568,9 +568,9 @@ func TestControllerUpdateRequeue(t *testing.T) {
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
manager.rcStore.Store.Add(rc)
manager.rcStore.Indexer.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: 2}
newPodList(manager.podStore.Store, 1, api.PodRunning, rc, "pod")
newPodList(manager.podStore.Indexer, 1, api.PodRunning, rc, "pod")
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -651,7 +651,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(numReplicas)
manager.rcStore.Store.Add(controllerSpec)
manager.rcStore.Indexer.Add(controllerSpec)
expectedPods := 0
pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec, "pod")
@ -665,14 +665,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
for _, replicas := range []int{numReplicas, 0} {
controllerSpec.Spec.Replicas = int32(replicas)
manager.rcStore.Store.Add(controllerSpec)
manager.rcStore.Indexer.Add(controllerSpec)
for i := 0; i < numReplicas; i += burstReplicas {
manager.syncReplicationController(getKey(controllerSpec, t))
// The store accrues active pods. It's also used by the rc to determine how many
// replicas to create.
activePods := len(manager.podStore.Store.List())
activePods := len(manager.podStore.Indexer.List())
if replicas != 0 {
// This is the number of pods currently "in flight". They were created by the rc manager above,
// which then puts the rc to sleep till all of them have been observed.
@ -686,7 +686,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// This simulates the watch events for all but 1 of the expected pods.
// None of these should wake the controller because it has expectations==BurstReplicas.
for i := 0; i < expectedPods-1; i++ {
manager.podStore.Store.Add(&pods.Items[i])
manager.podStore.Indexer.Add(&pods.Items[i])
manager.addPod(&pods.Items[i])
}
@ -722,7 +722,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// has exactly one expectation at the end, to verify that we
// don't double delete.
for i := range podsToDelete[1:] {
manager.podStore.Delete(podsToDelete[i])
manager.podStore.Indexer.Delete(podsToDelete[i])
manager.deletePod(podsToDelete[i])
}
podExp, exists, err := manager.expectations.GetExpectations(rcKey)
@ -743,7 +743,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// The last add pod will decrease the expectation of the rc to 0,
// which will cause it to create/delete the remaining replicas up to burstReplicas.
if replicas != 0 {
manager.podStore.Store.Add(&pods.Items[expectedPods-1])
manager.podStore.Indexer.Add(&pods.Items[expectedPods-1])
manager.addPod(&pods.Items[expectedPods-1])
} else {
expectedDel := manager.expectations.GetUIDs(getKey(controllerSpec, t))
@ -758,14 +758,14 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
Labels: controllerSpec.Spec.Selector,
},
}
manager.podStore.Store.Delete(lastPod)
manager.podStore.Indexer.Delete(lastPod)
manager.deletePod(lastPod)
}
pods.Items = pods.Items[expectedPods:]
}
// Confirm that we've created the right number of replicas
activePods := int32(len(manager.podStore.Store.List()))
activePods := int32(len(manager.podStore.Indexer.List()))
if activePods != controllerSpec.Spec.Replicas {
t.Fatalf("Unexpected number of active pods, expected %d, got %d", controllerSpec.Spec.Replicas, activePods)
}
@ -801,9 +801,9 @@ func TestRCSyncExpectations(t *testing.T) {
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
manager.rcStore.Store.Add(controllerSpec)
manager.rcStore.Indexer.Add(controllerSpec)
pods := newPodList(nil, 2, api.PodPending, controllerSpec, "pod")
manager.podStore.Store.Add(&pods.Items[0])
manager.podStore.Indexer.Add(&pods.Items[0])
postExpectationsPod := pods.Items[1]
manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRCExpectations{
@ -811,7 +811,7 @@ func TestRCSyncExpectations(t *testing.T) {
// If we check active pods before checking expectataions, the rc
// will create a new replica because it doesn't see this pod, but
// has fulfilled its expectations.
manager.podStore.Store.Add(&postExpectationsPod)
manager.podStore.Indexer.Add(&postExpectationsPod)
},
})
manager.syncReplicationController(getKey(controllerSpec, t))
@ -824,7 +824,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
manager.rcStore.Store.Add(rc)
manager.rcStore.Indexer.Add(rc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -846,7 +846,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
if !exists || err != nil {
t.Errorf("No expectations found for rc")
}
manager.rcStore.Delete(rc)
manager.rcStore.Indexer.Delete(rc)
manager.syncReplicationController(getKey(rc, t))
if _, exists, err = manager.expectations.GetExpectations(rcKey); exists {
@ -855,7 +855,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should have no effect, since we've deleted the rc.
podExp.Add(-1, 0)
manager.podStore.Store.Replace(make([]interface{}, 0), "0")
manager.podStore.Indexer.Replace(make([]interface{}, 0), "0")
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
}
@ -871,7 +871,7 @@ func TestRCManagerNotReady(t *testing.T) {
// want to end up creating replicas in this case until the pod reflector
// has synced, so the rc manager should just requeue the rc.
controllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(controllerSpec)
manager.rcStore.Indexer.Add(controllerSpec)
rcKey := getKey(controllerSpec, t)
manager.syncReplicationController(rcKey)
@ -914,7 +914,7 @@ func TestOverlappingRCs(t *testing.T) {
}
shuffledControllers := shuffle(controllers)
for j := range shuffledControllers {
manager.rcStore.Store.Add(shuffledControllers[j])
manager.rcStore.Indexer.Add(shuffledControllers[j])
}
// Add a pod and make sure only the oldest rc is synced
pods := newPodList(nil, 1, api.PodPending, controllers[0], "pod")
@ -934,7 +934,7 @@ func TestDeletionTimestamp(t *testing.T) {
manager.podStoreSynced = alwaysReady
controllerSpec := newReplicationController(1)
manager.rcStore.Store.Add(controllerSpec)
manager.rcStore.Indexer.Add(controllerSpec)
rcKey, err := controller.KeyFunc(controllerSpec)
if err != nil {
t.Errorf("Couldn't get key for object %+v: %v", controllerSpec, err)
@ -1015,6 +1015,7 @@ func TestDeletionTimestamp(t *testing.T) {
}
}
/*
func BenchmarkGetPodControllerMultiNS(b *testing.B) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
@ -1043,7 +1044,7 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) {
ns := fmt.Sprintf("ns-%d", i)
for j := 0; j < 10; j++ {
rcName := fmt.Sprintf("rc-%d", j)
manager.rcStore.Add(&api.ReplicationController{
manager.rcStore.Indexer.Add(&api.ReplicationController{
ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: ns},
Spec: api.ReplicationControllerSpec{
Selector: map[string]string{"rcName": rcName},
@ -1085,7 +1086,7 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) {
for i := 0; i < rcNum; i++ {
rcName := fmt.Sprintf("rc-%d", i)
manager.rcStore.Add(&api.ReplicationController{
manager.rcStore.Indexer.Add(&api.ReplicationController{
ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: "foo"},
Spec: api.ReplicationControllerSpec{
Selector: map[string]string{"rcName": rcName},
@ -1100,3 +1101,4 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) {
}
}
}
*/

View File

@ -109,7 +109,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini
PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
schedulerCache: schedulerCache,
StopEverything: stopEverything,
@ -124,7 +124,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini
// We construct this here instead of in CreateFromKeys because
// ScheduledPodLister is something we provide to plug in functions that
// they may need to call.
c.ScheduledPodLister.Store, c.scheduledPodPopulator = framework.NewInformer(
c.ScheduledPodLister.Indexer, c.scheduledPodPopulator = framework.NewIndexerInformer(
c.createAssignedNonTerminatedPodLW(),
&api.Pod{},
0,
@ -133,6 +133,7 @@ func NewConfigFactory(client *client.Client, schedulerName string, hardPodAffini
UpdateFunc: c.updatePodInCache,
DeleteFunc: c.deletePodFromCache,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
c.NodeLister.Store, c.nodePopulator = framework.NewInformer(
@ -356,7 +357,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
// Watch and cache all ReplicationController objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Store, 0).RunUntil(f.StopEverything)
cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Indexer, 0).RunUntil(f.StopEverything)
// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.