Use generic sets

This commit is contained in:
Lars Ekman 2023-02-19 16:33:14 +01:00
parent 547db63bdf
commit fbe671d3f0

View File

@ -286,16 +286,16 @@ type Proxier struct {
// due to the absence of local endpoints when the internal traffic policy is "Local". // due to the absence of local endpoints when the internal traffic policy is "Local".
// It is used to publish the sync_proxy_rules_no_endpoints_total // It is used to publish the sync_proxy_rules_no_endpoints_total
// metric with the traffic_policy label set to "internal". // metric with the traffic_policy label set to "internal".
// sets.String is used here since we end up calculating endpoint topology multiple times for the same Service // A Set is used here since we end up calculating endpoint topology multiple times for the same Service
// if it has multiple ports but each Service should only be counted once. // if it has multiple ports but each Service should only be counted once.
serviceNoLocalEndpointsInternal sets.String serviceNoLocalEndpointsInternal sets.Set[string]
// serviceNoLocalEndpointsExternal represents the set of services that couldn't be applied // serviceNoLocalEndpointsExternal represents the set of services that couldn't be applied
// due to the absence of any endpoints when the external traffic policy is "Local". // due to the absence of any endpoints when the external traffic policy is "Local".
// It is used to publish the sync_proxy_rules_no_endpoints_total // It is used to publish the sync_proxy_rules_no_endpoints_total
// metric with the traffic_policy label set to "external". // metric with the traffic_policy label set to "external".
// sets.String is used here since we end up calculating endpoint topology multiple times for the same Service // A Set is used here since we end up calculating endpoint topology multiple times for the same Service
// if it has multiple ports but each Service should only be counted once. // if it has multiple ports but each Service should only be counted once.
serviceNoLocalEndpointsExternal sets.String serviceNoLocalEndpointsExternal sets.Set[string]
} }
// NodeIPs returns all LOCAL type IP addresses from host which are // NodeIPs returns all LOCAL type IP addresses from host which are
@ -990,8 +990,8 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(3).InfoS("Syncing ipvs proxier rules") klog.V(3).InfoS("Syncing ipvs proxier rules")
proxier.serviceNoLocalEndpointsInternal = sets.NewString() proxier.serviceNoLocalEndpointsInternal = sets.New[string]()
proxier.serviceNoLocalEndpointsExternal = sets.NewString() proxier.serviceNoLocalEndpointsExternal = sets.New[string]()
// Begin install iptables // Begin install iptables
// Reset all buffers used later. // Reset all buffers used later.
@ -1023,11 +1023,11 @@ func (proxier *Proxier) syncProxyRules() {
} }
// activeIPVSServices represents IPVS service successfully created in this round of sync // activeIPVSServices represents IPVS service successfully created in this round of sync
activeIPVSServices := map[string]bool{} activeIPVSServices := sets.New[string]()
// currentIPVSServices represent IPVS services listed from the system // currentIPVSServices represent IPVS services listed from the system
currentIPVSServices := make(map[string]*utilipvs.VirtualServer) currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
// activeBindAddrs represents ip address successfully bind to defaultDummyDevice in this round of sync // activeBindAddrs represents ip address successfully bind to defaultDummyDevice in this round of sync
activeBindAddrs := map[string]bool{} activeBindAddrs := sets.New[string]()
bindedAddresses, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice) bindedAddresses, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice)
if err != nil { if err != nil {
@ -1166,8 +1166,8 @@ func (proxier *Proxier) syncProxyRules() {
} }
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices.Insert(serv.String())
activeBindAddrs[serv.Address.String()] = true activeBindAddrs.Insert(serv.Address.String())
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode. // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
internalNodeLocal := false internalNodeLocal := false
@ -1222,9 +1222,8 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagSourceHash serv.Flags |= utilipvs.FlagSourceHash
} }
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices.Insert(serv.String())
activeBindAddrs[serv.Address.String()] = true activeBindAddrs.Insert(serv.Address.String())
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
} }
@ -1326,8 +1325,8 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagSourceHash serv.Flags |= utilipvs.FlagSourceHash
} }
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices.Insert(serv.String())
activeBindAddrs[serv.Address.String()] = true activeBindAddrs.Insert(serv.Address.String())
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
} }
@ -1474,7 +1473,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil { if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true activeIPVSServices.Insert(serv.String())
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil { if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv) klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
} }
@ -1541,13 +1540,17 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
// Get legacy bind address // Remove superfluous addresses from the dummy device
// currentBindAddrs represents ip addresses bind to defaultDummyDevice from the system alreadyBoundAddrs := sets.New(bindedAddresses.List()...)
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(defaultDummyDevice) superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs)
if err != nil { if superfluousAddresses.Len() > 0 {
klog.ErrorS(err, "Failed to get bind address") klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses)
for adr := range superfluousAddresses {
if err := proxier.netlinkHandle.UnbindAddress(adr, defaultDummyDevice); err != nil {
klog.ErrorS(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr)
}
}
} }
legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
// Clean up legacy IPVS services and unbind addresses // Clean up legacy IPVS services and unbind addresses
appliedSvcs, err := proxier.ipvs.GetVirtualServers() appliedSvcs, err := proxier.ipvs.GetVirtualServers()
@ -1558,7 +1561,7 @@ func (proxier *Proxier) syncProxyRules() {
} else { } else {
klog.ErrorS(err, "Failed to get ipvs service") klog.ErrorS(err, "Failed to get ipvs service")
} }
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs) proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
if proxier.healthzServer != nil { if proxier.healthzServer != nil {
proxier.healthzServer.Updated() proxier.healthzServer.Updated()
@ -2065,32 +2068,20 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
return nil return nil
} }
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) { func (proxier *Proxier) cleanLegacyService(activeServices sets.Set[string], currentServices map[string]*utilipvs.VirtualServer) {
isIPv6 := netutils.IsIPv6(proxier.nodeIP) for cs, svc := range currentServices {
for cs := range currentServices {
svc := currentServices[cs]
if proxier.isIPInExcludeCIDRs(svc.Address) { if proxier.isIPInExcludeCIDRs(svc.Address) {
continue continue
} }
if netutils.IsIPv6(svc.Address) != isIPv6 { if getIPFamily(svc.Address) != proxier.ipFamily {
// Not our family // Not our family
continue continue
} }
if _, ok := activeServices[cs]; !ok { if !activeServices.Has(cs) {
klog.V(4).InfoS("Delete service", "virtualServer", svc) klog.V(4).InfoS("Delete service", "virtualServer", svc)
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil { if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
klog.ErrorS(err, "Failed to delete service", "virtualServer", svc) klog.ErrorS(err, "Failed to delete service", "virtualServer", svc)
} }
addr := svc.Address.String()
if _, ok := legacyBindAddrs[addr]; ok {
klog.V(4).InfoS("Unbinding address", "address", addr)
if err := proxier.netlinkHandle.UnbindAddress(addr, defaultDummyDevice); err != nil {
klog.ErrorS(err, "Failed to unbind service from dummy interface", "interface", defaultDummyDevice, "address", addr)
} else {
// In case we delete a multi-port service, avoid trying to unbind multiple times
delete(legacyBindAddrs, addr)
}
}
} }
} }
} }
@ -2120,6 +2111,13 @@ func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, curre
return legacyAddrs return legacyAddrs
} }
func getIPFamily(ip net.IP) v1.IPFamily {
if netutils.IsIPv4(ip) {
return v1.IPv4Protocol
}
return v1.IPv6Protocol
}
// ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets // ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
// It will only operate iptables *nat table. // It will only operate iptables *nat table.
// Create and link the kube postrouting chain for SNAT packets. // Create and link the kube postrouting chain for SNAT packets.