mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #115073 from Nordix/proxy-ipvs-cleanup1
proxy/ipvs: Use generic Sets and sanitize code
This commit is contained in:
commit
cd228fa551
@ -93,7 +93,7 @@ type IPSetVersioner interface {
|
||||
type IPSet struct {
|
||||
utilipset.IPSet
|
||||
// activeEntries is the current active entries of the ipset.
|
||||
activeEntries sets.String
|
||||
activeEntries sets.Set[string]
|
||||
// handle is the util ipset interface handle.
|
||||
handle utilipset.Interface
|
||||
}
|
||||
@ -125,7 +125,7 @@ func NewIPSet(handle utilipset.Interface, name string, setType utilipset.Type, i
|
||||
HashFamily: hashFamily,
|
||||
Comment: comment,
|
||||
},
|
||||
activeEntries: sets.NewString(),
|
||||
activeEntries: sets.New[string](),
|
||||
handle: handle,
|
||||
}
|
||||
return set
|
||||
@ -144,7 +144,7 @@ func (set *IPSet) getComment() string {
|
||||
}
|
||||
|
||||
func (set *IPSet) resetEntries() {
|
||||
set.activeEntries = sets.NewString()
|
||||
set.activeEntries = sets.New[string]()
|
||||
}
|
||||
|
||||
func (set *IPSet) syncIPSetEntries() {
|
||||
@ -155,14 +155,14 @@ func (set *IPSet) syncIPSetEntries() {
|
||||
}
|
||||
|
||||
// currentIPSetEntries represents Endpoints watched from API Server.
|
||||
currentIPSetEntries := sets.NewString()
|
||||
currentIPSetEntries := sets.New[string]()
|
||||
for _, appliedEntry := range appliedEntries {
|
||||
currentIPSetEntries.Insert(appliedEntry)
|
||||
}
|
||||
|
||||
if !set.activeEntries.Equal(currentIPSetEntries) {
|
||||
// Clean legacy entries
|
||||
for _, entry := range currentIPSetEntries.Difference(set.activeEntries).List() {
|
||||
for _, entry := range currentIPSetEntries.Difference(set.activeEntries).UnsortedList() {
|
||||
if err := set.handle.DelEntry(entry, set.Name); err != nil {
|
||||
if !utilipset.IsNotFoundError(err) {
|
||||
klog.ErrorS(err, "Failed to delete ip set entry from ip set", "ipSetEntry", entry, "ipSet", set.Name)
|
||||
@ -172,7 +172,7 @@ func (set *IPSet) syncIPSetEntries() {
|
||||
}
|
||||
}
|
||||
// Create active entries
|
||||
for _, entry := range set.activeEntries.Difference(currentIPSetEntries).List() {
|
||||
for _, entry := range set.activeEntries.Difference(currentIPSetEntries).UnsortedList() {
|
||||
if err := set.handle.AddEntry(entry, &set.IPSet, true); err != nil {
|
||||
klog.ErrorS(err, "Failed to add ip set entry to ip set", "ipSetEntry", entry, "ipSet", set.Name)
|
||||
} else {
|
||||
|
@ -35,9 +35,9 @@ type NetLinkHandle interface {
|
||||
// GetAllLocalAddresses return all local addresses on the node.
|
||||
// Only the addresses of the current family are returned.
|
||||
// IPv6 link-local and loopback addresses are excluded.
|
||||
GetAllLocalAddresses() (sets.String, error)
|
||||
GetAllLocalAddresses() (sets.Set[string], error)
|
||||
// GetLocalAddresses return all local addresses for an interface.
|
||||
// Only the addresses of the current family are returned.
|
||||
// IPv6 link-local and loopback addresses are excluded.
|
||||
GetLocalAddresses(dev string) (sets.String, error)
|
||||
GetLocalAddresses(dev string) (sets.Set[string], error)
|
||||
}
|
||||
|
@ -129,7 +129,7 @@ func (h *netlinkHandle) ListBindAddress(devName string) ([]string, error) {
|
||||
// GetAllLocalAddresses return all local addresses on the node.
|
||||
// Only the addresses of the current family are returned.
|
||||
// IPv6 link-local and loopback addresses are excluded.
|
||||
func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) {
|
||||
func (h *netlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) {
|
||||
addr, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Could not get addresses: %v", err)
|
||||
@ -140,7 +140,7 @@ func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) {
|
||||
// GetLocalAddresses return all local addresses for an interface.
|
||||
// Only the addresses of the current family are returned.
|
||||
// IPv6 link-local and loopback addresses are excluded.
|
||||
func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.String, error) {
|
||||
func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error) {
|
||||
ifi, err := net.InterfaceByName(dev)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Could not get interface %s: %v", dev, err)
|
||||
|
@ -62,12 +62,12 @@ func (h *netlinkHandle) ListBindAddress(devName string) ([]string, error) {
|
||||
}
|
||||
|
||||
// GetAllLocalAddresses is part of interface.
|
||||
func (h *netlinkHandle) GetAllLocalAddresses() (sets.String, error) {
|
||||
func (h *netlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) {
|
||||
return nil, fmt.Errorf("netlink is not supported in this platform")
|
||||
}
|
||||
|
||||
// GetLocalAddresses is part of interface.
|
||||
func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.String, error) {
|
||||
func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error) {
|
||||
return nil, fmt.Errorf("netlink is not supported in this platform")
|
||||
}
|
||||
|
||||
|
@ -264,8 +264,6 @@ type Proxier struct {
|
||||
healthzServer healthcheck.ProxierHealthUpdater
|
||||
|
||||
ipvsScheduler string
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
ipGetter IPGetter
|
||||
// The following buffers are used to reuse memory and avoid allocations
|
||||
// that are significantly impacting performance.
|
||||
iptablesData *bytes.Buffer
|
||||
@ -288,62 +286,16 @@ type Proxier struct {
|
||||
// due to the absence of local endpoints when the internal traffic policy is "Local".
|
||||
// It is used to publish the sync_proxy_rules_no_endpoints_total
|
||||
// metric with the traffic_policy label set to "internal".
|
||||
// sets.String is used here since we end up calculating endpoint topology multiple times for the same Service
|
||||
// A Set is used here since we end up calculating endpoint topology multiple times for the same Service
|
||||
// if it has multiple ports but each Service should only be counted once.
|
||||
serviceNoLocalEndpointsInternal sets.String
|
||||
serviceNoLocalEndpointsInternal sets.Set[string]
|
||||
// serviceNoLocalEndpointsExternal represents the set of services that couldn't be applied
|
||||
// due to the absence of any endpoints when the external traffic policy is "Local".
|
||||
// It is used to publish the sync_proxy_rules_no_endpoints_total
|
||||
// metric with the traffic_policy label set to "external".
|
||||
// sets.String is used here since we end up calculating endpoint topology multiple times for the same Service
|
||||
// A Set is used here since we end up calculating endpoint topology multiple times for the same Service
|
||||
// if it has multiple ports but each Service should only be counted once.
|
||||
serviceNoLocalEndpointsExternal sets.String
|
||||
}
|
||||
|
||||
// IPGetter helps get node network interface IP and IPs binded to the IPVS dummy interface
|
||||
type IPGetter interface {
|
||||
NodeIPs() ([]net.IP, error)
|
||||
BindedIPs() (sets.String, error)
|
||||
}
|
||||
|
||||
// realIPGetter is a real NodeIP handler, it implements IPGetter.
|
||||
type realIPGetter struct {
|
||||
// nl is a handle for revoking netlink interface
|
||||
nl NetLinkHandle
|
||||
}
|
||||
|
||||
// NodeIPs returns all LOCAL type IP addresses from host which are
|
||||
// taken as the Node IPs of NodePort service. Filtered addresses:
|
||||
//
|
||||
// - Loopback addresses
|
||||
// - Addresses of the "other" family (not handled by this proxier instance)
|
||||
// - Link-local IPv6 addresses
|
||||
// - Addresses on the created dummy device `kube-ipvs0`
|
||||
func (r *realIPGetter) NodeIPs() (ips []net.IP, err error) {
|
||||
|
||||
nodeAddress, err := r.nl.GetAllLocalAddresses()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error listing LOCAL type addresses from host, error: %v", err)
|
||||
}
|
||||
|
||||
// We must exclude the addresses on the IPVS dummy interface
|
||||
bindedAddress, err := r.BindedIPs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ipset := nodeAddress.Difference(bindedAddress)
|
||||
|
||||
// translate ip string to IP
|
||||
for _, ipStr := range ipset.UnsortedList() {
|
||||
a := netutils.ParseIPSloppy(ipStr)
|
||||
ips = append(ips, a)
|
||||
}
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
// BindedIPs returns all addresses that are binded to the IPVS dummy interface kube-ipvs0
|
||||
func (r *realIPGetter) BindedIPs() (sets.String, error) {
|
||||
return r.nl.GetLocalAddresses(defaultDummyDevice)
|
||||
serviceNoLocalEndpointsExternal sets.Set[string]
|
||||
}
|
||||
|
||||
// Proxier implements proxy.Provider
|
||||
@ -484,7 +436,6 @@ func NewProxier(ipFamily v1.IPFamily,
|
||||
healthzServer: healthzServer,
|
||||
ipvs: ipvs,
|
||||
ipvsScheduler: scheduler,
|
||||
ipGetter: &realIPGetter{nl: NewNetLinkHandle(ipFamily == v1.IPv6Protocol)},
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
filterChainsData: bytes.NewBuffer(nil),
|
||||
natChains: utilproxy.LineBuffer{},
|
||||
@ -1010,8 +961,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
|
||||
klog.V(3).InfoS("Syncing ipvs proxier rules")
|
||||
|
||||
proxier.serviceNoLocalEndpointsInternal = sets.NewString()
|
||||
proxier.serviceNoLocalEndpointsExternal = sets.NewString()
|
||||
proxier.serviceNoLocalEndpointsInternal = sets.New[string]()
|
||||
proxier.serviceNoLocalEndpointsExternal = sets.New[string]()
|
||||
// Begin install iptables
|
||||
|
||||
// Reset all buffers used later.
|
||||
@ -1043,16 +994,20 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// activeIPVSServices represents IPVS service successfully created in this round of sync
|
||||
activeIPVSServices := map[string]bool{}
|
||||
// currentIPVSServices represent IPVS services listed from the system
|
||||
currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
|
||||
// activeBindAddrs represents ip address successfully bind to defaultDummyDevice in this round of sync
|
||||
activeBindAddrs := map[string]bool{}
|
||||
|
||||
bindedAddresses, err := proxier.ipGetter.BindedIPs()
|
||||
activeIPVSServices := sets.New[string]()
|
||||
// activeBindAddrs Represents addresses we want on the defaultDummyDevice after this round of sync
|
||||
activeBindAddrs := sets.New[string]()
|
||||
// alreadyBoundAddrs Represents addresses currently assigned to the dummy interface
|
||||
alreadyBoundAddrs, err := proxier.netlinkHandle.GetLocalAddresses(defaultDummyDevice)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Error listing addresses binded to dummy interface")
|
||||
}
|
||||
// nodeAddressSet All addresses *except* those on the dummy interface
|
||||
nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddresses()
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Error listing node addresses")
|
||||
}
|
||||
nodeAddressSet = nodeAddressSet.Difference(alreadyBoundAddrs)
|
||||
|
||||
hasNodePort := false
|
||||
for _, svc := range proxier.svcPortMap {
|
||||
@ -1084,28 +1039,19 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
if utilproxy.IsZeroCIDR(address) {
|
||||
nodeIPs, err = proxier.ipGetter.NodeIPs()
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to list all node IPs from host")
|
||||
nodeIPs = nil
|
||||
for _, ipStr := range nodeAddressSet.UnsortedList() {
|
||||
nodeIPs = append(nodeIPs, netutils.ParseIPSloppy(ipStr))
|
||||
}
|
||||
break
|
||||
}
|
||||
nodeIPs = append(nodeIPs, a)
|
||||
if getIPFamily(a) == proxier.ipFamily {
|
||||
nodeIPs = append(nodeIPs, a)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// filter node IPs by proxier ipfamily
|
||||
idx := 0
|
||||
for _, nodeIP := range nodeIPs {
|
||||
if (proxier.ipFamily == v1.IPv6Protocol) == netutils.IsIPv6(nodeIP) {
|
||||
nodeIPs[idx] = nodeIP
|
||||
idx++
|
||||
}
|
||||
}
|
||||
// reset slice to filtered entries
|
||||
nodeIPs = nodeIPs[:idx]
|
||||
|
||||
// Build IPVS rules for each service.
|
||||
for svcPortName, svcPort := range proxier.svcPortMap {
|
||||
svcInfo, ok := svcPort.(*servicePortInfo)
|
||||
@ -1185,9 +1131,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
serv.Flags |= utilipvs.FlagSourceHash
|
||||
}
|
||||
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
|
||||
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
|
||||
activeIPVSServices.Insert(serv.String())
|
||||
activeBindAddrs.Insert(serv.Address.String())
|
||||
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
|
||||
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
|
||||
internalNodeLocal := false
|
||||
@ -1241,10 +1187,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if proxier.ipvsScheduler == "mh" {
|
||||
serv.Flags |= utilipvs.FlagSourceHash
|
||||
}
|
||||
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
|
||||
if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
|
||||
activeIPVSServices.Insert(serv.String())
|
||||
activeBindAddrs.Insert(serv.Address.String())
|
||||
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
|
||||
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
|
||||
}
|
||||
@ -1345,9 +1290,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
if proxier.ipvsScheduler == "mh" {
|
||||
serv.Flags |= utilipvs.FlagSourceHash
|
||||
}
|
||||
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
|
||||
activeIPVSServices.Insert(serv.String())
|
||||
activeBindAddrs.Insert(serv.Address.String())
|
||||
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
|
||||
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
|
||||
}
|
||||
@ -1493,8 +1438,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
serv.Flags |= utilipvs.FlagSourceHash
|
||||
}
|
||||
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
|
||||
if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
if err := proxier.syncService(svcPortNameString, serv, false, alreadyBoundAddrs); err == nil {
|
||||
activeIPVSServices.Insert(serv.String())
|
||||
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
|
||||
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
|
||||
}
|
||||
@ -1522,7 +1467,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
|
||||
// Set the KUBE-IPVS-IPS set to the "activeBindAddrs"
|
||||
proxier.ipsetList[kubeIPVSSet].activeEntries = sets.StringKeySet(activeBindAddrs)
|
||||
proxier.ipsetList[kubeIPVSSet].activeEntries = activeBindAddrs
|
||||
|
||||
// sync ipset entries
|
||||
for _, set := range proxier.ipsetList {
|
||||
@ -1561,15 +1506,20 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
// Get legacy bind address
|
||||
// currentBindAddrs represents ip addresses bind to defaultDummyDevice from the system
|
||||
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(defaultDummyDevice)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "Failed to get bind address")
|
||||
// Remove superfluous addresses from the dummy device
|
||||
superfluousAddresses := alreadyBoundAddrs.Difference(activeBindAddrs)
|
||||
if superfluousAddresses.Len() > 0 {
|
||||
klog.V(2).InfoS("Removing addresses", "interface", defaultDummyDevice, "addresses", superfluousAddresses)
|
||||
for adr := range superfluousAddresses {
|
||||
if err := proxier.netlinkHandle.UnbindAddress(adr, defaultDummyDevice); err != nil {
|
||||
klog.ErrorS(err, "UnbindAddress", "interface", defaultDummyDevice, "address", adr)
|
||||
}
|
||||
}
|
||||
}
|
||||
legacyBindAddrs := proxier.getLegacyBindAddr(activeBindAddrs, currentBindAddrs)
|
||||
|
||||
// Clean up legacy IPVS services and unbind addresses
|
||||
// currentIPVSServices represent IPVS services listed from the system
|
||||
// (including any we have created in this sync)
|
||||
currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
|
||||
appliedSvcs, err := proxier.ipvs.GetVirtualServers()
|
||||
if err == nil {
|
||||
for _, appliedSvc := range appliedSvcs {
|
||||
@ -1578,7 +1528,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
} else {
|
||||
klog.ErrorS(err, "Failed to get ipvs service")
|
||||
}
|
||||
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices, legacyBindAddrs)
|
||||
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
|
||||
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.Updated()
|
||||
@ -1900,7 +1850,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, bindedAddresses sets.String) error {
|
||||
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool, alreadyBoundAddrs sets.Set[string]) error {
|
||||
appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
|
||||
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
|
||||
if appliedVirtualServer == nil {
|
||||
@ -1923,9 +1873,9 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
|
||||
|
||||
// bind service address to dummy interface
|
||||
if bindAddr {
|
||||
// always attempt to bind if bindedAddresses is nil,
|
||||
// always attempt to bind if alreadyBoundAddrs is nil,
|
||||
// otherwise check if it's already binded and return early
|
||||
if bindedAddresses != nil && bindedAddresses.Has(vs.Address.String()) {
|
||||
if alreadyBoundAddrs != nil && alreadyBoundAddrs.Has(vs.Address.String()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -2085,32 +2035,20 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
|
||||
return nil
|
||||
}
|
||||
|
||||
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer, legacyBindAddrs map[string]bool) {
|
||||
isIPv6 := netutils.IsIPv6(proxier.nodeIP)
|
||||
for cs := range currentServices {
|
||||
svc := currentServices[cs]
|
||||
func (proxier *Proxier) cleanLegacyService(activeServices sets.Set[string], currentServices map[string]*utilipvs.VirtualServer) {
|
||||
for cs, svc := range currentServices {
|
||||
if proxier.isIPInExcludeCIDRs(svc.Address) {
|
||||
continue
|
||||
}
|
||||
if netutils.IsIPv6(svc.Address) != isIPv6 {
|
||||
if getIPFamily(svc.Address) != proxier.ipFamily {
|
||||
// Not our family
|
||||
continue
|
||||
}
|
||||
if _, ok := activeServices[cs]; !ok {
|
||||
if !activeServices.Has(cs) {
|
||||
klog.V(4).InfoS("Delete service", "virtualServer", svc)
|
||||
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
|
||||
klog.ErrorS(err, "Failed to delete service", "virtualServer", svc)
|
||||
}
|
||||
addr := svc.Address.String()
|
||||
if _, ok := legacyBindAddrs[addr]; ok {
|
||||
klog.V(4).InfoS("Unbinding address", "address", addr)
|
||||
if err := proxier.netlinkHandle.UnbindAddress(addr, defaultDummyDevice); err != nil {
|
||||
klog.ErrorS(err, "Failed to unbind service from dummy interface", "interface", defaultDummyDevice, "address", addr)
|
||||
} else {
|
||||
// In case we delete a multi-port service, avoid trying to unbind multiple times
|
||||
delete(legacyBindAddrs, addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2125,19 +2063,11 @@ func (proxier *Proxier) isIPInExcludeCIDRs(ip net.IP) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) map[string]bool {
|
||||
legacyAddrs := make(map[string]bool)
|
||||
isIPv6 := netutils.IsIPv6(proxier.nodeIP)
|
||||
for _, addr := range currentBindAddrs {
|
||||
addrIsIPv6 := netutils.IsIPv6(netutils.ParseIPSloppy(addr))
|
||||
if addrIsIPv6 && !isIPv6 || !addrIsIPv6 && isIPv6 {
|
||||
continue
|
||||
}
|
||||
if _, ok := activeBindAddrs[addr]; !ok {
|
||||
legacyAddrs[addr] = true
|
||||
}
|
||||
func getIPFamily(ip net.IP) v1.IPFamily {
|
||||
if netutils.IsIPv4(ip) {
|
||||
return v1.IPv4Protocol
|
||||
}
|
||||
return legacyAddrs
|
||||
return v1.IPv6Protocol
|
||||
}
|
||||
|
||||
// ipvs Proxier fall back on iptables when it needs to do SNAT for engress packets
|
||||
|
@ -62,19 +62,6 @@ import (
|
||||
|
||||
const testHostname = "test-hostname"
|
||||
|
||||
type fakeIPGetter struct {
|
||||
nodeIPs []net.IP
|
||||
bindedIPs sets.String
|
||||
}
|
||||
|
||||
func (f *fakeIPGetter) NodeIPs() ([]net.IP, error) {
|
||||
return f.nodeIPs, nil
|
||||
}
|
||||
|
||||
func (f *fakeIPGetter) BindedIPs() (sets.String, error) {
|
||||
return f.bindedIPs, nil
|
||||
}
|
||||
|
||||
// fakeIpvs implements utilipvs.Interface
|
||||
type fakeIpvs struct {
|
||||
ipvsErr string
|
||||
@ -139,21 +126,10 @@ func (fake *fakeIPSetVersioner) GetVersion() (string, error) {
|
||||
return fake.version, fake.err
|
||||
}
|
||||
|
||||
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP, excludeCIDRs []*net.IPNet, ipFamily v1.IPFamily) *Proxier {
|
||||
// unlike actual proxier, this fake proxier does not filter node IPs per family requested
|
||||
// which can lead to false postives.
|
||||
func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []string, excludeCIDRs []*net.IPNet, ipFamily v1.IPFamily) *Proxier {
|
||||
|
||||
// filter node IPs by proxier ipfamily
|
||||
idx := 0
|
||||
for _, nodeIP := range nodeIPs {
|
||||
if (ipFamily == v1.IPv6Protocol) == netutils.IsIPv6(nodeIP) {
|
||||
nodeIPs[idx] = nodeIP
|
||||
idx++
|
||||
}
|
||||
}
|
||||
|
||||
// reset slice to filtered entries
|
||||
nodeIPs = nodeIPs[:idx]
|
||||
netlinkHandle := netlinktest.NewFakeNetlinkHandle(ipFamily == v1.IPv6Protocol)
|
||||
netlinkHandle.SetLocalAddresses("eth0", nodeIPs...)
|
||||
|
||||
fcmd := fakeexec.FakeCmd{
|
||||
CombinedOutputScript: []fakeexec.FakeAction{
|
||||
@ -188,14 +164,13 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
|
||||
hostname: testHostname,
|
||||
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
|
||||
ipvsScheduler: defaultScheduler,
|
||||
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
filterChainsData: bytes.NewBuffer(nil),
|
||||
natChains: utilproxy.LineBuffer{},
|
||||
natRules: utilproxy.LineBuffer{},
|
||||
filterChains: utilproxy.LineBuffer{},
|
||||
filterRules: utilproxy.LineBuffer{},
|
||||
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
|
||||
netlinkHandle: netlinkHandle,
|
||||
ipsetList: ipsetList,
|
||||
nodePortAddresses: make([]string, 0),
|
||||
networkInterfacer: proxyutiltest.NewFakeNetwork(),
|
||||
@ -438,23 +413,22 @@ func TestGetNodeIPs(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
for i := range testCases {
|
||||
fake := netlinktest.NewFakeNetlinkHandle()
|
||||
fake.IsIPv6 = testCases[i].isIPv6
|
||||
for i, tc := range testCases {
|
||||
fake := netlinktest.NewFakeNetlinkHandle(tc.isIPv6)
|
||||
for dev, addresses := range testCases[i].devAddresses {
|
||||
fake.SetLocalAddresses(dev, addresses...)
|
||||
}
|
||||
r := realIPGetter{nl: fake}
|
||||
ips, err := r.NodeIPs()
|
||||
ips, err := fake.GetAllLocalAddresses()
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
ipStrs := sets.NewString()
|
||||
for _, ip := range ips {
|
||||
ipStrs.Insert(ip.String())
|
||||
devIps, err := fake.GetLocalAddresses("kube-ipvs0")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if !ipStrs.Equal(sets.NewString(testCases[i].expectIPs...)) {
|
||||
t.Errorf("case[%d], unexpected mismatch, expected: %v, got: %v", i, testCases[i].expectIPs, ips)
|
||||
ips = ips.Difference(devIps)
|
||||
if !ips.Equal(sets.New(tc.expectIPs...)) {
|
||||
t.Errorf("case[%d], unexpected mismatch, expected: %v, got: %v", i, tc.expectIPs, ips)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -467,7 +441,7 @@ func TestNodePortIPv4(t *testing.T) {
|
||||
name string
|
||||
services []*v1.Service
|
||||
endpoints []*discovery.EndpointSlice
|
||||
nodeIPs []net.IP
|
||||
nodeIPs []string
|
||||
nodePortAddresses []string
|
||||
expectedIPVS *ipvstest.FakeIPVS
|
||||
expectedIPSets netlinktest.ExpectedIPSet
|
||||
@ -511,10 +485,7 @@ func TestNodePortIPv4(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("100.101.102.103"),
|
||||
netutils.ParseIPSloppy("2001:db8::1:1"),
|
||||
},
|
||||
nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"},
|
||||
nodePortAddresses: []string{},
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
|
||||
@ -592,9 +563,7 @@ func TestNodePortIPv4(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("100.101.102.103"),
|
||||
},
|
||||
nodeIPs: []string{"100.101.102.103"},
|
||||
nodePortAddresses: []string{"0.0.0.0/0"},
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
|
||||
@ -682,10 +651,8 @@ func TestNodePortIPv4(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
endpoints: []*discovery.EndpointSlice{},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("100.101.102.103"),
|
||||
},
|
||||
endpoints: []*discovery.EndpointSlice{},
|
||||
nodeIPs: []string{"100.101.102.103"},
|
||||
nodePortAddresses: []string{},
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
|
||||
@ -751,13 +718,13 @@ func TestNodePortIPv4(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("100.101.102.103"),
|
||||
netutils.ParseIPSloppy("100.101.102.104"),
|
||||
netutils.ParseIPSloppy("100.101.102.105"),
|
||||
netutils.ParseIPSloppy("2001:db8::1:1"),
|
||||
netutils.ParseIPSloppy("2001:db8::1:2"),
|
||||
netutils.ParseIPSloppy("2001:db8::1:3"),
|
||||
nodeIPs: []string{
|
||||
"100.101.102.103",
|
||||
"100.101.102.104",
|
||||
"100.101.102.105",
|
||||
"2001:db8::1:1",
|
||||
"2001:db8::1:2",
|
||||
"2001:db8::1:3",
|
||||
},
|
||||
nodePortAddresses: []string{},
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
@ -905,9 +872,7 @@ func TestNodePortIPv4(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("100.101.102.103"),
|
||||
},
|
||||
nodeIPs: []string{"100.101.102.103"},
|
||||
nodePortAddresses: []string{},
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
|
||||
@ -1030,7 +995,7 @@ func TestNodePortIPv6(t *testing.T) {
|
||||
name string
|
||||
services []*v1.Service
|
||||
endpoints []*discovery.EndpointSlice
|
||||
nodeIPs []net.IP
|
||||
nodeIPs []string
|
||||
nodePortAddresses []string
|
||||
expectedIPVS *ipvstest.FakeIPVS
|
||||
expectedIPSets netlinktest.ExpectedIPSet
|
||||
@ -1074,10 +1039,7 @@ func TestNodePortIPv6(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("100.101.102.103"),
|
||||
netutils.ParseIPSloppy("2001:db8::1:1"),
|
||||
},
|
||||
nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"},
|
||||
nodePortAddresses: []string{},
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
|
||||
@ -1157,9 +1119,7 @@ func TestNodePortIPv6(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("100.101.102.103"),
|
||||
},
|
||||
nodeIPs: []string{"100.101.102.103"},
|
||||
nodePortAddresses: []string{"0.0.0.0/0"},
|
||||
/*since this is a node with only IPv4, proxier should not do anything */
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
@ -1184,11 +1144,8 @@ func TestNodePortIPv6(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
endpoints: []*discovery.EndpointSlice{},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("100.101.102.103"),
|
||||
netutils.ParseIPSloppy("2001:db8::1:1"),
|
||||
},
|
||||
endpoints: []*discovery.EndpointSlice{},
|
||||
nodeIPs: []string{"100.101.102.103", "2001:db8::1:1"},
|
||||
nodePortAddresses: []string{},
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
|
||||
@ -1255,10 +1212,7 @@ func TestNodePortIPv6(t *testing.T) {
|
||||
}}
|
||||
}),
|
||||
},
|
||||
nodeIPs: []net.IP{
|
||||
netutils.ParseIPSloppy("2001:db8::1:1"),
|
||||
netutils.ParseIPSloppy("2001:db8::1:2"),
|
||||
},
|
||||
nodeIPs: []string{"2001:db8::1:1", "2001:db8::1:2"},
|
||||
nodePortAddresses: []string{},
|
||||
expectedIPVS: &ipvstest.FakeIPVS{
|
||||
Services: map[ipvstest.ServiceKey]*utilipvs.VirtualServer{
|
||||
@ -2794,8 +2748,8 @@ func TestSessionAffinity(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
nodeIP := netutils.ParseIPSloppy("100.101.102.103")
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}, nil, v1.IPv4Protocol)
|
||||
nodeIP := "100.101.102.103"
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, []string{nodeIP}, nil, v1.IPv4Protocol)
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
@ -3706,11 +3660,11 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe
|
||||
|
||||
func Test_syncService(t *testing.T) {
|
||||
testCases := []struct {
|
||||
oldVirtualServer *utilipvs.VirtualServer
|
||||
svcName string
|
||||
newVirtualServer *utilipvs.VirtualServer
|
||||
bindAddr bool
|
||||
bindedAddrs sets.String
|
||||
oldVirtualServer *utilipvs.VirtualServer
|
||||
svcName string
|
||||
newVirtualServer *utilipvs.VirtualServer
|
||||
bindAddr bool
|
||||
alreadyBoundAddrs sets.Set[string]
|
||||
}{
|
||||
{
|
||||
// case 0, old virtual server is same as new virtual server
|
||||
@ -3729,8 +3683,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
},
|
||||
bindAddr: false,
|
||||
bindedAddrs: sets.NewString(),
|
||||
bindAddr: false,
|
||||
alreadyBoundAddrs: nil,
|
||||
},
|
||||
{
|
||||
// case 1, old virtual server is different from new virtual server
|
||||
@ -3749,8 +3703,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagPersistent,
|
||||
},
|
||||
bindAddr: false,
|
||||
bindedAddrs: sets.NewString(),
|
||||
bindAddr: false,
|
||||
alreadyBoundAddrs: nil,
|
||||
},
|
||||
{
|
||||
// case 2, old virtual server is different from new virtual server
|
||||
@ -3769,8 +3723,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "wlc",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
},
|
||||
bindAddr: false,
|
||||
bindedAddrs: sets.NewString(),
|
||||
bindAddr: false,
|
||||
alreadyBoundAddrs: nil,
|
||||
},
|
||||
{
|
||||
// case 3, old virtual server is nil, and create new virtual server
|
||||
@ -3783,8 +3737,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
},
|
||||
bindAddr: true,
|
||||
bindedAddrs: sets.NewString(),
|
||||
bindAddr: true,
|
||||
alreadyBoundAddrs: nil,
|
||||
},
|
||||
{
|
||||
// case 4, SCTP, old virtual server is same as new virtual server
|
||||
@ -3803,8 +3757,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
},
|
||||
bindAddr: false,
|
||||
bindedAddrs: sets.NewString(),
|
||||
bindAddr: false,
|
||||
alreadyBoundAddrs: nil,
|
||||
},
|
||||
{
|
||||
// case 5, old virtual server is different from new virtual server
|
||||
@ -3823,8 +3777,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagPersistent,
|
||||
},
|
||||
bindAddr: false,
|
||||
bindedAddrs: sets.NewString(),
|
||||
bindAddr: false,
|
||||
alreadyBoundAddrs: nil,
|
||||
},
|
||||
{
|
||||
// case 6, old virtual server is different from new virtual server
|
||||
@ -3843,8 +3797,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "wlc",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
},
|
||||
bindAddr: false,
|
||||
bindedAddrs: sets.NewString(),
|
||||
bindAddr: false,
|
||||
alreadyBoundAddrs: nil,
|
||||
},
|
||||
{
|
||||
// case 7, old virtual server is nil, and create new virtual server
|
||||
@ -3857,8 +3811,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
},
|
||||
bindAddr: true,
|
||||
bindedAddrs: sets.NewString(),
|
||||
bindAddr: true,
|
||||
alreadyBoundAddrs: sets.New[string](),
|
||||
},
|
||||
{
|
||||
// case 8, virtual server address already binded, skip sync
|
||||
@ -3877,8 +3831,8 @@ func Test_syncService(t *testing.T) {
|
||||
Scheduler: "rr",
|
||||
Flags: utilipvs.FlagHashed,
|
||||
},
|
||||
bindAddr: true,
|
||||
bindedAddrs: sets.NewString("1.2.3.4"),
|
||||
bindAddr: true,
|
||||
alreadyBoundAddrs: sets.New("1.2.3.4"),
|
||||
},
|
||||
}
|
||||
|
||||
@ -3894,7 +3848,7 @@ func Test_syncService(t *testing.T) {
|
||||
t.Errorf("Case [%d], unexpected add IPVS virtual server error: %v", i, err)
|
||||
}
|
||||
}
|
||||
if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr, testCases[i].bindedAddrs); err != nil {
|
||||
if err := proxier.syncService(testCases[i].svcName, testCases[i].newVirtualServer, testCases[i].bindAddr, testCases[i].alreadyBoundAddrs); err != nil {
|
||||
t.Errorf("Case [%d], unexpected sync IPVS virtual server error: %v", i, err)
|
||||
}
|
||||
// check
|
||||
@ -4012,7 +3966,7 @@ func TestCleanLegacyService(t *testing.T) {
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol)
|
||||
|
||||
// All ipvs services that were processed in the latest sync loop.
|
||||
activeServices := map[string]bool{"ipvs0": true, "ipvs1": true}
|
||||
activeServices := sets.New("ipvs0", "ipvs1")
|
||||
// All ipvs services in the system.
|
||||
currentServices := map[string]*utilipvs.VirtualServer{
|
||||
// Created by kube-proxy.
|
||||
@ -4068,15 +4022,7 @@ func TestCleanLegacyService(t *testing.T) {
|
||||
fp.ipvs.AddVirtualServer(currentServices[v])
|
||||
}
|
||||
|
||||
fp.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
|
||||
activeBindAddrs := map[string]bool{"1.1.1.1": true, "2.2.2.2": true, "3.3.3.3": true, "4.4.4.4": true}
|
||||
// This is ipv4-only so ipv6 addresses should be ignored
|
||||
currentBindAddrs := []string{"1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4", "5.5.5.5", "6.6.6.6", "fd80::1:2:3", "fd80::1:2:4"}
|
||||
for i := range currentBindAddrs {
|
||||
fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], defaultDummyDevice)
|
||||
}
|
||||
|
||||
fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"5.5.5.5": true, "6.6.6.6": true})
|
||||
fp.cleanLegacyService(activeServices, currentServices)
|
||||
// ipvs4 and ipvs5 should have been cleaned.
|
||||
remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
|
||||
if len(remainingVirtualServers) != 4 {
|
||||
@ -4091,24 +4037,6 @@ func TestCleanLegacyService(t *testing.T) {
|
||||
t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
|
||||
}
|
||||
}
|
||||
|
||||
// Addresses 5.5.5.5 and 6.6.6.6 should not be bound any more, but the ipv6 addresses should remain
|
||||
remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(defaultDummyDevice)
|
||||
if len(remainingAddrs) != 6 {
|
||||
t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 6, len(remainingAddrs))
|
||||
}
|
||||
// check that address "1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4" are bound, ignore ipv6 addresses
|
||||
remainingAddrsMap := make(map[string]bool)
|
||||
for _, a := range remainingAddrs {
|
||||
if netutils.ParseIPSloppy(a).To4() == nil {
|
||||
continue
|
||||
}
|
||||
remainingAddrsMap[a] = true
|
||||
}
|
||||
if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) {
|
||||
t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestCleanLegacyServiceWithRealServers(t *testing.T) {
|
||||
@ -4118,7 +4046,7 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) {
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, nil, v1.IPv4Protocol)
|
||||
|
||||
// all deleted expect ipvs2
|
||||
activeServices := map[string]bool{"ipvs2": true}
|
||||
activeServices := sets.New("ipvs2")
|
||||
// All ipvs services in the system.
|
||||
currentServices := map[string]*utilipvs.VirtualServer{
|
||||
"ipvs0": { // deleted with real servers
|
||||
@ -4167,14 +4095,7 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) {
|
||||
fp.ipvs.AddRealServer(v, r)
|
||||
}
|
||||
|
||||
fp.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
|
||||
activeBindAddrs := map[string]bool{"3.3.3.3": true}
|
||||
currentBindAddrs := []string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}
|
||||
for i := range currentBindAddrs {
|
||||
fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], defaultDummyDevice)
|
||||
}
|
||||
|
||||
fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"1.1.1.1": true, "2.2.2.2": true})
|
||||
fp.cleanLegacyService(activeServices, currentServices)
|
||||
remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
|
||||
if len(remainingVirtualServers) != 1 {
|
||||
t.Errorf("Expected number of remaining IPVS services after cleanup to be %v. Got %v", 1, len(remainingVirtualServers))
|
||||
@ -4185,23 +4106,6 @@ func TestCleanLegacyServiceWithRealServers(t *testing.T) {
|
||||
t.Logf("expected virtual server: %v", currentServices["ipvs0"])
|
||||
t.Errorf("unexpected IPVS service")
|
||||
}
|
||||
|
||||
remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(defaultDummyDevice)
|
||||
if len(remainingAddrs) != 1 {
|
||||
t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 1, len(remainingAddrs))
|
||||
}
|
||||
// check that address is "3.3.3.3"
|
||||
remainingAddrsMap := make(map[string]bool)
|
||||
for _, a := range remainingAddrs {
|
||||
if netutils.ParseIPSloppy(a).To4() == nil {
|
||||
continue
|
||||
}
|
||||
remainingAddrsMap[a] = true
|
||||
}
|
||||
if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) {
|
||||
t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) {
|
||||
@ -4245,11 +4149,7 @@ func TestCleanLegacyRealServersExcludeCIDRs(t *testing.T) {
|
||||
|
||||
fp.netlinkHandle.EnsureAddressBind("4.4.4.4", defaultDummyDevice)
|
||||
|
||||
fp.cleanLegacyService(
|
||||
map[string]bool{},
|
||||
map[string]*utilipvs.VirtualServer{"ipvs0": vs},
|
||||
map[string]bool{"4.4.4.4": true},
|
||||
)
|
||||
fp.cleanLegacyService(nil, map[string]*utilipvs.VirtualServer{"ipvs0": vs})
|
||||
|
||||
fp.gracefuldeleteManager.tryDeleteRs()
|
||||
|
||||
@ -4265,11 +4165,11 @@ func TestCleanLegacyService6(t *testing.T) {
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
excludeCIDRs, _ := netutils.ParseCIDRs([]string{"3000::/64", "4000::/64"})
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv4Protocol)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil, excludeCIDRs, v1.IPv6Protocol)
|
||||
fp.nodeIP = netutils.ParseIPSloppy("::1")
|
||||
|
||||
// All ipvs services that were processed in the latest sync loop.
|
||||
activeServices := map[string]bool{"ipvs0": true, "ipvs1": true}
|
||||
activeServices := sets.New("ipvs0", "ipvs1")
|
||||
// All ipvs services in the system.
|
||||
currentServices := map[string]*utilipvs.VirtualServer{
|
||||
// Created by kube-proxy.
|
||||
@ -4325,15 +4225,7 @@ func TestCleanLegacyService6(t *testing.T) {
|
||||
fp.ipvs.AddVirtualServer(currentServices[v])
|
||||
}
|
||||
|
||||
fp.netlinkHandle.EnsureDummyDevice(defaultDummyDevice)
|
||||
activeBindAddrs := map[string]bool{"1000::1": true, "1000::2": true, "3000::1": true, "4000::1": true}
|
||||
// This is ipv6-only so ipv4 addresses should be ignored
|
||||
currentBindAddrs := []string{"1000::1", "1000::2", "3000::1", "4000::1", "5000::1", "1000::6", "1.1.1.1", "2.2.2.2"}
|
||||
for i := range currentBindAddrs {
|
||||
fp.netlinkHandle.EnsureAddressBind(currentBindAddrs[i], defaultDummyDevice)
|
||||
}
|
||||
|
||||
fp.cleanLegacyService(activeServices, currentServices, map[string]bool{"5000::1": true, "1000::6": true})
|
||||
fp.cleanLegacyService(activeServices, currentServices)
|
||||
// ipvs4 and ipvs5 should have been cleaned.
|
||||
remainingVirtualServers, _ := fp.ipvs.GetVirtualServers()
|
||||
if len(remainingVirtualServers) != 4 {
|
||||
@ -4348,24 +4240,6 @@ func TestCleanLegacyService6(t *testing.T) {
|
||||
t.Errorf("Expected ipvs5 to be removed after cleanup. It still remains")
|
||||
}
|
||||
}
|
||||
|
||||
// Addresses 5000::1 and 1000::6 should not be bound any more, but the ipv4 addresses should remain
|
||||
remainingAddrs, _ := fp.netlinkHandle.ListBindAddress(defaultDummyDevice)
|
||||
if len(remainingAddrs) != 6 {
|
||||
t.Errorf("Expected number of remaining bound addrs after cleanup to be %v. Got %v", 6, len(remainingAddrs))
|
||||
}
|
||||
// check that address "1000::1", "1000::2", "3000::1", "4000::1" are still bound, ignore ipv4 addresses
|
||||
remainingAddrsMap := make(map[string]bool)
|
||||
for _, a := range remainingAddrs {
|
||||
if netutils.ParseIPSloppy(a).To4() != nil {
|
||||
continue
|
||||
}
|
||||
remainingAddrsMap[a] = true
|
||||
}
|
||||
if !reflect.DeepEqual(activeBindAddrs, remainingAddrsMap) {
|
||||
t.Errorf("Expected remainingAddrsMap %v, got %v", activeBindAddrs, remainingAddrsMap)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMultiPortServiceBindAddr(t *testing.T) {
|
||||
@ -6011,7 +5885,7 @@ func TestNoEndpointsMetric(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{netutils.ParseIPSloppy("10.0.0.1")}, nil, v1.IPv4Protocol)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, []string{"10.0.0.1"}, nil, v1.IPv4Protocol)
|
||||
fp.servicesSynced = true
|
||||
// fp.endpointsSynced = true
|
||||
fp.endpointSlicesSynced = true
|
||||
|
@ -33,9 +33,10 @@ type FakeNetlinkHandle struct {
|
||||
}
|
||||
|
||||
// NewFakeNetlinkHandle will create a new FakeNetlinkHandle
|
||||
func NewFakeNetlinkHandle() *FakeNetlinkHandle {
|
||||
func NewFakeNetlinkHandle(isIPv6 bool) *FakeNetlinkHandle {
|
||||
fake := &FakeNetlinkHandle{
|
||||
localAddresses: make(map[string][]string),
|
||||
IsIPv6: isIPv6,
|
||||
}
|
||||
return fake
|
||||
}
|
||||
@ -115,8 +116,8 @@ func (h *FakeNetlinkHandle) ListBindAddress(devName string) ([]string, error) {
|
||||
}
|
||||
|
||||
// GetLocalAddresses is a mock implementation
|
||||
func (h *FakeNetlinkHandle) GetLocalAddresses(dev string) (sets.String, error) {
|
||||
res := sets.NewString()
|
||||
func (h *FakeNetlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error) {
|
||||
res := sets.New[string]()
|
||||
// list all addresses from a given network interface.
|
||||
for _, addr := range h.localAddresses[dev] {
|
||||
if h.isValidForSet(addr) {
|
||||
@ -125,8 +126,8 @@ func (h *FakeNetlinkHandle) GetLocalAddresses(dev string) (sets.String, error) {
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
func (h *FakeNetlinkHandle) GetAllLocalAddresses() (sets.String, error) {
|
||||
res := sets.NewString()
|
||||
func (h *FakeNetlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) {
|
||||
res := sets.New[string]()
|
||||
// List all addresses from all available network interfaces.
|
||||
for linkName := range h.localAddresses {
|
||||
// list all addresses from a given network interface.
|
||||
|
@ -17,33 +17,36 @@ limitations under the License.
|
||||
package testing
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/proxy/ipvs"
|
||||
)
|
||||
|
||||
func TestSetGetLocalAddresses(t *testing.T) {
|
||||
fake := NewFakeNetlinkHandle()
|
||||
fake := NewFakeNetlinkHandle(false)
|
||||
_ = ipvs.NetLinkHandle(fake) // Ensure that the interface is honored
|
||||
fake.SetLocalAddresses("eth0", "1.2.3.4")
|
||||
expected := sets.NewString("1.2.3.4")
|
||||
addr, _ := fake.GetLocalAddresses("eth0")
|
||||
if !reflect.DeepEqual(expected, addr) {
|
||||
var expected, addr sets.Set[string]
|
||||
expected = sets.New("1.2.3.4")
|
||||
addr, _ = fake.GetLocalAddresses("eth0")
|
||||
if !addr.Equal(expected) {
|
||||
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
||||
}
|
||||
list, _ := fake.GetAllLocalAddresses()
|
||||
if !reflect.DeepEqual(expected, list) {
|
||||
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, list)
|
||||
addr, _ = fake.GetAllLocalAddresses()
|
||||
if !addr.Equal(expected) {
|
||||
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
||||
}
|
||||
fake.SetLocalAddresses("lo", "127.0.0.1")
|
||||
expected = sets.NewString()
|
||||
expected = nil
|
||||
addr, _ = fake.GetLocalAddresses("lo")
|
||||
if !reflect.DeepEqual(expected, addr) {
|
||||
if !addr.Equal(expected) {
|
||||
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
||||
}
|
||||
list, _ = fake.GetAllLocalAddresses()
|
||||
expected = sets.NewString("1.2.3.4")
|
||||
if !reflect.DeepEqual(expected, list) {
|
||||
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, list)
|
||||
fake.SetLocalAddresses("kube-ipvs0", "4.3.2.1")
|
||||
addr, _ = fake.GetAllLocalAddresses()
|
||||
expected = sets.New("1.2.3.4", "4.3.2.1")
|
||||
if !addr.Equal(expected) {
|
||||
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
||||
}
|
||||
}
|
||||
|
@ -199,8 +199,8 @@ func ShouldSkipService(service *v1.Service) bool {
|
||||
|
||||
// AddressSet validates the addresses in the slice using the "isValid" function.
|
||||
// Addresses that pass the validation are returned as a string Set.
|
||||
func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.String {
|
||||
ips := sets.NewString()
|
||||
func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string] {
|
||||
ips := sets.New[string]()
|
||||
for _, a := range addrs {
|
||||
var ip net.IP
|
||||
switch v := a.(type) {
|
||||
|
@ -1112,13 +1112,13 @@ func TestAddressSet(t *testing.T) {
|
||||
name string
|
||||
validator func(ip net.IP) bool
|
||||
input []net.Addr
|
||||
expected sets.String
|
||||
expected sets.Set[string]
|
||||
}{
|
||||
{
|
||||
"Empty",
|
||||
func(ip net.IP) bool { return false },
|
||||
nil,
|
||||
sets.NewString(),
|
||||
nil,
|
||||
},
|
||||
{
|
||||
"Reject IPAddr x 2",
|
||||
@ -1127,7 +1127,7 @@ func TestAddressSet(t *testing.T) {
|
||||
mustParseIPAddr("8.8.8.8"),
|
||||
mustParseIPAddr("1000::"),
|
||||
},
|
||||
sets.NewString(),
|
||||
nil,
|
||||
},
|
||||
{
|
||||
"Accept IPAddr x 2",
|
||||
@ -1136,7 +1136,7 @@ func TestAddressSet(t *testing.T) {
|
||||
mustParseIPAddr("8.8.8.8"),
|
||||
mustParseIPAddr("1000::"),
|
||||
},
|
||||
sets.NewString("8.8.8.8", "1000::"),
|
||||
sets.New("8.8.8.8", "1000::"),
|
||||
},
|
||||
{
|
||||
"Accept IPNet x 2",
|
||||
@ -1145,7 +1145,7 @@ func TestAddressSet(t *testing.T) {
|
||||
mustParseIPNet("8.8.8.8/32"),
|
||||
mustParseIPNet("1000::/128"),
|
||||
},
|
||||
sets.NewString("8.8.8.8", "1000::"),
|
||||
sets.New("8.8.8.8", "1000::"),
|
||||
},
|
||||
{
|
||||
"Accept Unix x 2",
|
||||
@ -1154,7 +1154,7 @@ func TestAddressSet(t *testing.T) {
|
||||
mustParseUnix("/tmp/sock1"),
|
||||
mustParseUnix("/tmp/sock2"),
|
||||
},
|
||||
sets.NewString(),
|
||||
nil,
|
||||
},
|
||||
{
|
||||
"Cidr IPv4",
|
||||
@ -1164,7 +1164,7 @@ func TestAddressSet(t *testing.T) {
|
||||
mustParseIPAddr("1000::"),
|
||||
mustParseIPAddr("192.168.1.1"),
|
||||
},
|
||||
sets.NewString("192.168.1.1"),
|
||||
sets.New("192.168.1.1"),
|
||||
},
|
||||
{
|
||||
"Cidr IPv6",
|
||||
@ -1174,7 +1174,7 @@ func TestAddressSet(t *testing.T) {
|
||||
mustParseIPAddr("1000::"),
|
||||
mustParseIPAddr("192.168.1.1"),
|
||||
},
|
||||
sets.NewString("1000::"),
|
||||
sets.New("1000::"),
|
||||
},
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user