mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
Ensuring kube-proxy does not mutate shared EndpointSlices
This commit is contained in:
parent
55f81314cc
commit
49e4bd137b
@ -70,6 +70,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
@ -108,11 +108,13 @@ func newEndpointSliceTracker() *endpointSliceTracker {
|
|||||||
// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice.
|
// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice.
|
||||||
func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo {
|
func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo {
|
||||||
esInfo := &endpointSliceInfo{
|
esInfo := &endpointSliceInfo{
|
||||||
Ports: endpointSlice.Ports,
|
Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)),
|
||||||
Endpoints: []*endpointInfo{},
|
Endpoints: []*endpointInfo{},
|
||||||
Remove: remove,
|
Remove: remove,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copy here to avoid mutating shared EndpointSlice object.
|
||||||
|
copy(esInfo.Ports, endpointSlice.Ports)
|
||||||
sort.Sort(byPort(esInfo.Ports))
|
sort.Sort(byPort(esInfo.Ports))
|
||||||
|
|
||||||
if !remove {
|
if !remove {
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1beta1"
|
discovery "k8s.io/api/discovery/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
utilpointer "k8s.io/utils/pointer"
|
utilpointer "k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
@ -152,11 +153,13 @@ func TestEndpointsMapFromESC(t *testing.T) {
|
|||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
||||||
|
|
||||||
|
cmc := newCacheMutationCheck(tc.endpointSlices)
|
||||||
for _, endpointSlice := range tc.endpointSlices {
|
for _, endpointSlice := range tc.endpointSlices {
|
||||||
esCache.updatePending(endpointSlice, false)
|
esCache.updatePending(endpointSlice, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
compareEndpointsMapsStr(t, esCache.getEndpointsMap(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending), tc.expectedMap)
|
compareEndpointsMapsStr(t, esCache.getEndpointsMap(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending), tc.expectedMap)
|
||||||
|
cmc.Check(t)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -315,6 +318,8 @@ func TestEsInfoChanged(t *testing.T) {
|
|||||||
|
|
||||||
for name, tc := range testCases {
|
for name, tc := range testCases {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
|
cmc := newCacheMutationCheck([]*discovery.EndpointSlice{tc.initialSlice})
|
||||||
|
|
||||||
if tc.initialSlice != nil {
|
if tc.initialSlice != nil {
|
||||||
tc.cache.updatePending(tc.initialSlice, false)
|
tc.cache.updatePending(tc.initialSlice, false)
|
||||||
tc.cache.checkoutChanges()
|
tc.cache.checkoutChanges()
|
||||||
@ -331,6 +336,8 @@ func TestEsInfoChanged(t *testing.T) {
|
|||||||
if tc.expectChanged != changed {
|
if tc.expectChanged != changed {
|
||||||
t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed)
|
t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cmc.Check(t)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -378,3 +385,45 @@ func generateEndpointSliceWithOffset(serviceName, namespace string, sliceNum, of
|
|||||||
func generateEndpointSlice(serviceName, namespace string, sliceNum, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice {
|
func generateEndpointSlice(serviceName, namespace string, sliceNum, numEndpoints, unreadyMod int, hosts []string, portNums []*int32) *discovery.EndpointSlice {
|
||||||
return generateEndpointSliceWithOffset(serviceName, namespace, sliceNum, sliceNum, numEndpoints, unreadyMod, hosts, portNums)
|
return generateEndpointSliceWithOffset(serviceName, namespace, sliceNum, sliceNum, numEndpoints, unreadyMod, hosts, portNums)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cacheMutationCheck helps ensure that cached objects have not been changed
|
||||||
|
// in any way throughout a test run.
|
||||||
|
type cacheMutationCheck struct {
|
||||||
|
objects []cacheObject
|
||||||
|
}
|
||||||
|
|
||||||
|
// cacheObject stores a reference to an original object as well as a deep copy
|
||||||
|
// of that object to track any mutations in the original object.
|
||||||
|
type cacheObject struct {
|
||||||
|
original runtime.Object
|
||||||
|
deepCopy runtime.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
|
||||||
|
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
|
||||||
|
cmc := cacheMutationCheck{}
|
||||||
|
for _, endpointSlice := range endpointSlices {
|
||||||
|
cmc.Add(endpointSlice)
|
||||||
|
}
|
||||||
|
return cmc
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add appends a runtime.Object and a deep copy of that object into the
|
||||||
|
// cacheMutationCheck.
|
||||||
|
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
|
||||||
|
cmc.objects = append(cmc.objects, cacheObject{
|
||||||
|
original: o,
|
||||||
|
deepCopy: o.DeepCopyObject(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check verifies that no objects in the cacheMutationCheck have been mutated.
|
||||||
|
func (cmc *cacheMutationCheck) Check(t *testing.T) {
|
||||||
|
for _, o := range cmc.objects {
|
||||||
|
if !reflect.DeepEqual(o.original, o.deepCopy) {
|
||||||
|
// Cached objects can't be safely mutated and instead should be deep
|
||||||
|
// copied before changed in any way.
|
||||||
|
t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user