mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #110639 from aojea/slice_no_node
EndpointSlice with Pods without an existing Node
This commit is contained in:
commit
ae3537120b
@ -91,6 +91,10 @@ func (spc *ServicePortCache) totals(maxEndpointsPerSlice int) (int, int, int) {
|
||||
actualSlices += eInfo.Slices
|
||||
desiredSlices += numDesiredSlices(eInfo.Endpoints, maxEndpointsPerSlice)
|
||||
}
|
||||
// there is always a placeholder slice
|
||||
if desiredSlices == 0 {
|
||||
desiredSlices = 1
|
||||
}
|
||||
return actualSlices, desiredSlices, endpoints
|
||||
}
|
||||
|
||||
@ -148,6 +152,9 @@ func (c *Cache) updateMetrics() {
|
||||
// numDesiredSlices calculates the number of EndpointSlices that would exist
|
||||
// with ideal endpoint distribution.
|
||||
func numDesiredSlices(numEndpoints, maxEndpointsPerSlice int) int {
|
||||
if numEndpoints == 0 {
|
||||
return 0
|
||||
}
|
||||
if numEndpoints <= maxEndpointsPerSlice {
|
||||
return 1
|
||||
}
|
||||
|
@ -60,6 +60,23 @@ func TestNumEndpointsAndSlices(t *testing.T) {
|
||||
expectNumEndpointsAndSlices(t, c, 4, 4, 160)
|
||||
}
|
||||
|
||||
func TestPlaceHolderSlice(t *testing.T) {
|
||||
c := NewCache(int32(100))
|
||||
|
||||
p80 := int32(80)
|
||||
p443 := int32(443)
|
||||
|
||||
pmKey80443 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}})
|
||||
pmKey80 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}})
|
||||
|
||||
sp := NewServicePortCache()
|
||||
sp.Set(pmKey80, EfficiencyInfo{Endpoints: 0, Slices: 1})
|
||||
sp.Set(pmKey80443, EfficiencyInfo{Endpoints: 0, Slices: 1})
|
||||
|
||||
c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc1"}, sp)
|
||||
expectNumEndpointsAndSlices(t, c, 1, 2, 0)
|
||||
}
|
||||
|
||||
func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) {
|
||||
t.Helper()
|
||||
if c.numSlicesDesired != desired {
|
||||
|
@ -130,6 +130,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
// slices (by address type) for the given service. It creates, updates, or deletes endpoint slices
|
||||
// to ensure the desired set of pods are represented by endpoint slices.
|
||||
func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time, addressType discovery.AddressType) error {
|
||||
errs := []error{}
|
||||
|
||||
slicesToCreate := []*discovery.EndpointSlice{}
|
||||
slicesToUpdate := []*discovery.EndpointSlice{}
|
||||
@ -173,7 +174,23 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
|
||||
|
||||
node, err := r.nodeLister.Get(pod.Spec.NodeName)
|
||||
if err != nil {
|
||||
return err
|
||||
// we are getting the information from the local informer,
|
||||
// an error different than IsNotFound should not happen
|
||||
if !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
// If the Node specified by the Pod doesn't exist we want to requeue the Service so we
|
||||
// retry later, but also update the EndpointSlice without the problematic Pod.
|
||||
// Theoretically, the pod Garbage Collector will remove the Pod, but we want to avoid
|
||||
// situations where a reference from a Pod to a missing node can leave the EndpointSlice
|
||||
// stuck forever.
|
||||
// On the other side, if the service.Spec.PublishNotReadyAddresses is set we just add the
|
||||
// Pod, since the user is explicitly indicating that the Pod address should be published.
|
||||
if !service.Spec.PublishNotReadyAddresses {
|
||||
klog.Warningf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName)
|
||||
errs = append(errs, fmt.Errorf("skipping Pod %s for Service %s/%s: Node %s Not Found", pod.Name, service.Namespace, service.Name, pod.Spec.NodeName))
|
||||
continue
|
||||
}
|
||||
}
|
||||
endpoint := podToEndpoint(pod, node, service, addressType)
|
||||
if len(endpoint.Addresses) > 0 {
|
||||
@ -223,11 +240,11 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
|
||||
slicesToDelete = slicesToDelete[:0]
|
||||
} else {
|
||||
slicesToCreate = append(slicesToCreate, placeholderSlice)
|
||||
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
|
||||
Endpoints: 0,
|
||||
Slices: 1,
|
||||
})
|
||||
}
|
||||
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
|
||||
Endpoints: 0,
|
||||
Slices: 1,
|
||||
})
|
||||
}
|
||||
|
||||
metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
|
||||
@ -255,7 +272,12 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
|
||||
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
|
||||
}
|
||||
|
||||
return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
|
||||
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
return utilerrors.NewAggregate(errs)
|
||||
|
||||
}
|
||||
|
||||
// placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices.
|
||||
|
@ -481,7 +481,7 @@ func TestReconcile1EndpointSlice(t *testing.T) {
|
||||
{
|
||||
desc: "Existing placeholder that's the same",
|
||||
existing: newEndpointSlice(&svc, &endpointMeta{Ports: []discovery.EndpointPort{}, AddressType: discovery.AddressTypeIPv4}),
|
||||
wantMetrics: expectedMetrics{desiredSlices: 0, actualSlices: 0, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 0},
|
||||
wantMetrics: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 0},
|
||||
},
|
||||
{
|
||||
desc: "Existing placeholder that's different",
|
||||
@ -1438,6 +1438,191 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// When a Pod references a Node that is not present in the informer cache the
|
||||
// EndpointSlices will continue to allow create, updates and deletes through.
|
||||
// The Pod with the missing reference will be published or retried depending of
|
||||
// the service.spec.PublishNotReadyAddresses.
|
||||
// The test considers two Pods on different nodes, one of the Nodes is not present.
|
||||
func TestReconcilerPodMissingNode(t *testing.T) {
|
||||
namespace := "test"
|
||||
|
||||
nodes := make([]*corev1.Node, 2)
|
||||
nodes[0] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-0"}}
|
||||
nodes[1] = &corev1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}
|
||||
|
||||
pods := make([]*corev1.Pod, 2)
|
||||
pods[0] = newPod(0, namespace, true, 1, false)
|
||||
pods[0].Spec.NodeName = nodes[0].Name
|
||||
pods[1] = newPod(1, namespace, true, 1, false)
|
||||
pods[1].Spec.NodeName = nodes[1].Name
|
||||
|
||||
service, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
publishNotReady bool
|
||||
existingNodes []*corev1.Node
|
||||
existingSlice func() *discovery.EndpointSlice
|
||||
expectedMetrics expectedMetrics
|
||||
expectError bool
|
||||
}{{
|
||||
name: "Create and publishNotReady false",
|
||||
publishNotReady: false,
|
||||
existingNodes: []*corev1.Node{nodes[0]},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 1,
|
||||
addedPerSync: 1,
|
||||
removedPerSync: 0,
|
||||
numCreated: 1,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1,
|
||||
},
|
||||
expectError: true,
|
||||
}, {
|
||||
name: "Create and publishNotReady true",
|
||||
publishNotReady: true,
|
||||
existingNodes: []*corev1.Node{nodes[0]},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 2,
|
||||
addedPerSync: 2,
|
||||
removedPerSync: 0,
|
||||
numCreated: 1,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1,
|
||||
},
|
||||
expectError: false,
|
||||
}, {
|
||||
name: "Update and publishNotReady false",
|
||||
publishNotReady: false,
|
||||
existingNodes: []*corev1.Node{nodes[0]},
|
||||
existingSlice: func() *discovery.EndpointSlice {
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
|
||||
return slice
|
||||
},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 1,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 1,
|
||||
numCreated: 0,
|
||||
numUpdated: 1,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1,
|
||||
},
|
||||
expectError: true,
|
||||
}, {
|
||||
name: "Update and publishNotReady true and all nodes missing",
|
||||
publishNotReady: true,
|
||||
existingSlice: func() *discovery.EndpointSlice {
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
|
||||
return slice
|
||||
},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 2,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 0,
|
||||
numCreated: 0,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 0,
|
||||
},
|
||||
expectError: false,
|
||||
}, {
|
||||
name: "Update and publishNotReady true",
|
||||
publishNotReady: true,
|
||||
existingNodes: []*corev1.Node{nodes[0]},
|
||||
existingSlice: func() *discovery.EndpointSlice {
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
|
||||
return slice
|
||||
},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 2,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 0,
|
||||
numCreated: 0,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 0,
|
||||
},
|
||||
expectError: false,
|
||||
}, {
|
||||
name: "Update if publishNotReady false and no nodes are present",
|
||||
publishNotReady: false,
|
||||
existingNodes: []*corev1.Node{},
|
||||
existingSlice: func() *discovery.EndpointSlice {
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, service)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[0], nodes[0], &service, discovery.AddressTypeIPv4))
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pods[1], nodes[1], &service, discovery.AddressTypeIPv4))
|
||||
return slice
|
||||
},
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 0,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 2,
|
||||
numCreated: 0,
|
||||
// the slice is updated not deleted
|
||||
numUpdated: 1,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1,
|
||||
},
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
client := newClientset()
|
||||
setupMetrics()
|
||||
r := newReconciler(client, tc.existingNodes, defaultMaxEndpointsPerSlice)
|
||||
|
||||
svc := service.DeepCopy()
|
||||
svc.Spec.PublishNotReadyAddresses = tc.publishNotReady
|
||||
existingSlices := []*discovery.EndpointSlice{}
|
||||
if tc.existingSlice != nil {
|
||||
slice := tc.existingSlice()
|
||||
existingSlices = append(existingSlices, slice)
|
||||
_, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
|
||||
if createErr != nil {
|
||||
t.Errorf("Expected no error creating endpoint slice")
|
||||
}
|
||||
}
|
||||
err := r.reconcile(svc, pods, existingSlices, time.Now())
|
||||
if err == nil && tc.expectError {
|
||||
t.Errorf("Expected error but no error received")
|
||||
}
|
||||
if err != nil && !tc.expectError {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
fetchedSlices := fetchEndpointSlices(t, client, namespace)
|
||||
if len(fetchedSlices) != tc.expectedMetrics.actualSlices {
|
||||
t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices)
|
||||
}
|
||||
expectMetrics(t, tc.expectedMetrics)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReconcileTopology(t *testing.T) {
|
||||
ns := "testing"
|
||||
svc, endpointMeta := newServiceAndEndpointMeta("foo", ns)
|
||||
@ -1594,7 +1779,7 @@ func TestReconcileTopology(t *testing.T) {
|
||||
expectedCrossZoneHints: 1,
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
actualSlices: 2,
|
||||
desiredEndpoints: 9,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 0,
|
||||
@ -1617,7 +1802,7 @@ func TestReconcileTopology(t *testing.T) {
|
||||
expectedCrossZoneHints: 0,
|
||||
expectedMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
actualSlices: 2,
|
||||
desiredEndpoints: 9,
|
||||
addedPerSync: 0,
|
||||
removedPerSync: 0,
|
||||
@ -1650,6 +1835,9 @@ func TestReconcileTopology(t *testing.T) {
|
||||
cmc.Check(t)
|
||||
expectMetrics(t, tc.expectedMetrics)
|
||||
fetchedSlices := fetchEndpointSlices(t, client, ns)
|
||||
if len(fetchedSlices) != tc.expectedMetrics.actualSlices {
|
||||
t.Fatalf("Actual slices %d doesn't match metric %d", len(fetchedSlices), tc.expectedMetrics.actualSlices)
|
||||
}
|
||||
|
||||
if tc.expectedHints == nil {
|
||||
for _, slice := range fetchedSlices {
|
||||
@ -1864,7 +2052,7 @@ func expectMetrics(t *testing.T, em expectedMetrics) {
|
||||
|
||||
actualNumSlices, err := testutil.GetGaugeMetricValue(metrics.NumEndpointSlices.WithLabelValues())
|
||||
handleErr(t, err, "numEndpointSlices")
|
||||
if actualDesiredSlices != float64(em.desiredSlices) {
|
||||
if actualNumSlices != float64(em.actualSlices) {
|
||||
t.Errorf("Expected numEndpointSlices to be %d, got %v", em.actualSlices, actualNumSlices)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user