diff --git a/pkg/proxy/conntrack/cleanup_test.go b/pkg/proxy/conntrack/cleanup_test.go index d6232a02b3a..68c8a8f1e2a 100644 --- a/pkg/proxy/conntrack/cleanup_test.go +++ b/pkg/proxy/conntrack/cleanup_test.go @@ -62,7 +62,7 @@ func TestCleanStaleEntries(t *testing.T) { // interface, or else use a proxy.ServiceChangeTracker and proxy.NewEndpointsChangeTracker // to construct them and fill in the maps for us. - sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil) + sct := proxy.NewServiceChangeTracker(v1.IPv4Protocol, nil, nil) svc := &v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: testServiceName, @@ -104,7 +104,7 @@ func TestCleanStaleEntries(t *testing.T) { svcPortMap := make(proxy.ServicePortMap) _ = svcPortMap.Update(sct) - ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil) + ect := proxy.NewEndpointsChangeTracker(v1.IPv4Protocol, "test-worker", nil, nil) eps := &discovery.EndpointSlice{ TypeMeta: metav1.TypeMeta{}, AddressType: discovery.AddressTypeIPv4, diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index 3918aae24c8..3bb113c5b16 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -24,16 +24,10 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy/metrics" ) -var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType]( - discovery.AddressTypeIPv4, - discovery.AddressTypeIPv6, -) - // EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of // Endpoints, keyed by their namespace and name. type EndpointsChangeTracker struct { @@ -45,6 +39,9 @@ type EndpointsChangeTracker struct { // any Proxier-specific cleanup. processEndpointsMapChange processEndpointsMapChangeFunc + // addressType is the type of EndpointSlice this proxy tracks + addressType discovery.AddressType + // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache @@ -62,12 +59,18 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap) // NewEndpointsChangeTracker initializes an EndpointsChangeTracker -func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { +func NewEndpointsChangeTracker(ipFamily v1.IPFamily, hostname string, makeEndpointInfo makeEndpointFunc, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker { + addressType := discovery.AddressTypeIPv4 + if ipFamily == v1.IPv6Protocol { + addressType = discovery.AddressTypeIPv6 + } + return &EndpointsChangeTracker{ + addressType: addressType, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, - endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo), + endpointSliceCache: NewEndpointSliceCache(hostname, makeEndpointInfo), } } @@ -76,14 +79,8 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun // change that needs to be synced; note that this is different from the return value of // ServiceChangeTracker.Update(). func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { - if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) { - klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType) - return false - } - - // This should never happen - if endpointSlice == nil { - klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate") + if endpointSlice.AddressType != ect.addressType { + klog.V(4).InfoS("Ignoring unsupported EndpointSlice", "endpointSlice", klog.KObj(endpointSlice), "type", endpointSlice.AddressType, "expected", ect.addressType) return false } diff --git a/pkg/proxy/endpointschangetracker_test.go b/pkg/proxy/endpointschangetracker_test.go index 89940b2912d..9b43638d9bd 100644 --- a/pkg/proxy/endpointschangetracker_test.go +++ b/pkg/proxy/endpointschangetracker_test.go @@ -1037,7 +1037,6 @@ func TestUpdateEndpointsMap(t *testing.T) { for tci, tc := range testCases { t.Run(tc.name, func(t *testing.T) { fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) - fp.hostname = testHostname // First check that after adding all previous versions of endpoints, // the fp.previousEndpointsMap is as we expect. @@ -1245,7 +1244,7 @@ func TestEndpointSliceUpdate(t *testing.T) { // test starting from an empty state "add a simple slice that doesn't already exist": { startingSlices: []*discovery.EndpointSlice{}, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1268,7 +1267,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1280,7 +1279,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: fqdnSlice, paramRemoveSlice: false, @@ -1293,7 +1292,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1325,7 +1324,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80)}), paramRemoveSlice: false, @@ -1355,7 +1354,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: true, @@ -1377,7 +1376,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: true, @@ -1389,7 +1388,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1412,7 +1411,7 @@ func TestEndpointSliceUpdate(t *testing.T) { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1434,7 +1433,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1462,7 +1461,7 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil), namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), paramRemoveSlice: false, @@ -1522,13 +1521,13 @@ func TestCheckoutChanges(t *testing.T) { pendingSlices []*discovery.EndpointSlice }{ "empty slices": { - endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil), expectedChanges: []*endpointsChange{}, appliedSlices: []*discovery.EndpointSlice{}, pendingSlices: []*discovery.EndpointSlice{}, }, "adding initial slice": { - endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{}, current: EndpointsMap{ @@ -1545,7 +1544,7 @@ func TestCheckoutChanges(t *testing.T) { }, }, "removing port in update": { - endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil), + endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil), expectedChanges: []*endpointsChange{{ previous: EndpointsMap{ svcPortName0: []Endpoint{ diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 5671404fcd3..85d62c74eed 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -22,15 +22,12 @@ import ( "sort" "sync" - v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" - proxyutil "k8s.io/kubernetes/pkg/proxy/util" utilnet "k8s.io/utils/net" ) @@ -49,8 +46,6 @@ type EndpointSliceCache struct { makeEndpointInfo makeEndpointFunc hostname string - ipFamily v1.IPFamily - recorder events.EventRecorder } // endpointSliceTracker keeps track of EndpointSlices as they have been applied @@ -72,16 +67,14 @@ type endpointSliceData struct { } // NewEndpointSliceCache initializes an EndpointSliceCache. -func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { +func NewEndpointSliceCache(hostname string, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache { if makeEndpointInfo == nil { makeEndpointInfo = standardEndpointInfo } return &EndpointSliceCache{ trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{}, hostname: hostname, - ipFamily: ipFamily, makeEndpointInfo: makeEndpointInfo, - recorder: recorder, } } @@ -213,15 +206,6 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port continue } - // Filter out the incorrect IP version case. Any endpoint port that - // contains incorrect IP version will be ignored. - if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) { - // Emit event on the corresponding service which had a different IP - // version than the endpoint. - proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "") - continue - } - isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName) ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index f1afd63a53c..610bb271c16 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -205,7 +205,7 @@ func TestEndpointsMapFromESC(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil) + esCache := NewEndpointSliceCache(tc.hostname, nil) cmc := newCacheMutationCheck(tc.endpointSlices) for _, endpointSlice := range tc.endpointSlices { @@ -315,7 +315,7 @@ func TestEndpointInfoByServicePort(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil) + esCache := NewEndpointSliceCache(tc.hostname, nil) for _, endpointSlice := range tc.endpointSlices { esCache.updatePending(endpointSlice, false) @@ -350,7 +350,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged bool }{ "identical slices, ports only": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port80}, @@ -362,7 +362,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: false, }, "identical slices, ports out of order": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443, port80}, @@ -374,7 +374,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "port removed": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443, port80}, @@ -386,7 +386,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "port added": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -398,7 +398,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "identical with endpoints": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -412,7 +412,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: false, }, "identical with endpoints out of order": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -426,7 +426,7 @@ func TestEsDataChanged(t *testing.T) { expectChanged: true, }, "identical with endpoint added": { - cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), + cache: NewEndpointSliceCache("", nil), initialSlice: &discovery.EndpointSlice{ ObjectMeta: objMeta, Ports: []discovery.EndpointPort{port443}, @@ -610,7 +610,6 @@ func TestEndpointSliceCacheClearedCorrectly(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) - fp.hostname = testHostname for _, epSlice := range tc.currEndpointSlices { fp.addEndpointSlice(epSlice) @@ -648,7 +647,6 @@ func TestSameServiceEndpointSliceCacheClearedCorrectly(t *testing.T) { } fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) - fp.hostname = testHostname for _, epSlice := range currEndpointSlices { fp.addEndpointSlice(epSlice) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 4feba919afa..da61d131248 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -286,9 +286,9 @@ func NewProxier(ctx context.Context, proxier := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, newEndpointInfo, nil), needFullSync: true, syncPeriod: syncPeriod, iptables: ipt, diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 3d7bb161b22..bcbdbe0f9da 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -116,9 +116,9 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { p := &Proxier{ ipFamily: ipfamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipfamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipfamily, testHostname, newEndpointInfo, nil), needFullSync: true, iptables: ipt, masqueradeMark: "0x4000", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 43bb1776387..b3d0dd4f81b 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -375,9 +375,9 @@ func NewProxier( proxier := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, nil, nil), initialSync: true, syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 004b54f95ba..b28ab05f4e3 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -140,9 +140,9 @@ func NewFakeProxier(ctx context.Context, ipt utiliptables.Interface, ipvs utilip } p := &Proxier{ svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, testHostname, nil, nil), excludeCIDRs: excludeCIDRs, iptables: ipt, ipvs: ipvs, diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 7845a375816..db993dab5ad 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -250,9 +250,9 @@ func NewProxier(ctx context.Context, proxier := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, newEndpointInfo, nil), needFullSync: true, syncPeriod: syncPeriod, nftables: nft, diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index ddb52e37a51..f770cca7591 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -118,9 +118,9 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) { p := &Proxier{ ipFamily: ipFamily, svcPortMap: make(proxy.ServicePortMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil), + serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil), endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil), + endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, testHostname, newEndpointInfo, nil), needFullSync: true, nftables: nft, masqueradeMark: "0x4000", diff --git a/pkg/proxy/servicechangetracker.go b/pkg/proxy/servicechangetracker.go index 8c42917e44a..e6abea247d7 100644 --- a/pkg/proxy/servicechangetracker.go +++ b/pkg/proxy/servicechangetracker.go @@ -23,7 +23,6 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/events" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy/metrics" proxyutil "k8s.io/kubernetes/pkg/proxy/util" @@ -46,7 +45,6 @@ type ServiceChangeTracker struct { processServiceMapChange processServiceMapChangeFunc ipFamily v1.IPFamily - recorder events.EventRecorder } type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort @@ -61,11 +59,10 @@ type serviceChange struct { } // NewServiceChangeTracker initializes a ServiceChangeTracker -func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { +func NewServiceChangeTracker(ipFamily v1.IPFamily, makeServiceInfo makeServicePortFunc, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { return &ServiceChangeTracker{ items: make(map[types.NamespacedName]*serviceChange), makeServiceInfo: makeServiceInfo, - recorder: recorder, ipFamily: ipFamily, processServiceMapChange: processServiceMapChange, } diff --git a/pkg/proxy/servicechangetracker_test.go b/pkg/proxy/servicechangetracker_test.go index 8f58424754b..df52618c419 100644 --- a/pkg/proxy/servicechangetracker_test.go +++ b/pkg/proxy/servicechangetracker_test.go @@ -577,7 +577,7 @@ func TestServiceToServiceMap(t *testing.T) { featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.31")) } featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, tc.ipModeEnabled) - svcTracker := NewServiceChangeTracker(nil, tc.ipFamily, nil, nil) + svcTracker := NewServiceChangeTracker(tc.ipFamily, nil, nil) // outputs newServices := svcTracker.serviceToServiceMap(tc.service) @@ -621,20 +621,16 @@ type FakeProxier struct { serviceChanges *ServiceChangeTracker svcPortMap ServicePortMap endpointsMap EndpointsMap - hostname string } func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { + ect := NewEndpointsChangeTracker(ipFamily, testHostname, nil, nil) + ect.trackerStartTime = t return &FakeProxier{ - svcPortMap: make(ServicePortMap), - serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), - endpointsMap: make(EndpointsMap), - endpointsChanges: &EndpointsChangeTracker{ - lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), - trackerStartTime: t, - processEndpointsMapChange: nil, - endpointSliceCache: NewEndpointSliceCache(testHostname, ipFamily, nil, nil), - }, + svcPortMap: make(ServicePortMap), + serviceChanges: NewServiceChangeTracker(ipFamily, nil, nil), + endpointsMap: make(EndpointsMap), + endpointsChanges: ect, } } diff --git a/pkg/proxy/serviceport.go b/pkg/proxy/serviceport.go index fa3f21758cc..cafda3b8af3 100644 --- a/pkg/proxy/serviceport.go +++ b/pkg/proxy/serviceport.go @@ -19,6 +19,7 @@ package proxy import ( "fmt" "net" + "strings" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -207,56 +208,34 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode] } - // filter external ips, source ranges and ingress ips - // prior to dual stack services, this was considered an error, but with dual stack - // services, this is actually expected. Hence we downgraded from reporting by events - // to just log lines with high verbosity + // Filter ExternalIPs to correct IP family ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs) info.externalIPs = ipFamilyMap[ipFamily] - // Log the IPs not matching the ipFamily - if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family", - "ipFamily", ipFamily, "externalIPs", ips, "service", klog.KObj(service)) + // Filter source ranges to correct IP family. Also deal with the fact that + // LoadBalancerSourceRanges validation mistakenly allows whitespace padding + loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges)) + for i, sourceRange := range service.Spec.LoadBalancerSourceRanges { + loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange) } - cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(service.Spec.LoadBalancerSourceRanges) + cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges) info.loadBalancerSourceRanges = cidrFamilyMap[ipFamily] - // Log the CIDRs not matching the ipFamily - if cidrs, ok := cidrFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family", - "ipFamily", ipFamily, "loadBalancerSourceRanges", cidrs, "service", klog.KObj(service)) - } - // Obtain Load Balancer Ingress - var invalidIPs []net.IP + // Filter Load Balancer Ingress IPs to correct IP family. While proxying load + // balancers might choose to proxy connections from an LB IP of one family to a + // service IP of another family, that's irrelevant to kube-proxy, which only + // creates rules for VIP-style load balancers. for _, ing := range service.Status.LoadBalancer.Ingress { - if ing.IP == "" { + if ing.IP == "" || !proxyutil.IsVIPMode(ing) { continue } - // proxy mode load balancers do not need to track the IPs in the service cache - // and they can also implement IP family translation, so no need to check if - // the status ingress.IP and the ClusterIP belong to the same family. - if !proxyutil.IsVIPMode(ing) { - klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode", - "ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service)) - continue - } - - // kube-proxy does not implement IP family translation, skip addresses with - // different IP family ip := netutils.ParseIPSloppy(ing.IP) // (already verified as an IP-address) if ingFamily := proxyutil.GetIPFamilyFromIP(ip); ingFamily == ipFamily { info.loadBalancerVIPs = append(info.loadBalancerVIPs, ip) - } else { - invalidIPs = append(invalidIPs, ip) } } - if len(invalidIPs) > 0 { - klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family", - "ipFamily", ipFamily, "loadBalancerIngressIPs", invalidIPs, "service", klog.KObj(service)) - } if apiservice.NeedsHealthCheck(service) { p := service.Spec.HealthCheckNodePort diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index 320d19bc49b..d9b13f9a18e 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -22,10 +22,8 @@ import ( "strings" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/core/v1/helper" @@ -86,21 +84,6 @@ func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string] return ips } -// LogAndEmitIncorrectIPVersionEvent logs and emits incorrect IP version event. -func LogAndEmitIncorrectIPVersionEvent(recorder events.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) { - errMsg := fmt.Sprintf("%s in %s has incorrect IP version", fieldValue, fieldName) - klog.ErrorS(nil, "Incorrect IP version", "service", klog.KRef(svcNamespace, svcName), "field", fieldName, "value", fieldValue) - if recorder != nil { - recorder.Eventf( - &v1.ObjectReference{ - Kind: "Service", - Name: svcName, - Namespace: svcNamespace, - UID: svcUID, - }, nil, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", "GatherEndpoints", errMsg) - } -} - // MapIPsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6) func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]net.IP { ipFamilyMap := map[v1.IPFamily][]net.IP{} diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index d50b4b39291..fbd3562e6fa 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -796,8 +796,8 @@ func NewProxier( terminatedEndpoints: make(map[string]bool), } - serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange) - endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange) + serviceChanges := proxy.NewServiceChangeTracker(ipFamily, proxier.newServiceInfo, proxier.serviceMapChange) + endPointChangeTracker := proxy.NewEndpointsChangeTracker(ipFamily, hostname, proxier.newEndpointInfo, proxier.endpointsMapChange) proxier.endpointsChanges = endPointChangeTracker proxier.serviceChanges = serviceChanges diff --git a/pkg/proxy/winkernel/proxier_test.go b/pkg/proxy/winkernel/proxier_test.go index 293cce56dc4..faefb236278 100644 --- a/pkg/proxy/winkernel/proxier_test.go +++ b/pkg/proxy/winkernel/proxier_test.go @@ -121,9 +121,9 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostn terminatedEndpoints: make(map[string]bool), } - serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange) - endpointsChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange) - proxier.endpointsChanges = endpointsChangeTracker + serviceChanges := proxy.NewServiceChangeTracker(v1.IPv4Protocol, proxier.newServiceInfo, proxier.serviceMapChange) + endpointChangeTracker := proxy.NewEndpointsChangeTracker(v1.IPv4Protocol, hostname, proxier.newEndpointInfo, proxier.endpointsMapChange) + proxier.endpointsChanges = endpointChangeTracker proxier.serviceChanges = serviceChanges return proxier