From d172ca69864549f1b8a32c3e0e81d669dd4558f7 Mon Sep 17 00:00:00 2001 From: Abhishek Shah Date: Fri, 16 Oct 2015 14:52:58 -0700 Subject: [PATCH] Added UdpIdleTimeout flag --- cmd/kube-proxy/app/server.go | 5 ++- hack/verify-flags/known-flags.txt | 1 + pkg/proxy/userspace/proxier.go | 51 ++++++++++++++--------------- pkg/proxy/userspace/proxier_test.go | 34 ++++++++++--------- 4 files changed, 49 insertions(+), 42 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 2e110034fd7..019b6b9c151 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -67,6 +67,7 @@ type ProxyServerConfig struct { CleanupAndExit bool KubeApiQps float32 KubeApiBurst int + UDPIdleTimeout time.Duration } type ProxyServer struct { @@ -94,6 +95,7 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.CleanupAndExit, "cleanup-iptables", false, "If true cleanup iptables rules and exit.") fs.Float32Var(&s.KubeApiQps, "kube-api-qps", s.KubeApiQps, "QPS to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeApiBurst, "kube-api-burst", s.KubeApiBurst, "Burst to use while talking with kubernetes apiserver") + fs.DurationVar(&s.UDPIdleTimeout, "udp-timeout", s.UDPIdleTimeout, "How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxy-mode=userspace") } const ( @@ -122,6 +124,7 @@ func NewProxyConfig() *ProxyServerConfig { ConfigSyncPeriod: 15 * time.Minute, KubeApiQps: 5.0, KubeApiBurst: 10, + UDPIdleTimeout: 250 * time.Millisecond, } } @@ -238,7 +241,7 @@ func NewProxyServerDefault(config *ProxyServerConfig) (*ProxyServer, error) { // set EndpointsConfigHandler to our loadBalancer endpointsHandler = loadBalancer - proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.IptablesSyncPeriod) + proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.IptablesSyncPeriod, config.UDPIdleTimeout) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index b44930ec655..a1a0ca168a0 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -292,6 +292,7 @@ token-auth-file ttl-secs type-src udp-port +udp-timeout unix-socket update-period upgrade-target diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 108e6a43b18..fc74122fc67 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -81,17 +81,18 @@ func logTimeout(err error) bool { // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { - loadBalancer LoadBalancer - mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortName]*serviceInfo - syncPeriod time.Duration - portMapMutex sync.Mutex - portMap map[portMapKey]*portMapValue - numProxyLoops int32 // use atomic ops to access this; mostly for testing - listenIP net.IP - iptables iptables.Interface - hostIP net.IP - proxyPorts PortAllocator + loadBalancer LoadBalancer + mu sync.Mutex // protects serviceMap + serviceMap map[proxy.ServicePortName]*serviceInfo + syncPeriod time.Duration + udpIdleTimeout time.Duration + portMapMutex sync.Mutex + portMap map[portMapKey]*portMapValue + numProxyLoops int32 // use atomic ops to access this; mostly for testing + listenIP net.IP + iptables iptables.Interface + hostIP net.IP + proxyPorts PortAllocator } // assert Proxier is a ProxyProvider @@ -136,7 +137,7 @@ func IsProxyLocked(err error) bool { // if iptables fails to update or acquire the initial lock. Once a proxier is // created, it will keep iptables up to date in the background and will not // terminate if a particular iptables call fails. -func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange, syncPeriod time.Duration) (*Proxier, error) { +func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) { return nil, ErrProxyOnLocalhost } @@ -154,10 +155,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In proxyPorts := newPortAllocator(pr) glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) - return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod) + return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, udpIdleTimeout) } -func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod time.Duration) (*Proxier, error) { +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { // convenient to pass nil for tests.. if proxyPorts == nil { proxyPorts = newPortAllocator(util.PortRange{}) @@ -172,14 +173,15 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables return nil, fmt.Errorf("failed to flush iptables: %v", err) } return &Proxier{ - loadBalancer: loadBalancer, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), - portMap: make(map[portMapKey]*portMapValue), - syncPeriod: syncPeriod, - listenIP: listenIP, - iptables: iptables, - hostIP: hostIP, - proxyPorts: proxyPorts, + loadBalancer: loadBalancer, + serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + portMap: make(map[portMapKey]*portMapValue), + syncPeriod: syncPeriod, + udpIdleTimeout: udpIdleTimeout, + listenIP: listenIP, + iptables: iptables, + hostIP: hostIP, + proxyPorts: proxyPorts, }, nil } @@ -340,9 +342,6 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol return si, nil } -// How long we leave idle UDP connections open. -const udpIdleTimeout = 250 * time.Millisecond - // OnServiceUpdate manages the active set of service proxies. // Active service proxies are reinitialized if found in the update set or // shutdown if missing from the update set. @@ -388,7 +387,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) { } glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol) - info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, udpIdleTimeout) + info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout) if err != nil { glog.Errorf("Failed to start proxy for %q: %v", serviceName, err) continue diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index b8d6ecd460f..3e10a797408 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -35,6 +35,10 @@ import ( "k8s.io/kubernetes/pkg/util/iptables" ) +const ( + udpIdleTimeoutForTest = 250 * time.Millisecond +) + func joinHostPort(host string, port int) string { return net.JoinHostPort(host, fmt.Sprintf("%d", port)) } @@ -245,7 +249,7 @@ func TestTCPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -272,7 +276,7 @@ func TestUDPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -299,7 +303,7 @@ func TestUDPProxyTimeout(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -335,7 +339,7 @@ func TestMultiPortProxy(t *testing.T) { }}, }}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -362,7 +366,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -425,7 +429,7 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -469,7 +473,7 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -507,7 +511,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -544,7 +548,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -581,7 +585,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -633,7 +637,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -685,7 +689,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -733,7 +737,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -778,7 +782,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) } @@ -830,7 +834,7 @@ func TestProxyUpdatePortal(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest) if err != nil { t.Fatal(err) }