Use shared informers for proxy endpoints and service configs

Use shared informers instead of creating local controllers/reflectors
for the proxy's endpoints and service configs. This allows downstream
integrators to pass in preexisting shared informers to save on memory &
cpu usage.

This also enables the cache mutation detector for kube-proxy for those
presubmit jobs that already turn it on.
This commit is contained in:
Andy Goldstein
2017-04-03 14:34:29 -04:00
parent 46d4c621a8
commit d2bc4d0b2e
13 changed files with 186 additions and 158 deletions

View File

@@ -21,12 +21,11 @@ import (
"time"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion"
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/util/config"
)
@@ -64,35 +63,36 @@ type EndpointsConfigHandler interface {
// EndpointsConfig tracks a set of endpoints configurations.
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
type EndpointsConfig struct {
informer cache.Controller
lister listers.EndpointsLister
handlers []EndpointsConfigHandler
lister listers.EndpointsLister
listerSynced cache.InformerSynced
handlers []EndpointsConfigHandler
// updates channel is used to trigger registered handlers.
updates chan struct{}
stop chan struct{}
}
// NewEndpointsConfig creates a new EndpointsConfig.
func NewEndpointsConfig(c cache.Getter, period time.Duration) *EndpointsConfig {
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
return newEndpointsConfig(endpointsLW, period)
}
func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
result := &EndpointsConfig{
lister: endpointsInformer.Lister(),
listerSynced: endpointsInformer.Informer().HasSynced,
// The updates channel is used to send interrupts to the Endpoints handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates: make(chan struct{}, 1),
stop: make(chan struct{}),
}
func newEndpointsConfig(lw cache.ListerWatcher, period time.Duration) *EndpointsConfig {
result := &EndpointsConfig{}
store, informer := cache.NewIndexerInformer(
lw,
&api.Endpoints{},
period,
endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddEndpoints,
UpdateFunc: result.handleUpdateEndpoints,
DeleteFunc: result.handleDeleteEndpoints,
},
cache.Indexers{},
resyncPeriod,
)
result.informer = informer
result.lister = listers.NewEndpointsLister(store)
return result
}
@@ -101,16 +101,9 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.handlers = append(c.handlers, handler)
}
// Run starts the underlying informer and goroutine responsible for calling
// registered handlers.
// Run starts the goroutine responsible for calling registered handlers.
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
// The updates channel is used to send interrupts to the Endpoints handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
c.updates = make(chan struct{}, 1)
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
utilruntime.HandleError(fmt.Errorf("endpoint controller not synced"))
return
}
@@ -118,27 +111,32 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
// We have synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
for range c.updates {
endpoints, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing endpoints from cache: %v", err)
// This will cause a retry (if there isn't any other trigger in-flight).
c.dispatchUpdate()
continue
}
if endpoints == nil {
endpoints = []*api.Endpoints{}
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
c.handlers[i].OnEndpointsUpdate(endpoints)
for {
select {
case <-c.updates:
endpoints, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing endpoints from cache: %v", err)
// This will cause a retry (if there isn't any other trigger in-flight).
c.dispatchUpdate()
continue
}
if endpoints == nil {
endpoints = []*api.Endpoints{}
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
c.handlers[i].OnEndpointsUpdate(endpoints)
}
case <-c.stop:
return
}
}
}()
// Close updates channel when stopCh is closed.
go func() {
<-stopCh
close(c.updates)
close(c.stop)
}()
}
@@ -157,6 +155,9 @@ func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) {
func (c *EndpointsConfig) dispatchUpdate() {
select {
case c.updates <- struct{}{}:
// Work enqueued successfully
case <-c.stop:
// We're shut down / avoid logging the message below
default:
glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
}
@@ -165,35 +166,36 @@ func (c *EndpointsConfig) dispatchUpdate() {
// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
informer cache.Controller
lister listers.ServiceLister
handlers []ServiceConfigHandler
lister listers.ServiceLister
listerSynced cache.InformerSynced
handlers []ServiceConfigHandler
// updates channel is used to trigger registered handlers
updates chan struct{}
stop chan struct{}
}
// NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(c cache.Getter, period time.Duration) *ServiceConfig {
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
return newServiceConfig(servicesLW, period)
}
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
result := &ServiceConfig{
lister: serviceInformer.Lister(),
listerSynced: serviceInformer.Informer().HasSynced,
// The updates channel is used to send interrupts to the Services handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates: make(chan struct{}, 1),
stop: make(chan struct{}),
}
func newServiceConfig(lw cache.ListerWatcher, period time.Duration) *ServiceConfig {
result := &ServiceConfig{}
store, informer := cache.NewIndexerInformer(
lw,
&api.Service{},
period,
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddService,
UpdateFunc: result.handleUpdateService,
DeleteFunc: result.handleDeleteService,
},
cache.Indexers{},
resyncPeriod,
)
result.informer = informer
result.lister = listers.NewServiceLister(store)
return result
}
@@ -202,16 +204,10 @@ func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.handlers = append(c.handlers, handler)
}
// Run starts the underlying informer and goroutine responsible for calling
// Run starts the goroutine responsible for calling
// registered handlers.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
// The updates channel is used to send interrupts to the Services handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
c.updates = make(chan struct{}, 1)
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
if !cache.WaitForCacheSync(stopCh, c.listerSynced) {
utilruntime.HandleError(fmt.Errorf("service controller not synced"))
return
}
@@ -219,27 +215,32 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
// We have synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
for range c.updates {
services, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing services from cache: %v", err)
// This will cause a retry (if there isn't any other trigger in-flight).
c.dispatchUpdate()
continue
}
if services == nil {
services = []*api.Service{}
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
c.handlers[i].OnServiceUpdate(services)
for {
select {
case <-c.updates:
services, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing services from cache: %v", err)
// This will cause a retry (if there isnt' any other trigger in-flight).
c.dispatchUpdate()
continue
}
if services == nil {
services = []*api.Service{}
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
c.handlers[i].OnServiceUpdate(services)
}
case <-c.stop:
return
}
}
}()
// Close updates channel when stopCh is closed.
go func() {
<-stopCh
close(c.updates)
close(c.stop)
}()
}
@@ -258,8 +259,11 @@ func (c *ServiceConfig) handleDeleteService(_ interface{}) {
func (c *ServiceConfig) dispatchUpdate() {
select {
case c.updates <- struct{}{}:
// Work enqueued successfully
case <-c.stop:
// We're shut down / avoid logging the message below
default:
glog.V(4).Infof("Service handler alread has a pending interrupt.")
glog.V(4).Infof("Service handler already has a pending interrupt.")
}
}