Merge pull request #44499 from wojtek-t/edge_based_services_in_proxy

Automatic merge from submit-queue

Edge based services in proxy

This is sibling effort to what I did for endpoints in KubeProxy.
This PR is first one (changing config & iptables) - userspace will follow.
This commit is contained in:
Kubernetes Submit Queue 2017-04-19 09:43:04 -07:00 committed by GitHub
commit 505ec43dab
7 changed files with 241 additions and 106 deletions

View File

@ -219,7 +219,10 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname}) recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "kube-proxy", Host: hostname})
var proxier proxy.ProxyProvider var proxier proxy.ProxyProvider
var servicesHandler proxyconfig.ServiceConfigHandler var serviceEventHandler proxyconfig.ServiceHandler
// TODO: Migrate all handlers to ServiceHandler types and
// get rid of this one.
var serviceHandler proxyconfig.ServiceConfigHandler
var endpointsEventHandler proxyconfig.EndpointsHandler var endpointsEventHandler proxyconfig.EndpointsHandler
proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
@ -246,7 +249,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
proxier = proxierIPTables proxier = proxierIPTables
servicesHandler = proxierIPTables serviceEventHandler = proxierIPTables
endpointsEventHandler = proxierIPTables endpointsEventHandler = proxierIPTables
// No turning back. Remove artifacts that might still exist from the userspace Proxier. // No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(0).Info("Tearing down userspace rules.") glog.V(0).Info("Tearing down userspace rules.")
@ -271,7 +274,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
if err != nil { if err != nil {
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
servicesHandler = proxierUserspace serviceHandler = proxierUserspace
proxier = proxierUserspace proxier = proxierUserspace
} else { } else {
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
@ -292,7 +295,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
if err != nil { if err != nil {
glog.Fatalf("Unable to create proxier: %v", err) glog.Fatalf("Unable to create proxier: %v", err)
} }
servicesHandler = proxierUserspace serviceHandler = proxierUserspace
proxier = proxierUserspace proxier = proxierUserspace
} }
// Remove artifacts from the pure-iptables Proxier, if not on Windows. // Remove artifacts from the pure-iptables Proxier, if not on Windows.
@ -314,7 +317,12 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// only notify on changes, and the initial update (on process start) may be lost if no handlers // only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet. // are registered yet.
serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), config.ConfigSyncPeriod) serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), config.ConfigSyncPeriod)
serviceConfig.RegisterHandler(servicesHandler) if serviceHandler != nil {
serviceConfig.RegisterHandler(serviceHandler)
}
if serviceEventHandler != nil {
serviceConfig.RegisterEventHandler(serviceEventHandler)
}
go serviceConfig.Run(wait.NeverStop) go serviceConfig.Run(wait.NeverStop)
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)

View File

