From 4229b992032b3c671a95652a076175d1a6c49c6a Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Fri, 15 Nov 2019 15:28:05 -0800 Subject: [PATCH] Deep copying EndpointSlices in reconciler before modifying them. --- .../endpointslice_controller_test.go | 66 +++++++++++++++---- pkg/controller/endpointslice/reconciler.go | 12 +++- .../endpointslice/reconciler_test.go | 20 ++++++ 3 files changed, 83 insertions(+), 15 deletions(-) diff --git a/pkg/controller/endpointslice/endpointslice_controller_test.go b/pkg/controller/endpointslice/endpointslice_controller_test.go index 98a51055d37..737f5b23915 100644 --- a/pkg/controller/endpointslice/endpointslice_controller_test.go +++ b/pkg/controller/endpointslice/endpointslice_controller_test.go @@ -18,6 +18,7 @@ package endpointslice import ( "fmt" + "reflect" "testing" "time" @@ -25,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" @@ -239,6 +241,8 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) { AddressType: discovery.AddressTypeIPv4, }} + cmc := newCacheMutationCheck(endpointSlices) + // need to add them to both store and fake clientset for _, endpointSlice := range endpointSlices { err := esController.endpointSliceStore.Add(endpointSlice) @@ -262,6 +266,9 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) { // only 2 slices should match, 2 should be deleted, 1 should be updated as a placeholder expectAction(t, client.Actions(), numActionsBefore, "update", "endpointslices") expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices") + + // ensure cache mutation has not occurred + cmc.Check(t) } // Ensure SyncService handles a variety of protocols and IPs appropriately. @@ -316,17 +323,17 @@ func TestSyncServiceFull(t *testing.T) { assert.Len(t, slice.Endpoints, 1, "Expected 1 endpoints in first slice") assert.Equal(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"], serviceCreateTime.Format(time.RFC3339Nano)) assert.EqualValues(t, []discovery.EndpointPort{{ - Name: strPtr("sctp-example"), + Name: utilpointer.StringPtr("sctp-example"), Protocol: protoPtr(v1.ProtocolSCTP), - Port: int32Ptr(int32(3456)), + Port: utilpointer.Int32Ptr(int32(3456)), }, { - Name: strPtr("udp-example"), + Name: utilpointer.StringPtr("udp-example"), Protocol: protoPtr(v1.ProtocolUDP), - Port: int32Ptr(int32(161)), + Port: utilpointer.Int32Ptr(int32(161)), }, { - Name: strPtr("tcp-example"), + Name: utilpointer.StringPtr("tcp-example"), Protocol: protoPtr(v1.ProtocolTCP), - Port: int32Ptr(int32(80)), + Port: utilpointer.Int32Ptr(int32(80)), }}, slice.Ports) assert.ElementsMatch(t, []discovery.Endpoint{{ @@ -382,14 +389,49 @@ func expectAction(t *testing.T, actions []k8stesting.Action, index int, verb, re } } -func strPtr(str string) *string { - return &str -} - +// protoPtr takes a Protocol and returns a pointer to it. func protoPtr(proto v1.Protocol) *v1.Protocol { return &proto } -func int32Ptr(num int32) *int32 { - return &num +// cacheMutationCheck helps ensure that cached objects have not been changed +// in any way throughout a test run. +type cacheMutationCheck struct { + objects []cacheObject +} + +// cacheObject stores a reference to an original object as well as a deep copy +// of that object to track any mutations in the original object. +type cacheObject struct { + original runtime.Object + deepCopy runtime.Object +} + +// newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices. +func newCacheMutationCheck(endpointSlices []*discovery.EndpointSlice) cacheMutationCheck { + cmc := cacheMutationCheck{} + for _, endpointSlice := range endpointSlices { + cmc.Add(endpointSlice) + } + return cmc +} + +// Add appends a runtime.Object and a deep copy of that object into the +// cacheMutationCheck. +func (cmc *cacheMutationCheck) Add(o runtime.Object) { + cmc.objects = append(cmc.objects, cacheObject{ + original: o, + deepCopy: o.DeepCopyObject(), + }) +} + +// Check verifies that no objects in the cacheMutationCheck have been mutated. +func (cmc *cacheMutationCheck) Check(t *testing.T) { + for _, o := range cmc.objects { + if !reflect.DeepEqual(o.original, o.deepCopy) { + // Cached objects can't be safely mutated and instead should be deep + // copied before changed in any way. + t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original) + } + } } diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 4f6af23e2ee..ff0193058d5 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -290,9 +290,11 @@ func (r *reconciler) reconcileByPortMapping( // if no endpoints desired in this slice, mark for deletion sliceNamesToDelete.Insert(existingSlice.Name) } else { - // otherwise, mark for update - existingSlice.Endpoints = newEndpoints - sliceNamesToUpdate.Insert(existingSlice.Name) + // otherwise, copy and mark for update + epSlice := existingSlice.DeepCopy() + epSlice.Endpoints = newEndpoints + slicesByName[existingSlice.Name] = epSlice + sliceNamesToUpdate.Insert(epSlice.Name) } } else { // slices with no changes will be useful if there are leftover endpoints @@ -344,6 +346,10 @@ func (r *reconciler) reconcileByPortMapping( // If we didn't find a sliceToFill, generate a new empty one. if sliceToFill == nil { sliceToFill = newEndpointSlice(service, endpointMeta) + } else { + // deep copy required to modify this slice. + sliceToFill = sliceToFill.DeepCopy() + slicesByName[sliceToFill.Name] = sliceToFill } // Fill the slice up with remaining endpoints. diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index a8b0644e4bb..a746f8b42e1 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -318,6 +318,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) { } existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2} + cmc := newCacheMutationCheck(existingSlices) createEndpointSlices(t, client, namespace, existingSlices) numActionsBefore := len(client.Actions()) @@ -332,6 +333,9 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) { // 1 new slice (0->100) + 1 updated slice (62->89) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100}) expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0}) + + // ensure cache mutation has not occurred + cmc.Check(t) } // now with preexisting slices, we have 300 pods matching a service @@ -370,6 +374,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) { } existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2} + cmc := newCacheMutationCheck(existingSlices) createEndpointSlices(t, client, namespace, existingSlices) numActionsBefore := len(client.Actions()) @@ -383,6 +388,9 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) { // 2 new slices (100, 52) in addition to existing slices (74, 74) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52}) expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0}) + + // ensure cache mutation has not occurred + cmc.Check(t) } // In some cases, such as a service port change, all slices for that service will require a change @@ -445,6 +453,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) { existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc)) } + cmc := newCacheMutationCheck(existingSlices) createEndpointSlices(t, client, namespace, existingSlices) numActionsBefore := len(client.Actions()) @@ -463,6 +472,9 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) { // thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100}) expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7}) + + // ensure cache mutation has not occurred + cmc.Check(t) } // In this test, we want to verify that endpoints are added to a slice that will @@ -493,6 +505,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { } existingSlices = append(existingSlices, slice2) + cmc := newCacheMutationCheck(existingSlices) createEndpointSlices(t, client, namespace, existingSlices) // ensure that endpoints in each slice will be marked for update. @@ -519,6 +532,9 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { // additional pods should get added to fuller slice expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20}) + + // ensure cache mutation has not occurred + cmc.Check(t) } // In this test, we want to verify that old EndpointSlices with a deprecated IP @@ -552,6 +568,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) { createEndpointSlices(t, client, namespace, existingSlices) + cmc := newCacheMutationCheck(existingSlices) r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) reconcileHelper(t, r, &svc, pods, existingSlices, time.Now()) @@ -569,6 +586,9 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) { t.Errorf("Expected address type to be IPv4, got %s", endpointSlice.AddressType) } } + + // ensure cache mutation has not occurred + cmc.Check(t) } // Named ports can map to different port numbers on different pods.