Deep copying EndpointSlices in reconciler before modifying them.

This commit is contained in:
Rob Scott 2019-11-15 15:28:05 -08:00
parent 65aefd3def
commit 4229b99203
No known key found for this signature in database
GPG Key ID: 53504C654CF4B3EE
3 changed files with 83 additions and 15 deletions

View File

@ -18,6 +18,7 @@ package endpointslice
import ( import (
"fmt" "fmt"
"reflect"
"testing" "testing"
"time" "time"
@ -25,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1" discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
@ -239,6 +241,8 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
AddressType: discovery.AddressTypeIPv4, AddressType: discovery.AddressTypeIPv4,
}} }}
cmc := newCacheMutationCheck(endpointSlices)
// need to add them to both store and fake clientset // need to add them to both store and fake clientset
for _, endpointSlice := range endpointSlices { for _, endpointSlice := range endpointSlices {
err := esController.endpointSliceStore.Add(endpointSlice) err := esController.endpointSliceStore.Add(endpointSlice)
@ -262,6 +266,9 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
// only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder // only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder
expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices") expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices")
expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices") expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices")
// ensure cache mutation has not occurred
cmc.Check(t)
} }
// Ensure SyncService handles a variety of protocols and IPs appropriately. // Ensure SyncService handles a variety of protocols and IPs appropriately.
@ -316,17 +323,17 @@ func TestSyncServiceFull(t *testing.T) {
assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoints in first slice") assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoints in first slice")
assert.Equal(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"], serviceCreateTime.Format(time.RFC3339Nano)) assert.Equal(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"], serviceCreateTime.Format(time.RFC3339Nano))
assert.EqualValues(t, []discovery.EndpointPort{{ assert.EqualValues(t, []discovery.EndpointPort{{
Name: strPtr("sctp-example"), Name: utilpointer.StringPtr("sctp-example"),
Protocol: protoPtr(v1.ProtocolSCTP), Protocol: protoPtr(v1.ProtocolSCTP),
Port: int32Ptr(int32(3456)), Port: utilpointer.Int32Ptr(int32(3456)),
}, { }, {
Name: strPtr("udp-example"), Name: utilpointer.StringPtr("udp-example"),
Protocol: protoPtr(v1.ProtocolUDP), Protocol: protoPtr(v1.ProtocolUDP),
Port: int32Ptr(int32(161)), Port: utilpointer.Int32Ptr(int32(161)),
}, { }, {
Name: strPtr("tcp-example"), Name: utilpointer.StringPtr("tcp-example"),
Protocol: protoPtr(v1.ProtocolTCP), Protocol: protoPtr(v1.ProtocolTCP),
Port: int32Ptr(int32(80)), Port: utilpointer.Int32Ptr(int32(80)),
}}, slice.Ports) }}, slice.Ports)
assert.ElementsMatch(t, []discovery.Endpoint{{ assert.ElementsMatch(t, []discovery.Endpoint{{
@ -382,14 +389,49 @@ func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, re
} }
} }
func strPtr(str string) *string { // protoPtr takes a Protocol and returns a pointer to it.
return &str
}
func protoPtr(proto v1.Protocol) *v1.Protocol { func protoPtr(proto v1.Protocol) *v1.Protocol {
return &proto return &proto
} }
func int32Ptr(num int32) *int32 { // cacheMutationCheck helps ensure that cached objects have not been changed
return &num // in any way throughout a test run.
type cacheMutationCheck struct {
objects []cacheObject
}
// cacheObject stores a reference to an original object as well as a deep copy
// of that object to track any mutations in the original object.
type cacheObject struct {
original runtime.Object
deepCopy runtime.Object
}
// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck {
cmc := cacheMutationCheck{}
for _, endpointSlice := range endpointSlices {
cmc.Add(endpointSlice)
}
return cmc
}
// Add appends a runtime.Object and a deep copy of that object into the
// cacheMutationCheck.
func (cmc *cacheMutationCheck) Add(o runtime.Object) {
cmc.objects = append(cmc.objects, cacheObject{
original: o,
deepCopy: o.DeepCopyObject(),
})
}
// Check verifies that no objects in the cacheMutationCheck have been mutated.
func (cmc *cacheMutationCheck) Check(t *testing.T) {
for _, o := range cmc.objects {
if !reflect.DeepEqual(o.original, o.deepCopy) {
// Cached objects can't be safely mutated and instead should be deep
// copied before changed in any way.
t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
}
}
} }

View File

@ -290,9 +290,11 @@ func (r *reconciler) reconcileByPortMapping(
// if no endpoints desired in this slice, mark for deletion // if no endpoints desired in this slice, mark for deletion
sliceNamesToDelete.Insert(existingSlice.Name) sliceNamesToDelete.Insert(existingSlice.Name)
} else { } else {
// otherwise, mark for update // otherwise, copy and mark for update
existingSlice.Endpoints = newEndpoints epSlice := existingSlice.DeepCopy()
sliceNamesToUpdate.Insert(existingSlice.Name) epSlice.Endpoints = newEndpoints
slicesByName[existingSlice.Name] = epSlice
sliceNamesToUpdate.Insert(epSlice.Name)
} }
} else { } else {
// slices with no changes will be useful if there are leftover endpoints // slices with no changes will be useful if there are leftover endpoints
@ -344,6 +346,10 @@ func (r *reconciler) reconcileByPortMapping(
// If we didn't find a sliceToFill, generate a new empty one. // If we didn't find a sliceToFill, generate a new empty one.
if sliceToFill == nil { if sliceToFill == nil {
sliceToFill = newEndpointSlice(service, endpointMeta) sliceToFill = newEndpointSlice(service, endpointMeta)
} else {
// deep copy required to modify this slice.
sliceToFill = sliceToFill.DeepCopy()
slicesByName[sliceToFill.Name] = sliceToFill
} }
// Fill the slice up with remaining endpoints. // Fill the slice up with remaining endpoints.

View File

@ -318,6 +318,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
} }
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2} existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
cmc := newCacheMutationCheck(existingSlices)
createEndpointSlices(t, client, namespace, existingSlices) createEndpointSlices(t, client, namespace, existingSlices)
numActionsBefore := len(client.Actions()) numActionsBefore := len(client.Actions())
@ -332,6 +333,9 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// 1 new slice (0->100) + 1 updated slice (62->89) // 1 new slice (0->100) + 1 updated slice (62->89)
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0}) expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0})
// ensure cache mutation has not occurred
cmc.Check(t)
} }
// now with preexisting slices, we have 300 pods matching a service // now with preexisting slices, we have 300 pods matching a service
@ -370,6 +374,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
} }
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2} existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
cmc := newCacheMutationCheck(existingSlices)
createEndpointSlices(t, client, namespace, existingSlices) createEndpointSlices(t, client, namespace, existingSlices)
numActionsBefore := len(client.Actions()) numActionsBefore := len(client.Actions())
@ -383,6 +388,9 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
// 2 new slices (100, 52) in addition to existing slices (74, 74) // 2 new slices (100, 52) in addition to existing slices (74, 74)
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0}) expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0})
// ensure cache mutation has not occurred
cmc.Check(t)
} }
// In some cases, such as a service port change, all slices for that service will require a change // In some cases, such as a service port change, all slices for that service will require a change
@ -445,6 +453,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc)) existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc))
} }
cmc := newCacheMutationCheck(existingSlices)
createEndpointSlices(t, client, namespace, existingSlices) createEndpointSlices(t, client, namespace, existingSlices)
numActionsBefore := len(client.Actions()) numActionsBefore := len(client.Actions())
@ -463,6 +472,9 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
// thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices // thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7}) expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7})
// ensure cache mutation has not occurred
cmc.Check(t)
} }
// In this test, we want to verify that endpoints are added to a slice that will // In this test, we want to verify that endpoints are added to a slice that will
@ -493,6 +505,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
} }
existingSlices = append(existingSlices, slice2) existingSlices = append(existingSlices, slice2)
cmc := newCacheMutationCheck(existingSlices)
createEndpointSlices(t, client, namespace, existingSlices) createEndpointSlices(t, client, namespace, existingSlices)
// ensure that endpoints in each slice will be marked for update. // ensure that endpoints in each slice will be marked for update.
@ -519,6 +532,9 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
// additional pods should get added to fuller slice // additional pods should get added to fuller slice
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20})
// ensure cache mutation has not occurred
cmc.Check(t)
} }
// In this test, we want to verify that old EndpointSlices with a deprecated IP // In this test, we want to verify that old EndpointSlices with a deprecated IP
@ -552,6 +568,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
createEndpointSlices(t, client, namespace, existingSlices) createEndpointSlices(t, client, namespace, existingSlices)
cmc := newCacheMutationCheck(existingSlices)
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &svc, pods, existingSlices, time.Now()) reconcileHelper(t, r, &svc, pods, existingSlices, time.Now())
@ -569,6 +586,9 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
t.Errorf("Expected address type to be IPv4, got %s", endpointSlice.AddressType) t.Errorf("Expected address type to be IPv4, got %s", endpointSlice.AddressType)
} }
} }
// ensure cache mutation has not occurred
cmc.Check(t)
} }
// Named ports can map to different port numbers on different pods. // Named ports can map to different port numbers on different pods.