@ -20,7 +20,6 @@ go_library(
"//pkg/client/informers/informers_generated/internalversion/core/internalversion:go_default_library", "//pkg/client/informers/informers_generated/internalversion/core/internalversion:go_default_library",
"//pkg/client/listers/core/internalversion:go_default_library", "//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/util/config:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",

View File

@ -18,7 +18,6 @@ package config
import ( import (
"reflect" "reflect"
"sort"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -51,40 +50,34 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
defer close(stopCh) defer close(stopCh)
ch := make(chan struct{}) handler := NewServiceHandlerMock()
handler := newSvcHandler(t, nil, func() { ch <- struct{}{} })
sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) serviceConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
serviceConfig.RegisterHandler(handler) serviceConfig.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh) go sharedInformers.Start(stopCh)
go serviceConfig.Run(stopCh) go serviceConfig.Run(stopCh)
// Add the first service // Add the first service
handler.expected = []*api.Service{service1v1}
fakeWatch.Add(service1v1) fakeWatch.Add(service1v1)
<-ch handler.ValidateServices(t, []*api.Service{service1v1})
// Add another service // Add another service
handler.expected = []*api.Service{service1v1, service2}
fakeWatch.Add(service2) fakeWatch.Add(service2)
<-ch handler.ValidateServices(t, []*api.Service{service1v1, service2})
// Modify service1 // Modify service1
handler.expected = []*api.Service{service1v2, service2}
fakeWatch.Modify(service1v2) fakeWatch.Modify(service1v2)
<-ch handler.ValidateServices(t, []*api.Service{service1v2, service2})
// Delete service1 // Delete service1
handler.expected = []*api.Service{service2}
fakeWatch.Delete(service1v2) fakeWatch.Delete(service1v2)
<-ch handler.ValidateServices(t, []*api.Service{service2})
// Delete service2 // Delete service2
handler.expected = []*api.Service{}
fakeWatch.Delete(service2) fakeWatch.Delete(service2)
<-ch handler.ValidateServices(t, []*api.Service{})
} }
func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
@ -155,22 +148,17 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
handler.ValidateEndpoints(t, []*api.Endpoints{}) handler.ValidateEndpoints(t, []*api.Endpoints{})
} }
type svcHandler struct { func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) ServiceHandler {
t *testing.T shm := &ServiceHandlerMock{
expected []*api.Service state: make(map[types.NamespacedName]*api.Service),
done func()
} }
shm.process = func(services []*api.Service) {
func newSvcHandler(t *testing.T, svcs []*api.Service, done func()) *svcHandler { defer done()
return &svcHandler{t: t, expected: svcs, done: done} if !reflect.DeepEqual(services, svcs) {
t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs)
} }
func (s *svcHandler) OnServiceUpdate(services []*api.Service) {
defer s.done()
sort.Sort(sortedServices(services))
if !reflect.DeepEqual(s.expected, services) {
s.t.Errorf("Unexpected services: %#v, expected: %#v", services, s.expected)
} }
return shm
} }
func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler { func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler {
@ -213,7 +201,7 @@ func TestInitialSync(t *testing.T) {
svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0) svcConfig := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), 0)
epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0) epsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), 0)
svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done) svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
svcConfig.RegisterHandler(svcHandler) svcConfig.RegisterEventHandler(svcHandler)
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
epsConfig.RegisterEventHandler(epsHandler) epsConfig.RegisterEventHandler(epsHandler)

View File

@ -28,10 +28,10 @@ import (
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion" coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/core/internalversion"
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion" listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/config"
) )
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. // ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
// DEPRECATED: Use ServiceHandler instead - this will be removed soon.
type ServiceConfigHandler interface { type ServiceConfigHandler interface {
// OnServiceUpdate gets called when a service is created, removed or changed // OnServiceUpdate gets called when a service is created, removed or changed
// on any of the configuration sources. An example is when a new service // on any of the configuration sources. An example is when a new service
@ -46,7 +46,24 @@ type ServiceConfigHandler interface {
OnServiceUpdate(services []*api.Service) OnServiceUpdate(services []*api.Service)
} }
// EndpointsHandler is an abstract interface o objects which receive // ServiceHandler is an abstract interface of objects which receive
// notifications about service object changes.
type ServiceHandler interface {
// OnServiceAdd is called whenever creation of new service object
// is observed.
OnServiceAdd(service *api.Service)
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
OnServiceUpdate(oldService, service *api.Service)
// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
OnServiceDelete(service *api.Service)
// OnServiceSynced is called once all the initial even handlers were
// called and the state is fully propagated to local cache.
OnServiceSynced()
}
// EndpointsHandler is an abstract interface of objects which receive
// notifications about endpoints object changes. // notifications about endpoints object changes.
type EndpointsHandler interface { type EndpointsHandler interface {
// OnEndpointsAdd is called whenever creation of new endpoints object // OnEndpointsAdd is called whenever creation of new endpoints object
@ -157,7 +174,7 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
} }
} }
for i := range c.eventHandlers { for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnEndpointsUpdate") glog.V(4).Infof("Calling handler.OnEndpointsDelete")
c.eventHandlers[i].OnEndpointsDelete(endpoints) c.eventHandlers[i].OnEndpointsDelete(endpoints)
} }
} }
@ -167,6 +184,8 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
type ServiceConfig struct { type ServiceConfig struct {
lister listers.ServiceLister lister listers.ServiceLister
listerSynced cache.InformerSynced listerSynced cache.InformerSynced
eventHandlers []ServiceHandler
// TODO: Remove as soon as we migrate everything to event handlers.
handlers []ServiceConfigHandler handlers []ServiceConfigHandler
// updates channel is used to trigger registered handlers // updates channel is used to trigger registered handlers
updates chan struct{} updates chan struct{}
@ -199,10 +218,16 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio
} }
// RegisterHandler registers a handler which is called on every services change. // RegisterHandler registers a handler which is called on every services change.
// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon.
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.handlers = append(c.handlers, handler) c.handlers = append(c.handlers, handler)
} }
// RegisterEventHandler registers a handler which is called on every service change.
func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}
// Run starts the goroutine responsible for calling // Run starts the goroutine responsible for calling
// registered handlers. // registered handlers.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) { func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
@ -217,6 +242,10 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
// We have synced informers. Now we can start delivering updates // We have synced informers. Now we can start delivering updates
// to the registered handler. // to the registered handler.
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}
go func() { go func() {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
for { for {
@ -241,24 +270,60 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
} }
} }
}() }()
// Close updates channel when stopCh is closed. // Close updates channel when stopCh is closed.
go func() {
<-stopCh <-stopCh
close(c.stop) close(c.stop)
}()
<-stopCh
} }
func (c *ServiceConfig) handleAddService(_ interface{}) { func (c *ServiceConfig) handleAddService(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnServiceAdd")
c.eventHandlers[i].OnServiceAdd(service)
}
c.dispatchUpdate() c.dispatchUpdate()
} }
func (c *ServiceConfig) handleUpdateService(_, _ interface{}) { func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
oldService, ok := oldObj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
return
}
service, ok := newObj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnServiceUpdate")
c.eventHandlers[i].OnServiceUpdate(oldService, service)
}
c.dispatchUpdate() c.dispatchUpdate()
} }
func (c *ServiceConfig) handleDeleteService(_ interface{}) { func (c *ServiceConfig) handleDeleteService(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
if service, ok = tombstone.Obj.(*api.Service); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
}
for i := range c.eventHandlers {
glog.V(4).Infof("Calling handler.OnServiceDelete")
c.eventHandlers[i].OnServiceDelete(service)
}
c.dispatchUpdate() c.dispatchUpdate()
} }
@ -272,12 +337,3 @@ func (c *ServiceConfig) dispatchUpdate() {
glog.V(4).Infof("Service handler already has a pending interrupt.") glog.V(4).Infof("Service handler already has a pending interrupt.")
} }
} }
// watchForUpdates invokes bcaster.Notify() with the latest version of an object
// when changes occur.
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
for true {
<-updates
bcaster.Notify(accessor.MergedState())
}
}

