Merge pull request #29465 from DirectXMan12/feature/extensible-proxysocket

Automatic merge from submit-queue

Extensible Userspace Proxy

This PR refactors the userspace proxy to allow for custom proxy socket implementations.
It changes the the ProxySocket interface to ensure that other packages can properly implement it (making sure all arguments are publicly exposed types, etc), and adds in a mechanism for an implementation to create an instance of the userspace proxy with a non-standard ProxySocket.
Custom ProxySockets are useful to inject additional logic into the actual proxying.  For example, our idling proxier uses a custom proxy socket to hold connections and notify the cluster that idled scalable resources need to be woken up.

Also-Authored-By: Ben Bennett bbennett@redhat.com
This commit is contained in:
Kubernetes Submit Queue 2017-03-01 09:17:29 -08:00 committed by GitHub
commit 0796d5c0d8
15 changed files with 471 additions and 252 deletions

View File

@ -278,6 +278,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
loadBalancer,
net.ParseIP(config.BindAddress),
iptInterface,
execer,
*utilnet.ParsePortRangeOrDie(config.PortRange),
config.IPTablesSyncPeriod.Duration,
config.IPTablesMinSyncPeriod.Duration,

View File

@ -35,6 +35,7 @@ filegroup(
"//pkg/proxy/healthcheck:all-srcs",
"//pkg/proxy/iptables:all-srcs",
"//pkg/proxy/userspace:all-srcs",
"//pkg/proxy/util:all-srcs",
"//pkg/proxy/winuserspace:all-srcs",
],
tags = ["automanaged"],

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/features:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/sysctl:go_default_library",

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
@ -552,7 +553,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
glog.V(4).Infof("Skipping proxy iptables rule sync on service update because nothing changed")
}
proxier.deleteServiceConnections(staleUDPServices.List())
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
}
// Reconstruct the list of endpoint infos from the endpointIP list
@ -792,7 +793,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
endpointIP := strings.Split(epSvcPair.endpoint, ":")[0]
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
@ -803,33 +804,6 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServ
}
}
// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip
func (proxier *Proxier) deleteServiceConnections(svcIPs []string) {
for _, ip := range svcIPs {
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
}
}
}
//execConntrackTool executes conntrack tool using given parameters
func (proxier *Proxier) execConntrackTool(parameters ...string) error {
conntrackPath, err := proxier.exec.LookPath("conntrack")
if err != nil {
return fmt.Errorf("Error looking for path of conntrack: %v", err)
}
output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err)
}
return nil
}
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
@ -1392,10 +1366,9 @@ func (proxier *Proxier) syncProxyRules() {
// https://github.com/docker/docker/issues/8795
// https://github.com/kubernetes/kubernetes/issues/31983
func (proxier *Proxier) clearUdpConntrackForPort(port int) {
var err error = nil
glog.V(2).Infof("Deleting conntrack entries for udp connections")
if port > 0 {
err = proxier.execConntrackTool("-D", "-p", "udp", "--dport", strconv.Itoa(port))
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "-p", "udp", "--dport", strconv.Itoa(port))
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
glog.Errorf("conntrack return with error: %v", err)
}

View File

@ -172,57 +172,6 @@ func TestGetChainLinesMultipleTables(t *testing.T) {
checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected)
}
func TestExecConntrackTool(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
fakeProxier := Proxier{exec: &fexec}
testCases := [][]string{
{"-L", "-p", "udp"},
{"-D", "-p", "udp", "-d", "10.0.240.1"},
{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"},
}
expectErr := []bool{false, false, true}
for i := range testCases {
err := fakeProxier.execConntrackTool(testCases[i]...)
if expectErr[i] {
if err == nil {
t.Errorf("expected err, got %v", err)
}
} else {
if err != nil {
t.Errorf("expected success, got %v", err)
}
}
execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ")
expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " "))
if execCmd != expectCmd {
t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd)
}
}
}
func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, port int, protocol api.Protocol, onlyNodeLocalEndpoints bool) *serviceInfo {
return &serviceInfo{
sessionAffinityType: api.ServiceAffinityNone, // default
@ -296,54 +245,6 @@ func TestDeleteEndpointConnections(t *testing.T) {
}
}
func TestDeleteServiceConnections(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
fakeProxier := Proxier{exec: &fexec}
testCases := [][]string{
{
"10.240.0.3",
"10.240.0.5",
},
{
"10.240.0.4",
},
}
svcCount := 0
for i := range testCases {
fakeProxier.deleteServiceConnections(testCases[i])
for _, ip := range testCases[i] {
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1
}
if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
}
type fakeClosable struct {
closed bool
}

View File

@ -23,6 +23,8 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/slice:go_default_library",
"//vendor:github.com/golang/glog",
@ -30,6 +32,7 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/util/errors",
"//vendor:k8s.io/apimachinery/pkg/util/net",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
],
)
@ -46,6 +49,7 @@ go_test(
deps = [
"//pkg/api:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/iptables/testing:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",

View File

@ -31,4 +31,5 @@ type LoadBalancer interface {
NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error
DeleteService(service proxy.ServicePortName)
CleanupStaleStickySessions(service proxy.ServicePortName)
ServiceHasEndpoints(service proxy.ServicePortName) bool
}

View File

@ -33,6 +33,9 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/iptables"
)
@ -42,14 +45,20 @@ type portal struct {
isExternal bool
}
type serviceInfo struct {
// ServiceInfo contains information and state for a particular proxied service
type ServiceInfo struct {
// Timeout is the the read/write timeout (used for UDP connections)
Timeout time.Duration
// ActiveClients is the cache of active UDP clients being proxied by this proxy for this service
ActiveClients *ClientCache
// ServiceRef is a full object reference to the the service described by this ServiceInfo
ServiceRef api.ObjectReference
isAliveAtomic int32 // Only access this with atomic ops
portal portal
protocol api.Protocol
proxyPort int
socket proxySocket
timeout time.Duration
activeClients *clientCache
socket ProxySocket
nodePort int
loadBalancerStatus api.LoadBalancerStatus
sessionAffinityType api.ServiceAffinity
@ -58,7 +67,7 @@ type serviceInfo struct {
externalIPs []string
}
func (info *serviceInfo) setAlive(b bool) {
func (info *ServiceInfo) setAlive(b bool) {
var i int32
if b {
i = 1
@ -66,7 +75,7 @@ func (info *serviceInfo) setAlive(b bool) {
atomic.StoreInt32(&info.isAliveAtomic, i)
}
func (info *serviceInfo) isAlive() bool {
func (info *ServiceInfo) IsAlive() bool {
return atomic.LoadInt32(&info.isAliveAtomic) != 0
}
@ -80,22 +89,27 @@ func logTimeout(err error) bool {
return false
}
// ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port
type ProxySocketFunc func(protocol api.Protocol, ip net.IP, port int) (ProxySocket, error)
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*serviceInfo
syncPeriod time.Duration
minSyncPeriod time.Duration // unused atm, but plumbed through
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*ServiceInfo
syncPeriod time.Duration
minSyncPeriod time.Duration // unused atm, but plumbed through
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
numProxyLoops int32 // use atomic ops to access this; mostly for testing
listenIP net.IP
iptables iptables.Interface
hostIP net.IP
proxyPorts PortAllocator
makeProxySocket ProxySocketFunc
exec utilexec.Interface
}
// assert Proxier is a ProxyProvider
@ -140,7 +154,15 @@ func IsProxyLocked(err error) bool {
// if iptables fails to update or acquire the initial lock. Once a proxier is
// created, it will keep iptables up to date in the background and will not
// terminate if a particular iptables call fails.
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
return NewCustomProxier(loadBalancer, listenIP, iptables, exec, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket)
}
// NewCustomProxier functions similarly to NewProxier, returing a new Proxier
// for the given LoadBalancer and address. The new proxier is constructed using
// the ProxySocket constructor provided, however, instead of constructing the
// default ProxySockets.
func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
return nil, ErrProxyOnLocalhost
}
@ -158,10 +180,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In
proxyPorts := newPortAllocator(pr)
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout)
return createProxier(loadBalancer, listenIP, iptables, exec, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket)
}
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, exec utilexec.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
// convenient to pass nil for tests..
if proxyPorts == nil {
proxyPorts = newPortAllocator(utilnet.PortRange{})
@ -177,16 +199,18 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
serviceMap: make(map[proxy.ServicePortName]*ServiceInfo),
portMap: make(map[portMapKey]*portMapValue),
syncPeriod: syncPeriod,
// plumbed through if needed, not used atm.
minSyncPeriod: minSyncPeriod,
udpIdleTimeout: udpIdleTimeout,
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
proxyPorts: proxyPorts,
minSyncPeriod: minSyncPeriod,
udpIdleTimeout: udpIdleTimeout,
listenIP: listenIP,
iptables: iptables,
hostIP: hostIP,
proxyPorts: proxyPorts,
makeProxySocket: makeProxySocket,
exec: exec,
}, nil
}
@ -301,14 +325,14 @@ func (proxier *Proxier) cleanupStaleStickySessions() {
}
// This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *serviceInfo) error {
func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error {
proxier.mu.Lock()
defer proxier.mu.Unlock()
return proxier.stopProxyInternal(service, info)
}
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *serviceInfo) error {
func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *ServiceInfo) error {
delete(proxier.serviceMap, service)
info.setAlive(false)
err := info.socket.Close()
@ -317,24 +341,24 @@ func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *s
return err
}
func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*serviceInfo, bool) {
func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceInfo, bool) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
info, ok := proxier.serviceMap[service]
return info, ok
}
func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *serviceInfo) {
func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *ServiceInfo) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.serviceMap[service] = info
}
// addServiceOnPort starts listening for a new service, returning the serviceInfo.
// addServiceOnPort starts listening for a new service, returning the ServiceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, serviceRef api.ObjectReference, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil {
return nil, err
}
@ -348,13 +372,15 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
sock.Close()
return nil, err
}
si := &serviceInfo{
si := &ServiceInfo{
Timeout: timeout,
ActiveClients: newClientCache(),
ServiceRef: serviceRef,
isAliveAtomic: 1,
proxyPort: portNum,
protocol: protocol,
socket: sock,
timeout: timeout,
activeClients: newClientCache(),
sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeMinutes: 180, // TODO: parameterize this in the API.
}
@ -364,7 +390,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
go func(service proxy.ServicePortName, proxier *Proxier) {
defer runtime.HandleCrash()
atomic.AddInt32(&proxier.numProxyLoops, 1)
sock.ProxyLoop(service, si, proxier)
sock.ProxyLoop(service, si, proxier.loadBalancer)
atomic.AddInt32(&proxier.numProxyLoops, -1)
}(service, proxier)
@ -386,6 +412,17 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
continue
}
// TODO: should this just be api.GetReference?
svcGVK := service.GetObjectKind().GroupVersionKind()
svcRef := api.ObjectReference{
Kind: svcGVK.Kind,
Namespace: service.Namespace,
Name: service.Name,
UID: service.UID,
APIVersion: svcGVK.GroupVersion().String(),
ResourceVersion: service.ResourceVersion,
}
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, Port: servicePort.Name}
@ -416,7 +453,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", serviceName, serviceIP, servicePort.Port, servicePort.Protocol)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
info, err = proxier.addServiceOnPort(serviceName, svcRef, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
@ -437,11 +474,18 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
}
staleUDPServices := sets.NewString()
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1).Infof("Stopping service %q", name)
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
staleUDPServices.Insert(proxier.serviceMap[name].portal.ip.String())
}
err := proxier.closePortal(name, info)
if err != nil {
glog.Errorf("Failed to close portal for %q: %v", name, err)
@ -453,9 +497,11 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
proxier.loadBalancer.DeleteService(name)
}
}
utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List())
}
func sameConfig(info *serviceInfo, service *api.Service, port *api.ServicePort) bool {
func sameConfig(info *ServiceInfo, service *api.Service, port *api.ServicePort) bool {
if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) {
return false
}
@ -486,7 +532,7 @@ func ipsEqual(lhs, rhs []string) bool {
return true
}
func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *serviceInfo) error {
func (proxier *Proxier) openPortal(service proxy.ServicePortName, info *ServiceInfo) error {
err := proxier.openOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
if err != nil {
return err
@ -588,7 +634,7 @@ func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol
// it. Tools like 'ss' and 'netstat' do not show sockets that are
// bind()ed but not listen()ed, and at least the default debian netcat
// has no way to avoid about 10 seconds of retries.
socket, err := newProxySocket(protocol, ip, port)
socket, err := proxier.makeProxySocket(protocol, ip, port)
if err != nil {
return fmt.Errorf("can't open node port for %s: %v", key.String(), err)
}
@ -668,7 +714,7 @@ func (proxier *Proxier) openNodePort(nodePort int, protocol api.Protocol, proxyI
return nil
}
func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *serviceInfo) error {
func (proxier *Proxier) closePortal(service proxy.ServicePortName, info *ServiceInfo) error {
// Collect errors and report them all at the end.
el := proxier.closeOnePortal(info.portal, info.protocol, proxier.listenIP, info.proxyPort, service)
for _, publicIP := range info.externalIPs {

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/util/exec"
ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
)
@ -182,14 +183,14 @@ func waitForNumProxyLoops(t *testing.T, p *Proxier, want int32) {
t.Errorf("expected %d ProxyLoops running, got %d", want, got)
}
func waitForNumProxyClients(t *testing.T, s *serviceInfo, want int, timeout time.Duration) {
func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time.Duration) {
var got int
now := time.Now()
deadline := now.Add(timeout)
for time.Now().Before(deadline) {
s.activeClients.mu.Lock()
got = len(s.activeClients.clients)
s.activeClients.mu.Unlock()
s.ActiveClients.Mu.Lock()
got = len(s.ActiveClients.Clients)
s.ActiveClients.Mu.Unlock()
if got == want {
return
}
@ -211,13 +212,16 @@ func TestTCPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -238,13 +242,16 @@ func TestUDPProxy(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -265,13 +272,16 @@ func TestUDPProxyTimeout(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -301,20 +311,24 @@ func TestMultiPortProxy(t *testing.T) {
}},
}})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
serviceRefP := api.ObjectReference{Name: serviceP.Name, Namespace: serviceP.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfoP, err := p.addServiceOnPort(serviceP, serviceRefP, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfoP.proxyPort)
waitForNumProxyLoops(t, p, 1)
svcInfoQ, err := p.addServiceOnPort(serviceQ, "UDP", 0, time.Second)
serviceRefQ := api.ObjectReference{Name: serviceQ.Name, Namespace: serviceQ.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfoQ, err := p.addServiceOnPort(serviceQ, serviceRefQ, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -328,7 +342,9 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
@ -391,18 +407,21 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
if !svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected true")
if !svcInfo.IsAlive() {
t.Fatalf("wrong value for IsAlive(): expected true")
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
@ -412,8 +431,8 @@ func TestTCPProxyStop(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
stopProxyByName(p, service)
if svcInfo.isAlive() {
t.Fatalf("wrong value for isAlive(): expected false")
if svcInfo.IsAlive() {
t.Fatalf("wrong value for IsAlive(): expected false")
}
// Wait for the port to really close.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
@ -435,13 +454,16 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -473,13 +495,16 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -510,13 +535,16 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -546,13 +574,16 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -599,13 +630,16 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -653,13 +687,16 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -701,13 +738,16 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -746,13 +786,16 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
},
})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -797,13 +840,16 @@ func TestProxyUpdatePortal(t *testing.T) {
}
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
fexec := makeFakeExec()
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
serviceRef := api.ObjectReference{Name: service.Name, Namespace: service.Namespace, Kind: "Service", APIVersion: "v1"}
svcInfo, err := p.addServiceOnPort(service, serviceRef, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
@ -853,4 +899,18 @@ func TestProxyUpdatePortal(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
}
func makeFakeExec() *exec.FakeExec {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
},
}
return &exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
}
// TODO(justinsb): Add test for nodePort conflict detection, once we have nodePort wired in

View File

@ -32,20 +32,20 @@ import (
)
// Abstraction over TCP/UDP sockets which are proxied.
type proxySocket interface {
// Addr gets the net.Addr for a proxySocket.
type ProxySocket interface {
// Addr gets the net.Addr for a ProxySocket.
Addr() net.Addr
// Close stops the proxySocket from accepting incoming connections.
// Close stops the ProxySocket from accepting incoming connections.
// Each implementation should comment on the impact of calling Close
// while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service proxy.ServicePortName, info *serviceInfo, proxier *Proxier)
// ListenPort returns the host port that the proxySocket is listening on
ProxyLoop(service proxy.ServicePortName, info *ServiceInfo, loadBalancer LoadBalancer)
// ListenPort returns the host port that the ProxySocket is listening on
ListenPort() int
}
func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {
func newProxySocket(protocol api.Protocol, ip net.IP, port int) (ProxySocket, error) {
host := ""
if ip != nil {
host = ip.String()
@ -73,9 +73,9 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
}
// How long we wait for a connection to a backend in seconds
var endpointDialTimeout = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second}
var EndpointDialTimeouts = []time.Duration{250 * time.Millisecond, 500 * time.Millisecond, 1 * time.Second, 2 * time.Second}
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
// tcpProxySocket implements ProxySocket. Close() is implemented by net.Listener. When Close() is called,
// no new connections are allowed but existing connections are left untouched.
type tcpProxySocket struct {
net.Listener
@ -86,10 +86,12 @@ func (tcp *tcpProxySocket) ListenPort() int {
return tcp.port
}
func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
// TryConnectEndpoints attempts to connect to the next available endpoint for the given service, cycling
// through until it is able to successully connect, or it has tried with all timeouts in EndpointDialTimeouts.
func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protocol string, loadBalancer LoadBalancer) (out net.Conn, err error) {
sessionAffinityReset := false
for _, dialTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
for _, dialTimeout := range EndpointDialTimeouts {
endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err
@ -111,9 +113,9 @@ func tryConnect(service proxy.ServicePortName, srcAddr net.Addr, protocol string
return nil, fmt.Errorf("failed to connect to an endpoint.")
}
func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *ServiceInfo, loadBalancer LoadBalancer) {
for {
if !myInfo.isAlive() {
if !myInfo.IsAlive() {
// The service port was closed or replaced.
return
}
@ -127,7 +129,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
if isClosedError(err) {
return
}
if !myInfo.isAlive() {
if !myInfo.IsAlive() {
// Then the service port was just closed so the accept failure is to be expected.
return
}
@ -135,19 +137,19 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
continue
}
glog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier)
outConn, err := TryConnectEndpoints(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer)
if err != nil {
glog.Errorf("Failed to connect to balancer: %v", err)
inConn.Close()
continue
}
// Spin up an async copy loop.
go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
go ProxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
}
}
// proxyTCP proxies data bi-directionally between in and out.
func proxyTCP(in, out *net.TCPConn) {
// ProxyTCP proxies data bi-directionally between in and out.
func ProxyTCP(in, out *net.TCPConn) {
var wg sync.WaitGroup
wg.Add(2)
glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
@ -171,7 +173,7 @@ func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
src.Close()
}
// udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called,
// udpProxySocket implements ProxySocket. Close() is implemented by net.UDPConn. When Close() is called,
// no new connections are allowed and existing connections are broken.
// TODO: We could lame-duck this ourselves, if it becomes important.
type udpProxySocket struct {
@ -188,19 +190,19 @@ func (udp *udpProxySocket) Addr() net.Addr {
}
// Holds all the known UDP clients that have not timed out.
type clientCache struct {
mu sync.Mutex
clients map[string]net.Conn // addr string -> connection
type ClientCache struct {
Mu sync.Mutex
Clients map[string]net.Conn // addr string -> connection
}
func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
func newClientCache() *ClientCache {
return &ClientCache{Clients: map[string]net.Conn{}}
}
func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serviceInfo, proxier *Proxier) {
func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *ServiceInfo, loadBalancer LoadBalancer) {
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
if !myInfo.isAlive() {
if !myInfo.IsAlive() {
// The service port was closed or replaced.
break
}
@ -219,7 +221,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
break
}
// If this is a client we know already, reuse the connection and goroutine.
svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout)
svrConn, err := udp.getBackendConn(myInfo.ActiveClients, cliAddr, loadBalancer, service, myInfo.Timeout)
if err != nil {
continue
}
@ -233,7 +235,7 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
}
continue
}
err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout))
err = svrConn.SetDeadline(time.Now().Add(myInfo.Timeout))
if err != nil {
glog.Errorf("SetDeadline failed: %v", err)
continue
@ -241,17 +243,17 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *serv
}
}
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) {
activeClients.mu.Lock()
defer activeClients.mu.Unlock()
func (udp *udpProxySocket) getBackendConn(activeClients *ClientCache, cliAddr net.Addr, loadBalancer LoadBalancer, service proxy.ServicePortName, timeout time.Duration) (net.Conn, error) {
activeClients.Mu.Lock()
defer activeClients.Mu.Unlock()
svrConn, found := activeClients.clients[cliAddr.String()]
svrConn, found := activeClients.Clients[cliAddr.String()]
if !found {
// TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic.
glog.V(3).Infof("New UDP connection from %s", cliAddr)
var err error
svrConn, err = tryConnect(service, cliAddr, "udp", proxier)
svrConn, err = TryConnectEndpoints(service, cliAddr, "udp", loadBalancer)
if err != nil {
return nil, err
}
@ -259,8 +261,8 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne
glog.Errorf("SetDeadline failed: %v", err)
return nil, err
}
activeClients.clients[cliAddr.String()] = svrConn
go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
activeClients.Clients[cliAddr.String()] = svrConn
go func(cliAddr net.Addr, svrConn net.Conn, activeClients *ClientCache, timeout time.Duration) {
defer runtime.HandleCrash()
udp.proxyClient(cliAddr, svrConn, activeClients, timeout)
}(cliAddr, svrConn, activeClients, timeout)
@ -270,7 +272,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne
// This function is expected to be called as a goroutine.
// TODO: Track and log bytes copied, like TCP
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *ClientCache, timeout time.Duration) {
defer svrConn.Close()
var buffer [4096]byte
for {
@ -294,7 +296,7 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ
break
}
}
activeClients.mu.Lock()
delete(activeClients.clients, cliAddr.String())
activeClients.mu.Unlock()
activeClients.Mu.Lock()
delete(activeClients.Clients, cliAddr.String())
activeClients.Mu.Unlock()
}

View File

@ -120,6 +120,16 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
return true
}
// ServiceHasEndpoints checks whether a service entry has endpoints.
func (lb *LoadBalancerRR) ServiceHasEndpoints(svcPort proxy.ServicePortName) bool {
lb.lock.Lock()
defer lb.lock.Unlock()
state, exists := lb.services[svcPort]
// TODO: while nothing ever assigns nil to the map, *some* of the code using the map
// checks for it. The code should all follow the same convention.
return exists && state != nil && len(state.endpoints) > 0
}
// NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) {

40
pkg/proxy/util/BUILD Normal file
View File

@ -0,0 +1,40 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["conntrack.go"],
tags = ["automanaged"],
deps = [
"//pkg/util/exec:go_default_library",
"//vendor:github.com/golang/glog",
],
)
go_test(
name = "go_default_test",
srcs = ["conntrack_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//pkg/util/exec:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,58 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"strings"
"k8s.io/kubernetes/pkg/util/exec"
"github.com/golang/glog"
)
// Utilities for dealing with conntrack
const noConnectionToDelete = "0 flow entries have been deleted"
// DeleteServiceConnection uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the given service IPs
func DeleteServiceConnections(execer exec.Interface, svcIPs []string) {
for _, ip := range svcIPs {
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
err := ExecConntrackTool(execer, "-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby-sit all udp connections to kubernetes services.
glog.Errorf("conntrack returned error: %v", err)
}
}
}
// ExecConntrackTool executes the conntrack tool using the given parameters
func ExecConntrackTool(execer exec.Interface, parameters ...string) error {
conntrackPath, err := execer.LookPath("conntrack")
if err != nil {
return fmt.Errorf("error looking for path of conntrack: %v", err)
}
output, err := execer.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("conntrack command returned: %q, error message: %s", string(output), err)
}
return nil
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"strings"
"testing"
"k8s.io/kubernetes/pkg/util/exec"
)
func TestExecConntrackTool(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := [][]string{
{"-L", "-p", "udp"},
{"-D", "-p", "udp", "-d", "10.0.240.1"},
{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"},
}
expectErr := []bool{false, false, true}
for i := range testCases {
err := ExecConntrackTool(&fexec, testCases[i]...)
if expectErr[i] {
if err == nil {
t.Errorf("expected err, got %v", err)
}
} else {
if err != nil {
t.Errorf("expected success, got %v", err)
}
}
execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ")
expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " "))
if execCmd != expectCmd {
t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd)
}
}
}
func TestDeleteServiceConnections(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
testCases := [][]string{
{
"10.240.0.3",
"10.240.0.5",
},
{
"10.240.0.4",
},
}
svcCount := 0
for i := range testCases {
DeleteServiceConnections(&fexec, testCases[i])
for _, ip := range testCases[i] {
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1
}
if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
}

View File

@ -768,6 +768,7 @@ k8s.io/kubernetes/pkg/proxy/config,ixdy,1,
k8s.io/kubernetes/pkg/proxy/healthcheck,rrati,0,
k8s.io/kubernetes/pkg/proxy/iptables,freehan,0,
k8s.io/kubernetes/pkg/proxy/userspace,luxas,1,
k8s.io/kubernetes/pkg/proxy/util,knobunc,0,
k8s.io/kubernetes/pkg/proxy/winuserspace,jbhurat,0,
k8s.io/kubernetes/pkg/quota,sttts,1,
k8s.io/kubernetes/pkg/quota/evaluator/core,yifan-gu,1,

1 name owner auto-assigned sig
768 k8s.io/kubernetes/pkg/proxy/healthcheck rrati 0
769 k8s.io/kubernetes/pkg/proxy/iptables freehan 0
770 k8s.io/kubernetes/pkg/proxy/userspace luxas 1
771 k8s.io/kubernetes/pkg/proxy/util knobunc 0
772 k8s.io/kubernetes/pkg/proxy/winuserspace jbhurat 0
773 k8s.io/kubernetes/pkg/quota sttts 1
774 k8s.io/kubernetes/pkg/quota/evaluator/core yifan-gu 1