From b8ba6ab0057b63cc1cf64196b1c1761aa615bb08 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Fri, 17 Jun 2022 12:47:39 +0200 Subject: [PATCH] endpointslices: node missing on Pod scenario When a Pod is referencing a Node that doesn't exist on the local informer cache, the current behavior was to return an error to retry later and stop processing. However, this can cause scenarios that a missing node leaves a Slice stuck, it can no reflect other changes, or be created. Also, this doesn't respect the publishNotReadyAddresses options on Services, that considers ok to publish pod Addresses that are known to not be ready. The new behavior keeps retrying the problematic Service, but it keeps processing the updates, reflacting current state on the EndpointSlice. If the publishNotReadyAddresses is set, a missing node on a Pod is not treated as an error. --- pkg/controller/endpointslice/reconciler.go | 26 ++- .../endpointslice/reconciler_test.go | 185 ++++++++++++++++++ 2 files changed, 209 insertions(+), 2 deletions(-) diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 8d9dcd1c55b..9baa2ada184 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -130,6 +130,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis // slices (by address type) for the given service. It creates, updates, or deletes endpoint slices // to ensure the desired set of pods are represented by endpoint slices. func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error { + errs := []error{} slicesToCreate := []*discovery.EndpointSlice{} slicesToUpdate := []*discovery.EndpointSlice{} @@ -173,7 +174,23 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor node, err := r.nodeLister.Get(pod.Spec.NodeName) if err != nil { - return err + // we are getting the information from the local informer, + // an error different than IsNotFound should not happen + if !errors.IsNotFound(err) { + return err + } + // If the Node specified by the Pod doesn't exist we want to requeue the Service so we + // retry later, but also update the EndpointSlice without the problematic Pod. + // Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid + // situations where a reference from a Pod to a missing node can leave the EndpointSlice + // stuck forever. + // On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the + // Pod, since the user is explicitly indicating that the Pod address should be published. + if !service.Spec.PublishNotReadyAddresses { + klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName) + errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)) + continue + } } endpoint := podToEndpoint(pod, node, service, addressType) if len(endpoint.Addresses) > 0 { @@ -255,7 +272,12 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si) } - return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) + err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) + if err != nil { + errs = append(errs, err) + } + return utilerrors.NewAggregate(errs) + } // placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices. diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index c1cfd46283b..ff5f99abe2e 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -1438,6 +1438,191 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) { } } +// When a Pod references a Node that is not present in the informer cache the +// EndpointSlices will continue to allow create, updates and deletes through. +// The Pod with the missing reference will be published or retried depending of +// the service.spec.PublishNotReadyAddresses. +// The test considers two Pods on different nodes, one of the Nodes is not present. +func TestReconcilerPodMissingNode(t *testing.T) { + namespace := "test" + + nodes := make([]*corev1.Node, 2) + nodes[0] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-0"}} + nodes[1] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}} + + pods := make([]*corev1.Pod, 2) + pods[0] = newPod(0, namespace, true, 1, false) + pods[0].Spec.NodeName = nodes[0].Name + pods[1] = newPod(1, namespace, true, 1, false) + pods[1].Spec.NodeName = nodes[1].Name + + service, endpointMeta := newServiceAndEndpointMeta("foo", namespace) + + testCases := []struct { + name string + publishNotReady bool + existingNodes []*corev1.Node + existingSlice func() *discovery.EndpointSlice + expectedMetrics expectedMetrics + expectError bool + }{{ + name: "Create and publishNotReady false", + publishNotReady: false, + existingNodes: []*corev1.Node{nodes[0]}, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: 1, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, { + name: "Create and publishNotReady true", + publishNotReady: true, + existingNodes: []*corev1.Node{nodes[0]}, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 2, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: false, + }, { + name: "Update and publishNotReady false", + publishNotReady: false, + existingNodes: []*corev1.Node{nodes[0]}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: 0, + removedPerSync: 1, + numCreated: 0, + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, { + name: "Update and publishNotReady true and all nodes missing", + publishNotReady: true, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + expectError: false, + }, { + name: "Update and publishNotReady true", + publishNotReady: true, + existingNodes: []*corev1.Node{nodes[0]}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + expectError: false, + }, { + name: "Update if publishNotReady false and no nodes are present", + publishNotReady: false, + existingNodes: []*corev1.Node{}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 0, + addedPerSync: 0, + removedPerSync: 2, + numCreated: 0, + // the slice is updated not deleted + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := newClientset() + setupMetrics() + r := newReconciler(client, tc.existingNodes, defaultMaxEndpointsPerSlice) + + svc := service.DeepCopy() + svc.Spec.PublishNotReadyAddresses = tc.publishNotReady + existingSlices := []*discovery.EndpointSlice{} + if tc.existingSlice != nil { + slice := tc.existingSlice() + existingSlices = append(existingSlices, slice) + _, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), slice, metav1.CreateOptions{}) + if createErr != nil { + t.Errorf("Expected no error creating endpoint slice") + } + } + err := r.reconcile(svc, pods, existingSlices, time.Now()) + if err == nil && tc.expectError { + t.Errorf("Expected error but no error received") + } + if err != nil && !tc.expectError { + t.Errorf("Unexpected error: %v", err) + } + + fetchedSlices := fetchEndpointSlices(t, client, namespace) + if len(fetchedSlices) != tc.expectedMetrics.actualSlices { + t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices) + } + expectMetrics(t, tc.expectedMetrics) + + }) + } +} + func TestReconcileTopology(t *testing.T) { ns := "testing" svc, endpointMeta := newServiceAndEndpointMeta("foo", ns)