Make change trackers just ignore the "wrong" IP family

Dual-stack clusters exist; ServiceChangeTracker does not need to log
messages (even at V(4)) when it sees dual-stack Services, and
EndpointsChangeTracker does not need to emit Events(!) when it sees
EndpointSlices of the wrong AddressType.

(Though in most cases the EndpointsChangeTracker Events would not get
emitted anyway, since the MetaProxier would ensure that only the v4
tracker saw v4 slices, and only the v6 tracker saw v6 slices.)

Also remove a nil check labeled "This should never happen" which, in
fact, we know *didn't* happen, since the function has already
dereferenced the value before it checking it against nil.
This commit is contained in:
Dan Winship 2024-04-29 16:32:22 -04:00
parent 2c348bf186
commit 79d1c078bb
5 changed files with 36 additions and 92 deletions

View File

@ -29,11 +29,6 @@ import (
"k8s.io/kubernetes/pkg/proxy/metrics"
)
var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType](
discovery.AddressTypeIPv4,
discovery.AddressTypeIPv6,
)
// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name.
type EndpointsChangeTracker struct {
@ -45,6 +40,9 @@ type EndpointsChangeTracker struct {
// any Proxier-specific cleanup.
processEndpointsMapChange processEndpointsMapChangeFunc
// addressType is the type of EndpointSlice this proxy tracks
addressType discovery.AddressType
// endpointSliceCache holds a simplified version of endpoint slices.
endpointSliceCache *EndpointSliceCache
@ -62,12 +60,18 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName)
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
// NewEndpointsChangeTracker initializes an EndpointsChangeTracker
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, _ events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
addressType := discovery.AddressTypeIPv4
if ipFamily == v1.IPv6Protocol {
addressType = discovery.AddressTypeIPv6
}
return &EndpointsChangeTracker{
addressType: addressType,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: time.Now(),
processEndpointsMapChange: processEndpointsMapChange,
endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo),
endpointSliceCache: NewEndpointSliceCache(hostname, makeEndpointInfo),
}
}
@ -76,14 +80,8 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun
// change that needs to be synced; note that this is different from the return value of
// ServiceChangeTracker.Update().
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
return false
}
// This should never happen
if endpointSlice == nil {
klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
if endpointSlice.AddressType != ect.addressType {
klog.V(4).InfoS("Ignoring unsupported EndpointSlice", "endpointSlice", klog.KObj(endpointSlice), "type", endpointSlice.AddressType, "expected", ect.addressType)
return false
}

View File

@ -22,15 +22,12 @@ import (
"sort"
"sync"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
utilnet "k8s.io/utils/net"
)
@ -49,8 +46,6 @@ type EndpointSliceCache struct {
makeEndpointInfo makeEndpointFunc
hostname string
ipFamily v1.IPFamily
recorder events.EventRecorder
}
// endpointSliceTracker keeps track of EndpointSlices as they have been applied
@ -72,16 +67,14 @@ type endpointSliceData struct {
}
// NewEndpointSliceCache initializes an EndpointSliceCache.
func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
func NewEndpointSliceCache(hostname string, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
if makeEndpointInfo == nil {
makeEndpointInfo = standardEndpointInfo
}
return &EndpointSliceCache{
trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{},
hostname: hostname,
ipFamily: ipFamily,
makeEndpointInfo: makeEndpointInfo,
recorder: recorder,
}
}
@ -213,15 +206,6 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port
continue
}
// Filter out the incorrect IP version case. Any endpoint port that
// contains incorrect IP version will be ignored.
if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) {
// Emit event on the corresponding service which had a different IP
// version than the endpoint.
proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "")
continue
}
isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName)
ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready

View File

