mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 13:55:41 +00:00
Userspace Proxy: Allow any ProxySocket in Proxier
This commit adds a new method for constructing userspace proxiers, `NewCustomProxier`. `NewCustomProxier` functions identically to `NewProxier`, except that it allows a custom constructor method to be passed in to construct instances of ProxySocket.
This commit is contained in:
parent
43c4d7ae23
commit
de2285ac7b
@ -84,22 +84,26 @@ func logTimeout(err error) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port
|
||||||
|
type ProxySocketFunc func(protocol api.Protocol, ip net.IP, port int) (ProxySocket, error)
|
||||||
|
|
||||||
// Proxier is a simple proxy for TCP connections between a localhost:lport
|
// Proxier is a simple proxy for TCP connections between a localhost:lport
|
||||||
// and services that provide the actual implementations.
|
// and services that provide the actual implementations.
|
||||||
type Proxier struct {
|
type Proxier struct {
|
||||||
loadBalancer LoadBalancer
|
loadBalancer LoadBalancer
|
||||||
mu sync.Mutex // protects serviceMap
|
mu sync.Mutex // protects serviceMap
|
||||||
serviceMap map[proxy.ServicePortName]*ServiceInfo
|
serviceMap map[proxy.ServicePortName]*ServiceInfo
|
||||||
syncPeriod time.Duration
|
syncPeriod time.Duration
|
||||||
minSyncPeriod time.Duration // unused atm, but plumbed through
|
minSyncPeriod time.Duration // unused atm, but plumbed through
|
||||||
udpIdleTimeout time.Duration
|
udpIdleTimeout time.Duration
|
||||||
portMapMutex sync.Mutex
|
portMapMutex sync.Mutex
|
||||||
portMap map[portMapKey]*portMapValue
|
portMap map[portMapKey]*portMapValue
|
||||||
numProxyLoops int32 // use atomic ops to access this; mostly for testing
|
numProxyLoops int32 // use atomic ops to access this; mostly for testing
|
||||||
listenIP net.IP
|
listenIP net.IP
|
||||||
iptables iptables.Interface
|
iptables iptables.Interface
|
||||||
hostIP net.IP
|
hostIP net.IP
|
||||||
proxyPorts PortAllocator
|
proxyPorts PortAllocator
|
||||||
|
makeProxySocket ProxySocketFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// assert Proxier is a ProxyProvider
|
// assert Proxier is a ProxyProvider
|
||||||
@ -145,6 +149,14 @@ func IsProxyLocked(err error) bool {
|
|||||||
// created, it will keep iptables up to date in the background and will not
|
// created, it will keep iptables up to date in the background and will not
|
||||||
// terminate if a particular iptables call fails.
|
// terminate if a particular iptables call fails.
|
||||||
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
|
func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
|
||||||
|
return NewCustomProxier(loadBalancer, listenIP, iptables, pr, syncPeriod, minSyncPeriod, udpIdleTimeout, newProxySocket)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCustomProxier functions similarly to NewProxier, returing a new Proxier
|
||||||
|
// for the given LoadBalancer and address. The new proxier is constructed using
|
||||||
|
// the ProxySocket constructor provided, however, instead of constructing the
|
||||||
|
// default ProxySockets.
|
||||||
|
func NewCustomProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, pr utilnet.PortRange, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
|
||||||
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
|
if listenIP.Equal(localhostIPv4) || listenIP.Equal(localhostIPv6) {
|
||||||
return nil, ErrProxyOnLocalhost
|
return nil, ErrProxyOnLocalhost
|
||||||
}
|
}
|
||||||
@ -162,10 +174,10 @@ func NewProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.In
|
|||||||
proxyPorts := newPortAllocator(pr)
|
proxyPorts := newPortAllocator(pr)
|
||||||
|
|
||||||
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
|
glog.V(2).Infof("Setting proxy IP to %v and initializing iptables", hostIP)
|
||||||
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout)
|
return createProxier(loadBalancer, listenIP, iptables, hostIP, proxyPorts, syncPeriod, minSyncPeriod, udpIdleTimeout, makeProxySocket)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration) (*Proxier, error) {
|
func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables.Interface, hostIP net.IP, proxyPorts PortAllocator, syncPeriod, minSyncPeriod, udpIdleTimeout time.Duration, makeProxySocket ProxySocketFunc) (*Proxier, error) {
|
||||||
// convenient to pass nil for tests..
|
// convenient to pass nil for tests..
|
||||||
if proxyPorts == nil {
|
if proxyPorts == nil {
|
||||||
proxyPorts = newPortAllocator(utilnet.PortRange{})
|
proxyPorts = newPortAllocator(utilnet.PortRange{})
|
||||||
@ -185,12 +197,13 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
|
|||||||
portMap: make(map[portMapKey]*portMapValue),
|
portMap: make(map[portMapKey]*portMapValue),
|
||||||
syncPeriod: syncPeriod,
|
syncPeriod: syncPeriod,
|
||||||
// plumbed through if needed, not used atm.
|
// plumbed through if needed, not used atm.
|
||||||
minSyncPeriod: minSyncPeriod,
|
minSyncPeriod: minSyncPeriod,
|
||||||
udpIdleTimeout: udpIdleTimeout,
|
udpIdleTimeout: udpIdleTimeout,
|
||||||
listenIP: listenIP,
|
listenIP: listenIP,
|
||||||
iptables: iptables,
|
iptables: iptables,
|
||||||
hostIP: hostIP,
|
hostIP: hostIP,
|
||||||
proxyPorts: proxyPorts,
|
proxyPorts: proxyPorts,
|
||||||
|
makeProxySocket: makeProxySocket,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -338,7 +351,7 @@ func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *Serv
|
|||||||
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
|
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
|
||||||
// connections, for now.
|
// connections, for now.
|
||||||
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
|
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol api.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
|
||||||
sock, err := newProxySocket(protocol, proxier.listenIP, proxyPort)
|
sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -593,7 +606,7 @@ func (proxier *Proxier) claimNodePort(ip net.IP, port int, protocol api.Protocol
|
|||||||
// it. Tools like 'ss' and 'netstat' do not show sockets that are
|
// it. Tools like 'ss' and 'netstat' do not show sockets that are
|
||||||
// bind()ed but not listen()ed, and at least the default debian netcat
|
// bind()ed but not listen()ed, and at least the default debian netcat
|
||||||
// has no way to avoid about 10 seconds of retries.
|
// has no way to avoid about 10 seconds of retries.
|
||||||
socket, err := newProxySocket(protocol, ip, port)
|
socket, err := proxier.makeProxySocket(protocol, ip, port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("can't open node port for %s: %v", key.String(), err)
|
return fmt.Errorf("can't open node port for %s: %v", key.String(), err)
|
||||||
}
|
}
|
||||||
|
@ -211,7 +211,7 @@ func TestTCPProxy(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -238,7 +238,7 @@ func TestUDPProxy(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -265,7 +265,7 @@ func TestUDPProxyTimeout(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -301,7 +301,7 @@ func TestMultiPortProxy(t *testing.T) {
|
|||||||
}},
|
}},
|
||||||
}})
|
}})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -328,7 +328,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
|
|||||||
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
|
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
|
||||||
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
|
serviceX := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "x"}
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -391,7 +391,7 @@ func TestTCPProxyStop(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -435,7 +435,7 @@ func TestUDPProxyStop(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -473,7 +473,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -510,7 +510,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -546,7 +546,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
|
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -599,7 +599,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
|
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -653,7 +653,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -701,7 +701,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -746,7 +746,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -797,7 +797,7 @@ func TestProxyUpdatePortal(t *testing.T) {
|
|||||||
}
|
}
|
||||||
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
|
lb.OnEndpointsUpdate([]api.Endpoints{endpoint})
|
||||||
|
|
||||||
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest)
|
p, err := createProxier(lb, net.ParseIP("0.0.0.0"), ipttest.NewFake(), net.ParseIP("127.0.0.1"), nil, time.Minute, time.Second, udpIdleTimeoutForTest, newProxySocket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user