Sync ipttables only when reflectors are fully synced

This commit is contained in:
Wojciech Tyczynski 2017-02-27 17:38:59 +01:00
parent dac0296f0b
commit df9cc0a59f
5 changed files with 126 additions and 32 deletions

View File

@ -32,7 +32,10 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["api_test.go"],
srcs = [
"api_test.go",
"config_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
@ -46,18 +49,6 @@ go_test(
],
)
go_test(
name = "go_default_xtest",
srcs = ["config_test.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/proxy/config:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),

View File

@ -30,19 +30,30 @@ import (
// NewSourceAPI creates config source that watches for changes to the services and endpoints.
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
stopCh := wait.NeverStop
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
newSourceAPI(servicesLW, endpointsLW, period, servicesChan, endpointsChan, wait.NeverStop)
}
func newSourceAPI(
servicesLW cache.ListerWatcher,
endpointsLW cache.ListerWatcher,
period time.Duration,
servicesChan chan<- ServiceUpdate,
endpointsChan chan<- EndpointsUpdate,
stopCh <-chan struct{}) {
serviceController := NewServiceController(servicesLW, period, servicesChan)
go serviceController.Run(stopCh)
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
endpointsController := NewEndpointsController(endpointsLW, period, endpointsChan)
go endpointsController.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced, endpointsController.HasSynced) {
utilruntime.HandleError(fmt.Errorf("source controllers not synced"))
return
}
servicesChan <- ServiceUpdate{Op: SYNCED}
endpointsChan <- EndpointsUpdate{Op: SYNCED}
}
func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {

View File

@ -17,6 +17,9 @@ limitations under the License.
package config
import (
"reflect"
"sort"
"sync"
"testing"
"time"
@ -226,3 +229,84 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
}
type svcHandler struct {
t *testing.T
expected []api.Service
done func()
}
func newSvcHandler(t *testing.T, svcs []api.Service, done func()) *svcHandler {
return &svcHandler{t: t, expected: svcs, done: done}
}
func (s *svcHandler) OnServiceUpdate(services []api.Service) {
defer s.done()
sort.Sort(sortedServices(services))
if !reflect.DeepEqual(s.expected, services) {
s.t.Errorf("Unexpected services: %#v, expected: %#v", services, s.expected)
}
}
type epsHandler struct {
t *testing.T
expected []api.Endpoints
done func()
}
func newEpsHandler(t *testing.T, eps []api.Endpoints, done func()) *epsHandler {
return &epsHandler{t: t, expected: eps, done: done}
}
func (e *epsHandler) OnEndpointsUpdate(endpoints []api.Endpoints) {
defer e.done()
sort.Sort(sortedEndpoints(endpoints))
if !reflect.DeepEqual(e.expected, endpoints) {
e.t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, e.expected)
}
}
func TestInitialSync(t *testing.T) {
svc1 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
}
svc2 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
}
eps1 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
}
eps2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
}
var wg sync.WaitGroup
// Wait for both services and endpoints handler.
wg.Add(2)
svcConfig := NewServiceConfig()
epsConfig := NewEndpointsConfig()
svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done)
svcConfig.RegisterHandler(svcHandler)
epsHandler := newEpsHandler(t, []api.Endpoints{*eps2, *eps1}, wg.Done)
epsConfig.RegisterHandler(epsHandler)
// Setup fake api client.
fakeSvcWatch := watch.NewFake()
svcLW := fakeLW{
listResp: &api.ServiceList{Items: []api.Service{*svc1, *svc2}},
watchResp: fakeSvcWatch,
}
fakeEpsWatch := watch.NewFake()
epsLW := fakeLW{
listResp: &api.EndpointsList{Items: []api.Endpoints{*eps2, *eps1}},
watchResp: fakeEpsWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
newSourceAPI(svcLW, epsLW, time.Minute, svcConfig.Channel("one"), epsConfig.Channel("two"), stopCh)
wg.Wait()
}