@ -205,7 +205,7 @@ func TestEndpointsMapFromESC(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil)
esCache := NewEndpointSliceCache(tc.hostname, nil)
cmc := newCacheMutationCheck(tc.endpointSlices)
for _, endpointSlice := range tc.endpointSlices {
@ -315,7 +315,7 @@ func TestEndpointInfoByServicePort(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil)
esCache := NewEndpointSliceCache(tc.hostname, nil)
for _, endpointSlice := range tc.endpointSlices {
esCache.updatePending(endpointSlice, false)
@ -350,7 +350,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged bool
}{
"identical slices, ports only": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port80},
@ -362,7 +362,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: false,
},
"identical slices, ports out of order": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443, port80},
@ -374,7 +374,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: true,
},
"port removed": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443, port80},
@ -386,7 +386,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: true,
},
"port added": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443},
@ -398,7 +398,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: true,
},
"identical with endpoints": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443},
@ -412,7 +412,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: false,
},
"identical with endpoints out of order": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443},
@ -426,7 +426,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: true,
},
"identical with endpoint added": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443},

View File

@ -19,6 +19,7 @@ package proxy
import (
"fmt"
"net"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
@ -207,56 +208,34 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv
info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode]
}
// filter external ips, source ranges and ingress ips
// prior to dual stack services, this was considered an error, but with dual stack
// services, this is actually expected. Hence we downgraded from reporting by events
// to just log lines with high verbosity
// Filter ExternalIPs to correct IP family
ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs)
info.externalIPs = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family",
"ipFamily", ipFamily, "externalIPs", ips, "service", klog.KObj(service))
// Filter source ranges to correct IP family. Also deal with the fact that
// LoadBalancerSourceRanges validation mistakenly allows whitespace padding
loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges))
for i, sourceRange := range service.Spec.LoadBalancerSourceRanges {
loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange)
}
cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(service.Spec.LoadBalancerSourceRanges)
cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges)
info.loadBalancerSourceRanges = cidrFamilyMap[ipFamily]
// Log the CIDRs not matching the ipFamily
if cidrs, ok := cidrFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family",
"ipFamily", ipFamily, "loadBalancerSourceRanges", cidrs, "service", klog.KObj(service))
}
// Obtain Load Balancer Ingress
var invalidIPs []net.IP
// Filter Load Balancer Ingress IPs to correct IP family. While proxying load
// balancers might choose to proxy connections from an LB IP of one family to a
// service IP of another family, that's irrelevant to kube-proxy, which only
// creates rules for VIP-style load balancers.
for _, ing := range service.Status.LoadBalancer.Ingress {
if ing.IP == "" {
if ing.IP == "" || !proxyutil.IsVIPMode(ing) {
continue
}
// proxy mode load balancers do not need to track the IPs in the service cache
// and they can also implement IP family translation, so no need to check if
// the status ingress.IP and the ClusterIP belong to the same family.
if !proxyutil.IsVIPMode(ing) {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode",
"ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service))
continue
}
// kube-proxy does not implement IP family translation, skip addresses with
// different IP family
ip := netutils.ParseIPSloppy(ing.IP) // (already verified as an IP-address)
if ingFamily := proxyutil.GetIPFamilyFromIP(ip); ingFamily == ipFamily {
info.loadBalancerVIPs = append(info.loadBalancerVIPs, ip)
} else {
invalidIPs = append(invalidIPs, ip)
}
}
if len(invalidIPs) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
"ipFamily", ipFamily, "loadBalancerIngressIPs", invalidIPs, "service", klog.KObj(service))
}
if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort

View File

@ -22,10 +22,8 @@ import (
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
@ -86,21 +84,6 @@ func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string]
return ips
}
// LogAndEmitIncorrectIPVersionEvent logs and emits incorrect IP version event.
func LogAndEmitIncorrectIPVersionEvent(recorder events.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) {
errMsg := fmt.Sprintf("%s in %s has incorrect IP version", fieldValue, fieldName)
klog.ErrorS(nil, "Incorrect IP version", "service", klog.KRef(svcNamespace, svcName), "field", fieldName, "value", fieldValue)
if recorder != nil {
recorder.Eventf(
&v1.ObjectReference{
Kind: "Service",
Name: svcName,
Namespace: svcNamespace,
UID: svcUID,
}, nil, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", "GatherEndpoints", errMsg)
}
}
// MapIPsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6)
func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]net.IP {
ipFamilyMap := map[v1.IPFamily][]net.IP{}