From 0d9689a55d30858aab7807f9a50bdc578337c81c Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Fri, 17 Jun 2022 12:54:16 +0200 Subject: [PATCH 1/3] fix a bug on endpointslices tests comparing the wrong metrics --- pkg/controller/endpointslice/reconciler_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index 6c31a2145d1..5aadc5f3090 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -1594,7 +1594,7 @@ func TestReconcileTopology(t *testing.T) { expectedCrossZoneHints: 1, expectedMetrics: expectedMetrics{ desiredSlices: 1, - actualSlices: 1, + actualSlices: 2, desiredEndpoints: 9, addedPerSync: 0, removedPerSync: 0, @@ -1617,7 +1617,7 @@ func TestReconcileTopology(t *testing.T) { expectedCrossZoneHints: 0, expectedMetrics: expectedMetrics{ desiredSlices: 1, - actualSlices: 1, + actualSlices: 2, desiredEndpoints: 9, addedPerSync: 0, removedPerSync: 0, @@ -1650,6 +1650,9 @@ func TestReconcileTopology(t *testing.T) { cmc.Check(t) expectMetrics(t, tc.expectedMetrics) fetchedSlices := fetchEndpointSlices(t, client, ns) + if len(fetchedSlices) != tc.expectedMetrics.actualSlices { + t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices) + } if tc.expectedHints == nil { for _, slice := range fetchedSlices { @@ -1864,7 +1867,7 @@ func expectMetrics(t *testing.T, em expectedMetrics) { actualNumSlices, err := testutil.GetGaugeMetricValue(metrics.NumEndpointSlices.WithLabelValues()) handleErr(t, err, "numEndpointSlices") - if actualDesiredSlices != float64(em.desiredSlices) { + if actualNumSlices != float64(em.actualSlices) { t.Errorf("Expected numEndpointSlices to be %d, got %v", em.actualSlices, actualNumSlices) } From baecb1981e085124acad7d44d6e38eacbfd05540 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Wed, 22 Jun 2022 09:45:02 +0200 Subject: [PATCH 2/3] fix metrics for placeholder slice There is always a placeholder slice. The ServicePortCache logic was considering always one endpointSlice per Endpoint, but if there are multiple empty Endpoints, we just use one placeholder slice, not multiple placeholder slices. --- pkg/controller/endpointslice/metrics/cache.go | 7 +++++++ .../endpointslice/metrics/cache_test.go | 17 +++++++++++++++++ pkg/controller/endpointslice/reconciler.go | 8 ++++---- pkg/controller/endpointslice/reconciler_test.go | 2 +- 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/pkg/controller/endpointslice/metrics/cache.go b/pkg/controller/endpointslice/metrics/cache.go index 86a67f8a54b..2bd22316167 100644 --- a/pkg/controller/endpointslice/metrics/cache.go +++ b/pkg/controller/endpointslice/metrics/cache.go @@ -91,6 +91,10 @@ func (spc *ServicePortCache) totals(maxEndpointsPerSlice int) (int, int, int) { actualSlices += eInfo.Slices desiredSlices += numDesiredSlices(eInfo.Endpoints, maxEndpointsPerSlice) } + // there is always a placeholder slice + if desiredSlices == 0 { + desiredSlices = 1 + } return actualSlices, desiredSlices, endpoints } @@ -148,6 +152,9 @@ func (c *Cache) updateMetrics() { // numDesiredSlices calculates the number of EndpointSlices that would exist // with ideal endpoint distribution. func numDesiredSlices(numEndpoints, maxEndpointsPerSlice int) int { + if numEndpoints == 0 { + return 0 + } if numEndpoints <= maxEndpointsPerSlice { return 1 } diff --git a/pkg/controller/endpointslice/metrics/cache_test.go b/pkg/controller/endpointslice/metrics/cache_test.go index df88ec18325..5cfd58bad9e 100644 --- a/pkg/controller/endpointslice/metrics/cache_test.go +++ b/pkg/controller/endpointslice/metrics/cache_test.go @@ -60,6 +60,23 @@ func TestNumEndpointsAndSlices(t *testing.T) { expectNumEndpointsAndSlices(t, c, 4, 4, 160) } +func TestPlaceHolderSlice(t *testing.T) { + c := NewCache(int32(100)) + + p80 := int32(80) + p443 := int32(443) + + pmKey80443 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) + pmKey80 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) + + sp := NewServicePortCache() + sp.Set(pmKey80, EfficiencyInfo{Endpoints: 0, Slices: 1}) + sp.Set(pmKey80443, EfficiencyInfo{Endpoints: 0, Slices: 1}) + + c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc1"}, sp) + expectNumEndpointsAndSlices(t, c, 1, 2, 0) +} + func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) { t.Helper() if c.numSlicesDesired != desired { diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 8c5204a52e6..8d9dcd1c55b 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -223,11 +223,11 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor slicesToDelete = slicesToDelete[:0] } else { slicesToCreate = append(slicesToCreate, placeholderSlice) - spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{ - Endpoints: 0, - Slices: 1, - }) } + spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{ + Endpoints: 0, + Slices: 1, + }) } metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded)) diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index 5aadc5f3090..c1cfd46283b 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -481,7 +481,7 @@ func TestReconcile1EndpointSlice(t *testing.T) { { desc: "Existing placeholder that's the same", existing: newEndpointSlice(&svc, &endpointMeta{Ports: []discovery.EndpointPort{}, AddressType: discovery.AddressTypeIPv4}), - wantMetrics: expectedMetrics{desiredSlices: 0, actualSlices: 0, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 0}, + wantMetrics: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 0}, }, { desc: "Existing placeholder that's different", From b8ba6ab0057b63cc1cf64196b1c1761aa615bb08 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Fri, 17 Jun 2022 12:47:39 +0200 Subject: [PATCH 3/3] endpointslices: node missing on Pod scenario When a Pod is referencing a Node that doesn't exist on the local informer cache, the current behavior was to return an error to retry later and stop processing. However, this can cause scenarios that a missing node leaves a Slice stuck, it can no reflect other changes, or be created. Also, this doesn't respect the publishNotReadyAddresses options on Services, that considers ok to publish pod Addresses that are known to not be ready. The new behavior keeps retrying the problematic Service, but it keeps processing the updates, reflacting current state on the EndpointSlice. If the publishNotReadyAddresses is set, a missing node on a Pod is not treated as an error. --- pkg/controller/endpointslice/reconciler.go | 26 ++- .../endpointslice/reconciler_test.go | 185 ++++++++++++++++++ 2 files changed, 209 insertions(+), 2 deletions(-) diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 8d9dcd1c55b..9baa2ada184 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -130,6 +130,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis // slices (by address type) for the given service. It creates, updates, or deletes endpoint slices // to ensure the desired set of pods are represented by endpoint slices. func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error { + errs := []error{} slicesToCreate := []*discovery.EndpointSlice{} slicesToUpdate := []*discovery.EndpointSlice{} @@ -173,7 +174,23 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor node, err := r.nodeLister.Get(pod.Spec.NodeName) if err != nil { - return err + // we are getting the information from the local informer, + // an error different than IsNotFound should not happen + if !errors.IsNotFound(err) { + return err + } + // If the Node specified by the Pod doesn't exist we want to requeue the Service so we + // retry later, but also update the EndpointSlice without the problematic Pod. + // Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid + // situations where a reference from a Pod to a missing node can leave the EndpointSlice + // stuck forever. + // On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the + // Pod, since the user is explicitly indicating that the Pod address should be published. + if !service.Spec.PublishNotReadyAddresses { + klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName) + errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)) + continue + } } endpoint := podToEndpoint(pod, node, service, addressType) if len(endpoint.Addresses) > 0 { @@ -255,7 +272,12 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si) } - return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) + err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) + if err != nil { + errs = append(errs, err) + } + return utilerrors.NewAggregate(errs) + } // placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices. diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index c1cfd46283b..ff5f99abe2e 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -1438,6 +1438,191 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) { } } +// When a Pod references a Node that is not present in the informer cache the +// EndpointSlices will continue to allow create, updates and deletes through. +// The Pod with the missing reference will be published or retried depending of +// the service.spec.PublishNotReadyAddresses. +// The test considers two Pods on different nodes, one of the Nodes is not present. +func TestReconcilerPodMissingNode(t *testing.T) { + namespace := "test" + + nodes := make([]*corev1.Node, 2) + nodes[0] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-0"}} + nodes[1] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}} + + pods := make([]*corev1.Pod, 2) + pods[0] = newPod(0, namespace, true, 1, false) + pods[0].Spec.NodeName = nodes[0].Name + pods[1] = newPod(1, namespace, true, 1, false) + pods[1].Spec.NodeName = nodes[1].Name + + service, endpointMeta := newServiceAndEndpointMeta("foo", namespace) + + testCases := []struct { + name string + publishNotReady bool + existingNodes []*corev1.Node + existingSlice func() *discovery.EndpointSlice + expectedMetrics expectedMetrics + expectError bool + }{{ + name: "Create and publishNotReady false", + publishNotReady: false, + existingNodes: []*corev1.Node{nodes[0]}, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: 1, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, { + name: "Create and publishNotReady true", + publishNotReady: true, + existingNodes: []*corev1.Node{nodes[0]}, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 2, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: false, + }, { + name: "Update and publishNotReady false", + publishNotReady: false, + existingNodes: []*corev1.Node{nodes[0]}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: 0, + removedPerSync: 1, + numCreated: 0, + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, { + name: "Update and publishNotReady true and all nodes missing", + publishNotReady: true, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + expectError: false, + }, { + name: "Update and publishNotReady true", + publishNotReady: true, + existingNodes: []*corev1.Node{nodes[0]}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + expectError: false, + }, { + name: "Update if publishNotReady false and no nodes are present", + publishNotReady: false, + existingNodes: []*corev1.Node{}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 0, + addedPerSync: 0, + removedPerSync: 2, + numCreated: 0, + // the slice is updated not deleted + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := newClientset() + setupMetrics() + r := newReconciler(client, tc.existingNodes, defaultMaxEndpointsPerSlice) + + svc := service.DeepCopy() + svc.Spec.PublishNotReadyAddresses = tc.publishNotReady + existingSlices := []*discovery.EndpointSlice{} + if tc.existingSlice != nil { + slice := tc.existingSlice() + existingSlices = append(existingSlices, slice) + _, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), slice, metav1.CreateOptions{}) + if createErr != nil { + t.Errorf("Expected no error creating endpoint slice") + } + } + err := r.reconcile(svc, pods, existingSlices, time.Now()) + if err == nil && tc.expectError { + t.Errorf("Expected error but no error received") + } + if err != nil && !tc.expectError { + t.Errorf("Unexpected error: %v", err) + } + + fetchedSlices := fetchEndpointSlices(t, client, namespace) + if len(fetchedSlices) != tc.expectedMetrics.actualSlices { + t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices) + } + expectMetrics(t, tc.expectedMetrics) + + }) + } +} + func TestReconcileTopology(t *testing.T) { ns := "testing" svc, endpointMeta := newServiceAndEndpointMeta("foo", ns)