View File

@ -46,16 +46,66 @@ func (s sortedServices) Less(i, j int) bool {
} }
type ServiceHandlerMock struct { type ServiceHandlerMock struct {
lock sync.Mutex
state map[types.NamespacedName]*api.Service
synced bool
updated chan []*api.Service updated chan []*api.Service
process func([]*api.Service)
} }
func NewServiceHandlerMock() *ServiceHandlerMock { func NewServiceHandlerMock() *ServiceHandlerMock {
return &ServiceHandlerMock{updated: make(chan []*api.Service, 5)} shm := &ServiceHandlerMock{
state: make(map[types.NamespacedName]*api.Service),
updated: make(chan []*api.Service, 5),
}
shm.process = func(services []*api.Service) {
shm.updated <- services
}
return shm
} }
func (h *ServiceHandlerMock) OnServiceUpdate(services []*api.Service) { func (h *ServiceHandlerMock) OnServiceAdd(service *api.Service) {
h.lock.Lock()
defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
h.state[namespacedName] = service
h.sendServices()
}
func (h *ServiceHandlerMock) OnServiceUpdate(oldService, service *api.Service) {
h.lock.Lock()
defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
h.state[namespacedName] = service
h.sendServices()
}
func (h *ServiceHandlerMock) OnServiceDelete(service *api.Service) {
h.lock.Lock()
defer h.lock.Unlock()
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
delete(h.state, namespacedName)
h.sendServices()
}
func (h *ServiceHandlerMock) OnServiceSynced() {
h.lock.Lock()
defer h.lock.Unlock()
h.synced = true
h.sendServices()
}
func (h *ServiceHandlerMock) sendServices() {
if !h.synced {
return
}
services := make([]*api.Service, 0, len(h.state))
for _, svc := range h.state {
services = append(services, svc)
}
sort.Sort(sortedServices(services)) sort.Sort(sortedServices(services))
h.updated <- services h.process(services)
} }
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) { func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*api.Service) {
@ -185,7 +235,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
handler := NewServiceHandlerMock() handler := NewServiceHandlerMock()
config.RegisterHandler(handler) config.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh) go sharedInformers.Start(stopCh)
go config.Run(stopCh) go config.Run(stopCh)
@ -209,7 +259,7 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
handler := NewServiceHandlerMock() handler := NewServiceHandlerMock()
config.RegisterHandler(handler) config.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh) go sharedInformers.Start(stopCh)
go config.Run(stopCh) go config.Run(stopCh)
@ -246,8 +296,8 @@ func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute) config := NewServiceConfig(sharedInformers.Core().InternalVersion().Services(), time.Minute)
handler := NewServiceHandlerMock() handler := NewServiceHandlerMock()
handler2 := NewServiceHandlerMock() handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler) config.RegisterEventHandler(handler)
config.RegisterHandler(handler2) config.RegisterEventHandler(handler2)
go sharedInformers.Start(stopCh) go sharedInformers.Start(stopCh)
go config.Run(stopCh) go config.Run(stopCh)

