From df9cc0a59f04b9cf275a3e7aa986e02b35d39eac Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Mon, 27 Feb 2017 17:38:59 +0100 Subject: [PATCH] Sync ipttables only when reflectors are fully synced --- pkg/proxy/config/BUILD | 17 ++----- pkg/proxy/config/api.go | 17 +++++-- pkg/proxy/config/api_test.go | 84 +++++++++++++++++++++++++++++++++ pkg/proxy/config/config.go | 31 ++++++------ pkg/proxy/config/config_test.go | 9 +++- 5 files changed, 126 insertions(+), 32 deletions(-) diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index cbc2acedfec..56e86e24e9d 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -32,7 +32,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["api_test.go"], + srcs = [ + "api_test.go", + "config_test.go", + ], library = ":go_default_library", tags = ["automanaged"], deps = [ @@ -46,18 +49,6 @@ go_test( ], ) -go_test( - name = "go_default_xtest", - srcs = ["config_test.go"], - tags = ["automanaged"], - deps = [ - "//pkg/api:go_default_library", - "//pkg/proxy/config:go_default_library", - "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", - "//vendor:k8s.io/apimachinery/pkg/util/wait", - ], -) - filegroup( name = "package-srcs", srcs = glob(["**"]), diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 227785a4c27..02aff427768 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -30,19 +30,30 @@ import ( // NewSourceAPI creates config source that watches for changes to the services and endpoints. func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { - stopCh := wait.NeverStop - servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything()) + endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything()) + newSourceAPI(servicesLW, endpointsLW, period, servicesChan, endpointsChan, wait.NeverStop) +} + +func newSourceAPI( + servicesLW cache.ListerWatcher, + endpointsLW cache.ListerWatcher, + period time.Duration, + servicesChan chan<- ServiceUpdate, + endpointsChan chan<- EndpointsUpdate, + stopCh <-chan struct{}) { serviceController := NewServiceController(servicesLW, period, servicesChan) go serviceController.Run(stopCh) - endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything()) endpointsController := NewEndpointsController(endpointsLW, period, endpointsChan) go endpointsController.Run(stopCh) if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced, endpointsController.HasSynced) { utilruntime.HandleError(fmt.Errorf("source controllers not synced")) + return } + servicesChan <- ServiceUpdate{Op: SYNCED} + endpointsChan <- EndpointsUpdate{Op: SYNCED} } func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) { diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 0ed99149329..b187b3d37e0 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -17,6 +17,9 @@ limitations under the License. package config import ( + "reflect" + "sort" + "sync" "testing" "time" @@ -226,3 +229,84 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { t.Errorf("Expected %#v, Got %#v", expected, got) } } + +type svcHandler struct { + t *testing.T + expected []api.Service + done func() +} + +func newSvcHandler(t *testing.T, svcs []api.Service, done func()) *svcHandler { + return &svcHandler{t: t, expected: svcs, done: done} +} + +func (s *svcHandler) OnServiceUpdate(services []api.Service) { + defer s.done() + sort.Sort(sortedServices(services)) + if !reflect.DeepEqual(s.expected, services) { + s.t.Errorf("Unexpected services: %#v, expected: %#v", services, s.expected) + } +} + +type epsHandler struct { + t *testing.T + expected []api.Endpoints + done func() +} + +func newEpsHandler(t *testing.T, eps []api.Endpoints, done func()) *epsHandler { + return &epsHandler{t: t, expected: eps, done: done} +} + +func (e *epsHandler) OnEndpointsUpdate(endpoints []api.Endpoints) { + defer e.done() + sort.Sort(sortedEndpoints(endpoints)) + if !reflect.DeepEqual(e.expected, endpoints) { + e.t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, e.expected) + } +} + +func TestInitialSync(t *testing.T) { + svc1 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, + Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, + } + svc2 := &api.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, + Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, + } + eps1 := &api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, + } + eps2 := &api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, + } + + var wg sync.WaitGroup + // Wait for both services and endpoints handler. + wg.Add(2) + + svcConfig := NewServiceConfig() + epsConfig := NewEndpointsConfig() + svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) + svcConfig.RegisterHandler(svcHandler) + epsHandler := newEpsHandler(t, []api.Endpoints{*eps2, *eps1}, wg.Done) + epsConfig.RegisterHandler(epsHandler) + + // Setup fake api client. + fakeSvcWatch := watch.NewFake() + svcLW := fakeLW{ + listResp: &api.ServiceList{Items: []api.Service{*svc1, *svc2}}, + watchResp: fakeSvcWatch, + } + fakeEpsWatch := watch.NewFake() + epsLW := fakeLW{ + listResp: &api.EndpointsList{Items: []api.Endpoints{*eps2, *eps1}}, + watchResp: fakeEpsWatch, + } + + stopCh := make(chan struct{}) + defer close(stopCh) + newSourceAPI(svcLW, epsLW, time.Minute, svcConfig.Channel("one"), epsConfig.Channel("two"), stopCh) + wg.Wait() +} diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 1e13acda23c..cdd894d71a6 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -34,6 +34,7 @@ const ( ADD Operation = iota UPDATE REMOVE + SYNCED ) // ServiceUpdate describes an operation of services, sent on the channel. @@ -88,6 +89,7 @@ func NewEndpointsConfig() *EndpointsConfig { return &EndpointsConfig{mux, bcaster, store} } +// RegisterHandler registers a handler which is called on every endpoints change. func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") @@ -95,6 +97,7 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { })) } +// Channel returns a channel to which endpoints updates should be delivered. func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate { ch := c.mux.Channel(source) endpointsCh := make(chan EndpointsUpdate) @@ -106,6 +109,7 @@ func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate { return endpointsCh } +// Config returns list of all endpoints from underlying store. func (c *EndpointsConfig) Config() []api.Endpoints { return c.store.MergedState().([]api.Endpoints) } @@ -113,6 +117,7 @@ func (c *EndpointsConfig) Config() []api.Endpoints { type endpointsStore struct { endpointLock sync.RWMutex endpoints map[string]map[types.NamespacedName]*api.Endpoints + synced bool updates chan<- struct{} } @@ -132,18 +137,15 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update.Endpoints)) name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name} delete(endpoints, name) + case SYNCED: + s.synced = true default: glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) } s.endpoints[source] = endpoints + synced := s.synced s.endpointLock.Unlock() - if s.updates != nil { - // TODO: We should not broadcase the signal, until the state is fully - // populated (i.e. until initial LIST of the underlying reflector is - // propagated here). - // - // Since we record the snapshot before sending this signal, it's - // possible that the consumer ends up performing an extra update. + if s.updates != nil && synced { select { case s.updates <- struct{}{}: default: @@ -188,6 +190,7 @@ func NewServiceConfig() *ServiceConfig { return &ServiceConfig{mux, bcaster, store} } +// RegisterHandler registers a handler which is called on every services change. func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof("Calling handler.OnServiceUpdate()") @@ -195,6 +198,7 @@ func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { })) } +// Channel returns a channel to which services updates should be delivered. func (c *ServiceConfig) Channel(source string) chan ServiceUpdate { ch := c.mux.Channel(source) serviceCh := make(chan ServiceUpdate) @@ -206,6 +210,7 @@ func (c *ServiceConfig) Channel(source string) chan ServiceUpdate { return serviceCh } +// Config returns list of all services from underlying store. func (c *ServiceConfig) Config() []api.Service { return c.store.MergedState().([]api.Service) } @@ -213,6 +218,7 @@ func (c *ServiceConfig) Config() []api.Service { type serviceStore struct { serviceLock sync.RWMutex services map[string]map[types.NamespacedName]*api.Service + synced bool updates chan<- struct{} } @@ -232,18 +238,15 @@ func (s *serviceStore) Merge(source string, change interface{}) error { glog.V(5).Infof("Removing a service %s", spew.Sdump(update.Service)) name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name} delete(services, name) + case SYNCED: + s.synced = true default: glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) } s.services[source] = services + synced := s.synced s.serviceLock.Unlock() - if s.updates != nil { - // TODO: We should not broadcase the signal, until the state is fully - // populated (i.e. until initial LIST of the underlying reflector is - // propagated here). - // - // Since we record the snapshot before sending this signal, it's - // possible that the consumer ends up performing an extra update. + if s.updates != nil && synced { select { case s.updates <- struct{}{}: default: diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 74f0bc19c24..cdb2edad94e 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package config_test +package config import ( "reflect" @@ -25,7 +25,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/api" - . "k8s.io/kubernetes/pkg/proxy/config" ) const TomcatPort int = 8080 @@ -140,6 +139,7 @@ func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpda func TestNewServiceAddedAndNotified(t *testing.T) { config := NewServiceConfig() + config.store.synced = true channel := config.Channel("one") handler := NewServiceHandlerMock() config.RegisterHandler(handler) @@ -153,6 +153,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) { func TestServiceAddedRemovedSetAndNotified(t *testing.T) { config := NewServiceConfig() + config.store.synced = true channel := config.Channel("one") handler := NewServiceHandlerMock() config.RegisterHandler(handler) @@ -181,6 +182,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) { config := NewServiceConfig() + config.store.synced = true channelOne := config.Channel("one") channelTwo := config.Channel("two") if channelOne == channelTwo { @@ -204,6 +206,7 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) { func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) { config := NewServiceConfig() + config.store.synced = true channelOne := config.Channel("one") channelTwo := config.Channel("two") handler := NewServiceHandlerMock() @@ -227,6 +230,7 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { config := NewEndpointsConfig() + config.store.synced = true channelOne := config.Channel("one") channelTwo := config.Channel("two") handler := NewEndpointsHandlerMock() @@ -257,6 +261,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing. func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { config := NewEndpointsConfig() + config.store.synced = true channelOne := config.Channel("one") channelTwo := config.Channel("two") handler := NewEndpointsHandlerMock()