Generic sets in netlink and utils

This commit is contained in:
Lars Ekman 2023-02-19 18:08:57 +01:00
parent 17e2c7d535
commit 8d63750c35
6 changed files with 31 additions and 26 deletions

View File

@ -35,9 +35,9 @@ type NetLinkHandle interface {
// GetAllLocalAddresses return all local addresses on the node. // GetAllLocalAddresses return all local addresses on the node.
// Only the addresses of the current family are returned. // Only the addresses of the current family are returned.
// IPv6 link-local and loopback addresses are excluded. // IPv6 link-local and loopback addresses are excluded.
GetAllLocalAddresses() (sets.String, error) GetAllLocalAddresses() (sets.Set[string], error)
// GetLocalAddresses return all local addresses for an interface. // GetLocalAddresses return all local addresses for an interface.
// Only the addresses of the current family are returned. // Only the addresses of the current family are returned.
// IPv6 link-local and loopback addresses are excluded. // IPv6 link-local and loopback addresses are excluded.
GetLocalAddresses(dev string) (sets.String, error) GetLocalAddresses(dev string) (sets.Set[string], error)
} }

View File

@ -129,7 +129,7 @@ func (h *netlinkHandle) ListBindAddress(devName string) ([]string, error) {
// GetAllLocalAddresses return all local addresses on the node. // GetAllLocalAddresses return all local addresses on the node.
// Only the addresses of the current family are returned. // Only the addresses of the current family are returned.
// IPv6 link-local and loopback addresses are excluded. // IPv6 link-local and loopback addresses are excluded.
func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) { func (h *netlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) {
addr, err := net.InterfaceAddrs() addr, err := net.InterfaceAddrs()
if err != nil { if err != nil {
return nil, fmt.Errorf("Could not get addresses: %v", err) return nil, fmt.Errorf("Could not get addresses: %v", err)
@ -140,7 +140,7 @@ func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) {
// GetLocalAddresses return all local addresses for an interface. // GetLocalAddresses return all local addresses for an interface.
// Only the addresses of the current family are returned. // Only the addresses of the current family are returned.
// IPv6 link-local and loopback addresses are excluded. // IPv6 link-local and loopback addresses are excluded.
func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.String, error) { func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error) {
ifi, err := net.InterfaceByName(dev) ifi, err := net.InterfaceByName(dev)
if err != nil { if err != nil {
return nil, fmt.Errorf("Could not get interface %s: %v", dev, err) return nil, fmt.Errorf("Could not get interface %s: %v", dev, err)

View File

@ -62,12 +62,12 @@ func (h *netlinkHandle) ListBindAddress(devName string) ([]string, error) {
} }
// GetAllLocalAddresses is part of interface. // GetAllLocalAddresses is part of interface.
func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) { func (h *netlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) {
return nil, fmt.Errorf("netlink is not supported in this platform") return nil, fmt.Errorf("netlink is not supported in this platform")
} }
// GetLocalAddresses is part of interface. // GetLocalAddresses is part of interface.
func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.String, error) { func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error) {
return nil, fmt.Errorf("netlink is not supported in this platform") return nil, fmt.Errorf("netlink is not supported in this platform")
} }

View File

@ -1026,11 +1026,17 @@ func (proxier *Proxier) syncProxyRules() {
activeIPVSServices := sets.New[string]() activeIPVSServices := sets.New[string]()
// activeBindAddrs Represents addresses we want on the defaultDummyDevice after this round of sync // activeBindAddrs Represents addresses we want on the defaultDummyDevice after this round of sync
activeBindAddrs := sets.New[string]() activeBindAddrs := sets.New[string]()
// alreadyBoundAddrs Represents addresses currently assigned to the dummy interface
bindedAddresses, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice) alreadyBoundAddrs, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice)
if err != nil { if err != nil {
klog.ErrorS(err, "Error listing addresses binded to dummy interface") klog.ErrorS(err, "Error listing addresses binded to dummy interface")
} }
// nodeAddressSet All addresses *except* those on the dummy interface
nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddresses()
if err != nil {
klog.ErrorS(err, "Error listing node addresses")
}
nodeAddressSet = nodeAddressSet.Difference(alreadyBoundAddrs)
hasNodePort := false hasNodePort := false
for _, svc := range proxier.svcPortMap { for _, svc := range proxier.svcPortMap {
@ -1163,7 +1169,7 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagSourceHash serv.Flags |= utilipvs.FlagSourceHash
} }
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
activeIPVSServices.Insert(serv.String()) activeIPVSServices.Insert(serv.String())
activeBindAddrs.Insert(serv.Address.String()) activeBindAddrs.Insert(serv.Address.String())
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
@ -1219,7 +1225,7 @@ func (proxier *Proxier) syncProxyRules() {
if proxier.ipvsScheduler == "mh" { if proxier.ipvsScheduler == "mh" {
serv.Flags |= utilipvs.FlagSourceHash serv.Flags |= utilipvs.FlagSourceHash
} }
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
activeIPVSServices.Insert(serv.String()) activeIPVSServices.Insert(serv.String())
activeBindAddrs.Insert(serv.Address.String()) activeBindAddrs.Insert(serv.Address.String())
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
@ -1322,7 +1328,7 @@ func (proxier *Proxier) syncProxyRules() {
if proxier.ipvsScheduler == "mh" { if proxier.ipvsScheduler == "mh" {
serv.Flags |= utilipvs.FlagSourceHash serv.Flags |= utilipvs.FlagSourceHash
} }
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
activeIPVSServices.Insert(serv.String()) activeIPVSServices.Insert(serv.String())
activeBindAddrs.Insert(serv.Address.String()) activeBindAddrs.Insert(serv.Address.String())
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
@ -1470,7 +1476,7 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagSourceHash serv.Flags |= utilipvs.FlagSourceHash
} }
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil { if err := proxier.syncService(svcPortNameString, serv, false, alreadyBoundAddrs); err == nil {
activeIPVSServices.Insert(serv.String()) activeIPVSServices.Insert(serv.String())
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
@ -1539,7 +1545,6 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Remove superfluous addresses from the dummy device // Remove superfluous addresses from the dummy device
alreadyBoundAddrs := sets.New(bindedAddresses.List()...)
superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs) superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs)
if superfluousAddresses.Len() > 0 { if superfluousAddresses.Len() > 0 {
klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses) klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses)
@ -1883,7 +1888,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
} }
} }
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, bindedAddresses sets.String) error { func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, alreadyBoundAddrs sets.Set[string]) error {
appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs) appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) { if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
if appliedVirtualServer == nil { if appliedVirtualServer == nil {
@ -1906,9 +1911,9 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
// bind service address to dummy interface // bind service address to dummy interface
if bindAddr { if bindAddr {
// always attempt to bind if bindedAddresses is nil, // always attempt to bind if alreadyBoundAddrs is nil,
// otherwise check if it's already binded and return early // otherwise check if it's already binded and return early
if bindedAddresses != nil && bindedAddresses.Has(vs.Address.String()) { if alreadyBoundAddrs != nil && alreadyBoundAddrs.Has(vs.Address.String()) {
return nil return nil
} }

View File

@ -199,8 +199,8 @@ func ShouldSkipService(service *v1.Service) bool {
// AddressSet validates the addresses in the slice using the "isValid" function. // AddressSet validates the addresses in the slice using the "isValid" function.
// Addresses that pass the validation are returned as a string Set. // Addresses that pass the validation are returned as a string Set.
func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.String { func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string] {
ips := sets.NewString() ips := sets.New[string]()
for _, a := range addrs { for _, a := range addrs {
var ip net.IP var ip net.IP
switch v := a.(type) { switch v := a.(type) {

View File

@ -1112,13 +1112,13 @@ func TestAddressSet(t *testing.T) {
name string name string
validator func(ip net.IP) bool validator func(ip net.IP) bool
input []net.Addr input []net.Addr
expected sets.String expected sets.Set[string]
}{ }{
{ {
"Empty", "Empty",
func(ip net.IP) bool { return false }, func(ip net.IP) bool { return false },
nil, nil,
sets.NewString(), nil,
}, },
{ {
"Reject IPAddr x 2", "Reject IPAddr x 2",
@ -1127,7 +1127,7 @@ func TestAddressSet(t *testing.T) {
mustParseIPAddr("8.8.8.8"), mustParseIPAddr("8.8.8.8"),
mustParseIPAddr("1000::"), mustParseIPAddr("1000::"),
}, },
sets.NewString(), nil,
}, },
{ {
"Accept IPAddr x 2", "Accept IPAddr x 2",
@ -1136,7 +1136,7 @@ func TestAddressSet(t *testing.T) {
mustParseIPAddr("8.8.8.8"), mustParseIPAddr("8.8.8.8"),
mustParseIPAddr("1000::"), mustParseIPAddr("1000::"),
}, },
sets.NewString("8.8.8.8", "1000::"), sets.New("8.8.8.8", "1000::"),
}, },
{ {
"Accept IPNet x 2", "Accept IPNet x 2",
@ -1145,7 +1145,7 @@ func TestAddressSet(t *testing.T) {
mustParseIPNet("8.8.8.8/32"), mustParseIPNet("8.8.8.8/32"),
mustParseIPNet("1000::/128"), mustParseIPNet("1000::/128"),
}, },
sets.NewString("8.8.8.8", "1000::"), sets.New("8.8.8.8", "1000::"),
}, },
{ {
"Accept Unix x 2", "Accept Unix x 2",
@ -1154,7 +1154,7 @@ func TestAddressSet(t *testing.T) {
mustParseUnix("/tmp/sock1"), mustParseUnix("/tmp/sock1"),
mustParseUnix("/tmp/sock2"), mustParseUnix("/tmp/sock2"),
}, },
sets.NewString(), nil,
}, },
{ {
"Cidr IPv4", "Cidr IPv4",
@ -1164,7 +1164,7 @@ func TestAddressSet(t *testing.T) {
mustParseIPAddr("1000::"), mustParseIPAddr("1000::"),
mustParseIPAddr("192.168.1.1"), mustParseIPAddr("192.168.1.1"),
}, },
sets.NewString("192.168.1.1"), sets.New("192.168.1.1"),
}, },
{ {
"Cidr IPv6", "Cidr IPv6",
@ -1174,7 +1174,7 @@ func TestAddressSet(t *testing.T) {
mustParseIPAddr("1000::"), mustParseIPAddr("1000::"),
mustParseIPAddr("192.168.1.1"), mustParseIPAddr("192.168.1.1"),
}, },
sets.NewString("1000::"), sets.New("1000::"),
}, },
} }