View File

@ -196,6 +196,7 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se
} }
type endpointsMap map[types.NamespacedName]*api.Endpoints type endpointsMap map[types.NamespacedName]*api.Endpoints
type serviceMap map[types.NamespacedName]*api.Service
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo
@ -211,13 +212,13 @@ type Proxier struct {
// to not be modified in the meantime, but also require to be not modified // to not be modified in the meantime, but also require to be not modified
// by Proxier. // by Proxier.
allEndpoints endpointsMap allEndpoints endpointsMap
// allServices is nil until we have seen an OnServiceUpdate event. allServices serviceMap
allServices []*api.Service
// endpointsSynced is set to true when endpoints are synced after startup. // endpointsSynced and servicesSynced are set to true when corresponding
// This is used to avoid updating iptables with some partial data after // objects are synced after startup. This is used to avoid updating iptables
// kube-proxy restart. // with some partial data after kube-proxy restart.
endpointsSynced bool endpointsSynced bool
servicesSynced bool
throttle flowcontrol.RateLimiter throttle flowcontrol.RateLimiter
@ -333,6 +334,7 @@ func NewProxier(ipt utiliptables.Interface,
endpointsMap: make(proxyEndpointMap), endpointsMap: make(proxyEndpointMap),
portsMap: make(map[localPort]closeable), portsMap: make(map[localPort]closeable),
allEndpoints: make(endpointsMap), allEndpoints: make(endpointsMap),
allServices: make(serviceMap),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod, minSyncPeriod: minSyncPeriod,
throttle: throttle, throttle: throttle,
@ -457,7 +459,7 @@ func (proxier *Proxier) SyncLoop() {
// Accepts a list of Services and the existing service map. Returns the new // Accepts a list of Services and the existing service map. Returns the new
// service map, a map of healthcheck ports, and a set of stale UDP // service map, a map of healthcheck ports, and a set of stale UDP
// services. // services.
func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) { func buildNewServiceMap(allServices serviceMap, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) {
newServiceMap := make(proxyServiceMap) newServiceMap := make(proxyServiceMap)
hcPorts := make(map[types.NamespacedName]uint16) hcPorts := make(map[types.NamespacedName]uint16)
@ -525,15 +527,37 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa
return newServiceMap, hcPorts, staleUDPServices return newServiceMap, hcPorts, staleUDPServices
} }
// OnServiceUpdate tracks the active set of service proxies. func (proxier *Proxier) OnServiceAdd(service *api.Service) {
// They will be synchronized using syncProxyRules() namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
if proxier.allServices == nil { proxier.allServices[namespacedName] = service
glog.V(2).Info("Received first Services update") proxier.syncProxyRules(syncReasonServices)
} }
proxier.allServices = allServices
func (proxier *Proxier) OnServiceUpdate(_, service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.allServices[namespacedName] = service
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
proxier.mu.Lock()
defer proxier.mu.Unlock()
delete(proxier.allServices, namespacedName)
proxier.syncProxyRules(syncReasonServices)
}
func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.servicesSynced = true
proxier.syncProxyRules(syncReasonServices) proxier.syncProxyRules(syncReasonServices)
} }

View File

