Merge pull request #128819 from danwinship/tracker-cleanup

cleanups to proxy change trackers
This commit is contained in:
Kubernetes Prow Robot 2024-12-14 20:24:43 +01:00 committed by GitHub
commit be1c99186f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 77 additions and 144 deletions

View File

@ -62,7 +62,7 @@ func TestCleanStaleEntries(t *testing.T) {
// interface, or else use a proxy.ServiceChangeTracker and proxy.NewEndpointsChangeTracker
// to construct them and fill in the maps for us.
sct := proxy.NewServiceChangeTracker(nil, v1.IPv4Protocol, nil, nil)
sct := proxy.NewServiceChangeTracker(v1.IPv4Protocol, nil, nil)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: testServiceName,
@ -104,7 +104,7 @@ func TestCleanStaleEntries(t *testing.T) {
svcPortMap := make(proxy.ServicePortMap)
_ = svcPortMap.Update(sct)
ect := proxy.NewEndpointsChangeTracker("test-worker", nil, v1.IPv4Protocol, nil, nil)
ect := proxy.NewEndpointsChangeTracker(v1.IPv4Protocol, "test-worker", nil, nil)
eps := &discovery.EndpointSlice{
TypeMeta: metav1.TypeMeta{},
AddressType: discovery.AddressTypeIPv4,

View File

@ -24,16 +24,10 @@ import (
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/metrics"
)
var supportedEndpointSliceAddressTypes = sets.New[discovery.AddressType](
discovery.AddressTypeIPv4,
discovery.AddressTypeIPv6,
)
// EndpointsChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name.
type EndpointsChangeTracker struct {
@ -45,6 +39,9 @@ type EndpointsChangeTracker struct {
// any Proxier-specific cleanup.
processEndpointsMapChange processEndpointsMapChangeFunc
// addressType is the type of EndpointSlice this proxy tracks
addressType discovery.AddressType
// endpointSliceCache holds a simplified version of endpoint slices.
endpointSliceCache *EndpointSliceCache
@ -62,12 +59,18 @@ type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName)
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
// NewEndpointsChangeTracker initializes an EndpointsChangeTracker
func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
func NewEndpointsChangeTracker(ipFamily v1.IPFamily, hostname string, makeEndpointInfo makeEndpointFunc, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointsChangeTracker {
addressType := discovery.AddressTypeIPv4
if ipFamily == v1.IPv6Protocol {
addressType = discovery.AddressTypeIPv6
}
return &EndpointsChangeTracker{
addressType: addressType,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: time.Now(),
processEndpointsMapChange: processEndpointsMapChange,
endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo),
endpointSliceCache: NewEndpointSliceCache(hostname, makeEndpointInfo),
}
}
@ -76,14 +79,8 @@ func NewEndpointsChangeTracker(hostname string, makeEndpointInfo makeEndpointFun
// change that needs to be synced; note that this is different from the return value of
// ServiceChangeTracker.Update().
func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
if !supportedEndpointSliceAddressTypes.Has(endpointSlice.AddressType) {
klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
return false
}
// This should never happen
if endpointSlice == nil {
klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
if endpointSlice.AddressType != ect.addressType {
klog.V(4).InfoS("Ignoring unsupported EndpointSlice", "endpointSlice", klog.KObj(endpointSlice), "type", endpointSlice.AddressType, "expected", ect.addressType)
return false
}

View File

@ -1037,7 +1037,6 @@ func TestUpdateEndpointsMap(t *testing.T) {
for tci, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
fp.hostname = testHostname
// First check that after adding all previous versions of endpoints,
// the fp.previousEndpointsMap is as we expect.
@ -1245,7 +1244,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
// test starting from an empty state
"add a simple slice that doesn't already exist": {
startingSlices: []*discovery.EndpointSlice{},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
@ -1268,7 +1267,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
@ -1280,7 +1279,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
@ -1293,7 +1292,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
@ -1325,7 +1324,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80)}),
paramRemoveSlice: false,
@ -1355,7 +1354,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: true,
@ -1377,7 +1376,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: true,
@ -1389,7 +1388,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
@ -1412,7 +1411,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
@ -1434,7 +1433,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
@ -1462,7 +1461,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "host1", nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
@ -1522,13 +1521,13 @@ func TestCheckoutChanges(t *testing.T) {
pendingSlices []*discovery.EndpointSlice
}{
"empty slices": {
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil),
expectedChanges: []*endpointsChange{},
appliedSlices: []*discovery.EndpointSlice{},
pendingSlices: []*discovery.EndpointSlice{},
},
"adding initial slice": {
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{},
current: EndpointsMap{
@ -1545,7 +1544,7 @@ func TestCheckoutChanges(t *testing.T) {
},
},
"removing port in update": {
endpointsChangeTracker: NewEndpointsChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
endpointsChangeTracker: NewEndpointsChangeTracker(v1.IPv4Protocol, "", nil, nil),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{
svcPortName0: []Endpoint{

View File

@ -22,15 +22,12 @@ import (
"sort"
"sync"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
utilnet "k8s.io/utils/net"
)
@ -49,8 +46,6 @@ type EndpointSliceCache struct {
makeEndpointInfo makeEndpointFunc
hostname string
ipFamily v1.IPFamily
recorder events.EventRecorder
}
// endpointSliceTracker keeps track of EndpointSlices as they have been applied
@ -72,16 +67,14 @@ type endpointSliceData struct {
}
// NewEndpointSliceCache initializes an EndpointSliceCache.
func NewEndpointSliceCache(hostname string, ipFamily v1.IPFamily, recorder events.EventRecorder, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
func NewEndpointSliceCache(hostname string, makeEndpointInfo makeEndpointFunc) *EndpointSliceCache {
if makeEndpointInfo == nil {
makeEndpointInfo = standardEndpointInfo
}
return &EndpointSliceCache{
trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{},
hostname: hostname,
ipFamily: ipFamily,
makeEndpointInfo: makeEndpointInfo,
recorder: recorder,
}
}
@ -213,15 +206,6 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port
continue
}
// Filter out the incorrect IP version case. Any endpoint port that
// contains incorrect IP version will be ignored.
if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) {
// Emit event on the corresponding service which had a different IP
// version than the endpoint.
proxyutil.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "")
continue
}
isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName)
ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready

View File

@ -205,7 +205,7 @@ func TestEndpointsMapFromESC(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil)
esCache := NewEndpointSliceCache(tc.hostname, nil)
cmc := newCacheMutationCheck(tc.endpointSlices)
for _, endpointSlice := range tc.endpointSlices {
@ -315,7 +315,7 @@ func TestEndpointInfoByServicePort(t *testing.T) {
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
esCache := NewEndpointSliceCache(tc.hostname, v1.IPv4Protocol, nil, nil)
esCache := NewEndpointSliceCache(tc.hostname, nil)
for _, endpointSlice := range tc.endpointSlices {
esCache.updatePending(endpointSlice, false)
@ -350,7 +350,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged bool
}{
"identical slices, ports only": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port80},
@ -362,7 +362,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: false,
},
"identical slices, ports out of order": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443, port80},
@ -374,7 +374,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: true,
},
"port removed": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443, port80},
@ -386,7 +386,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: true,
},
"port added": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443},
@ -398,7 +398,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: true,
},
"identical with endpoints": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443},
@ -412,7 +412,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: false,
},
"identical with endpoints out of order": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443},
@ -426,7 +426,7 @@ func TestEsDataChanged(t *testing.T) {
expectChanged: true,
},
"identical with endpoint added": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
cache: NewEndpointSliceCache("", nil),
initialSlice: &discovery.EndpointSlice{
ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port443},
@ -610,7 +610,6 @@ func TestEndpointSliceCacheClearedCorrectly(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
fp.hostname = testHostname
for _, epSlice := range tc.currEndpointSlices {
fp.addEndpointSlice(epSlice)
@ -648,7 +647,6 @@ func TestSameServiceEndpointSliceCacheClearedCorrectly(t *testing.T) {
}
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
fp.hostname = testHostname
for _, epSlice := range currEndpointSlices {
fp.addEndpointSlice(epSlice)

View File

@ -286,9 +286,9 @@ func NewProxier(ctx context.Context,
proxier := &Proxier{
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, newEndpointInfo, nil),
needFullSync: true,
syncPeriod: syncPeriod,
iptables: ipt,

View File

@ -116,9 +116,9 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
p := &Proxier{
ipFamily: ipfamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipfamily, nil, nil),
serviceChanges: proxy.NewServiceChangeTracker(ipfamily, newServiceInfo, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipfamily, nil, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(ipfamily, testHostname, newEndpointInfo, nil),
needFullSync: true,
iptables: ipt,
masqueradeMark: "0x4000",

View File

@ -375,9 +375,9 @@ func NewProxier(
proxier := &Proxier{
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, nil, ipFamily, recorder, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, nil, nil),
initialSync: true,
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,

View File

@ -140,9 +140,9 @@ func NewFakeProxier(ctx context.Context, ipt utiliptables.Interface, ipvs utilip
}
p := &Proxier{
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, nil, ipFamily, nil, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, testHostname, nil, nil),
excludeCIDRs: excludeCIDRs,
iptables: ipt,
ipvs: ipvs,

View File

@ -250,9 +250,9 @@ func NewProxier(ctx context.Context,
proxier := &Proxier{
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, hostname, newEndpointInfo, nil),
needFullSync: true,
syncPeriod: syncPeriod,
nftables: nft,

View File

@ -118,9 +118,9 @@ func NewFakeProxier(ipFamily v1.IPFamily) (*knftables.Fake, *Proxier) {
p := &Proxier{
ipFamily: ipFamily,
svcPortMap: make(proxy.ServicePortMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, nil, nil),
serviceChanges: proxy.NewServiceChangeTracker(ipFamily, newServiceInfo, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointsChangeTracker(testHostname, newEndpointInfo, ipFamily, nil, nil),
endpointsChanges: proxy.NewEndpointsChangeTracker(ipFamily, testHostname, newEndpointInfo, nil),
needFullSync: true,
nftables: nft,
masqueradeMark: "0x4000",

View File

@ -23,7 +23,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy/metrics"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
@ -46,7 +45,6 @@ type ServiceChangeTracker struct {
processServiceMapChange processServiceMapChangeFunc
ipFamily v1.IPFamily
recorder events.EventRecorder
}
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServicePortInfo) ServicePort
@ -61,11 +59,10 @@ type serviceChange struct {
}
// NewServiceChangeTracker initializes a ServiceChangeTracker
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
func NewServiceChangeTracker(ipFamily v1.IPFamily, makeServiceInfo makeServicePortFunc, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
return &ServiceChangeTracker{
items: make(map[types.NamespacedName]*serviceChange),
makeServiceInfo: makeServiceInfo,
recorder: recorder,
ipFamily: ipFamily,
processServiceMapChange: processServiceMapChange,
}

View File

@ -577,7 +577,7 @@ func TestServiceToServiceMap(t *testing.T) {
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.31"))
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.LoadBalancerIPMode, tc.ipModeEnabled)
svcTracker := NewServiceChangeTracker(nil, tc.ipFamily, nil, nil)
svcTracker := NewServiceChangeTracker(tc.ipFamily, nil, nil)
// outputs
newServices := svcTracker.serviceToServiceMap(tc.service)
@ -621,20 +621,16 @@ type FakeProxier struct {
serviceChanges *ServiceChangeTracker
svcPortMap ServicePortMap
endpointsMap EndpointsMap
hostname string
}
func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
ect := NewEndpointsChangeTracker(ipFamily, testHostname, nil, nil)
ect.trackerStartTime = t
return &FakeProxier{
svcPortMap: make(ServicePortMap),
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: &EndpointsChangeTracker{
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: t,
processEndpointsMapChange: nil,
endpointSliceCache: NewEndpointSliceCache(testHostname, ipFamily, nil, nil),
},
svcPortMap: make(ServicePortMap),
serviceChanges: NewServiceChangeTracker(ipFamily, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: ect,
}
}

View File

@ -19,6 +19,7 @@ package proxy
import (
"fmt"
"net"
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
@ -207,56 +208,34 @@ func newBaseServiceInfo(service *v1.Service, ipFamily v1.IPFamily, port *v1.Serv
info.hintsAnnotation = service.Annotations[v1.AnnotationTopologyMode]
}
// filter external ips, source ranges and ingress ips
// prior to dual stack services, this was considered an error, but with dual stack
// services, this is actually expected. Hence we downgraded from reporting by events
// to just log lines with high verbosity
// Filter ExternalIPs to correct IP family
ipFamilyMap := proxyutil.MapIPsByIPFamily(service.Spec.ExternalIPs)
info.externalIPs = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family",
"ipFamily", ipFamily, "externalIPs", ips, "service", klog.KObj(service))
// Filter source ranges to correct IP family. Also deal with the fact that
// LoadBalancerSourceRanges validation mistakenly allows whitespace padding
loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges))
for i, sourceRange := range service.Spec.LoadBalancerSourceRanges {
loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange)
}
cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(service.Spec.LoadBalancerSourceRanges)
cidrFamilyMap := proxyutil.MapCIDRsByIPFamily(loadBalancerSourceRanges)
info.loadBalancerSourceRanges = cidrFamilyMap[ipFamily]
// Log the CIDRs not matching the ipFamily
if cidrs, ok := cidrFamilyMap[proxyutil.OtherIPFamily(ipFamily)]; ok && len(cidrs) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family",
"ipFamily", ipFamily, "loadBalancerSourceRanges", cidrs, "service", klog.KObj(service))
}
// Obtain Load Balancer Ingress
var invalidIPs []net.IP
// Filter Load Balancer Ingress IPs to correct IP family. While proxying load
// balancers might choose to proxy connections from an LB IP of one family to a
// service IP of another family, that's irrelevant to kube-proxy, which only
// creates rules for VIP-style load balancers.
for _, ing := range service.Status.LoadBalancer.Ingress {
if ing.IP == "" {
if ing.IP == "" || !proxyutil.IsVIPMode(ing) {
continue
}
// proxy mode load balancers do not need to track the IPs in the service cache
// and they can also implement IP family translation, so no need to check if
// the status ingress.IP and the ClusterIP belong to the same family.
if !proxyutil.IsVIPMode(ing) {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IP for given Service as it using Proxy mode",
"ipFamily", ipFamily, "loadBalancerIngressIP", ing.IP, "service", klog.KObj(service))
continue
}
// kube-proxy does not implement IP family translation, skip addresses with
// different IP family
ip := netutils.ParseIPSloppy(ing.IP) // (already verified as an IP-address)
if ingFamily := proxyutil.GetIPFamilyFromIP(ip); ingFamily == ipFamily {
info.loadBalancerVIPs = append(info.loadBalancerVIPs, ip)
} else {
invalidIPs = append(invalidIPs, ip)
}
}
if len(invalidIPs) > 0 {
klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
"ipFamily", ipFamily, "loadBalancerIngressIPs", invalidIPs, "service", klog.KObj(service))
}
if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort

View File

@ -22,10 +22,8 @@ import (
"strings"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
@ -86,21 +84,6 @@ func AddressSet(isValid func(ip net.IP) bool, addrs []net.Addr) sets.Set[string]
return ips
}
// LogAndEmitIncorrectIPVersionEvent logs and emits incorrect IP version event.
func LogAndEmitIncorrectIPVersionEvent(recorder events.EventRecorder, fieldName, fieldValue, svcNamespace, svcName string, svcUID types.UID) {
errMsg := fmt.Sprintf("%s in %s has incorrect IP version", fieldValue, fieldName)
klog.ErrorS(nil, "Incorrect IP version", "service", klog.KRef(svcNamespace, svcName), "field", fieldName, "value", fieldValue)
if recorder != nil {
recorder.Eventf(
&v1.ObjectReference{
Kind: "Service",
Name: svcName,
Namespace: svcNamespace,
UID: svcUID,
}, nil, v1.EventTypeWarning, "KubeProxyIncorrectIPVersion", "GatherEndpoints", errMsg)
}
}
// MapIPsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6)
func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]net.IP {
ipFamilyMap := map[v1.IPFamily][]net.IP{}

View File

@ -796,8 +796,8 @@ func NewProxier(
terminatedEndpoints: make(map[string]bool),
}
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
endPointChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
serviceChanges := proxy.NewServiceChangeTracker(ipFamily, proxier.newServiceInfo, proxier.serviceMapChange)
endPointChangeTracker := proxy.NewEndpointsChangeTracker(ipFamily, hostname, proxier.newEndpointInfo, proxier.endpointsMapChange)
proxier.endpointsChanges = endPointChangeTracker
proxier.serviceChanges = serviceChanges

View File

@ -121,9 +121,9 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, hostn
terminatedEndpoints: make(map[string]bool),
}
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange)
endpointsChangeTracker := proxy.NewEndpointsChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange)
proxier.endpointsChanges = endpointsChangeTracker
serviceChanges := proxy.NewServiceChangeTracker(v1.IPv4Protocol, proxier.newServiceInfo, proxier.serviceMapChange)
endpointChangeTracker := proxy.NewEndpointsChangeTracker(v1.IPv4Protocol, hostname, proxier.newEndpointInfo, proxier.endpointsMapChange)
proxier.endpointsChanges = endpointChangeTracker
proxier.serviceChanges = serviceChanges
return proxier