View File

@ -34,6 +34,7 @@ const (
ADD Operation = iota
UPDATE
REMOVE
SYNCED
)
// ServiceUpdate describes an operation of services, sent on the channel.
@ -88,6 +89,7 @@ func NewEndpointsConfig() *EndpointsConfig {
return &EndpointsConfig{mux, bcaster, store}
}
// RegisterHandler registers a handler which is called on every endpoints change.
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
@ -95,6 +97,7 @@ func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
}))
}
// Channel returns a channel to which endpoints updates should be delivered.
func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate {
ch := c.mux.Channel(source)
endpointsCh := make(chan EndpointsUpdate)
@ -106,6 +109,7 @@ func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate {
return endpointsCh
}
// Config returns list of all endpoints from underlying store.
func (c *EndpointsConfig) Config() []api.Endpoints {
return c.store.MergedState().([]api.Endpoints)
}
@ -113,6 +117,7 @@ func (c *EndpointsConfig) Config() []api.Endpoints {
type endpointsStore struct {
endpointLock sync.RWMutex
endpoints map[string]map[types.NamespacedName]*api.Endpoints
synced bool
updates chan<- struct{}
}
@ -132,18 +137,15 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update.Endpoints))
name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name}
delete(endpoints, name)
case SYNCED:
s.synced = true
default:
glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
}
s.endpoints[source] = endpoints
synced := s.synced
s.endpointLock.Unlock()
if s.updates != nil {
// TODO: We should not broadcase the signal, until the state is fully
// populated (i.e. until initial LIST of the underlying reflector is
// propagated here).
//
// Since we record the snapshot before sending this signal, it's
// possible that the consumer ends up performing an extra update.
if s.updates != nil && synced {
select {
case s.updates <- struct{}{}:
default:
@ -188,6 +190,7 @@ func NewServiceConfig() *ServiceConfig {
return &ServiceConfig{mux, bcaster, store}
}
// RegisterHandler registers a handler which is called on every services change.
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
@ -195,6 +198,7 @@ func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
}))
}
// Channel returns a channel to which services updates should be delivered.
func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
ch := c.mux.Channel(source)
serviceCh := make(chan ServiceUpdate)
@ -206,6 +210,7 @@ func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
return serviceCh
}
// Config returns list of all services from underlying store.
func (c *ServiceConfig) Config() []api.Service {
return c.store.MergedState().([]api.Service)
}
@ -213,6 +218,7 @@ func (c *ServiceConfig) Config() []api.Service {
type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[types.NamespacedName]*api.Service
synced bool
updates chan<- struct{}
}
@ -232,18 +238,15 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
glog.V(5).Infof("Removing a service %s", spew.Sdump(update.Service))
name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name}
delete(services, name)
case SYNCED:
s.synced = true
default:
glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
}
s.services[source] = services
synced := s.synced
s.serviceLock.Unlock()
if s.updates != nil {
// TODO: We should not broadcase the signal, until the state is fully
// populated (i.e. until initial LIST of the underlying reflector is
// propagated here).
//
// Since we record the snapshot before sending this signal, it's
// possible that the consumer ends up performing an extra update.
if s.updates != nil && synced {
select {
case s.updates <- struct{}{}:
default:

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package config_test
package config
import (
"reflect"
@ -25,7 +25,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api"
. "k8s.io/kubernetes/pkg/proxy/config"
)
const TomcatPort int = 8080
@ -140,6 +139,7 @@ func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpda
func TestNewServiceAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
@ -153,6 +153,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
@ -181,6 +182,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
if channelOne == channelTwo {
@ -204,6 +206,7 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewServiceHandlerMock()
@ -227,6 +230,7 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T
func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewEndpointsConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewEndpointsHandlerMock()
@ -257,6 +261,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
config := NewEndpointsConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewEndpointsHandlerMock()