diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index a5c14390777..e7af2601b34 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -302,11 +302,8 @@ type ProxyServer struct { ResourceContainer string ConfigSyncPeriod time.Duration ServiceEventHandler proxyconfig.ServiceHandler - // TODO: Migrate all handlers to ServiceHandler types and - // get rid of this one. - ServiceHandler proxyconfig.ServiceConfigHandler - EndpointsEventHandler proxyconfig.EndpointsHandler - HealthzServer *healthcheck.HealthzServer + EndpointsEventHandler proxyconfig.EndpointsHandler + HealthzServer *healthcheck.HealthzServer } // createClients creates a kube client and an event client from the given config and masterOverride. @@ -397,9 +394,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx var proxier proxy.ProxyProvider var serviceEventHandler proxyconfig.ServiceHandler - // TODO: Migrate all handlers to ServiceHandler types and - // get rid of this one. - var serviceHandler proxyconfig.ServiceConfigHandler var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), iptInterface, iptables.LinuxKernelCompatTester{}) @@ -517,7 +511,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx ResourceContainer: config.ResourceContainer, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, ServiceEventHandler: serviceEventHandler, - ServiceHandler: serviceHandler, EndpointsEventHandler: endpointsEventHandler, HealthzServer: healthzServer, }, nil @@ -621,12 +614,7 @@ func (s *ProxyServer) Run() error { // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod) - if s.ServiceHandler != nil { - serviceConfig.RegisterHandler(s.ServiceHandler) - } - if s.ServiceEventHandler != nil { - serviceConfig.RegisterEventHandler(s.ServiceEventHandler) - } + serviceConfig.RegisterEventHandler(s.ServiceEventHandler) go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod) diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index b68c78286d9..8048f6e9d67 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -21,7 +21,6 @@ go_library( "//pkg/client/listers/core/internalversion:go_default_library", "//pkg/controller:go_default_library", "//vendor/github.com/golang/glog:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 3e99d1f9fa6..1c79fdf6f24 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -21,7 +21,6 @@ import ( "time" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -30,22 +29,6 @@ import ( "k8s.io/kubernetes/pkg/controller" ) -// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. -// DEPRECATED: Use ServiceHandler instead - this will be removed soon. -type ServiceConfigHandler interface { - // OnServiceUpdate gets called when a service is created, removed or changed - // on any of the configuration sources. An example is when a new service - // comes up. - // - // NOTE: For efficiency, services are being passed by reference, thus, - // OnServiceUpdate should NOT modify pointers of a given slice. - // Those service objects are shared with other layers of the system and - // are guaranteed to be immutable with the assumption that are also - // not mutated by those handlers. Make a deep copy if you need to modify - // them in your code. - OnServiceUpdate(services []*api.Service) -} - // ServiceHandler is an abstract interface of objects which receive // notifications about service object changes. type ServiceHandler interface { @@ -185,11 +168,6 @@ type ServiceConfig struct { lister listers.ServiceLister listerSynced cache.InformerSynced eventHandlers []ServiceHandler - // TODO: Remove as soon as we migrate everything to event handlers. - handlers []ServiceConfigHandler - // updates channel is used to trigger registered handlers - updates chan struct{} - stop chan struct{} } // NewServiceConfig creates a new ServiceConfig. @@ -197,12 +175,6 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio result := &ServiceConfig{ lister: serviceInformer.Lister(), listerSynced: serviceInformer.Informer().HasSynced, - // The updates channel is used to send interrupts to the Services handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - updates: make(chan struct{}, 1), - stop: make(chan struct{}), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -217,12 +189,6 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio return result } -// RegisterHandler registers a handler which is called on every services change. -// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon. -func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { - c.handlers = append(c.handlers, handler) -} - // RegisterEventHandler registers a handler which is called on every service change. func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) { c.eventHandlers = append(c.eventHandlers, handler) @@ -240,40 +206,12 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) { return } - // We have synced informers. Now we can start delivering updates - // to the registered handler. for i := range c.eventHandlers { glog.V(3).Infof("Calling handler.OnServiceSynced()") c.eventHandlers[i].OnServiceSynced() } - go func() { - defer utilruntime.HandleCrash() - for { - select { - case <-c.updates: - services, err := c.lister.List(labels.Everything()) - if err != nil { - glog.Errorf("Error while listing services from cache: %v", err) - // This will cause a retry (if there isnt' any other trigger in-flight). - c.dispatchUpdate() - continue - } - if services == nil { - services = []*api.Service{} - } - for i := range c.handlers { - glog.V(3).Infof("Calling handler.OnServiceUpdate()") - c.handlers[i].OnServiceUpdate(services) - } - case <-c.stop: - return - } - } - }() - // Close updates channel when stopCh is closed. <-stopCh - close(c.stop) } func (c *ServiceConfig) handleAddService(obj interface{}) { @@ -286,7 +224,6 @@ func (c *ServiceConfig) handleAddService(obj interface{}) { glog.V(4).Infof("Calling handler.OnServiceAdd") c.eventHandlers[i].OnServiceAdd(service) } - c.dispatchUpdate() } func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) { @@ -304,7 +241,6 @@ func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) { glog.V(4).Infof("Calling handler.OnServiceUpdate") c.eventHandlers[i].OnServiceUpdate(oldService, service) } - c.dispatchUpdate() } func (c *ServiceConfig) handleDeleteService(obj interface{}) { @@ -324,16 +260,4 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) { glog.V(4).Infof("Calling handler.OnServiceDelete") c.eventHandlers[i].OnServiceDelete(service) } - c.dispatchUpdate() -} - -func (c *ServiceConfig) dispatchUpdate() { - select { - case c.updates <- struct{}{}: - // Work enqueued successfully - case <-c.stop: - // We're shut down / avoid logging the message below - default: - glog.V(4).Infof("Service handler already has a pending interrupt.") - } }