mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
Merge pull request #15797 from ArtfulCoder/udpTimeoutFlag
Auto commit by PR queue bot
This commit is contained in:
commit
29c2706ba0
@ -67,6 +67,7 @@ type ProxyServerConfig struct {
|
|||||||
CleanupAndExit bool
|
CleanupAndExit bool
|
||||||
KubeApiQps float32
|
KubeApiQps float32
|
||||||
KubeApiBurst int
|
KubeApiBurst int
|
||||||
|
UDPIdleTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type ProxyServer struct {
|
type ProxyServer struct {
|
||||||
@ -94,6 +95,7 @@ func (s *ProxyServerConfig) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.BoolVar(&s.CleanupAndExit, "cleanup-iptables", false, "If true cleanup iptables rules and exit.")
|
fs.BoolVar(&s.CleanupAndExit, "cleanup-iptables", false, "If true cleanup iptables rules and exit.")
|
||||||
fs.Float32Var(&s.KubeApiQps, "kube-api-qps", s.KubeApiQps, "QPS to use while talking with kubernetes apiserver")
|
fs.Float32Var(&s.KubeApiQps, "kube-api-qps", s.KubeApiQps, "QPS to use while talking with kubernetes apiserver")
|
||||||
fs.IntVar(&s.KubeApiBurst, "kube-api-burst", s.KubeApiBurst, "Burst to use while talking with kubernetes apiserver")
|
fs.IntVar(&s.KubeApiBurst, "kube-api-burst", s.KubeApiBurst, "Burst to use while talking with kubernetes apiserver")
|
||||||
|
fs.DurationVar(&s.UDPIdleTimeout, "udp-timeout", s.UDPIdleTimeout, "How long an idle UDP connection will be kept open (e.g. '250ms', '2s'). Must be greater than 0. Only applicable for proxy-mode=userspace")
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -122,6 +124,7 @@ func NewProxyConfig() *ProxyServerConfig {
|
|||||||
ConfigSyncPeriod: 15 * time.Minute,
|
ConfigSyncPeriod: 15 * time.Minute,
|
||||||
KubeApiQps: 5.0,
|
KubeApiQps: 5.0,
|
||||||
KubeApiBurst: 10,
|
KubeApiBurst: 10,
|
||||||
|
UDPIdleTimeout: 250 * time.Millisecond,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -238,7 +241,7 @@ func NewProxyServerDefault(config *ProxyServerConfig) (*ProxyServer, error) {
|
|||||||
// set EndpointsConfigHandler to our loadBalancer
|
// set EndpointsConfigHandler to our loadBalancer
|
||||||
endpointsHandler = loadBalancer
|
endpointsHandler = loadBalancer
|
||||||
|
|
||||||
proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.IptablesSyncPeriod)
|
proxierUserspace, err := userspace.NewProxier(loadBalancer, config.BindAddress, iptInterface, config.PortRange, config.IptablesSyncPeriod, config.UDPIdleTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Unable to create proxier: %v", err)
|
glog.Fatalf("Unable to create proxier: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -294,6 +294,7 @@ token-auth-file
|
|||||||
ttl-secs
|
ttl-secs
|
||||||
type-src
|
type-src
|
||||||
udp-port
|
udp-port
|
||||||
|
udp-timeout
|
||||||
unix-socket
|
unix-socket
|
||||||
update-period
|
update-period
|
||||||
upgrade-target
|
upgrade-target
|
||||||
|
@ -81,17 +81,18 @@ func logTimeout(err error) bool {
|
|||||||
// Proxier is a simple proxy for TCP connections between a localhost:lport
|
// Proxier is a simple proxy for TCP connections between a localhost:lport
|
||||||
// and services that provide the actual implementations.
|
// and services that provide the actual implementations.
|
||||||
type Proxier struct {
|
type Proxier struct {
|
||||||
loadBalancer LoadBalancer
|
loadBalancer LoadBalancer
|
||||||
mu sync.Mutex // protects serviceMap
|
mu sync.Mutex // protects serviceMap
|
||||||
serviceMap map[proxy.ServicePortName]*serviceInfo
|
serviceMap map[proxy.ServicePortName]*serviceInfo
|
||||||
syncPeriod time.Duration
|
syncPeriod time.Duration
|
||||||
portMapMutex sync.Mutex
|
udpIdleTimeout time.Duration
|
||||||
portMap map[portMapKey]*portMapValue
|
portMapMutex sync.Mutex
|
||||||
numProxyLoops int32 // use atomic ops to access this; mostly for testing
|
portMap map[portMapKey]*portMapValue
|
||||||
listenIP net.IP
|
numProxyLoops int32 // use atomic ops to access this; mostly for testing
|
||||||
iptables iptables.Interface
|
listenIP net.IP
|
||||||
hostIP net.IP
|
iptables iptables.Interface
|
||||||
proxyPorts PortAllocator
|
hostIP net.IP
|
||||||
|
proxyPorts PortAllocator
|
||||||
}
|
}
|
||||||
|
|
||||||
// assert Proxier is a ProxyProvider
|
// assert Proxier is a ProxyProvider
|
||||||
@ -136,7 +137,7 @@ func IsProxyLocked(err error) bool {
|
|||||||
// if iptables fails to update or acquire the initial lock. Once a proxier is
|
// if iptables fails to update or acquire the initial lock. Once a proxier is
|
||||||
// created, it will keep iptables up to date in the background and will not
|
// created, it will keep iptables up to date in the background and will not
|
||||||
// terminate if a particular iptables call fails.
|
// terminate if a particular iptables call fails.
|
||||||
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange, syncPeriod time.Duration) (*Proxier, error) {
|
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr util.PortRange, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
|
||||||
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
|
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
|
||||||
return nil, ErrProxyOnLocalhost
|
return nil, ErrProxyOnLocalhost
|
||||||
}
|
}
|
||||||
@ -154,10 +155,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In
|
|||||||
proxyPorts := newPortAllocator(pr)
|
proxyPorts := newPortAllocator(pr)
|
||||||
|
|
||||||
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
|
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
|
||||||
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod)
|
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, udpIdleTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod time.Duration) (*Proxier, error) {
|
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
|
||||||
// convenient to pass nil for tests..
|
// convenient to pass nil for tests..
|
||||||
if proxyPorts == nil {
|
if proxyPorts == nil {
|
||||||
proxyPorts = newPortAllocator(util.PortRange{})
|
proxyPorts = newPortAllocator(util.PortRange{})
|
||||||
@ -172,14 +173,15 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
|
|||||||
return nil, fmt.Errorf("failed to flush iptables: %v", err)
|
return nil, fmt.Errorf("failed to flush iptables: %v", err)
|
||||||
}
|
}
|
||||||
return &Proxier{
|
return &Proxier{
|
||||||
loadBalancer: loadBalancer,
|
loadBalancer: loadBalancer,
|
||||||
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
|
||||||
portMap: make(map[portMapKey]*portMapValue),
|
portMap: make(map[portMapKey]*portMapValue),
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
listenIP: listenIP,
|
udpIdleTimeout: udpIdleTimeout,
|
||||||
iptables: iptables,
|
listenIP: listenIP,
|
||||||
hostIP: hostIP,
|
iptables: iptables,
|
||||||
proxyPorts: proxyPorts,
|
hostIP: hostIP,
|
||||||
|
proxyPorts: proxyPorts,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,9 +342,6 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
|
|||||||
return si, nil
|
return si, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// How long we leave idle UDP connections open.
|
|
||||||
const udpIdleTimeout = 250 * time.Millisecond
|
|
||||||
|
|
||||||
// OnServiceUpdate manages the active set of service proxies.
|
// OnServiceUpdate manages the active set of service proxies.
|
||||||
// Active service proxies are reinitialized if found in the update set or
|
// Active service proxies are reinitialized if found in the update set or
|
||||||
// shutdown if missing from the update set.
|
// shutdown if missing from the update set.
|
||||||
@ -388,7 +387,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
|
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
|
||||||
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, udpIdleTimeout)
|
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
|
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
|
||||||
continue
|
continue
|
||||||
|
@ -35,6 +35,10 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/util/iptables"
|
"k8s.io/kubernetes/pkg/util/iptables"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
udpIdleTimeoutForTest = 250 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
func joinHostPort(host string, port int) string {
|
func joinHostPort(host string, port int) string {
|
||||||
return net.JoinHostPort(host, fmt.Sprintf("%d", port))
|
return net.JoinHostPort(host, fmt.Sprintf("%d", port))
|
||||||
}
|
}
|
||||||
@ -245,7 +249,7 @@ func TestTCPProxy(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -272,7 +276,7 @@ func TestUDPProxy(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -299,7 +303,7 @@ func TestUDPProxyTimeout(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -335,7 +339,7 @@ func TestMultiPortProxy(t *testing.T) {
|
|||||||
}},
|
}},
|
||||||
}})
|
}})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -362,7 +366,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
|
|||||||
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
|
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
|
||||||
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
|
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -425,7 +429,7 @@ func TestTCPProxyStop(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -469,7 +473,7 @@ func TestUDPProxyStop(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -507,7 +511,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -544,7 +548,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -581,7 +585,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -633,7 +637,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -685,7 +689,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -733,7 +737,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -778,7 +782,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -830,7 +834,7 @@ func TestProxyUpdatePortal(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"), nil, time.Minute, udpIdleTimeoutForTest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user