Merge pull request #23575 from deads2k/shared-cache

Automatic merge from submit-queue

shared controller informers

Related to https://github.com/kubernetes/kubernetes/issues/14978

This demonstrates how controllers which use an `Informer`, would be able to share the same watch and store.  A similar "setup and run" approach could be done for an `IndexInformer` to share that cache.  I found adding listeners here to be easier than intercepting at the watch interface (problems with resourceVersion) or the reflector (same plumbing, but you have to fan out to multiple stores).

We could also use the cache we build here to back several of the admission plugins that currently run their own lookup caches today.

If there's interest, I can finish out the `SharedInformer` and switch the low hanging fruit over.  

@kubernetes/rh-cluster-infra @smarterclayton @liggitt @wojtek-t
This commit is contained in:
k8s-merge-robot 2016-04-18 07:48:29 -07:00
commit 5ad27f2720
11 changed files with 472 additions and 82 deletions

View File

@ -44,6 +44,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework/informers"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
@ -194,14 +195,18 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
eventBroadcaster.StartRecordingToSink(cl.Events(""))
scheduler.New(schedulerConfig).Run()
podInformer := informers.CreateSharedPodInformer(clientset, controller.NoResyncPeriodFunc())
// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go endpointcontroller.NewEndpointController(clientset, controller.NoResyncPeriodFunc).
go endpointcontroller.NewEndpointController(podInformer, clientset).
Run(3, wait.NeverStop)
// TODO: Write an integration test for the replication controllers watch.
go replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
go replicationcontroller.NewReplicationManager(podInformer, clientset, controller.NoResyncPeriodFunc, replicationcontroller.BurstReplicas, 4096).
Run(3, wait.NeverStop)
go podInformer.Run(wait.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, clientset, 5*time.Minute, flowcontrol.NewFakeAlwaysRateLimiter(), flowcontrol.NewFakeAlwaysRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second)

View File

