Merge pull request #71735 from dcbw/userspace-proxy-ratelimiting

proxy/userspace: respect minSyncInterval
This commit is contained in:
Kubernetes Prow Robot 2019-04-16 19:14:03 -07:00 committed by GitHub
commit 2490e035d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 507 additions and 178 deletions

View File

@ -504,8 +504,6 @@ type ProxyServer struct {
OOMScoreAdj *int32 OOMScoreAdj *int32
ResourceContainer string ResourceContainer string
ConfigSyncPeriod time.Duration ConfigSyncPeriod time.Duration
ServiceEventHandler config.ServiceHandler
EndpointsEventHandler config.EndpointsHandler
HealthzServer *healthcheck.HealthzServer HealthzServer *healthcheck.HealthzServer
} }
@ -660,11 +658,11 @@ func (s *ProxyServer) Run() error {
// only notify on changes, and the initial update (on process start) may be lost if no handlers // only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet. // are registered yet.
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
serviceConfig.RegisterEventHandler(s.ServiceEventHandler) serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop) go serviceConfig.Run(wait.NeverStop)
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) endpointsConfig.RegisterEventHandler(s.Proxier)
go endpointsConfig.Run(wait.NeverStop) go endpointsConfig.Run(wait.NeverStop)
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those

View File

