diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 76a4e0da9b6..52ae583f941 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -35,6 +35,12 @@ import ( utilnet "k8s.io/utils/net" ) +var supportedEndpointSliceAddressTypes = sets.NewString( + string(discovery.AddressTypeIP), // IP is a deprecated address type + string(discovery.AddressTypeIPv4), + string(discovery.AddressTypeIPv6), +) + // BaseEndpointInfo contains base information that defines an endpoint. // This could be used directly by proxier while processing endpoints, // or can be used for constructing a more specific EndpointInfo struct @@ -173,6 +179,11 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool { // It returns true if items changed, otherwise return false. Will add/update/delete items of EndpointsChangeMap. // If removeSlice is true, slice will be removed, otherwise it will be added or updated. func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { + if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) { + klog.V(4).Infof("EndpointSlice address type not supported by kube-proxy: %s", endpointSlice.AddressType) + return false + } + // This should never happen if endpointSlice == nil { klog.Error("Nil endpointSlice passed to EndpointSliceUpdate") diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 4238aaaa299..8919d92f335 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -1431,6 +1431,9 @@ func TestLastChangeTriggerTime(t *testing.T) { } func TestEndpointSliceUpdate(t *testing.T) { + fqdnSlice := generateEndpointSlice("svc1", "ns1", 2, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}) + fqdnSlice.AddressType = discovery.AddressTypeFQDN + testCases := map[string]struct { startingSlices []*discovery.EndpointSlice endpointChangeTracker *EndpointChangeTracker @@ -1473,6 +1476,18 @@ func TestEndpointSliceUpdate(t *testing.T) { expectedReturnVal: false, expectedCurrentChange: nil, }, + // ensure that only valide address types are processed + "add an FQDN slice (invalid address type)": { + startingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: fqdnSlice, + paramRemoveSlice: false, + expectedReturnVal: false, + expectedCurrentChange: nil, + }, // test additions to existing state "add a slice that overlaps with existing state": { startingSlices: []*discovery.EndpointSlice{ diff --git a/pkg/proxy/ipvs/meta_proxier.go b/pkg/proxy/ipvs/meta_proxier.go index 9c567c26416..bef79df5984 100644 --- a/pkg/proxy/ipvs/meta_proxier.go +++ b/pkg/proxy/ipvs/meta_proxier.go @@ -153,25 +153,47 @@ func (proxier *metaProxier) OnEndpointsSynced() { // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object // is observed. func (proxier *metaProxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) { - // noop + switch endpointSlice.AddressType { + case discovery.AddressTypeIPv4: + proxier.ipv4Proxier.OnEndpointSliceAdd(endpointSlice) + case discovery.AddressTypeIPv6: + proxier.ipv6Proxier.OnEndpointSliceAdd(endpointSlice) + default: + klog.V(4).Infof("EndpointSlice address type not supported by kube-proxy: %s", endpointSlice.AddressType) + } } // OnEndpointSliceUpdate is called whenever modification of an existing endpoint // slice object is observed. -func (proxier *metaProxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) { - //noop +func (proxier *metaProxier) OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discovery.EndpointSlice) { + switch newEndpointSlice.AddressType { + case discovery.AddressTypeIPv4: + proxier.ipv4Proxier.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice) + case discovery.AddressTypeIPv6: + proxier.ipv6Proxier.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice) + default: + klog.V(4).Infof("EndpointSlice address type not supported by kube-proxy: %s", newEndpointSlice.AddressType) + } } // OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice // object is observed. func (proxier *metaProxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) { - //noop + switch endpointSlice.AddressType { + case discovery.AddressTypeIPv4: + proxier.ipv4Proxier.OnEndpointSliceDelete(endpointSlice) + case discovery.AddressTypeIPv6: + proxier.ipv6Proxier.OnEndpointSliceDelete(endpointSlice) + default: + klog.V(4).Infof("EndpointSlice address type not supported by kube-proxy: %s", endpointSlice.AddressType) + } } // OnEndpointSlicesSynced is called once all the initial event handlers were // called and the state is fully propagated to local cache. func (proxier *metaProxier) OnEndpointSlicesSynced() { - //noop + proxier.ipv4Proxier.OnEndpointSlicesSynced() + proxier.ipv6Proxier.OnEndpointSlicesSynced() } // endpointsIPFamily that returns IPFamily of endpoints or error if