From fbe671d3f0de52df8be1bd2cf6d70ef2fcc62d8b Mon Sep 17 00:00:00 2001 From: Lars Ekman Date: Sun, 19 Feb 2023 16:33:14 +0100 Subject: [PATCH] Use generic sets --- pkg/proxy/ipvs/proxier.go | 76 +++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index c619a451b6d..257a5512a79 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -286,16 +286,16 @@ type Proxier struct { // due to the absence of local endpoints when the internal traffic policy is "Local". // It is used to publish the sync_proxy_rules_no_endpoints_total // metric with the traffic_policy label set to "internal". - // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service + // A Set is used here since we end up calculating endpoint topology multiple times for the same Service // if it has multiple ports but each Service should only be counted once. - serviceNoLocalEndpointsInternal sets.String + serviceNoLocalEndpointsInternal sets.Set[string] // serviceNoLocalEndpointsExternal represents the set of services that couldn't be applied // due to the absence of any endpoints when the external traffic policy is "Local". // It is used to publish the sync_proxy_rules_no_endpoints_total // metric with the traffic_policy label set to "external". - // sets.String is used here since we end up calculating endpoint topology multiple times for the same Service + // A Set is used here since we end up calculating endpoint topology multiple times for the same Service // if it has multiple ports but each Service should only be counted once. - serviceNoLocalEndpointsExternal sets.String + serviceNoLocalEndpointsExternal sets.Set[string] } // NodeIPs returns all LOCAL type IP addresses from host which are @@ -990,8 +990,8 @@ func (proxier *Proxier) syncProxyRules() { klog.V(3).InfoS("Syncing ipvs proxier rules") - proxier.serviceNoLocalEndpointsInternal = sets.NewString() - proxier.serviceNoLocalEndpointsExternal = sets.NewString() + proxier.serviceNoLocalEndpointsInternal = sets.New[string]() + proxier.serviceNoLocalEndpointsExternal = sets.New[string]() // Begin install iptables // Reset all buffers used later. @@ -1023,11 +1023,11 @@ func (proxier *Proxier) syncProxyRules() { } // activeIPVSServices represents IPVS service successfully created in this round of sync - activeIPVSServices := map[string]bool{} + activeIPVSServices := sets.New[string]() // currentIPVSServices represent IPVS services listed from the system currentIPVSServices := make(map[string]*utilipvs.VirtualServer) // activeBindAddrs represents ip address successfully bind to defaultDummyDevice in this round of sync - activeBindAddrs := map[string]bool{} + activeBindAddrs := sets.New[string]() bindedAddresses, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice) if err != nil { @@ -1166,8 +1166,8 @@ func (proxier *Proxier) syncProxyRules() { } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { - activeIPVSServices[serv.String()] = true - activeBindAddrs[serv.Address.String()] = true + activeIPVSServices.Insert(serv.String()) + activeBindAddrs.Insert(serv.Address.String()) // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. internalNodeLocal := false @@ -1222,9 +1222,8 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagSourceHash } if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { - activeIPVSServices[serv.String()] = true - activeBindAddrs[serv.Address.String()] = true - + activeIPVSServices.Insert(serv.String()) + activeBindAddrs.Insert(serv.Address.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) } @@ -1326,8 +1325,8 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagSourceHash } if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { - activeIPVSServices[serv.String()] = true - activeBindAddrs[serv.Address.String()] = true + activeIPVSServices.Insert(serv.String()) + activeBindAddrs.Insert(serv.Address.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) } @@ -1474,7 +1473,7 @@ func (proxier *Proxier) syncProxyRules() { } // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil { - activeIPVSServices[serv.String()] = true + activeIPVSServices.Insert(serv.String()) if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) } @@ -1541,13 +1540,17 @@ func (proxier *Proxier) syncProxyRules() { } } - // Get legacy bind address - // currentBindAddrs represents ip addresses bind to defaultDummyDevice from the system - currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(defaultDummyDevice) - if err != nil { - klog.ErrorS(err, "Failed to get bind address") + // Remove superfluous addresses from the dummy device + alreadyBoundAddrs := sets.New(bindedAddresses.List()...) + superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs) + if superfluousAddresses.Len() > 0 { + klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses) + for adr := range superfluousAddresses { + if err := proxier.netlinkHandle.UnbindAddress(adr, defaultDummyDevice); err != nil { + klog.ErrorS(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr) + } + } } - legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs) // Clean up legacy IPVS services and unbind addresses appliedSvcs, err := proxier.ipvs.GetVirtualServers() @@ -1558,7 +1561,7 @@ func (proxier *Proxier) syncProxyRules() { } else { klog.ErrorS(err, "Failed to get ipvs service") } - proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs) + proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices) if proxier.healthzServer != nil { proxier.healthzServer.Updated() @@ -2065,32 +2068,20 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode return nil } -func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) { - isIPv6 := netutils.IsIPv6(proxier.nodeIP) - for cs := range currentServices { - svc := currentServices[cs] +func (proxier *Proxier) cleanLegacyService(activeServices sets.Set[string], currentServices map[string]*utilipvs.VirtualServer) { + for cs, svc := range currentServices { if proxier.isIPInExcludeCIDRs(svc.Address) { continue } - if netutils.IsIPv6(svc.Address) != isIPv6 { + if getIPFamily(svc.Address) != proxier.ipFamily { // Not our family continue } - if _, ok := activeServices[cs]; !ok { + if !activeServices.Has(cs) { klog.V(4).InfoS("Delete service", "virtualServer", svc) if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil { klog.ErrorS(err, "Failed to delete service", "virtualServer", svc) } - addr := svc.Address.String() - if _, ok := legacyBindAddrs[addr]; ok { - klog.V(4).InfoS("Unbinding address", "address", addr) - if err := proxier.netlinkHandle.UnbindAddress(addr, defaultDummyDevice); err != nil { - klog.ErrorS(err, "Failed to unbind service from dummy interface", "interface", defaultDummyDevice, "address", addr) - } else { - // In case we delete a multi-port service, avoid trying to unbind multiple times - delete(legacyBindAddrs, addr) - } - } } } } @@ -2120,6 +2111,13 @@ func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, curre return legacyAddrs } +func getIPFamily(ip net.IP) v1.IPFamily { + if netutils.IsIPv4(ip) { + return v1.IPv4Protocol + } + return v1.IPv6Protocol +} + // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets // It will only operate iptables *nat table. // Create and link the kube postrouting chain for SNAT packets.