proxier/ipvs: check already binded addresses in the IPVS dummy interface

Signed-off-by: Andrew Sy Kim <kim.andrewsy@gmail.com>
Co-authored-by: Laurent Bernaille <laurent.bernaille@gmail.com>
This commit is contained in:
Andrew Sy Kim 2020-06-29 11:18:06 -04:00
parent 6cedc0853f
commit de2ecd7e2f
2 changed files with 69 additions and 18 deletions

View File

@ -262,9 +262,10 @@ type Proxier struct {
gracefuldeleteManager *GracefulTerminationManager gracefuldeleteManager *GracefulTerminationManager
} }
// IPGetter helps get node network interface IP // IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface
type IPGetter interface { type IPGetter interface {
NodeIPs() ([]net.IP, error) NodeIPs() ([]net.IP, error)
BindedIPs() (sets.String, error)
} }
// realIPGetter is a real NodeIP handler, it implements IPGetter. // realIPGetter is a real NodeIP handler, it implements IPGetter.
@ -300,6 +301,11 @@ func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) {
return ips, nil return ips, nil
} }
// BindedIPs returns all addresses that are binded to the IPVS dummy interface kube-ipvs0
func (r *realIPGetter) BindedIPs() (sets.String, error) {
return r.nl.GetLocalAddresses(DefaultDummyDevice, "")
}
// Proxier implements proxy.Provider // Proxier implements proxy.Provider
var _ proxy.Provider = &Proxier{} var _ proxy.Provider = &Proxier{}
@ -1089,6 +1095,11 @@ func (proxier *Proxier) syncProxyRules() {
// activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync // activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
activeBindAddrs := map[string]bool{} activeBindAddrs := map[string]bool{}
bindedAddresses, err := proxier.ipGetter.BindedIPs()
if err != nil {
klog.Errorf("error listing addresses binded to dummy interface, error: %v", err)
}
hasNodePort := false hasNodePort := false
for _, svc := range proxier.serviceMap { for _, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo) svcInfo, ok := svc.(*serviceInfo)
@ -1197,7 +1208,7 @@ func (proxier *Proxier) syncProxyRules() {
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true activeBindAddrs[serv.Address.String()] = true
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
@ -1278,7 +1289,7 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true activeBindAddrs[serv.Address.String()] = true
@ -1384,7 +1395,7 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagPersistent serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
if err := proxier.syncService(svcNameString, serv, true); err == nil { if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true activeBindAddrs[serv.Address.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil { if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
@ -1541,7 +1552,7 @@ func (proxier *Proxier) syncProxyRules() {
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
} }
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false); err == nil { if err := proxier.syncService(svcNameString, serv, false, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil { if err := proxier.syncEndpoint(svcName, svcInfo.OnlyNodeLocalEndpoints(), serv); err != nil {
klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
@ -1922,7 +1933,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
} }
} }
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error { func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, bindedAddresses sets.String) error {
appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs) appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) { if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
if appliedVirtualServer == nil { if appliedVirtualServer == nil {
@ -1943,9 +1954,14 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
} }
} }
// bind service address to dummy interface even if service not changed, // bind service address to dummy interface
// in case that service IP was removed by other processes
if bindAddr { if bindAddr {
// always attempt to bind if bindedAddresses is nil,
// otherwise check if it's already binded and return early
if bindedAddresses != nil && bindedAddresses.Has(vs.Address.String()) {
return nil
}
klog.V(4).Infof("Bind addr %s", vs.Address.String()) klog.V(4).Infof("Bind addr %s", vs.Address.String())
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice) _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
if err != nil { if err != nil {
@ -1953,6 +1969,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
return err return err
} }
} }
return nil return nil
} }

View File