@ -33,7 +33,6 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/ipvs" "k8s.io/kubernetes/pkg/proxy/ipvs"
@ -135,8 +134,6 @@ func newProxyServer(
} }
var proxier proxy.ProxyProvider var proxier proxy.ProxyProvider
var serviceEventHandler proxyconfig.ServiceHandler
var endpointsEventHandler proxyconfig.EndpointsHandler
proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{}) proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
nodeIP := net.ParseIP(config.BindAddress) nodeIP := net.ParseIP(config.BindAddress)
@ -151,7 +148,7 @@ func newProxyServer(
} }
// TODO this has side effects that should only happen when Run() is invoked. // TODO this has side effects that should only happen when Run() is invoked.
proxierIPTables, err := iptables.NewProxier( proxier, err = iptables.NewProxier(
iptInterface, iptInterface,
utilsysctl.New(), utilsysctl.New(),
execer, execer,
@ -170,12 +167,9 @@ func newProxyServer(
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
metrics.RegisterMetrics() metrics.RegisterMetrics()
proxier = proxierIPTables
serviceEventHandler = proxierIPTables
endpointsEventHandler = proxierIPTables
} else if proxyMode == proxyModeIPVS { } else if proxyMode == proxyModeIPVS {
klog.V(0).Info("Using ipvs Proxier.") klog.V(0).Info("Using ipvs Proxier.")
proxierIPVS, err := ipvs.NewProxier( proxier, err = ipvs.NewProxier(
iptInterface, iptInterface,
ipvsInterface, ipvsInterface,
ipsetInterface, ipsetInterface,
@ -199,20 +193,12 @@ func newProxyServer(
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
metrics.RegisterMetrics() metrics.RegisterMetrics()
proxier = proxierIPVS
serviceEventHandler = proxierIPVS
endpointsEventHandler = proxierIPVS
} else { } else {
klog.V(0).Info("Using userspace Proxier.") klog.V(0).Info("Using userspace Proxier.")
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsConfigHandler.
loadBalancer := userspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
endpointsEventHandler = loadBalancer
// TODO this has side effects that should only happen when Run() is invoked. // TODO this has side effects that should only happen when Run() is invoked.
proxierUserspace, err := userspace.NewProxier( proxier, err = userspace.NewProxier(
loadBalancer, userspace.NewLoadBalancerRR(),
net.ParseIP(config.BindAddress), net.ParseIP(config.BindAddress),
iptInterface, iptInterface,
execer, execer,
@ -225,8 +211,6 @@ func newProxyServer(
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
serviceEventHandler = proxierUserspace
proxier = proxierUserspace
} }
iptInterface.AddReloadFunc(proxier.Sync) iptInterface.AddReloadFunc(proxier.Sync)
@ -250,8 +234,6 @@ func newProxyServer(
OOMScoreAdj: config.OOMScoreAdj, OOMScoreAdj: config.OOMScoreAdj,
ResourceContainer: config.ResourceContainer, ResourceContainer: config.ResourceContainer,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
ServiceEventHandler: serviceEventHandler,
EndpointsEventHandler: endpointsEventHandler,
HealthzServer: healthzServer, HealthzServer: healthzServer,
}, nil }, nil
} }

View File

@ -34,7 +34,6 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/winkernel" "k8s.io/kubernetes/pkg/proxy/winkernel"
"k8s.io/kubernetes/pkg/proxy/winuserspace" "k8s.io/kubernetes/pkg/proxy/winuserspace"
@ -95,13 +94,11 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
} }
var proxier proxy.ProxyProvider var proxier proxy.ProxyProvider
var serviceEventHandler proxyconfig.ServiceHandler
var endpointsEventHandler proxyconfig.EndpointsHandler
proxyMode := getProxyMode(string(config.Mode), winkernel.WindowsKernelCompatTester{}) proxyMode := getProxyMode(string(config.Mode), winkernel.WindowsKernelCompatTester{})
if proxyMode == proxyModeKernelspace { if proxyMode == proxyModeKernelspace {
klog.V(0).Info("Using Kernelspace Proxier.") klog.V(0).Info("Using Kernelspace Proxier.")
proxierKernelspace, err := winkernel.NewProxier( proxier, err = winkernel.NewProxier(
config.IPTables.SyncPeriod.Duration, config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll, config.IPTables.MasqueradeAll,
@ -116,23 +113,14 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
proxier = proxierKernelspace
endpointsEventHandler = proxierKernelspace
serviceEventHandler = proxierKernelspace
} else { } else {
klog.V(0).Info("Using userspace Proxier.") klog.V(0).Info("Using userspace Proxier.")
execer := exec.New() execer := exec.New()
var netshInterface utilnetsh.Interface var netshInterface utilnetsh.Interface
netshInterface = utilnetsh.New(execer) netshInterface = utilnetsh.New(execer)
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for proxier, err = winuserspace.NewProxier(
// our config.EndpointsConfigHandler. winuserspace.NewLoadBalancerRR(),
loadBalancer := winuserspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
endpointsEventHandler = loadBalancer
proxierUserspace, err := winuserspace.NewProxier(
loadBalancer,
net.ParseIP(config.BindAddress), net.ParseIP(config.BindAddress),
netshInterface, netshInterface,
*utilnet.ParsePortRangeOrDie(config.PortRange), *utilnet.ParsePortRangeOrDie(config.PortRange),
@ -143,26 +131,22 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
proxier = proxierUserspace
serviceEventHandler = proxierUserspace
} }
return &ProxyServer{ return &ProxyServer{
Client: client, Client: client,
EventClient: eventClient, EventClient: eventClient,
Proxier: proxier, Proxier: proxier,
Broadcaster: eventBroadcaster, Broadcaster: eventBroadcaster,
Recorder: recorder, Recorder: recorder,
ProxyMode: proxyMode, ProxyMode: proxyMode,
NodeRef: nodeRef, NodeRef: nodeRef,
MetricsBindAddress: config.MetricsBindAddress, MetricsBindAddress: config.MetricsBindAddress,
EnableProfiling: config.EnableProfiling, EnableProfiling: config.EnableProfiling,
OOMScoreAdj: config.OOMScoreAdj, OOMScoreAdj: config.OOMScoreAdj,
ResourceContainer: config.ResourceContainer, ResourceContainer: config.ResourceContainer,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
ServiceEventHandler: serviceEventHandler, HealthzServer: healthzServer,
EndpointsEventHandler: endpointsEventHandler,
HealthzServer: healthzServer,
}, nil }, nil
} }

View File

@ -26,7 +26,6 @@ go_library(
"//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/types:go_default_library", "//pkg/kubelet/types:go_default_library",
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/iptables:go_default_library", "//pkg/proxy/iptables:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/mount:go_default_library", "//pkg/util/mount:go_default_library",

View File

@ -27,7 +27,6 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app" proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/iptables" "k8s.io/kubernetes/pkg/proxy/iptables"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnode "k8s.io/kubernetes/pkg/util/node" utilnode "k8s.io/kubernetes/pkg/util/node"
@ -72,13 +71,12 @@ func NewHollowProxyOrDie(
) (*HollowProxy, error) { ) (*HollowProxy, error) {
// Create proxier and service/endpoint handlers. // Create proxier and service/endpoint handlers.
var proxier proxy.ProxyProvider var proxier proxy.ProxyProvider
var serviceHandler proxyconfig.ServiceHandler var err error
var endpointsHandler proxyconfig.EndpointsHandler
if useRealProxier { if useRealProxier {
// Real proxier with fake iptables, sysctl, etc underneath it. // Real proxier with fake iptables, sysctl, etc underneath it.
//var err error //var err error
proxierIPTables, err := iptables.NewProxier( proxier, err = iptables.NewProxier(
iptInterface, iptInterface,
sysctl, sysctl,
execer, execer,
@ -96,13 +94,8 @@ func NewHollowProxyOrDie(
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
proxier = proxierIPTables
serviceHandler = proxierIPTables
endpointsHandler = proxierIPTables
} else { } else {
proxier = &FakeProxier{} proxier = &FakeProxier{}
serviceHandler = &FakeProxier{}
endpointsHandler = &FakeProxier{}
} }
// Create a Hollow Proxy instance. // Create a Hollow Proxy instance.
@ -114,19 +107,17 @@ func NewHollowProxyOrDie(
} }
return &HollowProxy{ return &HollowProxy{
ProxyServer: &proxyapp.ProxyServer{ ProxyServer: &proxyapp.ProxyServer{
Client: client, Client: client,
EventClient: eventClient, EventClient: eventClient,
IptInterface: iptInterface, IptInterface: iptInterface,
Proxier: proxier, Proxier: proxier,
Broadcaster: broadcaster, Broadcaster: broadcaster,
Recorder: recorder, Recorder: recorder,
ProxyMode: "fake", ProxyMode: "fake",
NodeRef: nodeRef, NodeRef: nodeRef,
OOMScoreAdj: utilpointer.Int32Ptr(0), OOMScoreAdj: utilpointer.Int32Ptr(0),
ResourceContainer: "", ResourceContainer: "",
ConfigSyncPeriod: 30 * time.Second, ConfigSyncPeriod: 30 * time.Second,
ServiceEventHandler: serviceHandler,
EndpointsEventHandler: endpointsHandler,
}, },
}, nil }, nil
} }

View File

@ -17,6 +17,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/proxy", importpath = "k8s.io/kubernetes/pkg/proxy",
deps = [ deps = [
"//pkg/api/v1/service:go_default_library", "//pkg/api/v1/service:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -21,10 +21,14 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/proxy/config"
) )
// ProxyProvider is the interface provided by proxier implementations. // ProxyProvider is the interface provided by proxier implementations.
type ProxyProvider interface { type ProxyProvider interface {
config.EndpointsHandler
config.ServiceHandler
// Sync immediately synchronizes the ProxyProvider's current state to proxy rules. // Sync immediately synchronizes the ProxyProvider's current state to proxy rules.
Sync() Sync()
// SyncLoop runs periodic work. // SyncLoop runs periodic work.

View File

@ -21,7 +21,9 @@ go_library(
deps = [ deps = [
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/util:go_default_library", "//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/conntrack:go_default_library", "//pkg/util/conntrack:go_default_library",
"//pkg/util/iptables:go_default_library", "//pkg/util/iptables:go_default_library",
"//pkg/util/slice:go_default_library", "//pkg/util/slice:go_default_library",
@ -85,6 +87,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library", "//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library", "//vendor/k8s.io/utils/exec/testing:go_default_library",
], ],

View File

@ -19,6 +19,7 @@ package userspace
import ( import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"net" "net"
) )
@ -31,4 +32,6 @@ type LoadBalancer interface {
DeleteService(service proxy.ServicePortName) DeleteService(service proxy.ServicePortName)
CleanupStaleStickySessions(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName)
ServiceHasEndpoints(service proxy.ServicePortName) bool ServiceHasEndpoints(service proxy.ServicePortName) bool
proxyconfig.EndpointsHandler
} }

View File

@ -19,6 +19,7 @@ package userspace
import ( import (
"fmt" "fmt"
"net" "net"
"reflect"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -35,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack" "k8s.io/kubernetes/pkg/util/conntrack"
"k8s.io/kubernetes/pkg/util/iptables" "k8s.io/kubernetes/pkg/util/iptables"
utilexec "k8s.io/utils/exec" utilexec "k8s.io/utils/exec"
@ -91,6 +93,19 @@ func logTimeout(err error) bool {
// ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port // ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port
type ProxySocketFunc func(protocol v1.Protocol, ip net.IP, port int) (ProxySocket, error) type ProxySocketFunc func(protocol v1.Protocol, ip net.IP, port int) (ProxySocket, error)
const numBurstSyncs int = 2
type serviceChange struct {
current *v1.Service
previous *v1.Service
}
// Interface for async runner; abstracted for testing
type asyncRunnerInterface interface {
Run()
Loop(<-chan struct{})
}
// Proxier is a simple proxy for TCP connections between a localhost:lport // Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations. // and services that provide the actual implementations.
type Proxier struct { type Proxier struct {
@ -98,7 +113,7 @@ type Proxier struct {
mu sync.Mutex // protects serviceMap mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*ServiceInfo serviceMap map[proxy.ServicePortName]*ServiceInfo
syncPeriod time.Duration syncPeriod time.Duration
minSyncPeriod time.Duration // unused atm, but plumbed through minSyncPeriod time.Duration
udpIdleTimeout time.Duration udpIdleTimeout time.Duration
portMapMutex sync.Mutex portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue portMap map[portMapKey]*portMapValue
@ -109,6 +124,18 @@ type Proxier struct {
proxyPorts PortAllocator proxyPorts PortAllocator
makeProxySocket ProxySocketFunc makeProxySocket ProxySocketFunc
exec utilexec.Interface exec utilexec.Interface
// endpointsSynced and servicesSynced are set to 1 when the corresponding
// objects are synced after startup. This is used to avoid updating iptables
// with some partial data after kube-proxy restart.
endpointsSynced int32
servicesSynced int32
initialized int32
// protects serviceChanges
serviceChangesLock sync.Mutex
serviceChanges map[types.NamespacedName]*serviceChange // map of service changes
syncRunner asyncRunnerInterface // governs calls to syncProxyRules
stopChan chan struct{}
} }
// assert Proxier is a ProxyProvider // assert Proxier is a ProxyProvider
@ -202,12 +229,12 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
if err := iptablesFlush(iptables); err != nil { if err := iptablesFlush(iptables); err != nil {
return nil, fmt.Errorf("failed to flush iptables: %v", err) return nil, fmt.Errorf("failed to flush iptables: %v", err)
} }
return &Proxier{ proxier := &Proxier{
loadBalancer: loadBalancer, loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortName]*ServiceInfo), serviceMap: make(map[proxy.ServicePortName]*ServiceInfo),
portMap: make(map[portMapKey]*portMapValue), serviceChanges: make(map[types.NamespacedName]*serviceChange),
syncPeriod: syncPeriod, portMap: make(map[portMapKey]*portMapValue),
// plumbed through if needed, not used atm. syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod, minSyncPeriod: minSyncPeriod,
udpIdleTimeout: udpIdleTimeout, udpIdleTimeout: udpIdleTimeout,
listenIP: listenIP, listenIP: listenIP,
@ -216,7 +243,11 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
proxyPorts: proxyPorts, proxyPorts: proxyPorts,
makeProxySocket: makeProxySocket, makeProxySocket: makeProxySocket,
exec: exec, exec: exec,
}, nil stopChan: make(chan struct{}),
}
klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, numBurstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs)
return proxier, nil
} }
// CleanupLeftovers removes all iptables rules and chains created by the Proxier // CleanupLeftovers removes all iptables rules and chains created by the Proxier
@ -287,30 +318,69 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) {
return encounteredError return encounteredError
} }
// Sync is called to immediately synchronize the proxier state to iptables // shutdown closes all service port proxies and returns from the proxy's
// sync loop. Used from testcases.
func (proxier *Proxier) shutdown() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
for serviceName, info := range proxier.serviceMap {
proxier.stopProxy(serviceName, info)
}
proxier.cleanupStaleStickySessions()
close(proxier.stopChan)
}
func (proxier *Proxier) isInitialized() bool {
return atomic.LoadInt32(&proxier.initialized) > 0
}
// Sync is called to synchronize the proxier state to iptables as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
proxier.syncRunner.Run()
}
func (proxier *Proxier) syncProxyRules() {
start := time.Now()
defer func() {
klog.V(2).Infof("userspace syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).Info("Not syncing userspace proxy until Services and Endpoints have been received from master")
return
}
if err := iptablesInit(proxier.iptables); err != nil { if err := iptablesInit(proxier.iptables); err != nil {
klog.Errorf("Failed to ensure iptables: %v", err) klog.Errorf("Failed to ensure iptables: %v", err)
} }
proxier.serviceChangesLock.Lock()
changes := proxier.serviceChanges
proxier.serviceChanges = make(map[types.NamespacedName]*serviceChange)
proxier.serviceChangesLock.Unlock()
proxier.mu.Lock()
defer proxier.mu.Unlock()
klog.V(2).Infof("userspace proxy: processing %d service events", len(changes))
for _, change := range changes {
existingPorts := proxier.mergeService(change.current)
proxier.unmergeService(change.previous, existingPorts)
}
proxier.ensurePortals() proxier.ensurePortals()
proxier.cleanupStaleStickySessions() proxier.cleanupStaleStickySessions()
} }
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() { func (proxier *Proxier) SyncLoop() {
t := time.NewTicker(proxier.syncPeriod) proxier.syncRunner.Loop(proxier.stopChan)
defer t.Stop()
for {
<-t.C
klog.V(6).Infof("Periodic sync")
proxier.Sync()
}
} }
// Ensure that portals exist for all services. // Ensure that portals exist for all services.
func (proxier *Proxier) ensurePortals() { func (proxier *Proxier) ensurePortals() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
// NB: This does not remove rules that should not be present. // NB: This does not remove rules that should not be present.
for name, info := range proxier.serviceMap { for name, info := range proxier.serviceMap {
err := proxier.openPortal(name, info) err := proxier.openPortal(name, info)
@ -322,22 +392,12 @@ func (proxier *Proxier) ensurePortals() {
// clean up any stale sticky session records in the hash map. // clean up any stale sticky session records in the hash map.
func (proxier *Proxier) cleanupStaleStickySessions() { func (proxier *Proxier) cleanupStaleStickySessions() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name := range proxier.serviceMap { for name := range proxier.serviceMap {
proxier.loadBalancer.CleanupStaleStickySessions(name) proxier.loadBalancer.CleanupStaleStickySessions(name)
} }
} }
// This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error { func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error {
proxier.mu.Lock()
defer proxier.mu.Unlock()
return proxier.stopProxyInternal(service, info)
}
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *ServiceInfo) error {
delete(proxier.serviceMap, service) delete(proxier.serviceMap, service)
info.setAlive(false) info.setAlive(false)
err := info.socket.Close() err := info.socket.Close()
@ -353,16 +413,18 @@ func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceI
return info, ok return info, ok
} }
func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *ServiceInfo) { // addServiceOnPort lockes the proxy before calling addServiceOnPortInternal.
// Used from testcases.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.serviceMap[service] = info return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout)
} }
// addServiceOnPort starts listening for a new service, returning the ServiceInfo. // addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now. // connections, for now.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil { if err != nil {
return nil, err return nil, err
@ -386,7 +448,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
socket: sock, socket: sock,
sessionAffinityType: v1.ServiceAffinityNone, // default sessionAffinityType: v1.ServiceAffinityNone, // default
} }
proxier.setServiceInfo(service, si) proxier.serviceMap[service] = si
klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service proxy.ServicePortName, proxier *Proxier) { go func(service proxy.ServicePortName, proxier *Proxier) {
@ -399,12 +461,22 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
return si, nil return si, nil
} }
func (proxier *Proxier) cleanupPortalAndProxy(serviceName proxy.ServicePortName, info *ServiceInfo) error {
if err := proxier.closePortal(serviceName, info); err != nil {
return fmt.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
if err := proxier.stopProxy(serviceName, info); err != nil {
return fmt.Errorf("Failed to stop service %q: %v", serviceName, err)
}
return nil
}
func (proxier *Proxier) mergeService(service *v1.Service) sets.String { func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
if service == nil { if service == nil {
return nil return nil
} }
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if !helper.IsServiceIPSet(service) { if utilproxy.ShouldSkipService(svcName, service) {
klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
return nil return nil
} }
@ -413,7 +485,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
servicePort := &service.Spec.Ports[i] servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name} serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
existingPorts.Insert(servicePort.Name) existingPorts.Insert(servicePort.Name)
info, exists := proxier.getServiceInfo(serviceName) info, exists := proxier.serviceMap[serviceName]
// TODO: check health of the socket? What if ProxyLoop exited? // TODO: check health of the socket? What if ProxyLoop exited?
if exists && sameConfig(info, service, servicePort) { if exists && sameConfig(info, service, servicePort) {
// Nothing changed. // Nothing changed.
@ -421,11 +493,8 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
} }
if exists { if exists {
klog.V(4).Infof("Something changed for service %q: stopping it", serviceName) klog.V(4).Infof("Something changed for service %q: stopping it", serviceName)
if err := proxier.closePortal(serviceName, info); err != nil { if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil {
klog.Errorf("Failed to close portal for %q: %v", serviceName, err) klog.Error(err)
}
if err := proxier.stopProxy(serviceName, info); err != nil {
klog.Errorf("Failed to stop service %q: %v", serviceName, err)
} }
} }
proxyPort, err := proxier.proxyPorts.AllocateNext() proxyPort, err := proxier.proxyPorts.AllocateNext()
@ -436,7 +505,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
serviceIP := net.ParseIP(service.Spec.ClusterIP) serviceIP := net.ParseIP(service.Spec.ClusterIP)
klog.V(1).Infof("Adding new service %q at %s/%s", serviceName, net.JoinHostPort(serviceIP.String(), strconv.Itoa(int(servicePort.Port))), servicePort.Protocol) klog.V(1).Infof("Adding new service %q at %s/%s", serviceName, net.JoinHostPort(serviceIP.String(), strconv.Itoa(int(servicePort.Port))), servicePort.Protocol)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) info, err = proxier.addServiceOnPortInternal(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil { if err != nil {
klog.Errorf("Failed to start proxy for %q: %v", serviceName, err) klog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue continue
@ -469,14 +538,11 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
return return
} }
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if !helper.IsServiceIPSet(service) { if utilproxy.ShouldSkipService(svcName, service) {
klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
return return
} }
staleUDPServices := sets.NewString() staleUDPServices := sets.NewString()
proxier.mu.Lock()
defer proxier.mu.Unlock()
for i := range service.Spec.Ports { for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i] servicePort := &service.Spec.Ports[i]
if existingPorts.Has(servicePort.Name) { if existingPorts.Has(servicePort.Name) {
@ -495,11 +561,8 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String()) staleUDPServices.Insert(proxier.serviceMap[serviceName].portal.ip.String())
} }
if err := proxier.closePortal(serviceName, info); err != nil { if err := proxier.cleanupPortalAndProxy(serviceName, info); err != nil {
klog.Errorf("Failed to close portal for %q: %v", serviceName, err) klog.Error(err)
}
if err := proxier.stopProxyInternal(serviceName, info); err != nil {
klog.Errorf("Failed to stop service %q: %v", serviceName, err)
} }
proxier.loadBalancer.DeleteService(serviceName) proxier.loadBalancer.DeleteService(serviceName)
} }
@ -510,20 +573,95 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
} }
} }
func (proxier *Proxier) serviceChange(previous, current *v1.Service, detail string) {
var svcName types.NamespacedName
if current != nil {
svcName = types.NamespacedName{Namespace: current.Namespace, Name: current.Name}
} else {
svcName = types.NamespacedName{Namespace: previous.Namespace, Name: previous.Name}
}
klog.V(4).Infof("userspace proxy: %s for %s", detail, svcName)
proxier.serviceChangesLock.Lock()
defer proxier.serviceChangesLock.Unlock()
change, exists := proxier.serviceChanges[svcName]
if !exists {
// change.previous is only set for new changes. We must keep
// the oldest service info (or nil) because correct unmerging
// depends on the next update/del after a merge, not subsequent
// updates.
change = &serviceChange{previous: previous}
proxier.serviceChanges[svcName] = change
}
// Always use the most current service (or nil) as change.current
change.current = current
if reflect.DeepEqual(change.previous, change.current) {
// collapsed change had no effect
delete(proxier.serviceChanges, svcName)
} else if proxier.isInitialized() {
// change will have an effect, ask the proxy to sync
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceAdd(service *v1.Service) { func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
_ = proxier.mergeService(service) proxier.serviceChange(nil, service, "OnServiceAdd")
} }
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
existingPorts := proxier.mergeService(service) proxier.serviceChange(oldService, service, "OnServiceUpdate")
proxier.unmergeService(oldService, existingPorts)
} }
func (proxier *Proxier) OnServiceDelete(service *v1.Service) { func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
proxier.unmergeService(service, sets.NewString()) proxier.serviceChange(service, nil, "OnServiceDelete")
} }
func (proxier *Proxier) OnServiceSynced() { func (proxier *Proxier) OnServiceSynced() {
klog.V(2).Infof("userspace OnServiceSynced")
// Mark services as initialized and (if endpoints are already
// initialized) the entire proxy as initialized
atomic.StoreInt32(&proxier.servicesSynced, 1)
if atomic.LoadInt32(&proxier.endpointsSynced) > 0 {
atomic.StoreInt32(&proxier.initialized, 1)
}
// Must sync from a goroutine to avoid blocking the
// service event handler on startup with large numbers
// of initial objects
go proxier.syncProxyRules()
}
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsAdd(endpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints)
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsDelete(endpoints)
}
func (proxier *Proxier) OnEndpointsSynced() {
klog.V(2).Infof("userspace OnEndpointsSynced")
proxier.loadBalancer.OnEndpointsSynced()
// Mark endpoints as initialized and (if services are already
// initialized) the entire proxy as initialized
atomic.StoreInt32(&proxier.endpointsSynced, 1)
if atomic.LoadInt32(&proxier.servicesSynced) > 0 {
atomic.StoreInt32(&proxier.initialized, 1)
}
// Must sync from a goroutine to avoid blocking the
// service event handler on startup with large numbers
// of initial objects
go proxier.syncProxyRules()
} }
func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool { func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool {

View File

@ -24,6 +24,7 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os" "os"
"reflect"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -33,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec" "k8s.io/utils/exec"
@ -86,6 +88,16 @@ func waitForClosedPortUDP(p *Proxier, proxyPort int) error {
return fmt.Errorf("port %d still open", proxyPort) return fmt.Errorf("port %d still open", proxyPort)
} }
func waitForServiceInfo(p *Proxier, service proxy.ServicePortName) (*ServiceInfo, bool) {
var svcInfo *ServiceInfo
var exists bool
wait.PollImmediate(50*time.Millisecond, 3*time.Second, func() (bool, error) {
svcInfo, exists = p.getServiceInfo(service)
return exists, nil
})
return svcInfo, exists
}
// udpEchoServer is a simple echo server in UDP, intended for testing the proxy. // udpEchoServer is a simple echo server in UDP, intended for testing the proxy.
type udpEchoServer struct { type udpEchoServer struct {
net.PacketConn net.PacketConn
@ -225,6 +237,15 @@ func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time
t.Errorf("expected %d ProxyClients live, got %d", want, got) t.Errorf("expected %d ProxyClients live, got %d", want, got)
} }
func startProxier(p *Proxier, t *testing.T) {
go func() {
p.SyncLoop()
}()
waitForNumProxyLoops(t, p, 0)
p.OnServiceSynced()
p.OnEndpointsSynced()
}
func TestTCPProxy(t *testing.T) { func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
@ -242,7 +263,8 @@ func TestTCPProxy(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -269,7 +291,8 @@ func TestUDPProxy(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -296,7 +319,8 @@ func TestUDPProxyTimeout(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -335,7 +359,8 @@ func TestMultiPortProxy(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -364,7 +389,8 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
p.OnServiceAdd(&v1.Service{ p.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
@ -379,7 +405,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
}}}, }}},
}) })
waitForNumProxyLoops(t, p, 2) waitForNumProxyLoops(t, p, 2)
svcInfo, exists := p.getServiceInfo(serviceP) svcInfo, exists := waitForServiceInfo(p, serviceP)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo for %s", serviceP) t.Fatalf("can't find serviceInfo for %s", serviceP)
} }
@ -387,7 +413,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo) t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo)
} }
svcInfo, exists = p.getServiceInfo(serviceQ) svcInfo, exists = waitForServiceInfo(p, serviceQ)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo for %s", serviceQ) t.Fatalf("can't find serviceInfo for %s", serviceQ)
} }
@ -403,7 +429,9 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
// Helper: Stops the proxy for the named service. // Helper: Stops the proxy for the named service.
func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
info, found := proxier.getServiceInfo(service) proxier.mu.Lock()
defer proxier.mu.Unlock()
info, found := proxier.serviceMap[service]
if !found { if !found {
return fmt.Errorf("unknown service: %s", service) return fmt.Errorf("unknown service: %s", service)
} }
@ -427,7 +455,8 @@ func TestTCPProxyStop(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -471,7 +500,8 @@ func TestUDPProxyStop(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -494,9 +524,9 @@ func TestUDPProxyStop(t *testing.T) {
func TestTCPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{ lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, ObjectMeta: metav1.ObjectMeta{Namespace: servicePortName.Namespace, Name: servicePortName.Name},
Subsets: []v1.EndpointSubset{{ Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}}, Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}}, Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
@ -509,28 +539,22 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) service := &v1.Service{
if err != nil { ObjectMeta: metav1.ObjectMeta{Name: servicePortName.Name, Namespace: servicePortName.Namespace},
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnServiceDelete(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{ Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p", Name: "p",
Port: int32(svcInfo.proxyPort), Port: 9997,
Protocol: "TCP", Protocol: "TCP",
}}}, }}},
}) }
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
p.OnServiceAdd(service)
waitForNumProxyLoops(t, p, 1)
p.OnServiceDelete(service)
if err := waitForClosedPortTCP(p, int(service.Spec.Ports[0].Port)); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
@ -553,7 +577,8 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -598,7 +623,8 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -634,7 +660,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
Protocol: "TCP", Protocol: "TCP",
}}}, }}},
}) })
svcInfo, exists := p.getServiceInfo(service) svcInfo, exists := waitForServiceInfo(p, service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo for %s", service) t.Fatalf("can't find serviceInfo for %s", service)
} }
@ -660,7 +686,8 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -696,7 +723,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
Protocol: "UDP", Protocol: "UDP",
}}}, }}},
}) })
svcInfo, exists := p.getServiceInfo(service) svcInfo, exists := waitForServiceInfo(p, service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo") t.Fatalf("can't find serviceInfo")
} }
@ -721,7 +748,8 @@ func TestTCPProxyUpdatePort(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -742,7 +770,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
svcInfo, exists := p.getServiceInfo(service) svcInfo, exists := waitForServiceInfo(p, service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo") t.Fatalf("can't find serviceInfo")
} }
@ -769,7 +797,8 @@ func TestUDPProxyUpdatePort(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
if err != nil { if err != nil {
@ -789,7 +818,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
svcInfo, exists := p.getServiceInfo(service) svcInfo, exists := waitForServiceInfo(p, service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo") t.Fatalf("can't find serviceInfo")
} }
@ -814,7 +843,8 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -839,7 +869,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
svcInfo, exists := p.getServiceInfo(service) svcInfo, exists := waitForServiceInfo(p, service)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo") t.Fatalf("can't find serviceInfo")
} }
@ -867,7 +897,8 @@ func TestProxyUpdatePortal(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
waitForNumProxyLoops(t, p, 0) startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil { if err != nil {
@ -894,7 +925,16 @@ func TestProxyUpdatePortal(t *testing.T) {
}}}, }}},
} }
p.OnServiceUpdate(svcv0, svcv1) p.OnServiceUpdate(svcv0, svcv1)
_, exists := p.getServiceInfo(service)
// Wait for the service to be removed because it had an empty ClusterIP
var exists bool
for i := 0; i < 50; i++ {
_, exists = p.getServiceInfo(service)
if !exists {
break
}
time.Sleep(50 * time.Millisecond)
}
if exists { if exists {
t.Fatalf("service with empty ClusterIP should not be included in the proxy") t.Fatalf("service with empty ClusterIP should not be included in the proxy")
} }
@ -923,7 +963,7 @@ func TestProxyUpdatePortal(t *testing.T) {
} }
p.OnServiceUpdate(svcv2, svcv3) p.OnServiceUpdate(svcv2, svcv3)
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
svcInfo, exists = p.getServiceInfo(service) svcInfo, exists = waitForServiceInfo(p, service)
if !exists { if !exists {
t.Fatalf("service with ClusterIP set not found in the proxy") t.Fatalf("service with ClusterIP set not found in the proxy")
} }
@ -931,6 +971,172 @@ func TestProxyUpdatePortal(t *testing.T) {
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
} }
type fakeRunner struct{}
// assert fakeAsyncRunner is a ProxyProvider
var _ asyncRunnerInterface = &fakeRunner{}
func (f fakeRunner) Run() {
}
func (f fakeRunner) Loop(stop <-chan struct{}) {
}
func TestOnServiceAddChangeMap(t *testing.T) {
fexec := makeFakeExec()
// Use long minSyncPeriod so we can test that immediate syncs work
p, err := createProxier(NewLoadBalancerRR(), net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Minute, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
// Fake out sync runner
p.syncRunner = fakeRunner{}
serviceMeta := metav1.ObjectMeta{Namespace: "testnamespace", Name: "testname"}
service := &v1.Service{
ObjectMeta: serviceMeta,
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 99,
Protocol: "TCP",
}}},
}
serviceUpdate := &v1.Service{
ObjectMeta: serviceMeta,
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.5", Ports: []v1.ServicePort{{
Name: "p",
Port: 100,
Protocol: "TCP",
}}},
}
serviceUpdate2 := &v1.Service{
ObjectMeta: serviceMeta,
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.6", Ports: []v1.ServicePort{{
Name: "p",
Port: 101,
Protocol: "TCP",
}}},
}
type onServiceTest struct {
detail string
changes []serviceChange
expectedChange *serviceChange
}
tests := []onServiceTest{
{
detail: "add",
changes: []serviceChange{
{current: service},
},
expectedChange: &serviceChange{
current: service,
},
},
{
detail: "add+update=add",
changes: []serviceChange{
{current: service},
{
previous: service,
current: serviceUpdate,
},
},
expectedChange: &serviceChange{
current: serviceUpdate,
},
},
{
detail: "add+del=none",
changes: []serviceChange{
{current: service},
{previous: service},
},
},
{
detail: "update+update=update",
changes: []serviceChange{
{
previous: service,
current: serviceUpdate,
},
{
previous: serviceUpdate,
current: serviceUpdate2,
},
},
expectedChange: &serviceChange{
previous: service,
current: serviceUpdate2,
},
},
{
detail: "update+del=del",
changes: []serviceChange{
{
previous: service,
current: serviceUpdate,
},
{previous: serviceUpdate},
},
// change collapsing always keeps the oldest service
// info since correct unmerging depends on the least
// recent update, not the most current.
expectedChange: &serviceChange{
previous: service,
},
},
{
detail: "del+add=update",
changes: []serviceChange{
{previous: service},
{current: serviceUpdate},
},
expectedChange: &serviceChange{
previous: service,
current: serviceUpdate,
},
},
}
for _, test := range tests {
for _, change := range test.changes {
p.serviceChange(change.previous, change.current, test.detail)
}
if test.expectedChange != nil {
if len(p.serviceChanges) != 1 {
t.Fatalf("[%s] expected 1 service change but found %d", test.detail, len(p.serviceChanges))
}
expectedService := test.expectedChange.current
if expectedService == nil {
expectedService = test.expectedChange.previous
}
svcName := types.NamespacedName{Namespace: expectedService.Namespace, Name: expectedService.Name}
change, ok := p.serviceChanges[svcName]
if !ok {
t.Fatalf("[%s] did not find service change for %v", test.detail, svcName)
}
if !reflect.DeepEqual(change.previous, test.expectedChange.previous) {
t.Fatalf("[%s] change previous service and expected previous service don't match\nchange: %+v\nexp: %+v", test.detail, change.previous, test.expectedChange.previous)
}
if !reflect.DeepEqual(change.current, test.expectedChange.current) {
t.Fatalf("[%s] change current service and expected current service don't match\nchange: %+v\nexp: %+v", test.detail, change.current, test.expectedChange.current)
}
} else {
if len(p.serviceChanges) != 0 {
t.Fatalf("[%s] expected no service changes but found %d", test.detail, len(p.serviceChanges))
}
}
}
}
func makeFakeExec() *fakeexec.FakeExec { func makeFakeExec() *fakeexec.FakeExec {
fcmd := fakeexec.FakeCmd{ fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{

View File

@ -19,6 +19,7 @@ go_library(
deps = [ deps = [
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/util/ipconfig:go_default_library", "//pkg/util/ipconfig:go_default_library",
"//pkg/util/netsh:go_default_library", "//pkg/util/netsh:go_default_library",
"//pkg/util/slice:go_default_library", "//pkg/util/slice:go_default_library",

View File

@ -19,6 +19,7 @@ package winuserspace
import ( import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"net" "net"
) )
@ -30,4 +31,6 @@ type LoadBalancer interface {
NewService(service proxy.ServicePortName, sessionAffinityType v1.ServiceAffinity, stickyMaxAgeMinutes int) error NewService(service proxy.ServicePortName, sessionAffinityType v1.ServiceAffinity, stickyMaxAgeMinutes int) error
DeleteService(service proxy.ServicePortName) DeleteService(service proxy.ServicePortName)
CleanupStaleStickySessions(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName)
proxyconfig.EndpointsHandler
} }

View File

@ -444,6 +444,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() { func (proxier *Proxier) OnServiceSynced() {
} }
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsAdd(endpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints)
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsDelete(endpoints)
}
func (proxier *Proxier) OnEndpointsSynced() {
proxier.loadBalancer.OnEndpointsSynced()
}
func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool { func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool {
return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity
} }