diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 57bd77cd742..8925e5541a0 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -84,22 +84,26 @@ func logTimeout(err error) bool { return false } +// ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port +type ProxySocketFunc func(protocol api.Protocol, ip net.IP, port int) (ProxySocket, error) + // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { - loadBalancer LoadBalancer - mu sync.Mutex // protects serviceMap - serviceMap map[proxy.ServicePortName]*ServiceInfo - syncPeriod time.Duration - minSyncPeriod time.Duration // unused atm, but plumbed through - udpIdleTimeout time.Duration - portMapMutex sync.Mutex - portMap map[portMapKey]*portMapValue - numProxyLoops int32 // use atomic ops to access this; mostly for testing - listenIP net.IP - iptables iptables.Interface - hostIP net.IP - proxyPorts PortAllocator + loadBalancer LoadBalancer + mu sync.Mutex // protects serviceMap + serviceMap map[proxy.ServicePortName]*ServiceInfo + syncPeriod time.Duration + minSyncPeriod time.Duration // unused atm, but plumbed through + udpIdleTimeout time.Duration + portMapMutex sync.Mutex + portMap map[portMapKey]*portMapValue + numProxyLoops int32 // use atomic ops to access this; mostly for testing + listenIP net.IP + iptables iptables.Interface + hostIP net.IP + proxyPorts PortAllocator + makeProxySocket ProxySocketFunc } // assert Proxier is a ProxyProvider @@ -145,6 +149,14 @@ func IsProxyLocked(err error) bool { // created, it will keep iptables up to date in the background and will not // terminate if a particular iptables call fails. func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { + return NewCustomProxier(loadBalancer, listenIP, iptables, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket) +} + +// NewCustomProxier functions similarly to NewProxier, returing a new Proxier +// for the given LoadBalancer and address. The new proxier is constructed using +// the ProxySocket constructor provided, however, instead of constructing the +// default ProxySockets. +func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) { return nil, ErrProxyOnLocalhost } @@ -162,10 +174,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In proxyPorts := newPortAllocator(pr) glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP) - return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout) + return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket) } -func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) { +func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) { // convenient to pass nil for tests.. if proxyPorts == nil { proxyPorts = newPortAllocator(utilnet.PortRange{}) @@ -185,12 +197,13 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables portMap: make(map[portMapKey]*portMapValue), syncPeriod: syncPeriod, // plumbed through if needed, not used atm. - minSyncPeriod: minSyncPeriod, - udpIdleTimeout: udpIdleTimeout, - listenIP: listenIP, - iptables: iptables, - hostIP: hostIP, - proxyPorts: proxyPorts, + minSyncPeriod: minSyncPeriod, + udpIdleTimeout: udpIdleTimeout, + listenIP: listenIP, + iptables: iptables, + hostIP: hostIP, + proxyPorts: proxyPorts, + makeProxySocket: makeProxySocket, }, nil } @@ -338,7 +351,7 @@ func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *Serv // Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) { - sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort) + sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort) if err != nil { return nil, err } @@ -593,7 +606,7 @@ func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol // it. Tools like 'ss' and 'netstat' do not show sockets that are // bind()ed but not listen()ed, and at least the default debian netcat // has no way to avoid about 10 seconds of retries. - socket, err := newProxySocket(protocol, ip, port) + socket, err := proxier.makeProxySocket(protocol, ip, port) if err != nil { return fmt.Errorf("can't open node port for %s: %v", key.String(), err) } diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index 8a6994fc620..66383e7a6be 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -211,7 +211,7 @@ func TestTCPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -238,7 +238,7 @@ func TestUDPProxy(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -265,7 +265,7 @@ func TestUDPProxyTimeout(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -301,7 +301,7 @@ func TestMultiPortProxy(t *testing.T) { }}, }}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -328,7 +328,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) { serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"} - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -391,7 +391,7 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -435,7 +435,7 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -473,7 +473,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -510,7 +510,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -546,7 +546,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -599,7 +599,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -653,7 +653,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -701,7 +701,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -746,7 +746,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { }, }) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) } @@ -797,7 +797,7 @@ func TestProxyUpdatePortal(t *testing.T) { } lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) - p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest) + p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket) if err != nil { t.Fatal(err) }