@ -57,13 +57,18 @@ import (
const testHostname = "test-hostname" const testHostname = "test-hostname"
type fakeIPGetter struct { type fakeIPGetter struct {
nodeIPs []net.IP nodeIPs []net.IP
bindedIPs sets.String
} }
func (f *fakeIPGetter) NodeIPs() ([]net.IP, error) { func (f *fakeIPGetter) NodeIPs() ([]net.IP, error) {
return f.nodeIPs, nil return f.nodeIPs, nil
} }
func (f *fakeIPGetter) BindedIPs() (sets.String, error) {
return f.bindedIPs, nil
}
// fakePortOpener implements portOpener. // fakePortOpener implements portOpener.
type fakePortOpener struct { type fakePortOpener struct {
openPorts []*utilproxy.LocalPort openPorts []*utilproxy.LocalPort
@ -3056,6 +3061,7 @@ func Test_syncService(t *testing.T) {
svcName string svcName string
newVirtualServer *utilipvs.VirtualServer newVirtualServer *utilipvs.VirtualServer
bindAddr bool bindAddr bool
bindedAddrs sets.String
}{ }{
{ {
// case 0, old virtual server is same as new virtual server // case 0, old virtual server is same as new virtual server
@ -3074,7 +3080,8 @@ func Test_syncService(t *testing.T) {
Scheduler: "rr", Scheduler: "rr",
Flags: utilipvs.FlagHashed, Flags: utilipvs.FlagHashed,
}, },
bindAddr: false, bindAddr: false,
bindedAddrs: sets.NewString(),
}, },
{ {
// case 1, old virtual server is different from new virtual server // case 1, old virtual server is different from new virtual server
@ -3093,7 +3100,8 @@ func Test_syncService(t *testing.T) {
Scheduler: "rr", Scheduler: "rr",
Flags: utilipvs.FlagPersistent, Flags: utilipvs.FlagPersistent,
}, },
bindAddr: false, bindAddr: false,
bindedAddrs: sets.NewString(),
}, },
{ {
// case 2, old virtual server is different from new virtual server // case 2, old virtual server is different from new virtual server
@ -3112,7 +3120,8 @@ func Test_syncService(t *testing.T) {
Scheduler: "wlc", Scheduler: "wlc",
Flags: utilipvs.FlagHashed, Flags: utilipvs.FlagHashed,
}, },
bindAddr: false, bindAddr: false,
bindedAddrs: sets.NewString(),
}, },
{ {
// case 3, old virtual server is nil, and create new virtual server // case 3, old virtual server is nil, and create new virtual server
@ -3125,7 +3134,8 @@ func Test_syncService(t *testing.T) {
Scheduler: "rr", Scheduler: "rr",
Flags: utilipvs.FlagHashed, Flags: utilipvs.FlagHashed,
}, },
bindAddr: true, bindAddr: true,
bindedAddrs: sets.NewString(),
}, },
{ {
// case 4, SCTP, old virtual server is same as new virtual server // case 4, SCTP, old virtual server is same as new virtual server
@ -3144,7 +3154,8 @@ func Test_syncService(t *testing.T) {
Scheduler: "rr", Scheduler: "rr",
Flags: utilipvs.FlagHashed, Flags: utilipvs.FlagHashed,
}, },
bindAddr: false, bindAddr: false,
bindedAddrs: sets.NewString(),
}, },
{ {
// case 5, old virtual server is different from new virtual server // case 5, old virtual server is different from new virtual server
@ -3163,7 +3174,8 @@ func Test_syncService(t *testing.T) {
Scheduler: "rr", Scheduler: "rr",
Flags: utilipvs.FlagPersistent, Flags: utilipvs.FlagPersistent,
}, },
bindAddr: false, bindAddr: false,
bindedAddrs: sets.NewString(),
}, },
{ {
// case 6, old virtual server is different from new virtual server // case 6, old virtual server is different from new virtual server
@ -3182,7 +3194,8 @@ func Test_syncService(t *testing.T) {
Scheduler: "wlc", Scheduler: "wlc",
Flags: utilipvs.FlagHashed, Flags: utilipvs.FlagHashed,
}, },
bindAddr: false, bindAddr: false,
bindedAddrs: sets.NewString(),
}, },
{ {
// case 7, old virtual server is nil, and create new virtual server // case 7, old virtual server is nil, and create new virtual server
@ -3195,7 +3208,28 @@ func Test_syncService(t *testing.T) {
Scheduler: "rr", Scheduler: "rr",
Flags: utilipvs.FlagHashed, Flags: utilipvs.FlagHashed,
}, },
bindAddr: true, bindAddr: true,
bindedAddrs: sets.NewString(),
},
{
// case 8, virtual server address already binded, skip sync
oldVirtualServer: &utilipvs.VirtualServer{
Address: net.ParseIP("1.2.3.4"),
Protocol: string(v1.ProtocolSCTP),
Port: 53,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
svcName: "baz",
newVirtualServer: &utilipvs.VirtualServer{
Address: net.ParseIP("1.2.3.4"),
Protocol: string(v1.ProtocolSCTP),
Port: 53,
Scheduler: "rr",
Flags: utilipvs.FlagHashed,
},
bindAddr: true,
bindedAddrs: sets.NewString("1.2.3.4"),
}, },
} }
@ -3211,7 +3245,7 @@ func Test_syncService(t *testing.T) {
t.Errorf("Case [%d], unexpected add IPVS virtual server error: %v", i, err) t.Errorf("Case [%d], unexpected add IPVS virtual server error: %v", i, err)
} }
} }
if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr); err != nil { if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr, testCases[i].bindedAddrs); err != nil {
t.Errorf("Case [%d], unexpected sync IPVS virtual server error: %v", i, err) t.Errorf("Case [%d], unexpected sync IPVS virtual server error: %v", i, err)
} }
// check // check