@ -388,7 +388,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
iptables: ipt, iptables: ipt,
clusterCIDR: "10.0.0.0/24", clusterCIDR: "10.0.0.0/24",
allEndpoints: make(endpointsMap), allEndpoints: make(endpointsMap),
allServices: []*api.Service{}, allServices: make(serviceMap),
endpointsSynced: true, endpointsSynced: true,
hostname: testHostname, hostname: testHostname,
portsMap: make(map[localPort]closeable), portsMap: make(map[localPort]closeable),
@ -567,7 +567,7 @@ func TestClusterIPReject(t *testing.T) {
Port: "p80", Port: "p80",
} }
fp.allServices = []*api.Service{ fp.allServices = makeServiceMap(
makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) { makeTestService(svcPortName.Namespace, svcPortName.Namespace, func(svc *api.Service) {
svc.Spec.ClusterIP = svcIP svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []api.ServicePort{{ svc.Spec.Ports = []api.ServicePort{{
@ -576,7 +576,7 @@ func TestClusterIPReject(t *testing.T) {
Protocol: api.ProtocolTCP, Protocol: api.ProtocolTCP,
}} }}
}), }),
} )
fp.syncProxyRules(syncReasonForce) fp.syncProxyRules(syncReasonForce)
svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP)))) svcChain := string(servicePortChainName(svcPortName.String(), strings.ToLower(string(api.ProtocolTCP))))
@ -600,7 +600,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
Port: "p80", Port: "p80",
} }
fp.allServices = []*api.Service{ fp.allServices = makeServiceMap(
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
svc.Spec.ClusterIP = svcIP svc.Spec.ClusterIP = svcIP
svc.Spec.Ports = []api.ServicePort{{ svc.Spec.Ports = []api.ServicePort{{
@ -609,7 +609,7 @@ func TestClusterIPEndpointsJump(t *testing.T) {
Protocol: api.ProtocolTCP, Protocol: api.ProtocolTCP,
}} }}
}), }),
} )
epIP := "10.180.0.1" epIP := "10.180.0.1"
fp.allEndpoints = makeEndpointsMap( fp.allEndpoints = makeEndpointsMap(
@ -659,7 +659,7 @@ func TestLoadBalancer(t *testing.T) {
Port: "p80", Port: "p80",
} }
fp.allServices = []*api.Service{ fp.allServices = makeServiceMap(
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
svc.Spec.Type = "LoadBalancer" svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = svcIP svc.Spec.ClusterIP = svcIP
@ -673,7 +673,7 @@ func TestLoadBalancer(t *testing.T) {
IP: svcLBIP, IP: svcLBIP,
}} }}
}), }),
} )
epIP := "10.180.0.1" epIP := "10.180.0.1"
fp.allEndpoints = makeEndpointsMap( fp.allEndpoints = makeEndpointsMap(
@ -719,7 +719,7 @@ func TestNodePort(t *testing.T) {
Port: "p80", Port: "p80",
} }
fp.allServices = []*api.Service{ fp.allServices = makeServiceMap(
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
svc.Spec.Type = "NodePort" svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP svc.Spec.ClusterIP = svcIP
@ -730,7 +730,7 @@ func TestNodePort(t *testing.T) {
NodePort: int32(svcNodePort), NodePort: int32(svcNodePort),
}} }}
}), }),
} )
epIP := "10.180.0.1" epIP := "10.180.0.1"
fp.allEndpoints = makeEndpointsMap( fp.allEndpoints = makeEndpointsMap(
@ -769,7 +769,7 @@ func TestNodePortReject(t *testing.T) {
Port: "p80", Port: "p80",
} }
fp.allServices = []*api.Service{ fp.allServices = makeServiceMap(
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
svc.Spec.Type = "NodePort" svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP svc.Spec.ClusterIP = svcIP
@ -780,7 +780,7 @@ func TestNodePortReject(t *testing.T) {
NodePort: int32(svcNodePort), NodePort: int32(svcNodePort),
}} }}
}), }),
} )
fp.syncProxyRules(syncReasonForce) fp.syncProxyRules(syncReasonForce)
@ -806,7 +806,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
Port: "p80", Port: "p80",
} }
fp.allServices = []*api.Service{ fp.allServices = makeServiceMap(
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
svc.Spec.Type = "LoadBalancer" svc.Spec.Type = "LoadBalancer"
svc.Spec.ClusterIP = svcIP svc.Spec.ClusterIP = svcIP
@ -821,7 +821,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
}} }}
svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal
}), }),
} )
epIP1 := "10.180.0.1" epIP1 := "10.180.0.1"
epIP2 := "10.180.2.1" epIP2 := "10.180.2.1"
@ -900,7 +900,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
Port: "p80", Port: "p80",
} }
fp.allServices = []*api.Service{ fp.allServices = makeServiceMap(
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) { makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *api.Service) {
svc.Spec.Type = "NodePort" svc.Spec.Type = "NodePort"
svc.Spec.ClusterIP = svcIP svc.Spec.ClusterIP = svcIP
@ -912,7 +912,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable
}} }}
svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal svc.Annotations[service.BetaAnnotationExternalTraffic] = service.AnnotationValueExternalTrafficLocal
}), }),
} )
epIP1 := "10.180.0.1" epIP1 := "10.180.0.1"
epIP2 := "10.180.2.1" epIP2 := "10.180.2.1"
@ -992,7 +992,7 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po
} }
func TestBuildServiceMapAddRemove(t *testing.T) { func TestBuildServiceMapAddRemove(t *testing.T) {
services := []*api.Service{ services := makeServiceMap(
makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.ClusterIP = "172.16.55.4"
@ -1033,7 +1033,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
}, },
} }
}), }),
} )
serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
if len(serviceMap) != 8 { if len(serviceMap) != 8 {
@ -1056,8 +1056,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
} }
// Remove some stuff // Remove some stuff
services = []*api.Service{services[0]} oneService := services[makeNSN("somewhere-else", "cluster-ip")]
services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} oneService.Spec.Ports = []api.ServicePort{oneService.Spec.Ports[1]}
services = makeServiceMap(oneService)
serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap) serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap)
if len(serviceMap) != 1 { if len(serviceMap) != 1 {
t.Errorf("expected service map length 1, got %v", serviceMap) t.Errorf("expected service map length 1, got %v", serviceMap)
@ -1082,13 +1083,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
} }
func TestBuildServiceMapServiceHeadless(t *testing.T) { func TestBuildServiceMapServiceHeadless(t *testing.T) {
services := []*api.Service{ services := makeServiceMap(
makeTestService("somewhere-else", "headless", func(svc *api.Service) { makeTestService("somewhere-else", "headless", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = api.ClusterIPNone svc.Spec.ClusterIP = api.ClusterIPNone
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "rpc", "UDP", 1234, 0, 0)
}), }),
} )
// Headless service should be ignored // Headless service should be ignored
serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
@ -1107,14 +1108,14 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
} }
func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
services := []*api.Service{ services := makeServiceMap(
makeTestService("somewhere-else", "external-name", func(svc *api.Service) { makeTestService("somewhere-else", "external-name", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeExternalName svc.Spec.Type = api.ServiceTypeExternalName
svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored svc.Spec.ClusterIP = "172.16.55.4" // Should be ignored
svc.Spec.ExternalName = "foo2.bar.com" svc.Spec.ExternalName = "foo2.bar.com"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "blah", "UDP", 1235, 5321, 0)
}), }),
} )
serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap))
if len(serviceMap) != 0 { if len(serviceMap) != 0 {
@ -1130,16 +1131,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
} }
func TestBuildServiceMapServiceUpdate(t *testing.T) { func TestBuildServiceMapServiceUpdate(t *testing.T) {
first := []*api.Service{ first := makeServiceMap(
makeTestService("somewhere", "some-service", func(svc *api.Service) { makeTestService("somewhere", "some-service", func(svc *api.Service) {
svc.Spec.Type = api.ServiceTypeClusterIP svc.Spec.Type = api.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.16.55.4" svc.Spec.ClusterIP = "172.16.55.4"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "something", "UDP", 1234, 4321, 0)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0) svc.Spec.Ports = addTestPort(svc.Spec.Ports, "somethingelse", "TCP", 1235, 5321, 0)
}), }),
} )
second := []*api.Service{ second := makeServiceMap(
makeTestService("somewhere", "some-service", func(svc *api.Service) { makeTestService("somewhere", "some-service", func(svc *api.Service) {
svc.ObjectMeta.Annotations = map[string]string{ svc.ObjectMeta.Annotations = map[string]string{
service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal, service.BetaAnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal,
@ -1156,7 +1157,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
}, },
} }
}), }),
} )
serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap))
if len(serviceMap) != 2 { if len(serviceMap) != 2 {
@ -1426,6 +1427,15 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName {
} }
} }
func makeServiceMap(allServices ...*api.Service) serviceMap {
result := make(serviceMap)
for _, service := range allServices {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
result[namespacedName] = service
}
return result
}
func Test_buildNewEndpointsMap(t *testing.T) { func Test_buildNewEndpointsMap(t *testing.T) {
var nodeName = "host" var nodeName = "host"