Updating EndpointSlices to use PublishNotReadyAddresses from Services.

The Service spec includes a PublishNotReadyAddresses field which has
been used by Endpoints to report all matching resources ready. This may
or may not have been the initial purpose of the field, but given the
desire to provide backwards compatibility with the Endpoints API here,
it seems to make sense to continue to provide the same functionality.
This commit is contained in:
Rob Scott 2019-10-30 14:05:05 -07:00
parent 1c974109b6
commit ff3cbdb0ad
No known key found for this signature in database
GPG Key ID: 53504C654CF4B3EE
4 changed files with 90 additions and 18 deletions

View File

@ -90,7 +90,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
if err != nil {
return err
}
endpoint := podToEndpoint(pod, node)
endpoint := podToEndpoint(pod, node, service.Spec.PublishNotReadyAddresses)
desiredEndpointsByPortMap[epHash].Insert(&endpoint)
numDesiredEndpoints++
}

View File

@ -139,6 +139,40 @@ func TestReconcile1EndpointSlice(t *testing.T) {
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0})
}
// when a Service has PublishNotReadyAddresses set to true, corresponding
// Endpoints should be considered ready, even if the backing Pod is not.
func TestReconcile1EndpointSlicePublishNotReadyAddresses(t *testing.T) {
client := newClientset()
namespace := "test"
svc, _ := newServiceAndEndpointMeta("foo", namespace)
svc.Spec.PublishNotReadyAddresses = true
// start with 50 pods, 1/3 not ready
pods := []*corev1.Pod{}
for i := 0; i < 50; i++ {
ready := !(i%3 == 0)
pods = append(pods, newPod(i, namespace, ready, 1))
}
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
// Only 1 action, an EndpointSlice create
assert.Len(t, client.Actions(), 1, "Expected 1 additional clientset action")
expectActions(t, client.Actions(), 1, "create", "endpointslices")
// Two endpoint slices should be completely full, the remainder should be in another one
endpointSlices := fetchEndpointSlices(t, client, namespace)
for _, endpointSlice := range endpointSlices {
for i, endpoint := range endpointSlice.Endpoints {
if !*endpoint.Conditions.Ready {
t.Errorf("Expected endpoints[%d] to be ready", i)
}
}
}
expectUnorderedSlicesWithLengths(t, endpointSlices, []int{50})
}
// a simple use case with 250 pods matching a service and no existing slices
// reconcile should create 3 slices, completely filling 2 of them
func TestReconcileManyPods(t *testing.T) {
@ -190,13 +224,13 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// have approximately 1/4 in first slice
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 1; i < len(pods)-4; i += 4 {
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
}
// have approximately 1/4 in second slice
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 3; i < len(pods)-4; i += 4 {
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
}
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
@ -242,13 +276,13 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
// have approximately 1/4 in first slice
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 1; i < len(pods)-4; i += 4 {
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
endpointSlice1.Endpoints = append(endpointSlice1.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
}
// have approximately 1/4 in second slice
endpointSlice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 3; i < len(pods)-4; i += 4 {
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}))
endpointSlice2.Endpoints = append(endpointSlice2.Endpoints, podToEndpoint(pods[i], &corev1.Node{}, false))
}
existingSlices := []*discovery.EndpointSlice{endpointSlice1, endpointSlice2}
@ -324,7 +358,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
if i%30 == 0 {
existingSlices = append(existingSlices, newEmptyEndpointSlice(sliceNum, namespace, endpointMeta, svc))
}
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}))
existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
}
createEndpointSlices(t, client, namespace, existingSlices)
@ -362,7 +396,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
for i := 0; i < 80; i++ {
pod := newPod(i, namespace, true, 1)
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}))
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice1)
@ -370,7 +404,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
for i := 100; i < 120; i++ {
pod := newPod(i, namespace, true, 1)
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}))
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, false))
pods = append(pods, pod)
}
existingSlices = append(existingSlices, slice2)

View File

@ -35,8 +35,8 @@ import (
// podEndpointChanged returns true if the results of podToEndpoint are different
// for the pods passed to this function.
func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
endpoint1 := podToEndpoint(pod1, &corev1.Node{})
endpoint2 := podToEndpoint(pod2, &corev1.Node{})
endpoint1 := podToEndpoint(pod1, &corev1.Node{}, false)
endpoint2 := podToEndpoint(pod2, &corev1.Node{}, false)
endpoint1.TargetRef.ResourceVersion = ""
endpoint2.TargetRef.ResourceVersion = ""
@ -45,7 +45,7 @@ func podEndpointChanged(pod1, pod2 *corev1.Pod) bool {
}
// podToEndpoint returns an Endpoint object generated from a Pod and Node.
func podToEndpoint(pod *corev1.Pod, node *corev1.Node) discovery.Endpoint {
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, publishNotReadyAddresses bool) discovery.Endpoint {
// Build out topology information. This is currently limited to hostname,
// zone, and region, but this will be expanded in the future.
topology := map[string]string{}
@ -67,7 +67,7 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node) discovery.Endpoint {
}
}
ready := podutil.IsPodReady(pod)
ready := publishNotReadyAddresses || podutil.IsPodReady(pod)
return discovery.Endpoint{
Addresses: getEndpointAddresses(pod.Status),
Conditions: discovery.EndpointConditions{

View File

@ -18,6 +18,7 @@ package endpointslice
import (
"fmt"
"reflect"
"testing"
"time"
@ -91,10 +92,11 @@ func TestPodToEndpoint(t *testing.T) {
}
testCases := []struct {
name string
pod *v1.Pod
node *v1.Node
expectedEndpoint discovery.Endpoint
name string
pod *v1.Pod
node *v1.Node
expectedEndpoint discovery.Endpoint
publishNotReadyAddresses bool
}{
{
name: "Ready pod",
@ -112,6 +114,23 @@ func TestPodToEndpoint(t *testing.T) {
},
},
},
{
name: "Ready pod + publishNotReadyAddresses",
pod: readyPod,
publishNotReadyAddresses: true,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: readyPod.Name,
UID: readyPod.UID,
ResourceVersion: readyPod.ResourceVersion,
},
},
},
{
name: "Unready pod",
pod: unreadyPod,
@ -128,6 +147,23 @@ func TestPodToEndpoint(t *testing.T) {
},
},
},
{
name: "Unready pod + publishNotReadyAddresses",
pod: unreadyPod,
publishNotReadyAddresses: true,
expectedEndpoint: discovery.Endpoint{
Addresses: []string{"1.2.3.5"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
TargetRef: &v1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: readyPod.Name,
UID: readyPod.UID,
ResourceVersion: readyPod.ResourceVersion,
},
},
},
{
name: "Ready pod + node labels",
pod: readyPod,
@ -174,8 +210,10 @@ func TestPodToEndpoint(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
endpoint := podToEndpoint(testCase.pod, testCase.node)
assert.EqualValues(t, testCase.expectedEndpoint, endpoint, "Test case failed: %s", testCase.name)
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.publishNotReadyAddresses)
if !reflect.DeepEqual(testCase.expectedEndpoint, endpoint) {
t.Errorf("Expected endpoint: %v, got: %v", testCase.expectedEndpoint, endpoint)
}
})
}
}