From 2c348bf1863f3e1db9e0078151452dbad075cf01 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 29 Apr 2024 16:41:33 -0400 Subject: [PATCH 1/3] Use a constructor properly in change tracker unit tests newFakeProxier was inlining the details of NewEndpointsChangeTracker so it could override trackerStartTime, but it would be better and more future-proof to just call NewEndpointsChangeTracker normally and then edit that one field. (Also remove an unused FakeProxier field.) --- pkg/proxy/endpointschangetracker_test.go | 1 - pkg/proxy/endpointslicecache_test.go | 2 -- pkg/proxy/servicechangetracker_test.go | 16 ++++++---------- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/pkg/proxy/endpointschangetracker_test.go b/pkg/proxy/endpointschangetracker_test.go index 89940b2912d..ab3235899b6 100644 --- a/pkg/proxy/endpointschangetracker_test.go +++ b/pkg/proxy/endpointschangetracker_test.go @@ -1037,7 +1037,6 @@ func TestUpdateEndpointsMap(t *testing.T) { for tci, tc := range testCases { t.Run(tc.name, func(t *testing.T) { fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) - fp.hostname = testHostname // First check that after adding all previous versions of endpoints, // the fp.previousEndpointsMap is as we expect. diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index f1afd63a53c..3f8317a1860 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -610,7 +610,6 @@ func TestEndpointSliceCacheClearedCorrectly(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) - fp.hostname = testHostname for _, epSlice := range tc.currEndpointSlices { fp.addEndpointSlice(epSlice) @@ -648,7 +647,6 @@ func TestSameServiceEndpointSliceCacheClearedCorrectly(t *testing.T) { } fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) - fp.hostname = testHostname for _, epSlice := range currEndpointSlices { fp.addEndpointSlice(epSlice) diff --git a/pkg/proxy/servicechangetracker_test.go b/pkg/proxy/servicechangetracker_test.go index 8f58424754b..b9858416f30 100644 --- a/pkg/proxy/servicechangetracker_test.go +++ b/pkg/proxy/servicechangetracker_test.go @@ -621,20 +621,16 @@ type FakeProxier struct { serviceChanges *ServiceChangeTracker svcPortMap ServicePortMap endpointsMap EndpointsMap - hostname string } func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { + ect := NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil) + ect.trackerStartTime = t return &FakeProxier{ - svcPortMap: make(ServicePortMap), - serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), - endpointsMap: make(EndpointsMap), - endpointsChanges: &EndpointsChangeTracker{ - lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), - trackerStartTime: t, - processEndpointsMapChange: nil, - endpointSliceCache: NewEndpointSliceCache(testHostname, ipFamily, nil, nil), - }, + svcPortMap: make(ServicePortMap), + serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), + endpointsMap: make(EndpointsMap), + endpointsChanges: ect, } } From 79d1c078bb1f93798647f744f306300e4f1b8260 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Mon, 29 Apr 2024 16:32:22 -0400 Subject: [PATCH 2/3] Make change trackers just ignore the "wrong" IP family Dual-stack clusters exist; ServiceChangeTracker does not need to log messages (even at V(4)) when it sees dual-stack Services, and EndpointsChangeTracker does not need to emit Events(!) when it sees EndpointSlices of the wrong AddressType. (Though in most cases the EndpointsChangeTracker Events would not get emitted anyway, since the MetaProxier would ensure that only the v4 tracker saw v4 slices, and only the v6 tracker saw v6 slices.) Also remove a nil check labeled "This should never happen" which, in fact, we know *didn't* happen, since the function has already dereferenced the value before it checking it against nil. --- pkg/proxy/endpointschangetracker.go | 28 ++++++++--------- pkg/proxy/endpointslicecache.go | 18 +---------- pkg/proxy/endpointslicecache_test.go | 18 +++++------ pkg/proxy/serviceport.go | 47 ++++++++-------------------- pkg/proxy/util/utils.go | 17 ---------- 5 files changed, 36 insertions(+), 92 deletions(-) diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index 3918aae24c8..1d3b13e1f00 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -29,11 +29,6 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" ) -var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType]( - discovery.AddressTypeIPv4, - discovery.AddressTypeIPv6, -) - // EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointsChangeTracker struct { @@ -45,6 +40,9 @@ type EndpointsChangeTracker struct { // any Proxier-specific cleanup. processEndpointsMapChange processEndpointsMapChangeFunc + // addressType is the type of EndpointSlice this proxy tracks + addressType discovery.AddressType + // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache @@ -62,12 +60,18 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) // NewEndpointsChangeTracker initializes an EndpointsChangeTracker -func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { +func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, _ events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { + addressType := discovery.AddressTypeIPv4 + if ipFamily == v1.IPv6Protocol { + addressType = discovery.AddressTypeIPv6 + } + return &EndpointsChangeTracker{ + addressType: addressType, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, - endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo), + endpointSliceCache: NewEndpointSliceCache(hostname, makeEndpointInfo), } } @@ -76,14 +80,8 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun // change that needs to be synced; note that this is different from the return value of // ServiceChangeTracker.Update(). func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { - if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) { - klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType) - return false - } - - // This should never happen - if endpointSlice == nil { - klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate") + if endpointSlice.AddressType != ect.addressType { + klog.V(4).InfoS("Ignoring unsupported EndpointSlice", "endpointSlice", klog.KObj(endpointSlice), "type", endpointSlice.AddressType, "expected", ect.addressType) return false } diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 5671404fcd3..85d62c74eed 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -22,15 +22,12 @@ import ( "sort" "sync" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" - proxyutil "k8s.io/kubernetes/pkg/proxy/util" utilnet "k8s.io/utils/net" ) @@ -49,8 +46,6 @@ type EndpointSliceCache struct { makeEndpointInfo makeEndpointFunc hostname string - ipFamily v1.IPFamily - recorder events.EventRecorder } // endpointSliceTracker keeps track of EndpointSlices as they have been applied @@ -72,16 +67,14 @@ type endpointSliceData struct { } // NewEndpointSliceCache initializes an EndpointSliceCache. -func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { +func NewEndpointSliceCache(hostname string, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { if makeEndpointInfo == nil { makeEndpointInfo = standardEndpointInfo } return &EndpointSliceCache{ trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{}, hostname: hostname, - ipFamily: ipFamily, makeEndpointInfo: makeEndpointInfo, - recorder: recorder, } } @@ -213,15 +206,6 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port continue } - // Filter out the incorrect IP version case. Any endpoint port that - // contains incorrect IP version will be ignored. - if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) { - // Emit event on the corresponding service which had a different IP - // version than the endpoint. - proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "") - continue - } - isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName) ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 3f8317a1860..610bb271c16 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -205,7 +205,7 @@ func TestEndpointsMapFromESC(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil) + esCache := NewEndpointSliceCache(tc.hostname, nil) cmc := newCacheMutationCheck(tc.endpointSlices) for _, endpointSlice := range tc.endpointSlices { @@ -315,7 +315,7 @@ func TestEndpointInfoByServicePort(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil) + esCache := NewEndpointSliceCache(tc.hostname, nil) for _, endpointSlice := range tc.endpointSlices { esCache.updatePending(endpointSlice, false) @@ -350,7 +350,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged bool }{ "identical slices, ports only": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port80}, @@ -362,7 +362,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: false, }, "identical slices, ports out of order": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443, port80}, @@ -374,7 +374,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "port removed": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443, port80}, @@ -386,7 +386,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "port added": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -398,7 +398,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "identical with endpoints": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -412,7 +412,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: false, }, "identical with endpoints out of order": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -426,7 +426,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "identical with endpoint added": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, diff --git a/pkg/proxy/serviceport.go b/pkg/proxy/serviceport.go index fa3f21758cc..cafda3b8af3 100644 --- a/pkg/proxy/serviceport.go +++ b/pkg/proxy/serviceport.go @@ -19,6 +19,7 @@ package proxy import ( "fmt" "net" + "strings" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -207,56 +208,34 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode] } - // filter external ips, source ranges and ingress ips - // prior to dual stack services, this was considered an error, but with dual stack - // services, this is actually expected. Hence we downgraded from reporting by events - // to just log lines with high verbosity + // Filter ExternalIPs to correct IP family ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs) info.externalIPs = ipFamilyMap[ipFamily] - // Log the IPs not matching the ipFamily - if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family", - "ipFamily", ipFamily, "externalIPs", ips, "service", klog.KObj(service)) + // Filter source ranges to correct IP family. Also deal with the fact that + // LoadBalancerSourceRanges validation mistakenly allows whitespace padding + loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges)) + for i, sourceRange := range service.Spec.LoadBalancerSourceRanges { + loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange) } - cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(service.Spec.LoadBalancerSourceRanges) + cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges) info.loadBalancerSourceRanges = cidrFamilyMap[ipFamily] - // Log the CIDRs not matching the ipFamily - if cidrs, ok := cidrFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family", - "ipFamily", ipFamily, "loadBalancerSourceRanges", cidrs, "service", klog.KObj(service)) - } - // Obtain Load Balancer Ingress - var invalidIPs []net.IP + // Filter Load Balancer Ingress IPs to correct IP family. While proxying load + // balancers might choose to proxy connections from an LB IP of one family to a + // service IP of another family, that's irrelevant to kube-proxy, which only + // creates rules for VIP-style load balancers. for _, ing := range service.Status.LoadBalancer.Ingress { - if ing.IP == "" { + if ing.IP == "" || !proxyutil.IsVIPMode(ing) { continue } - // proxy mode load balancers do not need to track the IPs in the service cache - // and they can also implement IP family translation, so no need to check if - // the status ingress.IP and the ClusterIP belong to the same family. - if !proxyutil.IsVIPMode(ing) { - klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode", - "ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service)) - continue - } - - // kube-proxy does not implement IP family translation, skip addresses with - // different IP family ip := netutils.ParseIPSloppy(ing.IP) // (already verified as an IP-address) if ingFamily := proxyutil.GetIPFamilyFromIP(ip); ingFamily == ipFamily { info.loadBalancerVIPs = append(info.loadBalancerVIPs, ip) - } else { - invalidIPs = append(invalidIPs, ip) } } - if len(invalidIPs) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family", - "ipFamily", ipFamily, "loadBalancerIngressIPs", invalidIPs, "service", klog.KObj(service)) - } if apiservice.NeedsHealthCheck(service) { p := service.Spec.HealthCheckNodePort diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index 320d19bc49b..d9b13f9a18e 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -22,10 +22,8 @@ import ( "strings" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -86,21 +84,6 @@ func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string] return ips } -// LogAndEmitIncorrectIPVersionEvent logs and emits incorrect IP version event. -func LogAndEmitIncorrectIPVersionEvent(recorder events.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) { - errMsg := fmt.Sprintf("%s in %s has incorrect IP version", fieldValue, fieldName) - klog.ErrorS(nil, "Incorrect IP version", "service", klog.KRef(svcNamespace, svcName), "field", fieldName, "value", fieldValue) - if recorder != nil { - recorder.Eventf( - &v1.ObjectReference{ - Kind: "Service", - Name: svcName, - Namespace: svcNamespace, - UID: svcUID, - }, nil, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", "GatherEndpoints", errMsg) - } -} - // MapIPsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6) func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]net.IP { ipFamilyMap := map[v1.IPFamily][]net.IP{} From f5969adb14ab419885625e480fe1a78ecf9953d1 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 6 Jun 2023 22:03:46 -0400 Subject: [PATCH 3/3] Clean up NewServiceChangeTracker/NewEndpointsChangeTracker args Remove the now-unused event recorders, and put the remaining args into a sensible order, and consistent between the two. --- pkg/proxy/conntrack/cleanup_test.go | 4 ++-- pkg/proxy/endpointschangetracker.go | 3 +-- pkg/proxy/endpointschangetracker_test.go | 28 ++++++++++++------------ pkg/proxy/iptables/proxier.go | 4 ++-- pkg/proxy/iptables/proxier_test.go | 4 ++-- pkg/proxy/ipvs/proxier.go | 4 ++-- pkg/proxy/ipvs/proxier_test.go | 4 ++-- pkg/proxy/nftables/proxier.go | 4 ++-- pkg/proxy/nftables/proxier_test.go | 4 ++-- pkg/proxy/servicechangetracker.go | 5 +---- pkg/proxy/servicechangetracker_test.go | 6 ++--- pkg/proxy/winkernel/proxier.go | 4 ++-- pkg/proxy/winkernel/proxier_test.go | 6 ++--- 13 files changed, 38 insertions(+), 42 deletions(-) diff --git a/pkg/proxy/conntrack/cleanup_test.go b/pkg/proxy/conntrack/cleanup_test.go index d6232a02b3a..68c8a8f1e2a 100644 --- a/pkg/proxy/conntrack/cleanup_test.go +++ b/pkg/proxy/conntrack/cleanup_test.go @@ -62,7 +62,7 @@ func TestCleanStaleEntries(t *testing.T) { // interface, or else use a proxy.ServiceChangeTracker and proxy.NewEndpointsChangeTracker // to construct them and fill in the maps for us. - sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil) + sct := proxy.NewServiceChangeTracker(v1.IPv4Protocol, nil, nil) svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: testServiceName, @@ -104,7 +104,7 @@ func TestCleanStaleEntries(t *testing.T) { svcPortMap := make(proxy.ServicePortMap) _ = svcPortMap.Update(sct) - ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil) + ect := proxy.NewEndpointsChangeTracker(v1.IPv4Protocol, "test-worker", nil, nil) eps := &discovery.EndpointSlice{ TypeMeta: metav1.TypeMeta{}, AddressType: discovery.AddressTypeIPv4, diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index 1d3b13e1f00..3bb113c5b16 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -24,7 +24,6 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy/metrics" ) @@ -60,7 +59,7 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) // NewEndpointsChangeTracker initializes an EndpointsChangeTracker -func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, _ events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { +func NewEndpointsChangeTracker(ipFamily v1.IPFamily, hostname string, makeEndpointInfo makeEndpointFunc, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { addressType := discovery.AddressTypeIPv4 if ipFamily == v1.IPv6Protocol { addressType = discovery.AddressTypeIPv6 diff --git a/pkg/proxy/endpointschangetracker_test.go b/pkg/proxy/endpointschangetracker_test.go index ab3235899b6..9b43638d9bd 100644 --- a/pkg/proxy/endpointschangetracker_test.go +++ b/pkg/proxy/endpointschangetracker_test.go @@ -1244,7 +1244,7 @@ func TestEndpointSliceUpdate(t *testing.T) { // test starting from an empty state "add a simple slice that doesn't already exist": { startingSlices: []*discovery.EndpointSlice{}, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1267,7 +1267,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1279,7 +1279,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: fqdnSlice, paramRemoveSlice: false, @@ -1292,7 +1292,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1324,7 +1324,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80)}), paramRemoveSlice: false, @@ -1354,7 +1354,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: true, @@ -1376,7 +1376,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: true, @@ -1388,7 +1388,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1411,7 +1411,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1433,7 +1433,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1461,7 +1461,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1521,13 +1521,13 @@ func TestCheckoutChanges(t *testing.T) { pendingSlices []*discovery.EndpointSlice }{ "empty slices": { - endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil), expectedChanges: []*endpointsChange{}, appliedSlices: []*discovery.EndpointSlice{}, pendingSlices: []*discovery.EndpointSlice{}, }, "adding initial slice": { - endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{}, current: EndpointsMap{ @@ -1544,7 +1544,7 @@ func TestCheckoutChanges(t *testing.T) { }, }, "removing port in update": { - endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{ svcPortName0: []Endpoint{ diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 4feba919afa..da61d131248 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -286,9 +286,9 @@ func NewProxier(ctx context.Context, proxier := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, newEndpointInfo, nil), needFullSync: true, syncPeriod: syncPeriod, iptables: ipt, diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 3d7bb161b22..bcbdbe0f9da 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -116,9 +116,9 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { p := &Proxier{ ipFamily: ipfamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipfamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipfamily, testHostname, newEndpointInfo, nil), needFullSync: true, iptables: ipt, masqueradeMark: "0x4000", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 43bb1776387..b3d0dd4f81b 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -375,9 +375,9 @@ func NewProxier( proxier := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, nil, nil), initialSync: true, syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 004b54f95ba..b28ab05f4e3 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -140,9 +140,9 @@ func NewFakeProxier(ctx context.Context, ipt utiliptables.Interface, ipvs utilip } p := &Proxier{ svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, testHostname, nil, nil), excludeCIDRs: excludeCIDRs, iptables: ipt, ipvs: ipvs, diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 7845a375816..db993dab5ad 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -250,9 +250,9 @@ func NewProxier(ctx context.Context, proxier := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, newEndpointInfo, nil), needFullSync: true, syncPeriod: syncPeriod, nftables: nft, diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index ddb52e37a51..f770cca7591 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -118,9 +118,9 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { p := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, testHostname, newEndpointInfo, nil), needFullSync: true, nftables: nft, masqueradeMark: "0x4000", diff --git a/pkg/proxy/servicechangetracker.go b/pkg/proxy/servicechangetracker.go index 8c42917e44a..e6abea247d7 100644 --- a/pkg/proxy/servicechangetracker.go +++ b/pkg/proxy/servicechangetracker.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy/metrics" proxyutil "k8s.io/kubernetes/pkg/proxy/util" @@ -46,7 +45,6 @@ type ServiceChangeTracker struct { processServiceMapChange processServiceMapChangeFunc ipFamily v1.IPFamily - recorder events.EventRecorder } type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort @@ -61,11 +59,10 @@ type serviceChange struct { } // NewServiceChangeTracker initializes a ServiceChangeTracker -func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { +func NewServiceChangeTracker(ipFamily v1.IPFamily, makeServiceInfo makeServicePortFunc, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { return &ServiceChangeTracker{ items: make(map[types.NamespacedName]*serviceChange), makeServiceInfo: makeServiceInfo, - recorder: recorder, ipFamily: ipFamily, processServiceMapChange: processServiceMapChange, } diff --git a/pkg/proxy/servicechangetracker_test.go b/pkg/proxy/servicechangetracker_test.go index b9858416f30..df52618c419 100644 --- a/pkg/proxy/servicechangetracker_test.go +++ b/pkg/proxy/servicechangetracker_test.go @@ -577,7 +577,7 @@ func TestServiceToServiceMap(t *testing.T) { featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.31")) } featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, tc.ipModeEnabled) - svcTracker := NewServiceChangeTracker(nil, tc.ipFamily, nil, nil) + svcTracker := NewServiceChangeTracker(tc.ipFamily, nil, nil) // outputs newServices := svcTracker.serviceToServiceMap(tc.service) @@ -624,11 +624,11 @@ type FakeProxier struct { } func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { - ect := NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil) + ect := NewEndpointsChangeTracker(ipFamily, testHostname, nil, nil) ect.trackerStartTime = t return &FakeProxier{ svcPortMap: make(ServicePortMap), - serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), + serviceChanges: NewServiceChangeTracker(ipFamily, nil, nil), endpointsMap: make(EndpointsMap), endpointsChanges: ect, } diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index d50b4b39291..fbd3562e6fa 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -796,8 +796,8 @@ func NewProxier( terminatedEndpoints: make(map[string]bool), } - serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange) - endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange) + serviceChanges := proxy.NewServiceChangeTracker(ipFamily, proxier.newServiceInfo, proxier.serviceMapChange) + endPointChangeTracker := proxy.NewEndpointsChangeTracker(ipFamily, hostname, proxier.newEndpointInfo, proxier.endpointsMapChange) proxier.endpointsChanges = endPointChangeTracker proxier.serviceChanges = serviceChanges diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 293cce56dc4..faefb236278 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -121,9 +121,9 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostn terminatedEndpoints: make(map[string]bool), } - serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange) - endpointsChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange) - proxier.endpointsChanges = endpointsChangeTracker + serviceChanges := proxy.NewServiceChangeTracker(v1.IPv4Protocol, proxier.newServiceInfo, proxier.serviceMapChange) + endpointChangeTracker := proxy.NewEndpointsChangeTracker(v1.IPv4Protocol, hostname, proxier.newEndpointInfo, proxier.endpointsMapChange) + proxier.endpointsChanges = endpointChangeTracker proxier.serviceChanges = serviceChanges return proxier