mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #84573 from robscott/endpointslice-publishnotready
Updating EndpointSlices to use PublishNotReadyAddresses from Services.
This commit is contained in:
commit
ca836256c4
@ -90,7 +90,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
endpoint := podToEndpoint(pod, node)
|
||||
endpoint := podToEndpoint(pod, node, service.Spec.PublishNotReadyAddresses)
|
||||
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
|
||||
numDesiredEndpoints++
|
||||
}
|
||||
|
@ -139,6 +139,40 @@ func TestReconcile1EndpointSlice(t *testing.T) {
|
||||
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0})
|
||||
}
|
||||
|
||||
// when a Service has PublishNotReadyAddresses set to true, corresponding
|
||||
// Endpoints should be considered ready, even if the backing Pod is not.
|
||||
func TestReconcile1EndpointSlicePublishNotReadyAddresses(t *testing.T) {
|
||||
client := newClientset()
|
||||
namespace := "test"
|
||||
svc, _ := newServiceAndEndpointMeta("foo", namespace)
|
||||
svc.Spec.PublishNotReadyAddresses = true
|
||||
|
||||
// start with 50 pods, 1/3 not ready
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 50; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
}
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
|
||||
|
||||
// Only 1 action, an EndpointSlice create
|
||||
assert.Len(t, client.Actions(), 1, "Expected 1 additional clientset action")
|
||||
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
||||
|
||||
// Two endpoint slices should be completely full, the remainder should be in another one
|
||||
endpointSlices := fetchEndpointSlices(t, client, namespace)
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
for i, endpoint := range endpointSlice.Endpoints {
|
||||
if !*endpoint.Conditions.Ready {
|
||||
t.Errorf("Expected endpoints[%d] to be ready", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
expectUnorderedSlicesWithLengths(t, endpointSlices, []int{50})
|
||||
}
|
||||
|
||||
// a simple use case with 250 pods matching a service and no existing slices
|
||||
// reconcile should create 3 slices, completely filling 2 of them
|
||||
func TestReconcileManyPods(t *testing.T) {
|
||||
@ -190,13 +224,13 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
|
||||
// have approximately 1/4 in first slice
|
||||
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
for i := 1; i < len(pods)-4; i += 4 {
|
||||
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
|
||||
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
|
||||
}
|
||||
|
||||
// have approximately 1/4 in second slice
|
||||
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
|
||||
for i := 3; i < len(pods)-4; i += 4 {
|
||||
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
|
||||
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
|
||||
}
|
||||
|
||||
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
|
||||
@ -242,13 +276,13 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
|
||||
// have approximately 1/4 in first slice
|
||||
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
for i := 1; i < len(pods)-4; i += 4 {
|
||||
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
|
||||
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
|
||||
}
|
||||
|
||||
// have approximately 1/4 in second slice
|
||||
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
|
||||
for i := 3; i < len(pods)-4; i += 4 {
|
||||
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
|
||||
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
|
||||
}
|
||||
|
||||
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
|
||||
@ -324,7 +358,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
|
||||
if i%30 == 0 {
|
||||
existingSlices = append(existingSlices, newEmptyEndpointSlice(sliceNum, namespace, endpointMeta, svc))
|
||||
}
|
||||
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}))
|
||||
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
|
||||
}
|
||||
|
||||
createEndpointSlices(t, client, namespace, existingSlices)
|
||||
@ -362,7 +396,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
|
||||
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
for i := 0; i < 80; i++ {
|
||||
pod := newPod(i, namespace, true, 1)
|
||||
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}))
|
||||
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
existingSlices = append(existingSlices, slice1)
|
||||
@ -370,7 +404,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
|
||||
slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
|
||||
for i := 100; i < 120; i++ {
|
||||
pod := newPod(i, namespace, true, 1)
|
||||
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}))
|
||||
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
existingSlices = append(existingSlices, slice2)
|
||||
|
@ -35,8 +35,8 @@ import (
|
||||
// podEndpointChanged returns true if the results of podToEndpoint are different
|
||||
// for the pods passed to this function.
|
||||
func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
|
||||
endpoint1 := podToEndpoint(pod1, &corev1.Node{})
|
||||
endpoint2 := podToEndpoint(pod2, &corev1.Node{})
|
||||
endpoint1 := podToEndpoint(pod1, &corev1.Node{}, false)
|
||||
endpoint2 := podToEndpoint(pod2, &corev1.Node{}, false)
|
||||
|
||||
endpoint1.TargetRef.ResourceVersion = ""
|
||||
endpoint2.TargetRef.ResourceVersion = ""
|
||||
@ -45,7 +45,7 @@ func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
|
||||
}
|
||||
|
||||
// podToEndpoint returns an Endpoint object generated from a Pod and Node.
|
||||
func podToEndpoint(pod *corev1.Pod, node *corev1.Node) discovery.Endpoint {
|
||||
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, publishNotReadyAddresses bool) discovery.Endpoint {
|
||||
// Build out topology information. This is currently limited to hostname,
|
||||
// zone, and region, but this will be expanded in the future.
|
||||
topology := map[string]string{}
|
||||
@ -67,7 +67,7 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node) discovery.Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
ready := podutil.IsPodReady(pod)
|
||||
ready := publishNotReadyAddresses || podutil.IsPodReady(pod)
|
||||
return discovery.Endpoint{
|
||||
Addresses: getEndpointAddresses(pod.Status),
|
||||
Conditions: discovery.EndpointConditions{
|
||||
|
@ -18,6 +18,7 @@ package endpointslice
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -91,10 +92,11 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
node *v1.Node
|
||||
expectedEndpoint discovery.Endpoint
|
||||
name string
|
||||
pod *v1.Pod
|
||||
node *v1.Node
|
||||
expectedEndpoint discovery.Endpoint
|
||||
publishNotReadyAddresses bool
|
||||
}{
|
||||
{
|
||||
name: "Ready pod",
|
||||
@ -112,6 +114,23 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Ready pod + publishNotReadyAddresses",
|
||||
pod: readyPod,
|
||||
publishNotReadyAddresses: true,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
Name: readyPod.Name,
|
||||
UID: readyPod.UID,
|
||||
ResourceVersion: readyPod.ResourceVersion,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Unready pod",
|
||||
pod: unreadyPod,
|
||||
@ -128,6 +147,23 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Unready pod + publishNotReadyAddresses",
|
||||
pod: unreadyPod,
|
||||
publishNotReadyAddresses: true,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
Name: readyPod.Name,
|
||||
UID: readyPod.UID,
|
||||
ResourceVersion: readyPod.ResourceVersion,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Ready pod + node labels",
|
||||
pod: readyPod,
|
||||
@ -174,8 +210,10 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
endpoint := podToEndpoint(testCase.pod, testCase.node)
|
||||
assert.EqualValues(t, testCase.expectedEndpoint, endpoint, "Test case failed: %s", testCase.name)
|
||||
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.publishNotReadyAddresses)
|
||||
if !reflect.DeepEqual(testCase.expectedEndpoint, endpoint) {
|
||||
t.Errorf("Expected endpoint: %v, got: %v", testCase.expectedEndpoint, endpoint)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user