diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 6631a84c98b..efea3697fe9 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -504,8 +504,6 @@ type ProxyServer struct { OOMScoreAdj *int32 ResourceContainer string ConfigSyncPeriod time.Duration - ServiceEventHandler config.ServiceHandler - EndpointsEventHandler config.EndpointsHandler HealthzServer *healthcheck.HealthzServer } @@ -660,11 +658,11 @@ func (s *ProxyServer) Run() error { // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) - serviceConfig.RegisterEventHandler(s.ServiceEventHandler) + serviceConfig.RegisterEventHandler(s.Proxier) go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) - endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) + endpointsConfig.RegisterEventHandler(s.Proxier) go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 84d5ea0d07d..0f0832c9bc4 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/ipvs" @@ -135,8 +134,6 @@ func newProxyServer( } var proxier proxy.ProxyProvider - var serviceEventHandler proxyconfig.ServiceHandler - var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{}) nodeIP := net.ParseIP(config.BindAddress) @@ -151,7 +148,7 @@ func newProxyServer( } // TODO this has side effects that should only happen when Run() is invoked. - proxierIPTables, err := iptables.NewProxier( + proxier, err = iptables.NewProxier( iptInterface, utilsysctl.New(), execer, @@ -170,12 +167,9 @@ func newProxyServer( return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() - proxier = proxierIPTables - serviceEventHandler = proxierIPTables - endpointsEventHandler = proxierIPTables } else if proxyMode == proxyModeIPVS { klog.V(0).Info("Using ipvs Proxier.") - proxierIPVS, err := ipvs.NewProxier( + proxier, err = ipvs.NewProxier( iptInterface, ipvsInterface, ipsetInterface, @@ -199,20 +193,12 @@ func newProxyServer( return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() - proxier = proxierIPVS - serviceEventHandler = proxierIPVS - endpointsEventHandler = proxierIPVS } else { klog.V(0).Info("Using userspace Proxier.") - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := userspace.NewLoadBalancerRR() - // set EndpointsConfigHandler to our loadBalancer - endpointsEventHandler = loadBalancer // TODO this has side effects that should only happen when Run() is invoked. - proxierUserspace, err := userspace.NewProxier( - loadBalancer, + proxier, err = userspace.NewProxier( + userspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), iptInterface, execer, @@ -225,8 +211,6 @@ func newProxyServer( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - serviceEventHandler = proxierUserspace - proxier = proxierUserspace } iptInterface.AddReloadFunc(proxier.Sync) @@ -250,8 +234,6 @@ func newProxyServer( OOMScoreAdj: config.OOMScoreAdj, ResourceContainer: config.ResourceContainer, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, - ServiceEventHandler: serviceEventHandler, - EndpointsEventHandler: endpointsEventHandler, HealthzServer: healthzServer, }, nil } diff --git a/cmd/kube-proxy/app/server_windows.go b/cmd/kube-proxy/app/server_windows.go index def8639adc9..56a4a3b61f8 100644 --- a/cmd/kube-proxy/app/server_windows.go +++ b/cmd/kube-proxy/app/server_windows.go @@ -34,7 +34,6 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/proxy" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/winkernel" "k8s.io/kubernetes/pkg/proxy/winuserspace" @@ -95,13 +94,11 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi } var proxier proxy.ProxyProvider - var serviceEventHandler proxyconfig.ServiceHandler - var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), winkernel.WindowsKernelCompatTester{}) if proxyMode == proxyModeKernelspace { klog.V(0).Info("Using Kernelspace Proxier.") - proxierKernelspace, err := winkernel.NewProxier( + proxier, err = winkernel.NewProxier( config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, @@ -116,23 +113,14 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierKernelspace - endpointsEventHandler = proxierKernelspace - serviceEventHandler = proxierKernelspace } else { klog.V(0).Info("Using userspace Proxier.") execer := exec.New() var netshInterface utilnetsh.Interface netshInterface = utilnetsh.New(execer) - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsConfigHandler. - loadBalancer := winuserspace.NewLoadBalancerRR() - - // set EndpointsConfigHandler to our loadBalancer - endpointsEventHandler = loadBalancer - proxierUserspace, err := winuserspace.NewProxier( - loadBalancer, + proxier, err = winuserspace.NewProxier( + winuserspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), netshInterface, *utilnet.ParsePortRangeOrDie(config.PortRange), @@ -143,26 +131,22 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierUserspace - serviceEventHandler = proxierUserspace } return &ProxyServer{ - Client: client, - EventClient: eventClient, - Proxier: proxier, - Broadcaster: eventBroadcaster, - Recorder: recorder, - ProxyMode: proxyMode, - NodeRef: nodeRef, - MetricsBindAddress: config.MetricsBindAddress, - EnableProfiling: config.EnableProfiling, - OOMScoreAdj: config.OOMScoreAdj, - ResourceContainer: config.ResourceContainer, - ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, - ServiceEventHandler: serviceEventHandler, - EndpointsEventHandler: endpointsEventHandler, - HealthzServer: healthzServer, + Client: client, + EventClient: eventClient, + Proxier: proxier, + Broadcaster: eventBroadcaster, + Recorder: recorder, + ProxyMode: proxyMode, + NodeRef: nodeRef, + MetricsBindAddress: config.MetricsBindAddress, + EnableProfiling: config.EnableProfiling, + OOMScoreAdj: config.OOMScoreAdj, + ResourceContainer: config.ResourceContainer, + ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, + HealthzServer: healthzServer, }, nil } diff --git a/pkg/kubemark/BUILD b/pkg/kubemark/BUILD index ced440d7bc6..1b689f13157 100644 --- a/pkg/kubemark/BUILD +++ b/pkg/kubemark/BUILD @@ -26,7 +26,6 @@ go_library( "//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/proxy:go_default_library", - "//pkg/proxy/config:go_default_library", "//pkg/proxy/iptables:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/mount:go_default_library", diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 5e4a7ec8897..600a97b295b 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -27,7 +27,6 @@ import ( "k8s.io/client-go/tools/record" proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app" "k8s.io/kubernetes/pkg/proxy" - proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "k8s.io/kubernetes/pkg/proxy/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -72,13 +71,12 @@ func NewHollowProxyOrDie( ) (*HollowProxy, error) { // Create proxier and service/endpoint handlers. var proxier proxy.ProxyProvider - var serviceHandler proxyconfig.ServiceHandler - var endpointsHandler proxyconfig.EndpointsHandler + var err error if useRealProxier { // Real proxier with fake iptables, sysctl, etc underneath it. //var err error - proxierIPTables, err := iptables.NewProxier( + proxier, err = iptables.NewProxier( iptInterface, sysctl, execer, @@ -96,13 +94,8 @@ func NewHollowProxyOrDie( if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } - proxier = proxierIPTables - serviceHandler = proxierIPTables - endpointsHandler = proxierIPTables } else { proxier = &FakeProxier{} - serviceHandler = &FakeProxier{} - endpointsHandler = &FakeProxier{} } // Create a Hollow Proxy instance. @@ -114,19 +107,17 @@ func NewHollowProxyOrDie( } return &HollowProxy{ ProxyServer: &proxyapp.ProxyServer{ - Client: client, - EventClient: eventClient, - IptInterface: iptInterface, - Proxier: proxier, - Broadcaster: broadcaster, - Recorder: recorder, - ProxyMode: "fake", - NodeRef: nodeRef, - OOMScoreAdj: utilpointer.Int32Ptr(0), - ResourceContainer: "", - ConfigSyncPeriod: 30 * time.Second, - ServiceEventHandler: serviceHandler, - EndpointsEventHandler: endpointsHandler, + Client: client, + EventClient: eventClient, + IptInterface: iptInterface, + Proxier: proxier, + Broadcaster: broadcaster, + Recorder: recorder, + ProxyMode: "fake", + NodeRef: nodeRef, + OOMScoreAdj: utilpointer.Int32Ptr(0), + ResourceContainer: "", + ConfigSyncPeriod: 30 * time.Second, }, }, nil } diff --git a/pkg/proxy/BUILD b/pkg/proxy/BUILD index a7969c25c86..b43afc322c3 100644 --- a/pkg/proxy/BUILD +++ b/pkg/proxy/BUILD @@ -17,6 +17,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/proxy", deps = [ "//pkg/api/v1/service:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index b7d8b1654a9..410603db195 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -21,10 +21,14 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/proxy/config" ) // ProxyProvider is the interface provided by proxier implementations. type ProxyProvider interface { + config.EndpointsHandler + config.ServiceHandler + // Sync immediately synchronizes the ProxyProvider's current state to proxy rules. Sync() // SyncLoop runs periodic work. diff --git a/pkg/proxy/userspace/BUILD b/pkg/proxy/userspace/BUILD index b7887c85b5d..87e3da69e96 100644 --- a/pkg/proxy/userspace/BUILD +++ b/pkg/proxy/userspace/BUILD @@ -21,7 +21,9 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/util/async:go_default_library", "//pkg/util/conntrack:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/slice:go_default_library", @@ -85,6 +87,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library", ], diff --git a/pkg/proxy/userspace/loadbalancer.go b/pkg/proxy/userspace/loadbalancer.go index f0e5bcaa694..0b853477851 100644 --- a/pkg/proxy/userspace/loadbalancer.go +++ b/pkg/proxy/userspace/loadbalancer.go @@ -19,6 +19,7 @@ package userspace import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "net" ) @@ -31,4 +32,6 @@ type LoadBalancer interface { DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) ServiceHasEndpoints(service proxy.ServicePortName) bool + + proxyconfig.EndpointsHandler } diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 661092b1b2b..ae55842b302 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -19,6 +19,7 @@ package userspace import ( "fmt" "net" + "reflect" "strconv" "strings" "sync" @@ -35,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/proxy" utilproxy "k8s.io/kubernetes/pkg/proxy/util" + "k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/conntrack" "k8s.io/kubernetes/pkg/util/iptables" utilexec "k8s.io/utils/exec" @@ -91,6 +93,19 @@ func logTimeout(err error) bool { // ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port type ProxySocketFunc func(protocol v1.Protocol, ip net.IP, port int) (ProxySocket, error) +const numBurstSyncs int = 2 + +type serviceChange struct { + current *v1.Service + previous *v1.Service +} + +// Interface for async runner; abstracted for testing +type asyncRunnerInterface interface { + Run() + Loop(<-chan struct{}) +} + // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { @@ -98,7 +113,7 @@ type Proxier struct { mu sync.Mutex // protects serviceMap serviceMap map[proxy.ServicePortName]*ServiceInfo syncPeriod time.Duration - minSyncPeriod time.Duration // unused atm, but plumbed through + minSyncPeriod time.Duration udpIdleTimeout time.Duration portMapMutex sync.Mutex portMap map[portMapKey]*portMapValue @@ -109,6 +124,18 @@ type Proxier struct { proxyPorts PortAllocator makeProxySocket ProxySocketFunc exec utilexec.Interface + // endpointsSynced and servicesSynced are set to 1 when the corresponding + // objects are synced after startup. This is used to avoid updating iptables + // with some partial data after kube-proxy restart. + endpointsSynced int32 + servicesSynced int32 + initialized int32 + // protects serviceChanges + serviceChangesLock sync.Mutex + serviceChanges map[types.NamespacedName]*serviceChange // map of service changes + syncRunner asyncRunnerInterface // governs calls to syncProxyRules + + stopChan chan struct{} } // assert Proxier is a ProxyProvider @@ -202,12 +229,12 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables if err := iptablesFlush(iptables); err != nil { return nil, fmt.Errorf("failed to flush iptables: %v", err) } - return &Proxier{ - loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), - portMap: make(map[portMapKey]*portMapValue), - syncPeriod: syncPeriod, - // plumbed through if needed, not used atm. + proxier := &Proxier{ + loadBalancer: loadBalancer, + serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), + serviceChanges: make(map[types.NamespacedName]*serviceChange), + portMap: make(map[portMapKey]*portMapValue), + syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, udpIdleTimeout: udpIdleTimeout, listenIP: listenIP, @@ -216,7 +243,11 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables proxyPorts: proxyPorts, makeProxySocket: makeProxySocket, exec: exec, - }, nil + stopChan: make(chan struct{}), + } + klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, numBurstSyncs) + proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs) + return proxier, nil } // CleanupLeftovers removes all iptables rules and chains created by the Proxier @@ -287,30 +318,69 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) { return encounteredError } -// Sync is called to immediately synchronize the proxier state to iptables +// shutdown closes all service port proxies and returns from the proxy's +// sync loop. Used from testcases. +func (proxier *Proxier) shutdown() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + + for serviceName, info := range proxier.serviceMap { + proxier.stopProxy(serviceName, info) + } + proxier.cleanupStaleStickySessions() + close(proxier.stopChan) +} + +func (proxier *Proxier) isInitialized() bool { + return atomic.LoadInt32(&proxier.initialized) > 0 +} + +// Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { + proxier.syncRunner.Run() +} + +func (proxier *Proxier) syncProxyRules() { + start := time.Now() + defer func() { + klog.V(2).Infof("userspace syncProxyRules took %v", time.Since(start)) + }() + + // don't sync rules till we've received services and endpoints + if !proxier.isInitialized() { + klog.V(2).Info("Not syncing userspace proxy until Services and Endpoints have been received from master") + return + } + if err := iptablesInit(proxier.iptables); err != nil { klog.Errorf("Failed to ensure iptables: %v", err) } + + proxier.serviceChangesLock.Lock() + changes := proxier.serviceChanges + proxier.serviceChanges = make(map[types.NamespacedName]*serviceChange) + proxier.serviceChangesLock.Unlock() + + proxier.mu.Lock() + defer proxier.mu.Unlock() + + klog.V(2).Infof("userspace proxy: processing %d service events", len(changes)) + for _, change := range changes { + existingPorts := proxier.mergeService(change.current) + proxier.unmergeService(change.previous, existingPorts) + } + proxier.ensurePortals() proxier.cleanupStaleStickySessions() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { - t := time.NewTicker(proxier.syncPeriod) - defer t.Stop() - for { - <-t.C - klog.V(6).Infof("Periodic sync") - proxier.Sync() - } + proxier.syncRunner.Loop(proxier.stopChan) } // Ensure that portals exist for all services. func (proxier *Proxier) ensurePortals() { - proxier.mu.Lock() - defer proxier.mu.Unlock() // NB: This does not remove rules that should not be present. for name, info := range proxier.serviceMap { err := proxier.openPortal(name, info) @@ -322,22 +392,12 @@ func (proxier *Proxier) ensurePortals() { // clean up any stale sticky session records in the hash map. func (proxier *Proxier) cleanupStaleStickySessions() { - proxier.mu.Lock() - defer proxier.mu.Unlock() for name := range proxier.serviceMap { proxier.loadBalancer.CleanupStaleStickySessions(name) } } -// This assumes proxier.mu is not locked. func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error { - proxier.mu.Lock() - defer proxier.mu.Unlock() - return proxier.stopProxyInternal(service, info) -} - -// This assumes proxier.mu is locked. -func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *ServiceInfo) error { delete(proxier.serviceMap, service) info.setAlive(false) err := info.socket.Close() @@ -353,16 +413,18 @@ func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceI return info, ok } -func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *ServiceInfo) { +// addServiceOnPort lockes the proxy before calling addServiceOnPortInternal. +// Used from testcases. +func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.serviceMap[service] = info + return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout) } -// addServiceOnPort starts listening for a new service, returning the ServiceInfo. +// addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo. // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { +func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err @@ -386,7 +448,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol socket: sock, sessionAffinityType: v1.ServiceAffinityNone, // default } - proxier.setServiceInfo(service, si) + proxier.serviceMap[service] = si klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) go func(service proxy.ServicePortName, proxier *Proxier) { @@ -399,12 +461,22 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol return si, nil } +func (proxier *Proxier) cleanupPortalAndProxy(serviceName proxy.ServicePortName, info *ServiceInfo) error { + if err := proxier.closePortal(serviceName, info); err != nil { + return fmt.Errorf("Failed to close portal for %q: %v", serviceName, err) + } + if err := proxier.stopProxy(serviceName, info); err != nil { + return fmt.Errorf("Failed to stop service %q: %v", serviceName, err) + } + return nil +} + func (proxier *Proxier) mergeService(service *v1.Service) sets.String { if service == nil { return nil } svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if !helper.IsServiceIPSet(service) { + if utilproxy.ShouldSkipService(svcName, service) { klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) return nil } @@ -413,7 +485,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { servicePort := &service.Spec.Ports[i] serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} existingPorts.Insert(servicePort.Name) - info, exists := proxier.getServiceInfo(serviceName) + info, exists := proxier.serviceMap[serviceName] // TODO: check health of the socket? What if ProxyLoop exited? if exists && sameConfig(info, service, servicePort) { // Nothing changed. @@ -421,11 +493,8 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { } if exists { klog.V(4).Infof("Something changed for service %q: stopping it", serviceName) - if err := proxier.closePortal(serviceName, info); err != nil { - klog.Errorf("Failed to close portal for %q: %v", serviceName, err) - } - if err := proxier.stopProxy(serviceName, info); err != nil { - klog.Errorf("Failed to stop service %q: %v", serviceName, err) + if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil { + klog.Error(err) } } proxyPort, err := proxier.proxyPorts.AllocateNext() @@ -436,7 +505,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String { serviceIP := net.ParseIP(service.Spec.ClusterIP) klog.V(1).Infof("Adding new service %q at %s/%s", serviceName, net.JoinHostPort(serviceIP.String(), strconv.Itoa(int(servicePort.Port))), servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) + info, err = proxier.addServiceOnPortInternal(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) if err != nil { klog.Errorf("Failed to start proxy for %q: %v", serviceName, err) continue @@ -469,14 +538,11 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S return } svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if !helper.IsServiceIPSet(service) { + if utilproxy.ShouldSkipService(svcName, service) { klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) return } - staleUDPServices := sets.NewString() - proxier.mu.Lock() - defer proxier.mu.Unlock() for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] if existingPorts.Has(servicePort.Name) { @@ -495,11 +561,8 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String()) } - if err := proxier.closePortal(serviceName, info); err != nil { - klog.Errorf("Failed to close portal for %q: %v", serviceName, err) - } - if err := proxier.stopProxyInternal(serviceName, info); err != nil { - klog.Errorf("Failed to stop service %q: %v", serviceName, err) + if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil { + klog.Error(err) } proxier.loadBalancer.DeleteService(serviceName) } @@ -510,20 +573,95 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S } } +func (proxier *Proxier) serviceChange(previous, current *v1.Service, detail string) { + var svcName types.NamespacedName + if current != nil { + svcName = types.NamespacedName{Namespace: current.Namespace, Name: current.Name} + } else { + svcName = types.NamespacedName{Namespace: previous.Namespace, Name: previous.Name} + } + klog.V(4).Infof("userspace proxy: %s for %s", detail, svcName) + + proxier.serviceChangesLock.Lock() + defer proxier.serviceChangesLock.Unlock() + + change, exists := proxier.serviceChanges[svcName] + if !exists { + // change.previous is only set for new changes. We must keep + // the oldest service info (or nil) because correct unmerging + // depends on the next update/del after a merge, not subsequent + // updates. + change = &serviceChange{previous: previous} + proxier.serviceChanges[svcName] = change + } + + // Always use the most current service (or nil) as change.current + change.current = current + + if reflect.DeepEqual(change.previous, change.current) { + // collapsed change had no effect + delete(proxier.serviceChanges, svcName) + } else if proxier.isInitialized() { + // change will have an effect, ask the proxy to sync + proxier.syncRunner.Run() + } +} + func (proxier *Proxier) OnServiceAdd(service *v1.Service) { - _ = proxier.mergeService(service) + proxier.serviceChange(nil, service, "OnServiceAdd") } func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { - existingPorts := proxier.mergeService(service) - proxier.unmergeService(oldService, existingPorts) + proxier.serviceChange(oldService, service, "OnServiceUpdate") } func (proxier *Proxier) OnServiceDelete(service *v1.Service) { - proxier.unmergeService(service, sets.NewString()) + proxier.serviceChange(service, nil, "OnServiceDelete") } func (proxier *Proxier) OnServiceSynced() { + klog.V(2).Infof("userspace OnServiceSynced") + + // Mark services as initialized and (if endpoints are already + // initialized) the entire proxy as initialized + atomic.StoreInt32(&proxier.servicesSynced, 1) + if atomic.LoadInt32(&proxier.endpointsSynced) > 0 { + atomic.StoreInt32(&proxier.initialized, 1) + } + + // Must sync from a goroutine to avoid blocking the + // service event handler on startup with large numbers + // of initial objects + go proxier.syncProxyRules() +} + +func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsAdd(endpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsDelete(endpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + klog.V(2).Infof("userspace OnEndpointsSynced") + proxier.loadBalancer.OnEndpointsSynced() + + // Mark endpoints as initialized and (if services are already + // initialized) the entire proxy as initialized + atomic.StoreInt32(&proxier.endpointsSynced, 1) + if atomic.LoadInt32(&proxier.servicesSynced) > 0 { + atomic.StoreInt32(&proxier.initialized, 1) + } + + // Must sync from a goroutine to avoid blocking the + // service event handler on startup with large numbers + // of initial objects + go proxier.syncProxyRules() } func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index cee7eec411b..e76400d114a 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -24,6 +24,7 @@ import ( "net/http/httptest" "net/url" "os" + "reflect" "strconv" "sync/atomic" "testing" @@ -33,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/proxy" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" "k8s.io/utils/exec" @@ -86,6 +88,16 @@ func waitForClosedPortUDP(p *Proxier, proxyPort int) error { return fmt.Errorf("port %d still open", proxyPort) } +func waitForServiceInfo(p *Proxier, service proxy.ServicePortName) (*ServiceInfo, bool) { + var svcInfo *ServiceInfo + var exists bool + wait.PollImmediate(50*time.Millisecond, 3*time.Second, func() (bool, error) { + svcInfo, exists = p.getServiceInfo(service) + return exists, nil + }) + return svcInfo, exists +} + // udpEchoServer is a simple echo server in UDP, intended for testing the proxy. type udpEchoServer struct { net.PacketConn @@ -225,6 +237,15 @@ func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time t.Errorf("expected %d ProxyClients live, got %d", want, got) } +func startProxier(p *Proxier, t *testing.T) { + go func() { + p.SyncLoop() + }() + waitForNumProxyLoops(t, p, 0) + p.OnServiceSynced() + p.OnEndpointsSynced() +} + func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} @@ -242,7 +263,8 @@ func TestTCPProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -269,7 +291,8 @@ func TestUDPProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -296,7 +319,8 @@ func TestUDPProxyTimeout(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -335,7 +359,8 @@ func TestMultiPortProxy(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) if err != nil { @@ -364,7 +389,8 @@ func TestMultiPortOnServiceAdd(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() p.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, @@ -379,7 +405,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { }}}, }) waitForNumProxyLoops(t, p, 2) - svcInfo, exists := p.getServiceInfo(serviceP) + svcInfo, exists := waitForServiceInfo(p, serviceP) if !exists { t.Fatalf("can't find serviceInfo for %s", serviceP) } @@ -387,7 +413,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo) } - svcInfo, exists = p.getServiceInfo(serviceQ) + svcInfo, exists = waitForServiceInfo(p, serviceQ) if !exists { t.Fatalf("can't find serviceInfo for %s", serviceQ) } @@ -403,7 +429,9 @@ func TestMultiPortOnServiceAdd(t *testing.T) { // Helper: Stops the proxy for the named service. func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { - info, found := proxier.getServiceInfo(service) + proxier.mu.Lock() + defer proxier.mu.Unlock() + info, found := proxier.serviceMap[service] if !found { return fmt.Errorf("unknown service: %s", service) } @@ -427,7 +455,8 @@ func TestTCPProxyStop(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -471,7 +500,8 @@ func TestUDPProxyStop(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -494,9 +524,9 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() - service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} + servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} lb.OnEndpointsAdd(&v1.Endpoints{ - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + ObjectMeta: metav1.ObjectMeta{Namespace: servicePortName.Namespace, Name: servicePortName.Name}, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, @@ -509,28 +539,22 @@ func TestTCPProxyUpdateDelete(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() - svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) - if err != nil { - t.Fatalf("error adding new service: %#v", err) - } - conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) - if err != nil { - t.Fatalf("error connecting to proxy: %v", err) - } - conn.Close() - waitForNumProxyLoops(t, p, 1) - - p.OnServiceDelete(&v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: servicePortName.Name, Namespace: servicePortName.Namespace}, Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Name: "p", - Port: int32(svcInfo.proxyPort), + Port: 9997, Protocol: "TCP", }}}, - }) - if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { + } + + p.OnServiceAdd(service) + waitForNumProxyLoops(t, p, 1) + p.OnServiceDelete(service) + if err := waitForClosedPortTCP(p, int(service.Spec.Ports[0].Port)); err != nil { t.Fatalf(err.Error()) } waitForNumProxyLoops(t, p, 0) @@ -553,7 +577,8 @@ func TestUDPProxyUpdateDelete(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -598,7 +623,8 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -634,7 +660,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { Protocol: "TCP", }}}, }) - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo for %s", service) } @@ -660,7 +686,8 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -696,7 +723,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { Protocol: "UDP", }}}, }) - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -721,7 +748,8 @@ func TestTCPProxyUpdatePort(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -742,7 +770,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -769,7 +797,8 @@ func TestUDPProxyUpdatePort(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -789,7 +818,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -814,7 +843,8 @@ func TestProxyUpdatePublicIPs(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -839,7 +869,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - svcInfo, exists := p.getServiceInfo(service) + svcInfo, exists := waitForServiceInfo(p, service) if !exists { t.Fatalf("can't find serviceInfo") } @@ -867,7 +897,8 @@ func TestProxyUpdatePortal(t *testing.T) { if err != nil { t.Fatal(err) } - waitForNumProxyLoops(t, p, 0) + startProxier(p, t) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -894,7 +925,16 @@ func TestProxyUpdatePortal(t *testing.T) { }}}, } p.OnServiceUpdate(svcv0, svcv1) - _, exists := p.getServiceInfo(service) + + // Wait for the service to be removed because it had an empty ClusterIP + var exists bool + for i := 0; i < 50; i++ { + _, exists = p.getServiceInfo(service) + if !exists { + break + } + time.Sleep(50 * time.Millisecond) + } if exists { t.Fatalf("service with empty ClusterIP should not be included in the proxy") } @@ -923,7 +963,7 @@ func TestProxyUpdatePortal(t *testing.T) { } p.OnServiceUpdate(svcv2, svcv3) lb.OnEndpointsAdd(endpoint) - svcInfo, exists = p.getServiceInfo(service) + svcInfo, exists = waitForServiceInfo(p, service) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") } @@ -931,6 +971,172 @@ func TestProxyUpdatePortal(t *testing.T) { waitForNumProxyLoops(t, p, 1) } +type fakeRunner struct{} + +// assert fakeAsyncRunner is a ProxyProvider +var _ asyncRunnerInterface = &fakeRunner{} + +func (f fakeRunner) Run() { +} + +func (f fakeRunner) Loop(stop <-chan struct{}) { +} + +func TestOnServiceAddChangeMap(t *testing.T) { + fexec := makeFakeExec() + + // Use long minSyncPeriod so we can test that immediate syncs work + p, err := createProxier(NewLoadBalancerRR(), net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Minute, udpIdleTimeoutForTest, newProxySocket) + if err != nil { + t.Fatal(err) + } + + // Fake out sync runner + p.syncRunner = fakeRunner{} + + serviceMeta := metav1.ObjectMeta{Namespace: "testnamespace", Name: "testname"} + service := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ + Name: "p", + Port: 99, + Protocol: "TCP", + }}}, + } + + serviceUpdate := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.5", Ports: []v1.ServicePort{{ + Name: "p", + Port: 100, + Protocol: "TCP", + }}}, + } + + serviceUpdate2 := &v1.Service{ + ObjectMeta: serviceMeta, + Spec: v1.ServiceSpec{ClusterIP: "1.2.3.6", Ports: []v1.ServicePort{{ + Name: "p", + Port: 101, + Protocol: "TCP", + }}}, + } + + type onServiceTest struct { + detail string + changes []serviceChange + expectedChange *serviceChange + } + + tests := []onServiceTest{ + { + detail: "add", + changes: []serviceChange{ + {current: service}, + }, + expectedChange: &serviceChange{ + current: service, + }, + }, + { + detail: "add+update=add", + changes: []serviceChange{ + {current: service}, + { + previous: service, + current: serviceUpdate, + }, + }, + expectedChange: &serviceChange{ + current: serviceUpdate, + }, + }, + { + detail: "add+del=none", + changes: []serviceChange{ + {current: service}, + {previous: service}, + }, + }, + { + detail: "update+update=update", + changes: []serviceChange{ + { + previous: service, + current: serviceUpdate, + }, + { + previous: serviceUpdate, + current: serviceUpdate2, + }, + }, + expectedChange: &serviceChange{ + previous: service, + current: serviceUpdate2, + }, + }, + { + detail: "update+del=del", + changes: []serviceChange{ + { + previous: service, + current: serviceUpdate, + }, + {previous: serviceUpdate}, + }, + // change collapsing always keeps the oldest service + // info since correct unmerging depends on the least + // recent update, not the most current. + expectedChange: &serviceChange{ + previous: service, + }, + }, + { + detail: "del+add=update", + changes: []serviceChange{ + {previous: service}, + {current: serviceUpdate}, + }, + expectedChange: &serviceChange{ + previous: service, + current: serviceUpdate, + }, + }, + } + + for _, test := range tests { + for _, change := range test.changes { + p.serviceChange(change.previous, change.current, test.detail) + } + + if test.expectedChange != nil { + if len(p.serviceChanges) != 1 { + t.Fatalf("[%s] expected 1 service change but found %d", test.detail, len(p.serviceChanges)) + } + expectedService := test.expectedChange.current + if expectedService == nil { + expectedService = test.expectedChange.previous + } + svcName := types.NamespacedName{Namespace: expectedService.Namespace, Name: expectedService.Name} + + change, ok := p.serviceChanges[svcName] + if !ok { + t.Fatalf("[%s] did not find service change for %v", test.detail, svcName) + } + if !reflect.DeepEqual(change.previous, test.expectedChange.previous) { + t.Fatalf("[%s] change previous service and expected previous service don't match\nchange: %+v\nexp: %+v", test.detail, change.previous, test.expectedChange.previous) + } + if !reflect.DeepEqual(change.current, test.expectedChange.current) { + t.Fatalf("[%s] change current service and expected current service don't match\nchange: %+v\nexp: %+v", test.detail, change.current, test.expectedChange.current) + } + } else { + if len(p.serviceChanges) != 0 { + t.Fatalf("[%s] expected no service changes but found %d", test.detail, len(p.serviceChanges)) + } + } + } +} + func makeFakeExec() *fakeexec.FakeExec { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ diff --git a/pkg/proxy/winuserspace/BUILD b/pkg/proxy/winuserspace/BUILD index 3925dc96d72..8bad75c386b 100644 --- a/pkg/proxy/winuserspace/BUILD +++ b/pkg/proxy/winuserspace/BUILD @@ -19,6 +19,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/proxy:go_default_library", + "//pkg/proxy/config:go_default_library", "//pkg/util/ipconfig:go_default_library", "//pkg/util/netsh:go_default_library", "//pkg/util/slice:go_default_library", diff --git a/pkg/proxy/winuserspace/loadbalancer.go b/pkg/proxy/winuserspace/loadbalancer.go index e7cb770e930..d96a2951f48 100644 --- a/pkg/proxy/winuserspace/loadbalancer.go +++ b/pkg/proxy/winuserspace/loadbalancer.go @@ -19,6 +19,7 @@ package winuserspace import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/proxy" + proxyconfig "k8s.io/kubernetes/pkg/proxy/config" "net" ) @@ -30,4 +31,6 @@ type LoadBalancer interface { NewService(service proxy.ServicePortName, sessionAffinityType v1.ServiceAffinity, stickyMaxAgeMinutes int) error DeleteService(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName) + + proxyconfig.EndpointsHandler } diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 4b5a218cfc0..91cbd6337c8 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -444,6 +444,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceSynced() { } +func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsAdd(endpoints) +} + +func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints) +} + +func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { + proxier.loadBalancer.OnEndpointsDelete(endpoints) +} + +func (proxier *Proxier) OnEndpointsSynced() { + proxier.loadBalancer.OnEndpointsSynced() +} + func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool { return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity }