From 547db63bdf63eb1e068a72d39d788bce39619c9f Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Sun, 19 Feb 2023 15:05:02 +0100 Subject: [PATCH 1/8] Drop the IPGetter --- pkg/proxy/ipvs/proxier.go | 30 +++++------------------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 48c8c391b87..c619a451b6d 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -264,8 +264,6 @@ type Proxier struct { healthzServer healthcheck.ProxierHealthUpdater ipvsScheduler string - // Added as a member to the struct to allow injection for testing. - ipGetter IPGetter // The following buffers are used to reuse memory and avoid allocations // that are significantly impacting performance. iptablesData *bytes.Buffer @@ -300,18 +298,6 @@ type Proxier struct { serviceNoLocalEndpointsExternal sets.String } -// IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface -type IPGetter interface { - NodeIPs() ([]net.IP, error) - BindedIPs() (sets.String, error) -} - -// realIPGetter is a real NodeIP handler, it implements IPGetter. -type realIPGetter struct { - // nl is a handle for revoking netlink interface - nl NetLinkHandle -} - // NodeIPs returns all LOCAL type IP addresses from host which are // taken as the Node IPs of NodePort service. Filtered addresses: // @@ -319,15 +305,15 @@ type realIPGetter struct { // - Addresses of the "other" family (not handled by this proxier instance) // - Link-local IPv6 addresses // - Addresses on the created dummy device `kube-ipvs0` -func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) { +func (p *Proxier) ipGetterNodeIPs() (ips []net.IP, err error) { - nodeAddress, err := r.nl.GetAllLocalAddresses() + nodeAddress, err := p.netlinkHandle.GetAllLocalAddresses() if err != nil { return nil, fmt.Errorf("error listing LOCAL type addresses from host, error: %v", err) } // We must exclude the addresses on the IPVS dummy interface - bindedAddress, err := r.BindedIPs() + bindedAddress, err := p.netlinkHandle.GetLocalAddresses(defaultDummyDevice) if err != nil { return nil, err } @@ -341,11 +327,6 @@ func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) { return ips, nil } -// BindedIPs returns all addresses that are binded to the IPVS dummy interface kube-ipvs0 -func (r *realIPGetter) BindedIPs() (sets.String, error) { - return r.nl.GetLocalAddresses(defaultDummyDevice) -} - // Proxier implements proxy.Provider var _ proxy.Provider = &Proxier{} @@ -484,7 +465,6 @@ func NewProxier(ipFamily v1.IPFamily, healthzServer: healthzServer, ipvs: ipvs, ipvsScheduler: scheduler, - ipGetter: &realIPGetter{nl: NewNetLinkHandle(ipFamily == v1.IPv6Protocol)}, iptablesData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil), natChains: utilproxy.LineBuffer{}, @@ -1049,7 +1029,7 @@ func (proxier *Proxier) syncProxyRules() { // activeBindAddrs represents ip address successfully bind to defaultDummyDevice in this round of sync activeBindAddrs := map[string]bool{} - bindedAddresses, err := proxier.ipGetter.BindedIPs() + bindedAddresses, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice) if err != nil { klog.ErrorS(err, "Error listing addresses binded to dummy interface") } @@ -1084,7 +1064,7 @@ func (proxier *Proxier) syncProxyRules() { continue } if utilproxy.IsZeroCIDR(address) { - nodeIPs, err = proxier.ipGetter.NodeIPs() + nodeIPs, err = proxier.ipGetterNodeIPs() if err != nil { klog.ErrorS(err, "Failed to list all node IPs from host") } From fbe671d3f0de52df8be1bd2cf6d70ef2fcc62d8b Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Sun, 19 Feb 2023 16:33:14 +0100 Subject: [PATCH 2/8] Use generic sets --- pkg/proxy/ipvs/proxier.go | 76 +++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index c619a451b6d..257a5512a79 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -286,16 +286,16 @@ type Proxier struct { // due to the absence of local endpoints when the internal traffic policy is "Local". // It is used to publish the sync_proxy_rules_no_endpoints_total // metric with the traffic_policy label set to "internal". - // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service + // A Set is used here since we end up calculating endpoint topology multiple times for the same Service // if it has multiple ports but each Service should only be counted once. - serviceNoLocalEndpointsInternal sets.String + serviceNoLocalEndpointsInternal sets.Set[string] // serviceNoLocalEndpointsExternal represents the set of services that couldn't be applied // due to the absence of any endpoints when the external traffic policy is "Local". // It is used to publish the sync_proxy_rules_no_endpoints_total // metric with the traffic_policy label set to "external". - // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service + // A Set is used here since we end up calculating endpoint topology multiple times for the same Service // if it has multiple ports but each Service should only be counted once. - serviceNoLocalEndpointsExternal sets.String + serviceNoLocalEndpointsExternal sets.Set[string] } // NodeIPs returns all LOCAL type IP addresses from host which are @@ -990,8 +990,8 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).InfoS("Syncing ipvs proxier rules") - proxier.serviceNoLocalEndpointsInternal = sets.NewString() - proxier.serviceNoLocalEndpointsExternal = sets.NewString() + proxier.serviceNoLocalEndpointsInternal = sets.New[string]() + proxier.serviceNoLocalEndpointsExternal = sets.New[string]() // Begin install iptables // Reset all buffers used later. @@ -1023,11 +1023,11 @@ func (proxier *Proxier) syncProxyRules() { } // activeIPVSServices represents IPVS service successfully created in this round of sync - activeIPVSServices := map[string]bool{} + activeIPVSServices := sets.New[string]() // currentIPVSServices represent IPVS services listed from the system currentIPVSServices := make(map[string]*utilipvs.VirtualServer) // activeBindAddrs represents ip address successfully bind to defaultDummyDevice in this round of sync - activeBindAddrs := map[string]bool{} + activeBindAddrs := sets.New[string]() bindedAddresses, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice) if err != nil { @@ -1166,8 +1166,8 @@ func (proxier *Proxier) syncProxyRules() { } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { - activeIPVSServices[serv.String()] = true - activeBindAddrs[serv.Address.String()] = true + activeIPVSServices.Insert(serv.String()) + activeBindAddrs.Insert(serv.Address.String()) // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. internalNodeLocal := false @@ -1222,9 +1222,8 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagSourceHash } if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { - activeIPVSServices[serv.String()] = true - activeBindAddrs[serv.Address.String()] = true - + activeIPVSServices.Insert(serv.String()) + activeBindAddrs.Insert(serv.Address.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) } @@ -1326,8 +1325,8 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagSourceHash } if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { - activeIPVSServices[serv.String()] = true - activeBindAddrs[serv.Address.String()] = true + activeIPVSServices.Insert(serv.String()) + activeBindAddrs.Insert(serv.Address.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) } @@ -1474,7 +1473,7 @@ func (proxier *Proxier) syncProxyRules() { } // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil { - activeIPVSServices[serv.String()] = true + activeIPVSServices.Insert(serv.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) } @@ -1541,13 +1540,17 @@ func (proxier *Proxier) syncProxyRules() { } } - // Get legacy bind address - // currentBindAddrs represents ip addresses bind to defaultDummyDevice from the system - currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(defaultDummyDevice) - if err != nil { - klog.ErrorS(err, "Failed to get bind address") + // Remove superfluous addresses from the dummy device + alreadyBoundAddrs := sets.New(bindedAddresses.List()...) + superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs) + if superfluousAddresses.Len() > 0 { + klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses) + for adr := range superfluousAddresses { + if err := proxier.netlinkHandle.UnbindAddress(adr, defaultDummyDevice); err != nil { + klog.ErrorS(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr) + } + } } - legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs) // Clean up legacy IPVS services and unbind addresses appliedSvcs, err := proxier.ipvs.GetVirtualServers() @@ -1558,7 +1561,7 @@ func (proxier *Proxier) syncProxyRules() { } else { klog.ErrorS(err, "Failed to get ipvs service") } - proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs) + proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices) if proxier.healthzServer != nil { proxier.healthzServer.Updated() @@ -2065,32 +2068,20 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode return nil } -func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) { - isIPv6 := netutils.IsIPv6(proxier.nodeIP) - for cs := range currentServices { - svc := currentServices[cs] +func (proxier *Proxier) cleanLegacyService(activeServices sets.Set[string], currentServices map[string]*utilipvs.VirtualServer) { + for cs, svc := range currentServices { if proxier.isIPInExcludeCIDRs(svc.Address) { continue } - if netutils.IsIPv6(svc.Address) != isIPv6 { + if getIPFamily(svc.Address) != proxier.ipFamily { // Not our family continue } - if _, ok := activeServices[cs]; !ok { + if !activeServices.Has(cs) { klog.V(4).InfoS("Delete service", "virtualServer", svc) if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil { klog.ErrorS(err, "Failed to delete service", "virtualServer", svc) } - addr := svc.Address.String() - if _, ok := legacyBindAddrs[addr]; ok { - klog.V(4).InfoS("Unbinding address", "address", addr) - if err := proxier.netlinkHandle.UnbindAddress(addr, defaultDummyDevice); err != nil { - klog.ErrorS(err, "Failed to unbind service from dummy interface", "interface", defaultDummyDevice, "address", addr) - } else { - // In case we delete a multi-port service, avoid trying to unbind multiple times - delete(legacyBindAddrs, addr) - } - } } } } @@ -2120,6 +2111,13 @@ func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, curre return legacyAddrs } +func getIPFamily(ip net.IP) v1.IPFamily { + if netutils.IsIPv4(ip) { + return v1.IPv4Protocol + } + return v1.IPv6Protocol +} + // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets // It will only operate iptables *nat table. // Create and link the kube postrouting chain for SNAT packets. From 3325c7031deb6509c0b49db7972d7e5425aec076 Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Sun, 19 Feb 2023 17:48:13 +0100 Subject: [PATCH 3/8] Generic sets in ipset.go --- pkg/proxy/ipvs/ipset.go | 12 ++++++------ pkg/proxy/ipvs/proxier.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go index 6cb4736f7a8..f77280d6bcf 100644 --- a/pkg/proxy/ipvs/ipset.go +++ b/pkg/proxy/ipvs/ipset.go @@ -93,7 +93,7 @@ type IPSetVersioner interface { type IPSet struct { utilipset.IPSet // activeEntries is the current active entries of the ipset. - activeEntries sets.String + activeEntries sets.Set[string] // handle is the util ipset interface handle. handle utilipset.Interface } @@ -125,7 +125,7 @@ func NewIPSet(handle utilipset.Interface, name string, setType utilipset.Type, i HashFamily: hashFamily, Comment: comment, }, - activeEntries: sets.NewString(), + activeEntries: sets.New[string](), handle: handle, } return set @@ -144,7 +144,7 @@ func (set *IPSet) getComment() string { } func (set *IPSet) resetEntries() { - set.activeEntries = sets.NewString() + set.activeEntries = sets.New[string]() } func (set *IPSet) syncIPSetEntries() { @@ -155,14 +155,14 @@ func (set *IPSet) syncIPSetEntries() { } // currentIPSetEntries represents Endpoints watched from API Server. - currentIPSetEntries := sets.NewString() + currentIPSetEntries := sets.New[string]() for _, appliedEntry := range appliedEntries { currentIPSetEntries.Insert(appliedEntry) } if !set.activeEntries.Equal(currentIPSetEntries) { // Clean legacy entries - for _, entry := range currentIPSetEntries.Difference(set.activeEntries).List() { + for _, entry := range currentIPSetEntries.Difference(set.activeEntries).UnsortedList() { if err := set.handle.DelEntry(entry, set.Name); err != nil { if !utilipset.IsNotFoundError(err) { klog.ErrorS(err, "Failed to delete ip set entry from ip set", "ipSetEntry", entry, "ipSet", set.Name) @@ -172,7 +172,7 @@ func (set *IPSet) syncIPSetEntries() { } } // Create active entries - for _, entry := range set.activeEntries.Difference(currentIPSetEntries).List() { + for _, entry := range set.activeEntries.Difference(currentIPSetEntries).UnsortedList() { if err := set.handle.AddEntry(entry, &set.IPSet, true); err != nil { klog.ErrorS(err, "Failed to add ip set entry to ip set", "ipSetEntry", entry, "ipSet", set.Name) } else { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 257a5512a79..e9d073d09f8 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1501,7 +1501,7 @@ func (proxier *Proxier) syncProxyRules() { } // Set the KUBE-IPVS-IPS set to the "activeBindAddrs" - proxier.ipsetList[kubeIPVSSet].activeEntries = sets.StringKeySet(activeBindAddrs) + proxier.ipsetList[kubeIPVSSet].activeEntries = activeBindAddrs // sync ipset entries for _, set := range proxier.ipsetList { From 17e2c7d5351a93bc2a68063e0a6c27c8c73c3430 Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Sun, 19 Feb 2023 17:54:00 +0100 Subject: [PATCH 4/8] Move variable closer to it's use --- pkg/proxy/ipvs/proxier.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index e9d073d09f8..0ce9a8609ff 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1024,9 +1024,7 @@ func (proxier *Proxier) syncProxyRules() { // activeIPVSServices represents IPVS service successfully created in this round of sync activeIPVSServices := sets.New[string]() - // currentIPVSServices represent IPVS services listed from the system - currentIPVSServices := make(map[string]*utilipvs.VirtualServer) - // activeBindAddrs represents ip address successfully bind to defaultDummyDevice in this round of sync + // activeBindAddrs Represents addresses we want on the defaultDummyDevice after this round of sync activeBindAddrs := sets.New[string]() bindedAddresses, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice) @@ -1552,7 +1550,9 @@ func (proxier *Proxier) syncProxyRules() { } } - // Clean up legacy IPVS services and unbind addresses + // currentIPVSServices represent IPVS services listed from the system + // (including any we have created in this sync) + currentIPVSServices := make(map[string]*utilipvs.VirtualServer) appliedSvcs, err := proxier.ipvs.GetVirtualServers() if err == nil { for _, appliedSvc := range appliedSvcs { From 8d63750c35dd3fd631a1c1f2f9680a903ea21b54 Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Sun, 19 Feb 2023 18:08:57 +0100 Subject: [PATCH 5/8] Generic sets in netlink and utils --- pkg/proxy/ipvs/netlink.go | 4 ++-- pkg/proxy/ipvs/netlink_linux.go | 4 ++-- pkg/proxy/ipvs/netlink_unsupported.go | 4 ++-- pkg/proxy/ipvs/proxier.go | 25 +++++++++++++++---------- pkg/proxy/util/utils.go | 4 ++-- pkg/proxy/util/utils_test.go | 16 ++++++++-------- 6 files changed, 31 insertions(+), 26 deletions(-) diff --git a/pkg/proxy/ipvs/netlink.go b/pkg/proxy/ipvs/netlink.go index 2d77bb32044..ab0b9eaaa14 100644 --- a/pkg/proxy/ipvs/netlink.go +++ b/pkg/proxy/ipvs/netlink.go @@ -35,9 +35,9 @@ type NetLinkHandle interface { // GetAllLocalAddresses return all local addresses on the node. // Only the addresses of the current family are returned. // IPv6 link-local and loopback addresses are excluded. - GetAllLocalAddresses() (sets.String, error) + GetAllLocalAddresses() (sets.Set[string], error) // GetLocalAddresses return all local addresses for an interface. // Only the addresses of the current family are returned. // IPv6 link-local and loopback addresses are excluded. - GetLocalAddresses(dev string) (sets.String, error) + GetLocalAddresses(dev string) (sets.Set[string], error) } diff --git a/pkg/proxy/ipvs/netlink_linux.go b/pkg/proxy/ipvs/netlink_linux.go index ebaae4b4bee..f4d2368885d 100644 --- a/pkg/proxy/ipvs/netlink_linux.go +++ b/pkg/proxy/ipvs/netlink_linux.go @@ -129,7 +129,7 @@ func (h *netlinkHandle) ListBindAddress(devName string) ([]string, error) { // GetAllLocalAddresses return all local addresses on the node. // Only the addresses of the current family are returned. // IPv6 link-local and loopback addresses are excluded. -func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) { +func (h *netlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) { addr, err := net.InterfaceAddrs() if err != nil { return nil, fmt.Errorf("Could not get addresses: %v", err) @@ -140,7 +140,7 @@ func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) { // GetLocalAddresses return all local addresses for an interface. // Only the addresses of the current family are returned. // IPv6 link-local and loopback addresses are excluded. -func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.String, error) { +func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error) { ifi, err := net.InterfaceByName(dev) if err != nil { return nil, fmt.Errorf("Could not get interface %s: %v", dev, err) diff --git a/pkg/proxy/ipvs/netlink_unsupported.go b/pkg/proxy/ipvs/netlink_unsupported.go index eec2f5bada7..31f3fb7406b 100644 --- a/pkg/proxy/ipvs/netlink_unsupported.go +++ b/pkg/proxy/ipvs/netlink_unsupported.go @@ -62,12 +62,12 @@ func (h *netlinkHandle) ListBindAddress(devName string) ([]string, error) { } // GetAllLocalAddresses is part of interface. -func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) { +func (h *netlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) { return nil, fmt.Errorf("netlink is not supported in this platform") } // GetLocalAddresses is part of interface. -func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.String, error) { +func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error) { return nil, fmt.Errorf("netlink is not supported in this platform") } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 0ce9a8609ff..2c0f6c01176 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1026,11 +1026,17 @@ func (proxier *Proxier) syncProxyRules() { activeIPVSServices := sets.New[string]() // activeBindAddrs Represents addresses we want on the defaultDummyDevice after this round of sync activeBindAddrs := sets.New[string]() - - bindedAddresses, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice) + // alreadyBoundAddrs Represents addresses currently assigned to the dummy interface + alreadyBoundAddrs, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice) if err != nil { klog.ErrorS(err, "Error listing addresses binded to dummy interface") } + // nodeAddressSet All addresses *except* those on the dummy interface + nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddresses() + if err != nil { + klog.ErrorS(err, "Error listing node addresses") + } + nodeAddressSet = nodeAddressSet.Difference(alreadyBoundAddrs) hasNodePort := false for _, svc := range proxier.svcPortMap { @@ -1163,7 +1169,7 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagSourceHash } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() - if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { + if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil { activeIPVSServices.Insert(serv.String()) activeBindAddrs.Insert(serv.Address.String()) // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP @@ -1219,7 +1225,7 @@ func (proxier *Proxier) syncProxyRules() { if proxier.ipvsScheduler == "mh" { serv.Flags |= utilipvs.FlagSourceHash } - if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { + if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil { activeIPVSServices.Insert(serv.String()) activeBindAddrs.Insert(serv.Address.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { @@ -1322,7 +1328,7 @@ func (proxier *Proxier) syncProxyRules() { if proxier.ipvsScheduler == "mh" { serv.Flags |= utilipvs.FlagSourceHash } - if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { + if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil { activeIPVSServices.Insert(serv.String()) activeBindAddrs.Insert(serv.Address.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { @@ -1470,7 +1476,7 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagSourceHash } // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. - if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil { + if err := proxier.syncService(svcPortNameString, serv, false, alreadyBoundAddrs); err == nil { activeIPVSServices.Insert(serv.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) @@ -1539,7 +1545,6 @@ func (proxier *Proxier) syncProxyRules() { } // Remove superfluous addresses from the dummy device - alreadyBoundAddrs := sets.New(bindedAddresses.List()...) superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs) if superfluousAddresses.Len() > 0 { klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses) @@ -1883,7 +1888,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE } } -func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, bindedAddresses sets.String) error { +func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, alreadyBoundAddrs sets.Set[string]) error { appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs) if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) { if appliedVirtualServer == nil { @@ -1906,9 +1911,9 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, // bind service address to dummy interface if bindAddr { - // always attempt to bind if bindedAddresses is nil, + // always attempt to bind if alreadyBoundAddrs is nil, // otherwise check if it's already binded and return early - if bindedAddresses != nil && bindedAddresses.Has(vs.Address.String()) { + if alreadyBoundAddrs != nil && alreadyBoundAddrs.Has(vs.Address.String()) { return nil } diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index b67a1c1de49..319daf27b4a 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -199,8 +199,8 @@ func ShouldSkipService(service *v1.Service) bool { // AddressSet validates the addresses in the slice using the "isValid" function. // Addresses that pass the validation are returned as a string Set. -func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.String { - ips := sets.NewString() +func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string] { + ips := sets.New[string]() for _, a := range addrs { var ip net.IP switch v := a.(type) { diff --git a/pkg/proxy/util/utils_test.go b/pkg/proxy/util/utils_test.go index 750fdf8a2d2..39e0ac573fb 100644 --- a/pkg/proxy/util/utils_test.go +++ b/pkg/proxy/util/utils_test.go @@ -1112,13 +1112,13 @@ func TestAddressSet(t *testing.T) { name string validator func(ip net.IP) bool input []net.Addr - expected sets.String + expected sets.Set[string] }{ { "Empty", func(ip net.IP) bool { return false }, nil, - sets.NewString(), + nil, }, { "Reject IPAddr x 2", @@ -1127,7 +1127,7 @@ func TestAddressSet(t *testing.T) { mustParseIPAddr("8.8.8.8"), mustParseIPAddr("1000::"), }, - sets.NewString(), + nil, }, { "Accept IPAddr x 2", @@ -1136,7 +1136,7 @@ func TestAddressSet(t *testing.T) { mustParseIPAddr("8.8.8.8"), mustParseIPAddr("1000::"), }, - sets.NewString("8.8.8.8", "1000::"), + sets.New("8.8.8.8", "1000::"), }, { "Accept IPNet x 2", @@ -1145,7 +1145,7 @@ func TestAddressSet(t *testing.T) { mustParseIPNet("8.8.8.8/32"), mustParseIPNet("1000::/128"), }, - sets.NewString("8.8.8.8", "1000::"), + sets.New("8.8.8.8", "1000::"), }, { "Accept Unix x 2", @@ -1154,7 +1154,7 @@ func TestAddressSet(t *testing.T) { mustParseUnix("/tmp/sock1"), mustParseUnix("/tmp/sock2"), }, - sets.NewString(), + nil, }, { "Cidr IPv4", @@ -1164,7 +1164,7 @@ func TestAddressSet(t *testing.T) { mustParseIPAddr("1000::"), mustParseIPAddr("192.168.1.1"), }, - sets.NewString("192.168.1.1"), + sets.New("192.168.1.1"), }, { "Cidr IPv6", @@ -1174,7 +1174,7 @@ func TestAddressSet(t *testing.T) { mustParseIPAddr("1000::"), mustParseIPAddr("192.168.1.1"), }, - sets.NewString("1000::"), + sets.New("1000::"), }, } From 32f80661199c34f5e891b86cb2c3abf47514db45 Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Sun, 19 Feb 2023 18:12:10 +0100 Subject: [PATCH 6/8] Simplification and cleanup --- pkg/proxy/ipvs/proxier.go | 50 +++++---------------------------------- 1 file changed, 6 insertions(+), 44 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 2c0f6c01176..801bc220036 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -298,35 +298,6 @@ type Proxier struct { serviceNoLocalEndpointsExternal sets.Set[string] } -// NodeIPs returns all LOCAL type IP addresses from host which are -// taken as the Node IPs of NodePort service. Filtered addresses: -// -// - Loopback addresses -// - Addresses of the "other" family (not handled by this proxier instance) -// - Link-local IPv6 addresses -// - Addresses on the created dummy device `kube-ipvs0` -func (p *Proxier) ipGetterNodeIPs() (ips []net.IP, err error) { - - nodeAddress, err := p.netlinkHandle.GetAllLocalAddresses() - if err != nil { - return nil, fmt.Errorf("error listing LOCAL type addresses from host, error: %v", err) - } - - // We must exclude the addresses on the IPVS dummy interface - bindedAddress, err := p.netlinkHandle.GetLocalAddresses(defaultDummyDevice) - if err != nil { - return nil, err - } - ipset := nodeAddress.Difference(bindedAddress) - - // translate ip string to IP - for _, ipStr := range ipset.UnsortedList() { - a := netutils.ParseIPSloppy(ipStr) - ips = append(ips, a) - } - return ips, nil -} - // Proxier implements proxy.Provider var _ proxy.Provider = &Proxier{} @@ -1068,28 +1039,19 @@ func (proxier *Proxier) syncProxyRules() { continue } if utilproxy.IsZeroCIDR(address) { - nodeIPs, err = proxier.ipGetterNodeIPs() - if err != nil { - klog.ErrorS(err, "Failed to list all node IPs from host") + nodeIPs = nil + for _, ipStr := range nodeAddressSet.UnsortedList() { + nodeIPs = append(nodeIPs, netutils.ParseIPSloppy(ipStr)) } break } - nodeIPs = append(nodeIPs, a) + if getIPFamily(a) == proxier.ipFamily { + nodeIPs = append(nodeIPs, a) + } } } } - // filter node IPs by proxier ipfamily - idx := 0 - for _, nodeIP := range nodeIPs { - if (proxier.ipFamily == v1.IPv6Protocol) == netutils.IsIPv6(nodeIP) { - nodeIPs[idx] = nodeIP - idx++ - } - } - // reset slice to filtered entries - nodeIPs = nodeIPs[:idx] - // Build IPVS rules for each service. for svcPortName, svcPort := range proxier.svcPortMap { svcInfo, ok := svcPort.(*servicePortInfo) From 6ad09dc418e5187bb4faf679a5ec3963ade763fa Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Sun, 19 Feb 2023 18:14:38 +0100 Subject: [PATCH 7/8] Update unit-tests --- pkg/proxy/ipvs/proxier_test.go | 260 +++++++--------------------- pkg/proxy/ipvs/testing/fake.go | 11 +- pkg/proxy/ipvs/testing/fake_test.go | 31 ++-- 3 files changed, 90 insertions(+), 212 deletions(-) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 0e1e8b5383e..889227ffc46 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -62,19 +62,6 @@ import ( const testHostname = "test-hostname" -type fakeIPGetter struct { - nodeIPs []net.IP - bindedIPs sets.String -} - -func (f *fakeIPGetter) NodeIPs() ([]net.IP, error) { - return f.nodeIPs, nil -} - -func (f *fakeIPGetter) BindedIPs() (sets.String, error) { - return f.bindedIPs, nil -} - // fakeIpvs implements utilipvs.Interface type fakeIpvs struct { ipvsErr string @@ -139,21 +126,10 @@ func (fake *fakeIPSetVersioner) GetVersion() (string, error) { return fake.version, fake.err } -func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP, excludeCIDRs []*net.IPNet, ipFamily v1.IPFamily) *Proxier { - // unlike actual proxier, this fake proxier does not filter node IPs per family requested - // which can lead to false postives. +func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []string, excludeCIDRs []*net.IPNet, ipFamily v1.IPFamily) *Proxier { - // filter node IPs by proxier ipfamily - idx := 0 - for _, nodeIP := range nodeIPs { - if (ipFamily == v1.IPv6Protocol) == netutils.IsIPv6(nodeIP) { - nodeIPs[idx] = nodeIP - idx++ - } - } - - // reset slice to filtered entries - nodeIPs = nodeIPs[:idx] + netlinkHandle := netlinktest.NewFakeNetlinkHandle(ipFamily == v1.IPv6Protocol) + netlinkHandle.SetLocalAddresses("eth0", nodeIPs...) fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ @@ -188,14 +164,13 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u hostname: testHostname, serviceHealthServer: healthcheck.NewFakeServiceHealthServer(), ipvsScheduler: defaultScheduler, - ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, iptablesData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil), natChains: utilproxy.LineBuffer{}, natRules: utilproxy.LineBuffer{}, filterChains: utilproxy.LineBuffer{}, filterRules: utilproxy.LineBuffer{}, - netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + netlinkHandle: netlinkHandle, ipsetList: ipsetList, nodePortAddresses: make([]string, 0), networkInterfacer: proxyutiltest.NewFakeNetwork(), @@ -438,23 +413,22 @@ func TestGetNodeIPs(t *testing.T) { }, } - for i := range testCases { - fake := netlinktest.NewFakeNetlinkHandle() - fake.IsIPv6 = testCases[i].isIPv6 + for i, tc := range testCases { + fake := netlinktest.NewFakeNetlinkHandle(tc.isIPv6) for dev, addresses := range testCases[i].devAddresses { fake.SetLocalAddresses(dev, addresses...) } - r := realIPGetter{nl: fake} - ips, err := r.NodeIPs() + ips, err := fake.GetAllLocalAddresses() if err != nil { t.Errorf("Unexpected error: %v", err) } - ipStrs := sets.NewString() - for _, ip := range ips { - ipStrs.Insert(ip.String()) + devIps, err := fake.GetLocalAddresses("kube-ipvs0") + if err != nil { + t.Errorf("Unexpected error: %v", err) } - if !ipStrs.Equal(sets.NewString(testCases[i].expectIPs...)) { - t.Errorf("case[%d], unexpected mismatch, expected: %v, got: %v", i, testCases[i].expectIPs, ips) + ips = ips.Difference(devIps) + if !ips.Equal(sets.New(tc.expectIPs...)) { + t.Errorf("case[%d], unexpected mismatch, expected: %v, got: %v", i, tc.expectIPs, ips) } } } @@ -467,7 +441,7 @@ func TestNodePortIPv4(t *testing.T) { name string services []*v1.Service endpoints []*discovery.EndpointSlice - nodeIPs []net.IP + nodeIPs []string nodePortAddresses []string expectedIPVS *ipvstest.FakeIPVS expectedIPSets netlinktest.ExpectedIPSet @@ -511,10 +485,7 @@ func TestNodePortIPv4(t *testing.T) { }} }), }, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("100.101.102.103"), - netutils.ParseIPSloppy("2001:db8::1:1"), - }, + nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ @@ -592,9 +563,7 @@ func TestNodePortIPv4(t *testing.T) { }} }), }, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("100.101.102.103"), - }, + nodeIPs: []string{"100.101.102.103"}, nodePortAddresses: []string{"0.0.0.0/0"}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ @@ -682,10 +651,8 @@ func TestNodePortIPv4(t *testing.T) { }} }), }, - endpoints: []*discovery.EndpointSlice{}, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("100.101.102.103"), - }, + endpoints: []*discovery.EndpointSlice{}, + nodeIPs: []string{"100.101.102.103"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ @@ -751,13 +718,13 @@ func TestNodePortIPv4(t *testing.T) { }} }), }, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("100.101.102.103"), - netutils.ParseIPSloppy("100.101.102.104"), - netutils.ParseIPSloppy("100.101.102.105"), - netutils.ParseIPSloppy("2001:db8::1:1"), - netutils.ParseIPSloppy("2001:db8::1:2"), - netutils.ParseIPSloppy("2001:db8::1:3"), + nodeIPs: []string{ + "100.101.102.103", + "100.101.102.104", + "100.101.102.105", + "2001:db8::1:1", + "2001:db8::1:2", + "2001:db8::1:3", }, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ @@ -905,9 +872,7 @@ func TestNodePortIPv4(t *testing.T) { }} }), }, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("100.101.102.103"), - }, + nodeIPs: []string{"100.101.102.103"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ @@ -1030,7 +995,7 @@ func TestNodePortIPv6(t *testing.T) { name string services []*v1.Service endpoints []*discovery.EndpointSlice - nodeIPs []net.IP + nodeIPs []string nodePortAddresses []string expectedIPVS *ipvstest.FakeIPVS expectedIPSets netlinktest.ExpectedIPSet @@ -1074,10 +1039,7 @@ func TestNodePortIPv6(t *testing.T) { }} }), }, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("100.101.102.103"), - netutils.ParseIPSloppy("2001:db8::1:1"), - }, + nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ @@ -1157,9 +1119,7 @@ func TestNodePortIPv6(t *testing.T) { }} }), }, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("100.101.102.103"), - }, + nodeIPs: []string{"100.101.102.103"}, nodePortAddresses: []string{"0.0.0.0/0"}, /*since this is a node with only IPv4, proxier should not do anything */ expectedIPVS: &ipvstest.FakeIPVS{ @@ -1184,11 +1144,8 @@ func TestNodePortIPv6(t *testing.T) { }} }), }, - endpoints: []*discovery.EndpointSlice{}, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("100.101.102.103"), - netutils.ParseIPSloppy("2001:db8::1:1"), - }, + endpoints: []*discovery.EndpointSlice{}, + nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ @@ -1255,10 +1212,7 @@ func TestNodePortIPv6(t *testing.T) { }} }), }, - nodeIPs: []net.IP{ - netutils.ParseIPSloppy("2001:db8::1:1"), - netutils.ParseIPSloppy("2001:db8::1:2"), - }, + nodeIPs: []string{"2001:db8::1:1", "2001:db8::1:2"}, nodePortAddresses: []string{}, expectedIPVS: &ipvstest.FakeIPVS{ Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{ @@ -2794,8 +2748,8 @@ func TestSessionAffinity(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - nodeIP := netutils.ParseIPSloppy("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil, v1.IPv4Protocol) + nodeIP := "100.101.102.103" + fp := NewFakeProxier(ipt, ipvs, ipset, []string{nodeIP}, nil, v1.IPv4Protocol) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -3706,11 +3660,11 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe func Test_syncService(t *testing.T) { testCases := []struct { - oldVirtualServer *utilipvs.VirtualServer - svcName string - newVirtualServer *utilipvs.VirtualServer - bindAddr bool - bindedAddrs sets.String + oldVirtualServer *utilipvs.VirtualServer + svcName string + newVirtualServer *utilipvs.VirtualServer + bindAddr bool + alreadyBoundAddrs sets.Set[string] }{ { // case 0, old virtual server is same as new virtual server @@ -3729,8 +3683,8 @@ func Test_syncService(t *testing.T) { Scheduler: "rr", Flags: utilipvs.FlagHashed, }, - bindAddr: false, - bindedAddrs: sets.NewString(), + bindAddr: false, + alreadyBoundAddrs: nil, }, { // case 1, old virtual server is different from new virtual server @@ -3749,8 +3703,8 @@ func Test_syncService(t *testing.T) { Scheduler: "rr", Flags: utilipvs.FlagPersistent, }, - bindAddr: false, - bindedAddrs: sets.NewString(), + bindAddr: false, + alreadyBoundAddrs: nil, }, { // case 2, old virtual server is different from new virtual server @@ -3769,8 +3723,8 @@ func Test_syncService(t *testing.T) { Scheduler: "wlc", Flags: utilipvs.FlagHashed, }, - bindAddr: false, - bindedAddrs: sets.NewString(), + bindAddr: false, + alreadyBoundAddrs: nil, }, { // case 3, old virtual server is nil, and create new virtual server @@ -3783,8 +3737,8 @@ func Test_syncService(t *testing.T) { Scheduler: "rr", Flags: utilipvs.FlagHashed, }, - bindAddr: true, - bindedAddrs: sets.NewString(), + bindAddr: true, + alreadyBoundAddrs: nil, }, { // case 4, SCTP, old virtual server is same as new virtual server @@ -3803,8 +3757,8 @@ func Test_syncService(t *testing.T) { Scheduler: "rr", Flags: utilipvs.FlagHashed, }, - bindAddr: false, - bindedAddrs: sets.NewString(), + bindAddr: false, + alreadyBoundAddrs: nil, }, { // case 5, old virtual server is different from new virtual server @@ -3823,8 +3777,8 @@ func Test_syncService(t *testing.T) { Scheduler: "rr", Flags: utilipvs.FlagPersistent, }, - bindAddr: false, - bindedAddrs: sets.NewString(), + bindAddr: false, + alreadyBoundAddrs: nil, }, { // case 6, old virtual server is different from new virtual server @@ -3843,8 +3797,8 @@ func Test_syncService(t *testing.T) { Scheduler: "wlc", Flags: utilipvs.FlagHashed, }, - bindAddr: false, - bindedAddrs: sets.NewString(), + bindAddr: false, + alreadyBoundAddrs: nil, }, { // case 7, old virtual server is nil, and create new virtual server @@ -3857,8 +3811,8 @@ func Test_syncService(t *testing.T) { Scheduler: "rr", Flags: utilipvs.FlagHashed, }, - bindAddr: true, - bindedAddrs: sets.NewString(), + bindAddr: true, + alreadyBoundAddrs: sets.New[string](), }, { // case 8, virtual server address already binded, skip sync @@ -3877,8 +3831,8 @@ func Test_syncService(t *testing.T) { Scheduler: "rr", Flags: utilipvs.FlagHashed, }, - bindAddr: true, - bindedAddrs: sets.NewString("1.2.3.4"), + bindAddr: true, + alreadyBoundAddrs: sets.New("1.2.3.4"), }, } @@ -3894,7 +3848,7 @@ func Test_syncService(t *testing.T) { t.Errorf("Case [%d], unexpected add IPVS virtual server error: %v", i, err) } } - if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr, testCases[i].bindedAddrs); err != nil { + if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr, testCases[i].alreadyBoundAddrs); err != nil { t.Errorf("Case [%d], unexpected sync IPVS virtual server error: %v", i, err) } // check @@ -4012,7 +3966,7 @@ func TestCleanLegacyService(t *testing.T) { fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol) // All ipvs services that were processed in the latest sync loop. - activeServices := map[string]bool{"ipvs0": true, "ipvs1": true} + activeServices := sets.New("ipvs0", "ipvs1") // All ipvs services in the system. currentServices := map[string]*utilipvs.VirtualServer{ // Created by kube-proxy. @@ -4068,15 +4022,7 @@ func TestCleanLegacyService(t *testing.T) { fp.ipvs.AddVirtualServer(currentServices[v]) } - fp.netlinkHandle.EnsureDummyDevice(defaultDummyDevice) - activeBindAddrs := map[string]bool{"1.1.1.1": true, "2.2.2.2": true, "3.3.3.3": true, "4.4.4.4": true} - // This is ipv4-only so ipv6 addresses should be ignored - currentBindAddrs := []string{"1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4", "5.5.5.5", "6.6.6.6", "fd80::1:2:3", "fd80::1:2:4"} - for i := range currentBindAddrs { - fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], defaultDummyDevice) - } - - fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"5.5.5.5": true, "6.6.6.6": true}) + fp.cleanLegacyService(activeServices, currentServices) // ipvs4 and ipvs5 should have been cleaned. remainingVirtualServers, _ := fp.ipvs.GetVirtualServers() if len(remainingVirtualServers) != 4 { @@ -4091,24 +4037,6 @@ func TestCleanLegacyService(t *testing.T) { t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains") } } - - // Addresses 5.5.5.5 and 6.6.6.6 should not be bound any more, but the ipv6 addresses should remain - remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(defaultDummyDevice) - if len(remainingAddrs) != 6 { - t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 6, len(remainingAddrs)) - } - // check that address "1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4" are bound, ignore ipv6 addresses - remainingAddrsMap := make(map[string]bool) - for _, a := range remainingAddrs { - if netutils.ParseIPSloppy(a).To4() == nil { - continue - } - remainingAddrsMap[a] = true - } - if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) { - t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap) - } - } func TestCleanLegacyServiceWithRealServers(t *testing.T) { @@ -4118,7 +4046,7 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) { fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol) // all deleted expect ipvs2 - activeServices := map[string]bool{"ipvs2": true} + activeServices := sets.New("ipvs2") // All ipvs services in the system. currentServices := map[string]*utilipvs.VirtualServer{ "ipvs0": { // deleted with real servers @@ -4167,14 +4095,7 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) { fp.ipvs.AddRealServer(v, r) } - fp.netlinkHandle.EnsureDummyDevice(defaultDummyDevice) - activeBindAddrs := map[string]bool{"3.3.3.3": true} - currentBindAddrs := []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"} - for i := range currentBindAddrs { - fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], defaultDummyDevice) - } - - fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"1.1.1.1": true, "2.2.2.2": true}) + fp.cleanLegacyService(activeServices, currentServices) remainingVirtualServers, _ := fp.ipvs.GetVirtualServers() if len(remainingVirtualServers) != 1 { t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 1, len(remainingVirtualServers)) @@ -4185,23 +4106,6 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) { t.Logf("expected virtual server: %v", currentServices["ipvs0"]) t.Errorf("unexpected IPVS service") } - - remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(defaultDummyDevice) - if len(remainingAddrs) != 1 { - t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs)) - } - // check that address is "3.3.3.3" - remainingAddrsMap := make(map[string]bool) - for _, a := range remainingAddrs { - if netutils.ParseIPSloppy(a).To4() == nil { - continue - } - remainingAddrsMap[a] = true - } - if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) { - t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap) - } - } func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) { @@ -4245,11 +4149,7 @@ func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) { fp.netlinkHandle.EnsureAddressBind("4.4.4.4", defaultDummyDevice) - fp.cleanLegacyService( - map[string]bool{}, - map[string]*utilipvs.VirtualServer{"ipvs0": vs}, - map[string]bool{"4.4.4.4": true}, - ) + fp.cleanLegacyService(nil, map[string]*utilipvs.VirtualServer{"ipvs0": vs}) fp.gracefuldeleteManager.tryDeleteRs() @@ -4265,11 +4165,11 @@ func TestCleanLegacyService6(t *testing.T) { ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) excludeCIDRs, _ := netutils.ParseCIDRs([]string{"3000::/64", "4000::/64"}) - fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol) + fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv6Protocol) fp.nodeIP = netutils.ParseIPSloppy("::1") // All ipvs services that were processed in the latest sync loop. - activeServices := map[string]bool{"ipvs0": true, "ipvs1": true} + activeServices := sets.New("ipvs0", "ipvs1") // All ipvs services in the system. currentServices := map[string]*utilipvs.VirtualServer{ // Created by kube-proxy. @@ -4325,15 +4225,7 @@ func TestCleanLegacyService6(t *testing.T) { fp.ipvs.AddVirtualServer(currentServices[v]) } - fp.netlinkHandle.EnsureDummyDevice(defaultDummyDevice) - activeBindAddrs := map[string]bool{"1000::1": true, "1000::2": true, "3000::1": true, "4000::1": true} - // This is ipv6-only so ipv4 addresses should be ignored - currentBindAddrs := []string{"1000::1", "1000::2", "3000::1", "4000::1", "5000::1", "1000::6", "1.1.1.1", "2.2.2.2"} - for i := range currentBindAddrs { - fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], defaultDummyDevice) - } - - fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"5000::1": true, "1000::6": true}) + fp.cleanLegacyService(activeServices, currentServices) // ipvs4 and ipvs5 should have been cleaned. remainingVirtualServers, _ := fp.ipvs.GetVirtualServers() if len(remainingVirtualServers) != 4 { @@ -4348,24 +4240,6 @@ func TestCleanLegacyService6(t *testing.T) { t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains") } } - - // Addresses 5000::1 and 1000::6 should not be bound any more, but the ipv4 addresses should remain - remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(defaultDummyDevice) - if len(remainingAddrs) != 6 { - t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 6, len(remainingAddrs)) - } - // check that address "1000::1", "1000::2", "3000::1", "4000::1" are still bound, ignore ipv4 addresses - remainingAddrsMap := make(map[string]bool) - for _, a := range remainingAddrs { - if netutils.ParseIPSloppy(a).To4() != nil { - continue - } - remainingAddrsMap[a] = true - } - if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) { - t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap) - } - } func TestMultiPortServiceBindAddr(t *testing.T) { @@ -6011,7 +5885,7 @@ func TestNoEndpointsMetric(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() ipset := ipsettest.NewFake(testIPSetVersion) - fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{netutils.ParseIPSloppy("10.0.0.1")}, nil, v1.IPv4Protocol) + fp := NewFakeProxier(ipt, ipvs, ipset, []string{"10.0.0.1"}, nil, v1.IPv4Protocol) fp.servicesSynced = true // fp.endpointsSynced = true fp.endpointSlicesSynced = true diff --git a/pkg/proxy/ipvs/testing/fake.go b/pkg/proxy/ipvs/testing/fake.go index 7ece461b436..3ae6c00750e 100644 --- a/pkg/proxy/ipvs/testing/fake.go +++ b/pkg/proxy/ipvs/testing/fake.go @@ -33,9 +33,10 @@ type FakeNetlinkHandle struct { } // NewFakeNetlinkHandle will create a new FakeNetlinkHandle -func NewFakeNetlinkHandle() *FakeNetlinkHandle { +func NewFakeNetlinkHandle(isIPv6 bool) *FakeNetlinkHandle { fake := &FakeNetlinkHandle{ localAddresses: make(map[string][]string), + IsIPv6: isIPv6, } return fake } @@ -115,8 +116,8 @@ func (h *FakeNetlinkHandle) ListBindAddress(devName string) ([]string, error) { } // GetLocalAddresses is a mock implementation -func (h *FakeNetlinkHandle) GetLocalAddresses(dev string) (sets.String, error) { - res := sets.NewString() +func (h *FakeNetlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error) { + res := sets.New[string]() // list all addresses from a given network interface. for _, addr := range h.localAddresses[dev] { if h.isValidForSet(addr) { @@ -125,8 +126,8 @@ func (h *FakeNetlinkHandle) GetLocalAddresses(dev string) (sets.String, error) { } return res, nil } -func (h *FakeNetlinkHandle) GetAllLocalAddresses() (sets.String, error) { - res := sets.NewString() +func (h *FakeNetlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) { + res := sets.New[string]() // List all addresses from all available network interfaces. for linkName := range h.localAddresses { // list all addresses from a given network interface. diff --git a/pkg/proxy/ipvs/testing/fake_test.go b/pkg/proxy/ipvs/testing/fake_test.go index 1c7a16d97fd..a8cd2b5cb7e 100644 --- a/pkg/proxy/ipvs/testing/fake_test.go +++ b/pkg/proxy/ipvs/testing/fake_test.go @@ -17,33 +17,36 @@ limitations under the License. package testing import ( - "reflect" "testing" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/proxy/ipvs" ) func TestSetGetLocalAddresses(t *testing.T) { - fake := NewFakeNetlinkHandle() + fake := NewFakeNetlinkHandle(false) + _ = ipvs.NetLinkHandle(fake) // Ensure that the interface is honored fake.SetLocalAddresses("eth0", "1.2.3.4") - expected := sets.NewString("1.2.3.4") - addr, _ := fake.GetLocalAddresses("eth0") - if !reflect.DeepEqual(expected, addr) { + var expected, addr sets.Set[string] + expected = sets.New("1.2.3.4") + addr, _ = fake.GetLocalAddresses("eth0") + if !addr.Equal(expected) { t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr) } - list, _ := fake.GetAllLocalAddresses() - if !reflect.DeepEqual(expected, list) { - t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, list) + addr, _ = fake.GetAllLocalAddresses() + if !addr.Equal(expected) { + t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr) } fake.SetLocalAddresses("lo", "127.0.0.1") - expected = sets.NewString() + expected = nil addr, _ = fake.GetLocalAddresses("lo") - if !reflect.DeepEqual(expected, addr) { + if !addr.Equal(expected) { t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr) } - list, _ = fake.GetAllLocalAddresses() - expected = sets.NewString("1.2.3.4") - if !reflect.DeepEqual(expected, list) { - t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, list) + fake.SetLocalAddresses("kube-ipvs0", "4.3.2.1") + addr, _ = fake.GetAllLocalAddresses() + expected = sets.New("1.2.3.4", "4.3.2.1") + if !addr.Equal(expected) { + t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr) } } From a05b04ad96b9381dd242c6102fef87b3fcf045b3 Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Mon, 20 Feb 2023 07:26:45 +0100 Subject: [PATCH 8/8] Remove un-used function --- pkg/proxy/ipvs/proxier.go | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 801bc220036..da79d76ddbd 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -2063,21 +2063,6 @@ func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool { return false } -func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool { - legacyAddrs := make(map[string]bool) - isIPv6 := netutils.IsIPv6(proxier.nodeIP) - for _, addr := range currentBindAddrs { - addrIsIPv6 := netutils.IsIPv6(netutils.ParseIPSloppy(addr)) - if addrIsIPv6 && !isIPv6 || !addrIsIPv6 && isIPv6 { - continue - } - if _, ok := activeBindAddrs[addr]; !ok { - legacyAddrs[addr] = true - } - } - return legacyAddrs -} - func getIPFamily(ip net.IP) v1.IPFamily { if netutils.IsIPv4(ip) { return v1.IPv4Protocol