diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 8d9dcd1c55b..9baa2ada184 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -130,6 +130,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis // slices (by address type) for the given service. It creates, updates, or deletes endpoint slices // to ensure the desired set of pods are represented by endpoint slices. func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error { + errs := []error{} slicesToCreate := []*discovery.EndpointSlice{} slicesToUpdate := []*discovery.EndpointSlice{} @@ -173,7 +174,23 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor node, err := r.nodeLister.Get(pod.Spec.NodeName) if err != nil { - return err + // we are getting the information from the local informer, + // an error different than IsNotFound should not happen + if !errors.IsNotFound(err) { + return err + } + // If the Node specified by the Pod doesn't exist we want to requeue the Service so we + // retry later, but also update the EndpointSlice without the problematic Pod. + // Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid + // situations where a reference from a Pod to a missing node can leave the EndpointSlice + // stuck forever. + // On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the + // Pod, since the user is explicitly indicating that the Pod address should be published. + if !service.Spec.PublishNotReadyAddresses { + klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName) + errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)) + continue + } } endpoint := podToEndpoint(pod, node, service, addressType) if len(endpoint.Addresses) > 0 { @@ -255,7 +272,12 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si) } - return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) + err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) + if err != nil { + errs = append(errs, err) + } + return utilerrors.NewAggregate(errs) + } // placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices. diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index c1cfd46283b..ff5f99abe2e 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -1438,6 +1438,191 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) { } } +// When a Pod references a Node that is not present in the informer cache the +// EndpointSlices will continue to allow create, updates and deletes through. +// The Pod with the missing reference will be published or retried depending of +// the service.spec.PublishNotReadyAddresses. +// The test considers two Pods on different nodes, one of the Nodes is not present. +func TestReconcilerPodMissingNode(t *testing.T) { + namespace := "test" + + nodes := make([]*corev1.Node, 2) + nodes[0] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-0"}} + nodes[1] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}} + + pods := make([]*corev1.Pod, 2) + pods[0] = newPod(0, namespace, true, 1, false) + pods[0].Spec.NodeName = nodes[0].Name + pods[1] = newPod(1, namespace, true, 1, false) + pods[1].Spec.NodeName = nodes[1].Name + + service, endpointMeta := newServiceAndEndpointMeta("foo", namespace) + + testCases := []struct { + name string + publishNotReady bool + existingNodes []*corev1.Node + existingSlice func() *discovery.EndpointSlice + expectedMetrics expectedMetrics + expectError bool + }{{ + name: "Create and publishNotReady false", + publishNotReady: false, + existingNodes: []*corev1.Node{nodes[0]}, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: 1, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, { + name: "Create and publishNotReady true", + publishNotReady: true, + existingNodes: []*corev1.Node{nodes[0]}, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 2, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: false, + }, { + name: "Update and publishNotReady false", + publishNotReady: false, + existingNodes: []*corev1.Node{nodes[0]}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: 0, + removedPerSync: 1, + numCreated: 0, + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, { + name: "Update and publishNotReady true and all nodes missing", + publishNotReady: true, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + expectError: false, + }, { + name: "Update and publishNotReady true", + publishNotReady: true, + existingNodes: []*corev1.Node{nodes[0]}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 2, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + expectError: false, + }, { + name: "Update if publishNotReady false and no nodes are present", + publishNotReady: false, + existingNodes: []*corev1.Node{}, + existingSlice: func() *discovery.EndpointSlice { + slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4)) + slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4)) + return slice + }, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 0, + addedPerSync: 0, + removedPerSync: 2, + numCreated: 0, + // the slice is updated not deleted + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSync: 1, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := newClientset() + setupMetrics() + r := newReconciler(client, tc.existingNodes, defaultMaxEndpointsPerSlice) + + svc := service.DeepCopy() + svc.Spec.PublishNotReadyAddresses = tc.publishNotReady + existingSlices := []*discovery.EndpointSlice{} + if tc.existingSlice != nil { + slice := tc.existingSlice() + existingSlices = append(existingSlices, slice) + _, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), slice, metav1.CreateOptions{}) + if createErr != nil { + t.Errorf("Expected no error creating endpoint slice") + } + } + err := r.reconcile(svc, pods, existingSlices, time.Now()) + if err == nil && tc.expectError { + t.Errorf("Expected error but no error received") + } + if err != nil && !tc.expectError { + t.Errorf("Unexpected error: %v", err) + } + + fetchedSlices := fetchEndpointSlices(t, client, namespace) + if len(fetchedSlices) != tc.expectedMetrics.actualSlices { + t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices) + } + expectMetrics(t, tc.expectedMetrics) + + }) + } +} + func TestReconcileTopology(t *testing.T) { ns := "testing" svc, endpointMeta := newServiceAndEndpointMeta("foo", ns)