diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index d15969124a7..3f0c43d9449 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -220,9 +220,6 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err var proxier proxy.ProxyProvider var servicesHandler proxyconfig.ServiceConfigHandler - // TODO: Migrate all handlers to EndpointsHandler type and - // get rid of this one. - var endpointsHandler proxyconfig.EndpointsConfigHandler var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) @@ -321,12 +318,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod) - if endpointsHandler != nil { - endpointsConfig.RegisterHandler(endpointsHandler) - } - if endpointsEventHandler != nil { - endpointsConfig.RegisterEventHandler(endpointsEventHandler) - } + endpointsConfig.RegisterEventHandler(endpointsEventHandler) go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those diff --git a/cmd/kubemark/hollow-node.go b/cmd/kubemark/hollow-node.go index 2b9cfa669ec..5c422d10dba 100644 --- a/cmd/kubemark/hollow-node.go +++ b/cmd/kubemark/hollow-node.go @@ -143,7 +143,7 @@ func main() { serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), configResyncPeriod) - endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) + endpointsConfig.RegisterEventHandler(&kubemark.FakeProxyHandler{}) eventClient, err := clientgoclientset.NewForConfig(clientConfig) if err != nil { diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index ae738f88e47..4e5fecb361b 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -40,8 +40,11 @@ type HollowProxy struct { type FakeProxyHandler struct{} -func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {} -func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {} +func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {} +func (*FakeProxyHandler) OnEndpointsAdd(endpoints *api.Endpoints) {} +func (*FakeProxyHandler) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {} +func (*FakeProxyHandler) OnEndpointsDelete(endpoints *api.Endpoints) {} +func (*FakeProxyHandler) OnEndpointsSynced() {} type FakeProxier struct{} diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 2adb49bc421..8c6967c1d2c 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -40,6 +40,7 @@ go_test( "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/testing", diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 11600eae8c3..6d8d0a68f0b 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -24,6 +24,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" ktesting "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/api" @@ -124,40 +125,34 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - ch := make(chan struct{}) - handler := newEpsHandler(t, nil, func() { ch <- struct{}{} }) + handler := NewEndpointsHandlerMock() sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) endpointsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) - endpointsConfig.RegisterHandler(handler) + endpointsConfig.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) go endpointsConfig.Run(stopCh) // Add the first endpoints - handler.expected = []*api.Endpoints{endpoints1v1} fakeWatch.Add(endpoints1v1) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1}) // Add another endpoints - handler.expected = []*api.Endpoints{endpoints1v1, endpoints2} fakeWatch.Add(endpoints2) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1, endpoints2}) // Modify endpoints1 - handler.expected = []*api.Endpoints{endpoints1v2, endpoints2} fakeWatch.Modify(endpoints1v2) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v2, endpoints2}) // Delete endpoints1 - handler.expected = []*api.Endpoints{endpoints2} fakeWatch.Delete(endpoints1v2) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{endpoints2}) // Delete endpoints2 - handler.expected = []*api.Endpoints{} fakeWatch.Delete(endpoints2) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{}) } type svcHandler struct { @@ -178,22 +173,17 @@ func (s *svcHandler) OnServiceUpdate(services []*api.Service) { } } -type epsHandler struct { - t *testing.T - expected []*api.Endpoints - done func() -} - -func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) *epsHandler { - return &epsHandler{t: t, expected: eps, done: done} -} - -func (e *epsHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) { - defer e.done() - sort.Sort(sortedEndpoints(endpoints)) - if !reflect.DeepEqual(e.expected, endpoints) { - e.t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, e.expected) +func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler { + ehm := &EndpointsHandlerMock{ + state: make(map[types.NamespacedName]*api.Endpoints), } + ehm.process = func(endpoints []*api.Endpoints) { + defer done() + if !reflect.DeepEqual(eps, endpoints) { + t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps) + } + } + return ehm } func TestInitialSync(t *testing.T) { @@ -225,7 +215,7 @@ func TestInitialSync(t *testing.T) { svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done) svcConfig.RegisterHandler(svcHandler) epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) - epsConfig.RegisterHandler(epsHandler) + epsConfig.RegisterEventHandler(epsHandler) stopCh := make(chan struct{}) defer close(stopCh) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 73bfae3caf8..979733af993 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -45,21 +45,6 @@ type ServiceConfigHandler interface { OnServiceUpdate(services []*api.Service) } -// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints. -type EndpointsConfigHandler interface { - // OnEndpointsUpdate gets called when endpoints configuration is changed for a given - // service on any of the configuration sources. An example is when a new - // service comes up, or when containers come up or down for an existing service. - // - // NOTE: For efficiency, endpoints are being passed by reference, thus, - // OnEndpointsUpdate should NOT modify pointers of a given slice. - // Those endpoints objects are shared with other layers of the system and - // are guaranteed to be immutable with the assumption that are also - // not mutated by those handlers. Make a deep copy if you need to modify - // them in your code. - OnEndpointsUpdate(endpoints []*api.Endpoints) -} - // EndpointsHandler is an abstract interface o objects which receive // notifications about endpoints object changes. type EndpointsHandler interface { @@ -83,11 +68,6 @@ type EndpointsConfig struct { lister listers.EndpointsLister listerSynced cache.InformerSynced eventHandlers []EndpointsHandler - // TODO: Remove handlers by switching them to eventHandlers. - handlers []EndpointsConfigHandler - // updates channel is used to trigger registered handlers. - updates chan struct{} - stop chan struct{} } // NewEndpointsConfig creates a new EndpointsConfig. @@ -95,12 +75,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn result := &EndpointsConfig{ lister: endpointsInformer.Lister(), listerSynced: endpointsInformer.Informer().HasSynced, - // The updates channel is used to send interrupts to the Endpoints handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - updates: make(chan struct{}, 1), - stop: make(chan struct{}), } endpointsInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -115,11 +89,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn return result } -// RegisterHandler registers a handler which is called on every endpoints change. -func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { - c.handlers = append(c.handlers, handler) -} - // RegisterEventHandler registers a handler which is called on every endpoints change. func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) { c.eventHandlers = append(c.eventHandlers, handler) @@ -132,40 +101,12 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { return } - // We have synced informers. Now we can start delivering updates - // to the registered handler. - go func() { - for i := range c.eventHandlers { - glog.V(3).Infof("Calling handler.OnEndpointsSynced()") - c.eventHandlers[i].OnEndpointsSynced() - } - for { - select { - case <-c.updates: - endpoints, err := c.lister.List(labels.Everything()) - if err != nil { - glog.Errorf("Error while listing endpoints from cache: %v", err) - // This will cause a retry (if there isn't any other trigger in-flight). - c.dispatchUpdate() - continue - } - if endpoints == nil { - endpoints = []*api.Endpoints{} - } - for i := range c.handlers { - glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") - c.handlers[i].OnEndpointsUpdate(endpoints) - } - case <-c.stop: - return - } - } - }() - // Close updates channel when stopCh is closed. - go func() { - <-stopCh - close(c.stop) - }() + for i := range c.eventHandlers { + glog.V(3).Infof("Calling handler.OnEndpointsSynced()") + c.eventHandlers[i].OnEndpointsSynced() + } + + <-stopCh } func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) { @@ -178,7 +119,6 @@ func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) { glog.V(4).Infof("Calling handler.OnEndpointsAdd") c.eventHandlers[i].OnEndpointsAdd(endpoints) } - c.dispatchUpdate() } func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) { @@ -196,7 +136,6 @@ func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) { glog.V(4).Infof("Calling handler.OnEndpointsUpdate") c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints) } - c.dispatchUpdate() } func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { @@ -216,18 +155,6 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { glog.V(4).Infof("Calling handler.OnEndpointsUpdate") c.eventHandlers[i].OnEndpointsDelete(endpoints) } - c.dispatchUpdate() -} - -func (c *EndpointsConfig) dispatchUpdate() { - select { - case c.updates <- struct{}{}: - // Work enqueued successfully - case <-c.stop: - // We're shut down / avoid logging the message below - default: - glog.V(4).Infof("Endpoints handler already has a pending interrupt.") - } } // ServiceConfig tracks a set of service configurations. diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index b5883211fdf..d4a9a370e07 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -19,10 +19,12 @@ package config import ( "reflect" "sort" + "sync" "testing" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" ktesting "k8s.io/client-go/testing" @@ -45,7 +47,6 @@ func (s sortedServices) Less(i, j int) bool { type ServiceHandlerMock struct { updated chan []*api.Service - waits int } func NewServiceHandlerMock() *ServiceHandlerMock { @@ -90,17 +91,66 @@ func (s sortedEndpoints) Less(i, j int) bool { } type EndpointsHandlerMock struct { + lock sync.Mutex + + state map[types.NamespacedName]*api.Endpoints + synced bool updated chan []*api.Endpoints - waits int + process func([]*api.Endpoints) } func NewEndpointsHandlerMock() *EndpointsHandlerMock { - return &EndpointsHandlerMock{updated: make(chan []*api.Endpoints, 5)} + ehm := &EndpointsHandlerMock{ + state: make(map[types.NamespacedName]*api.Endpoints), + updated: make(chan []*api.Endpoints, 5), + } + ehm.process = func(endpoints []*api.Endpoints) { + ehm.updated <- endpoints + } + return ehm } -func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []*api.Endpoints) { +func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *api.Endpoints) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + h.state[namespacedName] = endpoints + h.sendEndpoints() +} + +func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + h.state[namespacedName] = endpoints + h.sendEndpoints() +} + +func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *api.Endpoints) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + delete(h.state, namespacedName) + h.sendEndpoints() +} + +func (h *EndpointsHandlerMock) OnEndpointsSynced() { + h.lock.Lock() + defer h.lock.Unlock() + h.synced = true + h.sendEndpoints() +} + +func (h *EndpointsHandlerMock) sendEndpoints() { + if !h.synced { + return + } + endpoints := make([]*api.Endpoints, 0, len(h.state)) + for _, eps := range h.state { + endpoints = append(endpoints, eps) + } sort.Sort(sortedEndpoints(endpoints)) - h.updated <- endpoints + h.process(endpoints) } func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*api.Endpoints) { @@ -230,8 +280,8 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() - config.RegisterHandler(handler) - config.RegisterHandler(handler2) + config.RegisterEventHandler(handler) + config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh) @@ -270,8 +320,8 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() - config.RegisterHandler(handler) - config.RegisterHandler(handler2) + config.RegisterEventHandler(handler) + config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh)