From d2bc4d0b2e7f8e593fc388fe3de5aeea496aff20 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Mon, 3 Apr 2017 14:34:29 -0400 Subject: [PATCH] Use shared informers for proxy endpoints and service configs Use shared informers instead of creating local controllers/reflectors for the proxy's endpoints and service configs. This allows downstream integrators to pass in preexisting shared informers to save on memory & cpu usage. This also enables the cache mutation detector for kube-proxy for those presubmit jobs that already turn it on. --- .../gce/container-linux/configure-helper.sh | 5 + cluster/gce/gci/configure-helper.sh | 5 + .../salt/kube-proxy/kube-proxy.manifest | 3 + cmd/kube-proxy/app/BUILD | 1 + cmd/kube-proxy/app/server.go | 11 +- cmd/kubemark/BUILD | 1 + cmd/kubemark/hollow-node.go | 12 +- pkg/kubemark/BUILD | 1 + pkg/kubemark/hollow_proxy.go | 3 + pkg/proxy/config/BUILD | 8 +- pkg/proxy/config/api_test.go | 59 ++---- pkg/proxy/config/config.go | 172 +++++++++--------- pkg/proxy/config/config_test.go | 63 ++++--- 13 files changed, 186 insertions(+), 158 deletions(-) diff --git a/cluster/gce/container-linux/configure-helper.sh b/cluster/gce/container-linux/configure-helper.sh index 88e597cfcab..3caf4ff6c04 100755 --- a/cluster/gce/container-linux/configure-helper.sh +++ b/cluster/gce/container-linux/configure-helper.sh @@ -619,10 +619,15 @@ function start-kube-proxy { if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then params+=" ${KUBEPROXY_TEST_ARGS}" fi + local container_env="" + if [[ -n "${ENABLE_CACHE_MUTATION_DETECTOR:-}" ]]; then + container_env="env:\n - name: KUBE_CACHE_MUTATION_DETECTOR\n value: \"${ENABLE_CACHE_MUTATION_DETECTOR}\"" + fi sed -i -e "s@{{kubeconfig}}@${kubeconfig}@g" ${src_file} sed -i -e "s@{{pillar\['kube_docker_registry'\]}}@${kube_docker_registry}@g" ${src_file} sed -i -e "s@{{pillar\['kube-proxy_docker_tag'\]}}@${kube_proxy_docker_tag}@g" ${src_file} sed -i -e "s@{{params}}@${params}@g" ${src_file} + sed -i -e "s@{{container_env}}@${container_env}@g" ${src_file} sed -i -e "s@{{ cpurequest }}@100m@g" ${src_file} sed -i -e "s@{{api_servers_with_port}}@${api_servers}@g" ${src_file} if [[ -n "${CLUSTER_IP_RANGE:-}" ]]; then diff --git a/cluster/gce/gci/configure-helper.sh b/cluster/gce/gci/configure-helper.sh index 5b74a3e18bd..64de6526085 100644 --- a/cluster/gce/gci/configure-helper.sh +++ b/cluster/gce/gci/configure-helper.sh @@ -814,10 +814,15 @@ function start-kube-proxy { if [[ -n "${KUBEPROXY_TEST_ARGS:-}" ]]; then params+=" ${KUBEPROXY_TEST_ARGS}" fi + local container_env="" + if [[ -n "${ENABLE_CACHE_MUTATION_DETECTOR:-}" ]]; then + container_env="env:\n - name: KUBE_CACHE_MUTATION_DETECTOR\n value: \"${ENABLE_CACHE_MUTATION_DETECTOR}\"" + fi sed -i -e "s@{{kubeconfig}}@${kubeconfig}@g" ${src_file} sed -i -e "s@{{pillar\['kube_docker_registry'\]}}@${kube_docker_registry}@g" ${src_file} sed -i -e "s@{{pillar\['kube-proxy_docker_tag'\]}}@${kube_proxy_docker_tag}@g" ${src_file} sed -i -e "s@{{params}}@${params}@g" ${src_file} + sed -i -e "s@{{container_env}}@${container_env}@g" ${src_file} sed -i -e "s@{{ cpurequest }}@100m@g" ${src_file} sed -i -e "s@{{api_servers_with_port}}@${api_servers}@g" ${src_file} if [[ -n "${CLUSTER_IP_RANGE:-}" ]]; then diff --git a/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest b/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest index 1891491030e..7818472d293 100644 --- a/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest +++ b/cluster/saltbase/salt/kube-proxy/kube-proxy.manifest @@ -32,6 +32,8 @@ # test_args should always go last to overwrite prior configuration {% set params = log_level + " " + feature_gates + " " + test_args -%} +{% set container_env = "" -%} + # kube-proxy podspec apiVersion: v1 kind: Pod @@ -60,6 +62,7 @@ spec: - /bin/sh - -c - echo -998 > /proc/$$$/oom_score_adj && kube-proxy {{api_servers_with_port}} {{kubeconfig}} {{cluster_cidr}} --resource-container="" {{params}} 1>>/var/log/kube-proxy.log 2>&1 + {{container_env}} securityContext: privileged: true volumeMounts: diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index 43917685fc1..85deaa7ef26 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -19,6 +19,7 @@ go_library( "//cmd/kube-proxy/app/options:go_default_library", "//pkg/api:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/proxy:go_default_library", "//pkg/proxy/config:go_default_library", "//pkg/proxy/iptables:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 2466bc4a561..6d486943f58 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -41,6 +41,7 @@ import ( "k8s.io/kubernetes/cmd/kube-proxy/app/options" "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/proxy" proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" @@ -306,18 +307,24 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err iptInterface.AddReloadFunc(proxier.Sync) } + informerFactory := informers.NewSharedInformerFactory(client, config.ConfigSyncPeriod) + // Create configs (i.e. Watches for Services and Endpoints) // Note: RegisterHandler() calls need to happen before creation of Sources because sources // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. - serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) + serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), config.ConfigSyncPeriod) serviceConfig.RegisterHandler(servicesHandler) go serviceConfig.Run(wait.NeverStop) - endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod) + endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod) endpointsConfig.RegisterHandler(endpointsHandler) go endpointsConfig.Run(wait.NeverStop) + // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those + // functions must configure their shared informer event handlers first. + go informerFactory.Start(wait.NeverStop) + config.NodeRef = &clientv1.ObjectReference{ Kind: "Node", Name: hostname, diff --git a/cmd/kubemark/BUILD b/cmd/kubemark/BUILD index 5ff40589133..b602d9b830f 100644 --- a/cmd/kubemark/BUILD +++ b/cmd/kubemark/BUILD @@ -22,6 +22,7 @@ go_library( "//pkg/api:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/client/metrics/prometheus:go_default_library", "//pkg/kubelet/cadvisor/testing:go_default_library", "//pkg/kubelet/cm:go_default_library", diff --git a/cmd/kubemark/hollow-node.go b/cmd/kubemark/hollow-node.go index d55fe31a592..2b9cfa669ec 100644 --- a/cmd/kubemark/hollow-node.go +++ b/cmd/kubemark/hollow-node.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" _ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" @@ -54,8 +55,9 @@ type HollowNodeConfig struct { } const ( - maxPods = 110 - podsPerCore = 0 + maxPods = 110 + podsPerCore = 0 + configResyncPeriod = 15 * time.Minute ) var knownMorphs = sets.NewString("kubelet", "proxy") @@ -136,10 +138,11 @@ func main() { iptInterface := fakeiptables.NewFake() - serviceConfig := proxyconfig.NewServiceConfig(internalClientset.Core().RESTClient(), 15*time.Minute) + informerFactory := informers.NewSharedInformerFactory(internalClientset, configResyncPeriod) + serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), configResyncPeriod) serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) - endpointsConfig := proxyconfig.NewEndpointsConfig(internalClientset.Core().RESTClient(), 15*time.Minute) + endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), configResyncPeriod) endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) eventClient, err := clientgoclientset.NewForConfig(clientConfig) @@ -153,6 +156,7 @@ func main() { eventClient, endpointsConfig, serviceConfig, + informerFactory, iptInterface, eventBroadcaster, recorder, diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index 99ec88dd81d..355f82c0d97 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -24,6 +24,7 @@ go_library( "//pkg/apis/componentconfig/v1alpha1:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//pkg/kubelet:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/cm:go_default_library", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 7dba0d01fd0..ae738f88e47 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/cmd/kube-proxy/app/options" "k8s.io/kubernetes/pkg/api" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/util" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -56,6 +57,7 @@ func NewHollowProxyOrDie( eventClient v1core.EventsGetter, endpointsConfig *proxyconfig.EndpointsConfig, serviceConfig *proxyconfig.ServiceConfig, + informerFactory informers.SharedInformerFactory, iptInterface utiliptables.Interface, broadcaster record.EventBroadcaster, recorder record.EventRecorder, @@ -73,6 +75,7 @@ func NewHollowProxyOrDie( go endpointsConfig.Run(wait.NeverStop) go serviceConfig.Run(wait.NeverStop) + go informerFactory.Start(wait.NeverStop) hollowProxy, err := proxyapp.NewProxyServer(client, eventClient, config, iptInterface, &FakeProxier{}, broadcaster, recorder, nil, "fake") if err != nil { diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 52be6161d04..2adb49bc421 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -17,11 +17,10 @@ go_library( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/client/informers/informers_generated/internalversion/core/internalversion:go_default_library", "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/util/config:go_default_library", "//vendor:github.com/golang/glog", - "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/fields", "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/client-go/tools/cache", @@ -38,11 +37,12 @@ go_test( tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", + "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/watch", - "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/testing", ], ) diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index b8b6265240b..11600eae8c3 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -24,27 +24,13 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" + ktesting "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" ) -type fakeLW struct { - listResp runtime.Object - watchResp watch.Interface -} - -func (lw fakeLW) List(options metav1.ListOptions) (runtime.Object, error) { - return lw.listResp, nil -} - -func (lw fakeLW) Watch(options metav1.ListOptions) (watch.Interface, error) { - return lw.watchResp, nil -} - -var _ cache.ListerWatcher = fakeLW{} - func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { service1v1 := &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"}, @@ -57,11 +43,9 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 30}}}} // Setup fake api client. + client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - lw := fakeLW{ - listResp: &api.ServiceList{Items: []api.Service{}}, - watchResp: fakeWatch, - } + client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) @@ -69,8 +53,11 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { ch := make(chan struct{}) handler := newSvcHandler(t, nil, func() { ch <- struct{}{} }) - serviceConfig := newServiceConfig(lw, time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + + serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) serviceConfig.RegisterHandler(handler) + go sharedInformers.Start(stopCh) go serviceConfig.Run(stopCh) // Add the first service @@ -130,11 +117,9 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { } // Setup fake api client. + client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - lw := fakeLW{ - listResp: &api.EndpointsList{Items: []api.Endpoints{}}, - watchResp: fakeWatch, - } + client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) stopCh := make(chan struct{}) defer close(stopCh) @@ -142,8 +127,11 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { ch := make(chan struct{}) handler := newEpsHandler(t, nil, func() { ch <- struct{}{} }) - endpointsConfig := newEndpointsConfig(lw, time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + + endpointsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) endpointsConfig.RegisterHandler(handler) + go sharedInformers.Start(stopCh) go endpointsConfig.Run(stopCh) // Add the first endpoints @@ -229,19 +217,11 @@ func TestInitialSync(t *testing.T) { wg.Add(2) // Setup fake api client. - fakeSvcWatch := watch.NewFake() - svcLW := fakeLW{ - listResp: &api.ServiceList{Items: []api.Service{*svc1, *svc2}}, - watchResp: fakeSvcWatch, - } - fakeEpsWatch := watch.NewFake() - epsLW := fakeLW{ - listResp: &api.EndpointsList{Items: []api.Endpoints{*eps2, *eps1}}, - watchResp: fakeEpsWatch, - } + client := fake.NewSimpleClientset(svc1, svc2, eps2, eps1) + sharedInformers := informers.NewSharedInformerFactory(client, 0) - svcConfig := newServiceConfig(svcLW, time.Minute) - epsConfig := newEndpointsConfig(epsLW, time.Minute) + svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0) + epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0) svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done) svcConfig.RegisterHandler(svcHandler) epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) @@ -249,6 +229,7 @@ func TestInitialSync(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) + go sharedInformers.Start(stopCh) go svcConfig.Run(stopCh) go epsConfig.Run(stopCh) wg.Wait() diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 3d1552add0f..375c26fa488 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -21,12 +21,11 @@ import ( "time" "github.com/golang/glog" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion" listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion" "k8s.io/kubernetes/pkg/util/config" ) @@ -64,35 +63,36 @@ type EndpointsConfigHandler interface { // EndpointsConfig tracks a set of endpoints configurations. // It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change. type EndpointsConfig struct { - informer cache.Controller - lister listers.EndpointsLister - handlers []EndpointsConfigHandler + lister listers.EndpointsLister + listerSynced cache.InformerSynced + handlers []EndpointsConfigHandler // updates channel is used to trigger registered handlers. updates chan struct{} + stop chan struct{} } // NewEndpointsConfig creates a new EndpointsConfig. -func NewEndpointsConfig(c cache.Getter, period time.Duration) *EndpointsConfig { - endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything()) - return newEndpointsConfig(endpointsLW, period) -} +func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig { + result := &EndpointsConfig{ + lister: endpointsInformer.Lister(), + listerSynced: endpointsInformer.Informer().HasSynced, + // The updates channel is used to send interrupts to the Endpoints handler. + // It's buffered because we never want to block for as long as there is a + // pending interrupt, but don't want to drop them if the handler is doing + // work. + updates: make(chan struct{}, 1), + stop: make(chan struct{}), + } -func newEndpointsConfig(lw cache.ListerWatcher, period time.Duration) *EndpointsConfig { - result := &EndpointsConfig{} - - store, informer := cache.NewIndexerInformer( - lw, - &api.Endpoints{}, - period, + endpointsInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddEndpoints, UpdateFunc: result.handleUpdateEndpoints, DeleteFunc: result.handleDeleteEndpoints, }, - cache.Indexers{}, + resyncPeriod, ) - result.informer = informer - result.lister = listers.NewEndpointsLister(store) + return result } @@ -101,16 +101,9 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.handlers = append(c.handlers, handler) } -// Run starts the underlying informer and goroutine responsible for calling -// registered handlers. +// Run starts the goroutine responsible for calling registered handlers. func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { - // The updates channel is used to send interrupts to the Endpoints handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - c.updates = make(chan struct{}, 1) - go c.informer.Run(stopCh) - if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + if !cache.WaitForCacheSync(stopCh, c.listerSynced) { utilruntime.HandleError(fmt.Errorf("endpoint controller not synced")) return } @@ -118,27 +111,32 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { // We have synced informers. Now we can start delivering updates // to the registered handler. go func() { - for range c.updates { - endpoints, err := c.lister.List(labels.Everything()) - if err != nil { - glog.Errorf("Error while listing endpoints from cache: %v", err) - // This will cause a retry (if there isn't any other trigger in-flight). - c.dispatchUpdate() - continue - } - if endpoints == nil { - endpoints = []*api.Endpoints{} - } - for i := range c.handlers { - glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") - c.handlers[i].OnEndpointsUpdate(endpoints) + for { + select { + case <-c.updates: + endpoints, err := c.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("Error while listing endpoints from cache: %v", err) + // This will cause a retry (if there isn't any other trigger in-flight). + c.dispatchUpdate() + continue + } + if endpoints == nil { + endpoints = []*api.Endpoints{} + } + for i := range c.handlers { + glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") + c.handlers[i].OnEndpointsUpdate(endpoints) + } + case <-c.stop: + return } } }() // Close updates channel when stopCh is closed. go func() { <-stopCh - close(c.updates) + close(c.stop) }() } @@ -157,6 +155,9 @@ func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) { func (c *EndpointsConfig) dispatchUpdate() { select { case c.updates <- struct{}{}: + // Work enqueued successfully + case <-c.stop: + // We're shut down / avoid logging the message below default: glog.V(4).Infof("Endpoints handler already has a pending interrupt.") } @@ -165,35 +166,36 @@ func (c *EndpointsConfig) dispatchUpdate() { // ServiceConfig tracks a set of service configurations. // It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change. type ServiceConfig struct { - informer cache.Controller - lister listers.ServiceLister - handlers []ServiceConfigHandler + lister listers.ServiceLister + listerSynced cache.InformerSynced + handlers []ServiceConfigHandler // updates channel is used to trigger registered handlers updates chan struct{} + stop chan struct{} } // NewServiceConfig creates a new ServiceConfig. -func NewServiceConfig(c cache.Getter, period time.Duration) *ServiceConfig { - servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything()) - return newServiceConfig(servicesLW, period) -} +func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig { + result := &ServiceConfig{ + lister: serviceInformer.Lister(), + listerSynced: serviceInformer.Informer().HasSynced, + // The updates channel is used to send interrupts to the Services handler. + // It's buffered because we never want to block for as long as there is a + // pending interrupt, but don't want to drop them if the handler is doing + // work. + updates: make(chan struct{}, 1), + stop: make(chan struct{}), + } -func newServiceConfig(lw cache.ListerWatcher, period time.Duration) *ServiceConfig { - result := &ServiceConfig{} - - store, informer := cache.NewIndexerInformer( - lw, - &api.Service{}, - period, + serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, }, - cache.Indexers{}, + resyncPeriod, ) - result.informer = informer - result.lister = listers.NewServiceLister(store) + return result } @@ -202,16 +204,10 @@ func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { c.handlers = append(c.handlers, handler) } -// Run starts the underlying informer and goroutine responsible for calling +// Run starts the goroutine responsible for calling // registered handlers. func (c *ServiceConfig) Run(stopCh <-chan struct{}) { - // The updates channel is used to send interrupts to the Services handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - c.updates = make(chan struct{}, 1) - go c.informer.Run(stopCh) - if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { + if !cache.WaitForCacheSync(stopCh, c.listerSynced) { utilruntime.HandleError(fmt.Errorf("service controller not synced")) return } @@ -219,27 +215,32 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { // We have synced informers. Now we can start delivering updates // to the registered handler. go func() { - for range c.updates { - services, err := c.lister.List(labels.Everything()) - if err != nil { - glog.Errorf("Error while listing services from cache: %v", err) - // This will cause a retry (if there isn't any other trigger in-flight). - c.dispatchUpdate() - continue - } - if services == nil { - services = []*api.Service{} - } - for i := range c.handlers { - glog.V(3).Infof("Calling handler.OnServiceUpdate()") - c.handlers[i].OnServiceUpdate(services) + for { + select { + case <-c.updates: + services, err := c.lister.List(labels.Everything()) + if err != nil { + glog.Errorf("Error while listing services from cache: %v", err) + // This will cause a retry (if there isnt' any other trigger in-flight). + c.dispatchUpdate() + continue + } + if services == nil { + services = []*api.Service{} + } + for i := range c.handlers { + glog.V(3).Infof("Calling handler.OnServiceUpdate()") + c.handlers[i].OnServiceUpdate(services) + } + case <-c.stop: + return } } }() // Close updates channel when stopCh is closed. go func() { <-stopCh - close(c.updates) + close(c.stop) }() } @@ -258,8 +259,11 @@ func (c *ServiceConfig) handleDeleteService(_ interface{}) { func (c *ServiceConfig) dispatchUpdate() { select { case c.updates <- struct{}{}: + // Work enqueued successfully + case <-c.stop: + // We're shut down / avoid logging the message below default: - glog.V(4).Infof("Service handler alread has a pending interrupt.") + glog.V(4).Infof("Service handler already has a pending interrupt.") } } diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index e07fd603ac6..b5883211fdf 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -25,7 +25,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + ktesting "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" ) type sortedServices []*api.Service @@ -121,17 +124,19 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints } func TestNewServiceAddedAndNotified(t *testing.T) { + client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - lw := fakeLW{ - listResp: &api.ServiceList{Items: []api.Service{}}, - watchResp: fakeWatch, - } + client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil)) + stopCh := make(chan struct{}) defer close(stopCh) - config := newServiceConfig(lw, time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + + config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() config.RegisterHandler(handler) + go sharedInformers.Start(stopCh) go config.Run(stopCh) service := &api.Service{ @@ -143,17 +148,19 @@ func TestNewServiceAddedAndNotified(t *testing.T) { } func TestServiceAddedRemovedSetAndNotified(t *testing.T) { + client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - lw := fakeLW{ - listResp: &api.ServiceList{Items: []api.Service{}}, - watchResp: fakeWatch, - } + client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil)) + stopCh := make(chan struct{}) defer close(stopCh) - config := newServiceConfig(lw, time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + + config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() config.RegisterHandler(handler) + go sharedInformers.Start(stopCh) go config.Run(stopCh) service1 := &api.Service{ @@ -177,19 +184,21 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { } func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { + client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - lw := fakeLW{ - listResp: &api.ServiceList{Items: []api.Service{}}, - watchResp: fakeWatch, - } + client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil)) + stopCh := make(chan struct{}) defer close(stopCh) - config := newServiceConfig(lw, time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + + config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) handler := NewServiceHandlerMock() handler2 := NewServiceHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) + go sharedInformers.Start(stopCh) go config.Run(stopCh) service1 := &api.Service{ @@ -209,19 +218,21 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) { } func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { + client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - lw := fakeLW{ - listResp: &api.EndpointsList{Items: []api.Endpoints{}}, - watchResp: fakeWatch, - } + client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) + stopCh := make(chan struct{}) defer close(stopCh) - config := newEndpointsConfig(lw, time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + + config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) + go sharedInformers.Start(stopCh) go config.Run(stopCh) endpoints1 := &api.Endpoints{ @@ -247,19 +258,21 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { } func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { + client := fake.NewSimpleClientset() fakeWatch := watch.NewFake() - lw := fakeLW{ - listResp: &api.EndpointsList{Items: []api.Endpoints{}}, - watchResp: fakeWatch, - } + client.PrependWatchReactor("endpoints", ktesting.DefaultWatchReactor(fakeWatch, nil)) + stopCh := make(chan struct{}) defer close(stopCh) - config := newEndpointsConfig(lw, time.Minute) + sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) + + config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) + go sharedInformers.Start(stopCh) go config.Run(stopCh) endpoints1 := &api.Endpoints{