@ -30,6 +30,7 @@ import (
"net/http"
"net/http/pprof"
"os"
"reflect"
"strconv"
"time"
@ -48,6 +49,8 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/gc"
"k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
@ -189,11 +192,16 @@ func Run(s *options.CMServer) error {
}
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
go endpointcontroller.NewEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller")), ResyncPeriod(s)).
podInformer := informers.CreateSharedPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedInformer{}
informers[reflect.TypeOf(&api.Pod{})] = podInformer
go endpointcontroller.NewEndpointController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
Run(s.ConcurrentEndpointSyncs, wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
go replicationcontroller.NewReplicationManager(
podInformer,
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")),
ResyncPeriod(s),
replicationcontroller.BurstReplicas,
@ -410,6 +418,11 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
).Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
// run the shared informers
for _, informer := range informers {
go informer.Run(wait.NeverStop)
}
select {}
}

View File

@ -136,7 +136,7 @@ func (s *CMServer) Run(_ []string) error {
endpoints := s.createEndpointController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller")))
go endpoints.Run(s.ConcurrentEndpointSyncs, wait.NeverStop)
go replicationcontroller.NewReplicationManager(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, s.LookupCacheSizeForRC).
go replicationcontroller.NewReplicationManagerFromClient(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")), s.resyncPeriod, replicationcontroller.BurstReplicas, s.LookupCacheSizeForRC).
Run(s.ConcurrentRCSyncs, wait.NeverStop)
if s.TerminatedPodGCThreshold > 0 {
@ -346,7 +346,7 @@ func (s *CMServer) createEndpointController(client *clientset.Clientset) kmendpo
return kmendpoint.NewEndpointController(client)
}
glog.V(2).Infof("Creating podIP:containerPort endpoint controller")
stockEndpointController := endpointcontroller.NewEndpointController(client, s.resyncPeriod)
stockEndpointController := endpointcontroller.NewEndpointControllerFromClient(client, s.resyncPeriod)
return stockEndpointController
}

View File

@ -33,6 +33,7 @@ import (
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -58,7 +59,7 @@ var (
)
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
func NewEndpointController(podInformer framework.SharedInformer, client *clientset.Clientset) *EndpointController {
e := &EndpointController{
client: client,
queue: workqueue.New(),
@ -85,24 +86,23 @@ func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.
},
)
e.podStore.Store, e.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return e.client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
},
)
e.podStoreSynced = e.podController.HasSynced
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
})
e.podStore.Store = podInformer.GetStore()
e.podController = podInformer.GetController()
e.podStoreSynced = podInformer.HasSynced
return e
}
// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
podInformer := informers.CreateSharedPodInformer(client, resyncPeriod())
e := NewEndpointController(podInformer, client)
e.internalPodInformer = podInformer
return e
}
@ -114,6 +114,13 @@ type EndpointController struct {
serviceStore cache.StoreToServiceLister
podStore cache.StoreToPodLister
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewEndpointController(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
@ -124,7 +131,7 @@ type EndpointController struct {
// Since we join two objects, we'll watch both of them with
// controllers.
serviceController *framework.Controller
podController *framework.Controller
podController framework.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
@ -144,6 +151,11 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
time.Sleep(5 * time.Minute) // give time for our cache to fill
e.checkLeftoverEndpoints()
}()
if e.internalPodInformer != nil {
go e.internalPodInformer.Run(stopCh)
}
<-stopCh
e.queue.ShutDown()
}

View File

@ -108,7 +108,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@ -142,7 +142,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
endpoints.checkLeftoverEndpoints()
@ -172,7 +172,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
@ -216,7 +216,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
@ -256,7 +256,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
@ -295,7 +295,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 0, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
@ -334,7 +334,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
@ -377,7 +377,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
@ -419,7 +419,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
@ -440,7 +440,7 @@ func TestSyncEndpointsItems(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found!
@ -484,7 +484,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
serviceLabels := map[string]string{"foo": "bar"}
@ -546,7 +546,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
serviceLabels := map[string]string{"baz": "blah"}

View File

@ -68,6 +68,12 @@ type Controller struct {
reflectorMutex sync.RWMutex
}
// TODO make the "Controller" private, and convert all references to use ControllerInterface instead
type ControllerInterface interface {
Run(stopCh <-chan struct{})
HasSynced() bool
}
// New makes a new Controller from the given Config.
func New(c *Config) *Controller {
ctlr := &Controller{

View File

@ -0,0 +1,44 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package informers
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// CreateSharedPodInformer returns a SharedInformer that lists and watches all pods
func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedInformer {
sharedInformer := framework.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{}, resyncPeriod)
return sharedInformer
}

View File

@ -0,0 +1,297 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
)
// if you use this, there is one behavior change compared to a standard Informer.
// When you receive a notification, the cache will be AT LEAST as fresh as the
// notification, but it MAY be more fresh. You should NOT depend on the contents
// of the cache exactly matching the notification you've received in handler
// functions. If there was a create, followed by a delete, the cache may NOT
// have your item. This has advantages over the broadcaster since it allows us
// to share a common cache across many controllers. Extending the broadcaster
// would have required us keep duplicate caches for each watch.
type SharedInformer interface {
// events to a single handler are delivered sequentially, but there is no coordination between different handlers
// You may NOT add a handler *after* the SharedInformer is running. That will result in an error being returned.
// TODO we should try to remove this restriction eventually.
AddEventHandler(handler ResourceEventHandler) error
GetStore() cache.Store
// GetController gives back a synthetic interface that "votes" to start the informer
GetController() ControllerInterface
Run(stopCh <-chan struct{})
HasSynced() bool
}
type SharedIndexInformer interface {
SharedInformer
AddIndexer(indexer cache.Indexer) error
GetIndexer() cache.Indexer
}
// NewSharedInformer creates a new instance for the listwatcher.
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
// be shared amongst all consumers.
func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
sharedInformer := &sharedInformer{
processor: &sharedProcessor{},
store: cache.NewStore(DeletionHandlingMetaNamespaceKeyFunc),
}
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedInformer.store)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: sharedInformer.HandleDeltas,
}
sharedInformer.controller = New(cfg)
return sharedInformer
}
type sharedInformer struct {
store cache.Store
controller *Controller
processor *sharedProcessor
started bool
startedLock sync.Mutex
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
// where a caller can `Run`. The run method is disonnected in this case, because higher
// level logic will decide when to start the SharedInformer and related controller.
// Because returning information back is always asynchronous, the legacy callers shouldn't
// notice any change in behavior.
type dummyController struct {
informer *sharedInformer
}
func (v *dummyController) Run(stopCh <-chan struct{}) {
}
func (v *dummyController) HasSynced() bool {
return v.informer.HasSynced()
}
type updateNotification struct {
oldObj interface{}
newObj interface{}
}
type addNotification struct {
newObj interface{}
}
type deleteNotification struct {
oldObj interface{}
}
func (s *sharedInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.started = true
}()
s.processor.run(stopCh)
s.controller.Run(stopCh)
}
func (s *sharedInformer) isStarted() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
return s.started
}
func (s *sharedInformer) HasSynced() bool {
return s.controller.HasSynced()
}
func (s *sharedInformer) GetStore() cache.Store {
return s.store
}
func (s *sharedInformer) GetController() ControllerInterface {
return &dummyController{informer: s}
}
func (s *sharedInformer) AddEventHandler(handler ResourceEventHandler) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
listener := newProcessListener(handler)
s.processor.listeners = append(s.processor.listeners, listener)
return nil
}
func (s *sharedInformer) HandleDeltas(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(cache.Deltas) {
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
if old, exists, err := s.store.Get(d.Object); err == nil && exists {
if err := s.store.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object})
} else {
if err := s.store.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object})
}
case cache.Deleted:
if err := s.store.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object})
}
}
return nil
}
type sharedProcessor struct {
listeners []*processorListener
}
func (p *sharedProcessor) distribute(obj interface{}) {
for _, listener := range p.listeners {
listener.add(obj)
}
}
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
for _, listener := range p.listeners {
go listener.run(stopCh)
go listener.pop(stopCh)
}
}
type processorListener struct {
// lock/cond protects access to 'pendingNotifications'.
lock sync.RWMutex
cond sync.Cond
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better
pendingNotifications []interface{}
nextCh chan interface{}
handler ResourceEventHandler
}
func newProcessListener(handler ResourceEventHandler) *processorListener {
ret := &processorListener{
pendingNotifications: []interface{}{},
nextCh: make(chan interface{}),
handler: handler,
}
ret.cond.L = &ret.lock
return ret
}
func (p *processorListener) add(notification interface{}) {
p.lock.Lock()
defer p.lock.Unlock()
p.pendingNotifications = append(p.pendingNotifications, notification)
p.cond.Broadcast()
}
func (p *processorListener) pop(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
p.lock.Lock()
defer p.lock.Unlock()
for {
for len(p.pendingNotifications) == 0 {
// check if we're shutdown
select {
case <-stopCh:
return
default:
}
p.cond.Wait()
}
notification := p.pendingNotifications[0]
p.pendingNotifications = p.pendingNotifications[1:]
select {
case <-stopCh:
return
case p.nextCh <- notification:
}
}
}
func (p *processorListener) run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
for {
var next interface{}
select {
case <-stopCh:
func() {
p.lock.Lock()
defer p.lock.Unlock()
p.cond.Broadcast()
}()
return
case next = <-p.nextCh:
}
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
}
}
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -66,6 +67,13 @@ type ReplicationManager struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
// internalPodInformer is used to hold a personal informer. If we're using
// a normal shared informer, then the informer will be started for us. If
// we have a personal informer, we must start it ourselves. If you start
// the controller using NewReplicationManager(passing SharedInformer), this
// will be null
internalPodInformer framework.SharedInformer
// An rc is temporarily suspended after creating/deleting these many replicas.
// It resumes normal action after observing the watch events for them.
burstReplicas int
@ -82,7 +90,7 @@ type ReplicationManager struct {
// A store of pods, populated by the podController
podStore cache.StoreToPodLister
// Watches changes to all pods
podController *framework.Controller
podController framework.ControllerInterface
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
@ -93,8 +101,7 @@ type ReplicationManager struct {
queue *workqueue.Type
}
// NewReplicationManager creates a new ReplicationManager.
func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
func NewReplicationManager(podInformer framework.SharedInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
@ -167,31 +174,31 @@ func NewReplicationManager(kubeClient clientset.Interface, resyncPeriod controll
},
)
rm.podStore.Store, rm.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return rm.kubeClient.Core().Pods(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return rm.kubeClient.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
},
)
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
// the most frequent pod update is status, and the associated rc will only list from local storage, so
// it should be ok.
UpdateFunc: rm.updatePod,
DeleteFunc: rm.deletePod,
})
rm.podStore.Store = podInformer.GetStore()
rm.podController = podInformer.GetController()
rm.syncHandler = rm.syncReplicationController
rm.podStoreSynced = rm.podController.HasSynced
rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rm
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodInformer(kubeClient, resyncPeriod())
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
rm.internalPodInformer = podInformer
return rm
}
// SetEventRecorder replaces the event recorder used by the replication manager
@ -211,6 +218,11 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(rm.worker, time.Second, stopCh)
}
if rm.internalPodInformer != nil {
go rm.internalPodInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down RC Manager")
rm.queue.ShutDown()
@ -478,7 +490,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(filteredPods[ix])
glog.V(2).Infof("Failed to delete %v, decrementing expectations for controller %q/%q", podKey, rc.Namespace, rc.Name)
glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
rm.expectations.DeletionObserved(rcKey, podKey)
utilruntime.HandleError(err)
}

