From f7c06ad23cff8145ad81848542ddab4966b45725 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 4 Apr 2017 13:00:58 +0200 Subject: [PATCH] Support endpoints event handlers in kube-proxy --- cmd/kube-proxy/app/server.go | 12 +++++- pkg/proxy/config/config.go | 79 +++++++++++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 8 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 6d486943f58..9131fe39622 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -220,7 +220,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err var proxier proxy.ProxyProvider var servicesHandler proxyconfig.ServiceConfigHandler + // TODO: Migrate all handlers to EndpointsHandler type and + // get rid of this one. var endpointsHandler proxyconfig.EndpointsConfigHandler + var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) if proxyMode == proxyModeIPTables { @@ -257,7 +260,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // our config.EndpointsConfigHandler. loadBalancer := winuserspace.NewLoadBalancerRR() - // set EndpointsConfigHandler to our loadBalancer + // set EndpointsHandler to our loadBalancer endpointsHandler = loadBalancer proxierUserspace, err := winuserspace.NewProxier( loadBalancer, @@ -318,7 +321,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod) - endpointsConfig.RegisterHandler(endpointsHandler) + if endpointsHandler != nil { + endpointsConfig.RegisterHandler(endpointsHandler) + } + if endpointsEventHandler != nil { + endpointsConfig.RegisterEventHandler(endpointsEventHandler) + } go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 375c26fa488..73bfae3caf8 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -60,12 +60,31 @@ type EndpointsConfigHandler interface { OnEndpointsUpdate(endpoints []*api.Endpoints) } +// EndpointsHandler is an abstract interface o objects which receive +// notifications about endpoints object changes. +type EndpointsHandler interface { + // OnEndpointsAdd is called whenever creation of new endpoints object + // is observed. + OnEndpointsAdd(endpoints *api.Endpoints) + // OnEndpointsUpdate is called whenever modification of an existing + // endpoints object is observed. + OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) + // OnEndpointsDelete is called whever deletion of an existing endpoints + // object is observed. + OnEndpointsDelete(endpoints *api.Endpoints) + // OnEndpointsSynced is called once all the initial event handlers were + // called and the state is fully propagated to local cache. + OnEndpointsSynced() +} + // EndpointsConfig tracks a set of endpoints configurations. // It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change. type EndpointsConfig struct { - lister listers.EndpointsLister - listerSynced cache.InformerSynced - handlers []EndpointsConfigHandler + lister listers.EndpointsLister + listerSynced cache.InformerSynced + eventHandlers []EndpointsHandler + // TODO: Remove handlers by switching them to eventHandlers. + handlers []EndpointsConfigHandler // updates channel is used to trigger registered handlers. updates chan struct{} stop chan struct{} @@ -101,6 +120,11 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.handlers = append(c.handlers, handler) } +// RegisterEventHandler registers a handler which is called on every endpoints change. +func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) { + c.eventHandlers = append(c.eventHandlers, handler) +} + // Run starts the goroutine responsible for calling registered handlers. func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { if !cache.WaitForCacheSync(stopCh, c.listerSynced) { @@ -111,6 +135,10 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { // We have synced informers. Now we can start delivering updates // to the registered handler. go func() { + for i := range c.eventHandlers { + glog.V(3).Infof("Calling handler.OnEndpointsSynced()") + c.eventHandlers[i].OnEndpointsSynced() + } for { select { case <-c.updates: @@ -140,15 +168,54 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { }() } -func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) { +func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) { + endpoints, ok := obj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnEndpointsAdd") + c.eventHandlers[i].OnEndpointsAdd(endpoints) + } c.dispatchUpdate() } -func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) { +func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) { + oldEndpoints, ok := oldObj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj)) + return + } + endpoints, ok := newObj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) + return + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnEndpointsUpdate") + c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints) + } c.dispatchUpdate() } -func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) { +func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { + endpoints, ok := obj.(*api.Endpoints) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + if endpoints, ok = tombstone.Obj.(*api.Endpoints); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj)) + return + } + } + for i := range c.eventHandlers { + glog.V(4).Infof("Calling handler.OnEndpointsUpdate") + c.eventHandlers[i].OnEndpointsDelete(endpoints) + } c.dispatchUpdate() }