mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
endpointslices: node missing on Pod scenario
When a Pod is referencing a Node that doesn't exist on the local informer cache, the current behavior was to return an error to retry later and stop processing. However, this can cause scenarios that a missing node leaves a Slice stuck, it can no reflect other changes, or be created. Also, this doesn't respect the publishNotReadyAddresses options on Services, that considers ok to publish pod Addresses that are known to not be ready. The new behavior keeps retrying the problematic Service, but it keeps processing the updates, reflacting current state on the EndpointSlice. If the publishNotReadyAddresses is set, a missing node on a Pod is not treated as an error.
This commit is contained in:
parent
baecb1981e
commit
b8ba6ab005
@ -130,6 +130,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
// slices (by address type) for the given service. It creates, updates, or deletes endpoint slices
|
||||
// to ensure the desired set of pods are represented by endpoint slices.
|
||||
func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
|
||||
errs := []error{}
|
||||
|
||||
slicesToCreate := []*discovery.EndpointSlice{}
|
||||
slicesToUpdate := []*discovery.EndpointSlice{}
|
||||
@ -173,7 +174,23 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
|
||||
|
||||
node, err := r.nodeLister.Get(pod.Spec.NodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
// we are getting the information from the local informer,
|
||||
// an error different than IsNotFound should not happen
|
||||
if !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
// If the Node specified by the Pod doesn't exist we want to requeue the Service so we
|
||||
// retry later, but also update the EndpointSlice without the problematic Pod.
|
||||
// Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
|
||||
// situations where a reference from a Pod to a missing node can leave the EndpointSlice
|
||||
// stuck forever.
|
||||
// On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
|
||||
// Pod, since the user is explicitly indicating that the Pod address should be published.
|
||||
if !service.Spec.PublishNotReadyAddresses {
|
||||
klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)
|
||||
errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
|
||||
continue
|
||||
}
|
||||
}
|
||||
endpoint := podToEndpoint(pod, node, service, addressType)
|
||||
if len(endpoint.Addresses) > 0 {
|
||||
@ -255,7 +272,12 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
|
||||
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
|
||||
}
|
||||
|
||||
return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
|
||||
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
return utilerrors.NewAggregate(errs)
|
||||
|
||||
}
|
||||
|
||||
// placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices.
|
||||
|
@ -1438,6 +1438,191 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// When a Pod references a Node that is not present in the informer cache the
|
||||
// EndpointSlices will continue to allow create, updates and deletes through.
|
||||
// The Pod with the missing reference will be published or retried depending of
|
||||
// the service.spec.PublishNotReadyAddresses.
|
||||
// The test considers two Pods on different nodes, one of the Nodes is not present.
|
||||
func TestReconcilerPodMissingNode(t *testing.T) {
|
||||
namespace := "test"
|
||||
|
||||
nodes := make([]*corev1.Node, 2)
|
||||
nodes[0] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-0"}}
|
||||
nodes[1] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}
|
||||
|
||||
pods := make([]*corev1.Pod, 2)
|
||||
pods[0] = newPod(0, namespace, true, 1, false)
|
||||
pods[0].Spec.NodeName = nodes[0].Name
|
||||
pods[1] = newPod(1, namespace, true, 1, false)
|
||||
pods[1].Spec.NodeName = nodes[1].Name
|
||||
|
||||
service, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
publishNotReady bool
|
||||
existingNodes []*corev1.Node
|
||||
existingSlice func() *discovery.EndpointSlice
|
||||
expectedMetrics expectedMetrics
|
||||
expectError bool
|
||||
}{{
|
||||
name: "Create and publishNotReady false",
|
||||
publishNotReady: false,
|
||||
existingNodes: []*corev1.Node{nodes[0]},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 1,
|
||||
addedPerSync: 1,
|
||||
removedPerSync: 0,
|
||||
numCreated: 1,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1,
|
||||
},
|
||||
expectError: true,
|
||||
}, {
|
||||
name: "Create and publishNotReady true",
|
||||
publishNotReady: true,
|
||||
existingNodes: []*corev1.Node{nodes[0]},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 2,
|
||||
addedPerSync: 2,
|
||||
removedPerSync: 0,
|
||||
numCreated: 1,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1,
|
||||
},
|
||||
expectError: false,
|
||||
}, {
|
||||
name: "Update and publishNotReady false",
|
||||
publishNotReady: false,
|
||||
existingNodes: []*corev1.Node{nodes[0]},
|
||||
existingSlice: func() *discovery.EndpointSlice {
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
|
||||
return slice
|
||||
},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 1,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 1,
|
||||
numCreated: 0,
|
||||
numUpdated: 1,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1,
|
||||
},
|
||||
expectError: true,
|
||||
}, {
|
||||
name: "Update and publishNotReady true and all nodes missing",
|
||||
publishNotReady: true,
|
||||
existingSlice: func() *discovery.EndpointSlice {
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
|
||||
return slice
|
||||
},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 2,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 0,
|
||||
numCreated: 0,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 0,
|
||||
},
|
||||
expectError: false,
|
||||
}, {
|
||||
name: "Update and publishNotReady true",
|
||||
publishNotReady: true,
|
||||
existingNodes: []*corev1.Node{nodes[0]},
|
||||
existingSlice: func() *discovery.EndpointSlice {
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
|
||||
return slice
|
||||
},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 2,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 0,
|
||||
numCreated: 0,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 0,
|
||||
},
|
||||
expectError: false,
|
||||
}, {
|
||||
name: "Update if publishNotReady false and no nodes are present",
|
||||
publishNotReady: false,
|
||||
existingNodes: []*corev1.Node{},
|
||||
existingSlice: func() *discovery.EndpointSlice {
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
|
||||
return slice
|
||||
},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 0,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 2,
|
||||
numCreated: 0,
|
||||
// the slice is updated not deleted
|
||||
numUpdated: 1,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1,
|
||||
},
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
client := newClientset()
|
||||
setupMetrics()
|
||||
r := newReconciler(client, tc.existingNodes, defaultMaxEndpointsPerSlice)
|
||||
|
||||
svc := service.DeepCopy()
|
||||
svc.Spec.PublishNotReadyAddresses = tc.publishNotReady
|
||||
existingSlices := []*discovery.EndpointSlice{}
|
||||
if tc.existingSlice != nil {
|
||||
slice := tc.existingSlice()
|
||||
existingSlices = append(existingSlices, slice)
|
||||
_, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
|
||||
if createErr != nil {
|
||||
t.Errorf("Expected no error creating endpoint slice")
|
||||
}
|
||||
}
|
||||
err := r.reconcile(svc, pods, existingSlices, time.Now())
|
||||
if err == nil && tc.expectError {
|
||||
t.Errorf("Expected error but no error received")
|
||||
}
|
||||
if err != nil && !tc.expectError {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
fetchedSlices := fetchEndpointSlices(t, client, namespace)
|
||||
if len(fetchedSlices) != tc.expectedMetrics.actualSlices {
|
||||
t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices)
|
||||
}
|
||||
expectMetrics(t, tc.expectedMetrics)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReconcileTopology(t *testing.T) {
|
||||
ns := "testing"
|
||||
svc, endpointMeta := newServiceAndEndpointMeta("foo", ns)
|
||||
|
Loading…
Reference in New Issue
Block a user