View File

@ -138,7 +138,7 @@ type serverResponse struct {
func TestSyncReplicationControllerDoesNothing(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op
@ -154,7 +154,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
func TestSyncReplicationControllerDeletes(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -170,7 +170,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
func TestDeleteFinalStateUnknown(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -202,7 +202,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
func TestSyncReplicationControllerCreates(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected
@ -225,7 +225,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Steady state for the replication controller, no Status.Replicas updates expected
@ -267,7 +267,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// TODO: Uncomment when fix #19254
// defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Insufficient number of pods in the system, and Status.Replicas is wrong;
@ -313,7 +313,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
// defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -359,7 +359,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
}
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicationManager(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
testCases := []struct {
inRCs []*api.ReplicationController
@ -422,7 +422,7 @@ func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
c := &fake.Clientset{}
c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
var testControllerSpec api.ReplicationController
@ -465,7 +465,7 @@ func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
c := &fake.Clientset{}
c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Put one rc and one pod into the controller's stores
@ -492,6 +492,7 @@ func TestWatchPods(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go manager.podController.Run(stopCh)
go manager.internalPodInformer.Run(stopCh)
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
pods := newPodList(nil, 1, api.PodRunning, testControllerSpec, "pod")
@ -507,7 +508,7 @@ func TestWatchPods(t *testing.T) {
}
func TestUpdatePods(t *testing.T) {
manager := NewReplicationManager(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
received := make(chan string)
@ -567,7 +568,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
// defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
@ -649,7 +650,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, burstReplicas, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, burstReplicas, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -799,7 +800,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool {
func TestRCSyncExpectations(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -823,7 +824,7 @@ func TestRCSyncExpectations(t *testing.T) {
func TestDeleteControllerAndExpectations(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
@ -866,7 +867,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
func TestRCManagerNotReady(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 2, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
@ -904,7 +905,7 @@ func TestOverlappingRCs(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
for i := 0; i < 5; i++ {
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
// Create 10 rcs, shuffled them randomly and insert them into the rc manager's store
@ -933,7 +934,7 @@ func TestOverlappingRCs(t *testing.T) {
func TestDeletionTimestamp(t *testing.T) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, 10, 0)
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
manager.podStoreSynced = alwaysReady
controllerSpec := newReplicationController(1)
@ -1020,7 +1021,7 @@ func TestDeletionTimestamp(t *testing.T) {
func BenchmarkGetPodControllerMultiNS(b *testing.B) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
const nsNum = 1000
@ -1066,7 +1067,7 @@ func BenchmarkGetPodControllerMultiNS(b *testing.B) {
func BenchmarkGetPodControllerSingleNS(b *testing.B) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
const rcNum = 1000
const replicaNum = 3

View File

@ -108,7 +108,7 @@ func NewMasterComponents(c *Config) *MasterComponents {
restClient := client.NewOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: c.QPS, Burst: c.Burst})
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: c.QPS, Burst: c.Burst})
rcStopCh := make(chan struct{})
controllerManager := replicationcontroller.NewReplicationManager(clientset, controller.NoResyncPeriodFunc, c.Burst, 4096)
controllerManager := replicationcontroller.NewReplicationManagerFromClient(clientset, controller.NoResyncPeriodFunc, c.Burst, 4096)
// TODO: Support events once we can cleanly shutdown an event recorder.
controllerManager.SetEventRecorder(&record.FakeRecorder{})