diff --git a/pkg/controller/endpointslice/BUILD b/pkg/controller/endpointslice/BUILD index 6bbe6bb7065..42288c69b9c 100644 --- a/pkg/controller/endpointslice/BUILD +++ b/pkg/controller/endpointslice/BUILD @@ -19,6 +19,7 @@ go_library( "//pkg/controller:go_default_library", "//pkg/controller/endpointslice/metrics:go_default_library", "//pkg/controller/util/endpoint:go_default_library", + "//pkg/features:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", @@ -31,6 +32,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", @@ -61,6 +63,7 @@ go_test( "//pkg/controller:go_default_library", "//pkg/controller/endpointslice/metrics:go_default_library", "//pkg/controller/util/endpoint:go_default_library", + "//pkg/features:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", @@ -71,11 +74,13 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", diff --git a/pkg/controller/endpointslice/endpointslice_controller_test.go b/pkg/controller/endpointslice/endpointslice_controller_test.go index 9c17c499c6d..bc2d113cb5e 100644 --- a/pkg/controller/endpointslice/endpointslice_controller_test.go +++ b/pkg/controller/endpointslice/endpointslice_controller_test.go @@ -186,11 +186,11 @@ func TestSyncServicePodSelection(t *testing.T) { client, esController := newController([]string{"node-1"}, time.Duration(0)) ns := metav1.NamespaceDefault - pod1 := newPod(1, ns, true, 0) + pod1 := newPod(1, ns, true, 0, false) esController.podStore.Add(pod1) // ensure this pod will not match the selector - pod2 := newPod(2, ns, true, 0) + pod2 := newPod(2, ns, true, 0, false) pod2.Labels["foo"] = "boo" esController.podStore.Add(pod2) @@ -341,11 +341,11 @@ func TestSyncServiceFull(t *testing.T) { namespace := metav1.NamespaceDefault serviceName := "all-the-protocols" - pod1 := newPod(1, namespace, true, 0) + pod1 := newPod(1, namespace, true, 0, false) pod1.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}} esController.podStore.Add(pod1) - pod2 := newPod(2, namespace, true, 0) + pod2 := newPod(2, namespace, true, 0, false) pod2.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.5"}, {IP: "1234::5678:0000:0000:9abc:def0"}} esController.podStore.Add(pod2) @@ -499,7 +499,7 @@ func TestPodAddsBatching(t *testing.T) { for i, add := range tc.adds { time.Sleep(add.delay) - p := newPod(i, ns, true, 0) + p := newPod(i, ns, true, 0, false) esController.podStore.Add(p) esController.addPod(p) } @@ -789,7 +789,7 @@ func TestPodDeleteBatching(t *testing.T) { func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) { t.Helper() for i := 0; i < podsCount; i++ { - pod := newPod(i, namespace, true, 0) + pod := newPod(i, namespace, true, 0, false) esController.podStore.Add(pod) } } diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 147c62d4c8b..1694844dd30 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -29,10 +29,12 @@ import ( "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" + "k8s.io/kubernetes/pkg/features" ) // reconciler is responsible for transforming current EndpointSlice state into @@ -136,7 +138,8 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor numDesiredEndpoints := 0 for _, pod := range pods { - if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) { + includeTerminating := service.Spec.PublishNotReadyAddresses || utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition) + if !endpointutil.ShouldPodBeInEndpointSlice(pod, includeTerminating) { continue } diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index 4710348712b..608b7314450 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -32,13 +32,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" corelisters "k8s.io/client-go/listers/core/v1" k8stesting "k8s.io/client-go/testing" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" + "k8s.io/kubernetes/pkg/features" utilpointer "k8s.io/utils/pointer" ) @@ -95,7 +98,7 @@ func TestReconcile1Pod(t *testing.T) { dualStackSvc.Spec.ClusterIP = "10.0.0.10" dualStackSvc.Spec.ClusterIPs = []string{"10.0.0.10", "2000::1"} - pod1 := newPod(1, namespace, true, 1) + pod1 := newPod(1, namespace, true, 1, false) pod1.Status.PodIPs = []corev1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}} pod1.Spec.Hostname = "example-hostname" node1 := &corev1.Node{ @@ -114,6 +117,7 @@ func TestReconcile1Pod(t *testing.T) { expectedEndpoint discovery.Endpoint expectedLabels map[string]string expectedEndpointPerSlice map[discovery.AddressType][]discovery.Endpoint + terminatingGateEnabled bool }{ "no-family-service": { service: noFamilyService, @@ -140,7 +144,6 @@ func TestReconcile1Pod(t *testing.T) { discovery.LabelServiceName: "foo", }, }, - "ipv4": { service: svcv4, expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{ @@ -167,6 +170,37 @@ func TestReconcile1Pod(t *testing.T) { corev1.IsHeadlessService: "", }, }, + "ipv4-with-terminating-gate-enabled": { + service: svcv4, + expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{ + discovery.AddressTypeIPv4: { + { + Addresses: []string{"1.2.3.4"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Accepting: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{ + "kubernetes.io/hostname": "node-1", + "topology.kubernetes.io/zone": "us-central1-a", + "topology.kubernetes.io/region": "us-central1", + }, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Namespace: namespace, + Name: "pod1", + }, + }, + }, + }, + expectedLabels: map[string]string{ + discovery.LabelManagedBy: controllerName, + discovery.LabelServiceName: "foo", + corev1.IsHeadlessService: "", + }, + terminatingGateEnabled: true, + }, "ipv4-clusterip": { service: svcv4ClusterIP, expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{ @@ -389,6 +423,8 @@ func TestReconcile1Pod(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testCase.terminatingGateEnabled)() + client := newClientset() setupMetrics() triggerTime := time.Now() @@ -496,7 +532,7 @@ func TestReconcile1EndpointSlicePublishNotReadyAddresses(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 50; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) @@ -530,7 +566,7 @@ func TestReconcileManyPods(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 250; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) @@ -563,7 +599,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 250; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } // have approximately 1/4 in first slice @@ -619,7 +655,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 300; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } // have approximately 1/4 in first slice @@ -665,7 +701,7 @@ func TestReconcileEndpointSlicesUpdating(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 250; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) @@ -698,7 +734,7 @@ func TestReconcileEndpointSlicesServicesLabelsUpdating(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 250; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) @@ -742,7 +778,7 @@ func TestReconcileEndpointSlicesServicesReservedLabels(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 250; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) @@ -776,7 +812,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 300; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } // generate 10 existing slices with 30 pods/endpoints each @@ -827,7 +863,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc) for i := 0; i < 80; i++ { - pod := newPod(i, namespace, true, 1) + pod := newPod(i, namespace, true, 1, false) slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc, discovery.AddressTypeIPv4)) pods = append(pods, pod) } @@ -835,7 +871,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc) for i := 100; i < 120; i++ { - pod := newPod(i, namespace, true, 1) + pod := newPod(i, namespace, true, 1, false) slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc, discovery.AddressTypeIPv4)) pods = append(pods, pod) } @@ -856,7 +892,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { // add a few additional endpoints - no more than could fit in either slice. for i := 200; i < 215; i++ { - pods = append(pods, newPod(i, namespace, true, 1)) + pods = append(pods, newPod(i, namespace, true, 1, false)) } r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) @@ -888,7 +924,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) { slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc) for i := 0; i < 80; i++ { - pod := newPod(i, namespace, true, 1) + pod := newPod(i, namespace, true, 1, false) slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4)) pods = append(pods, pod) } @@ -896,7 +932,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) { slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc) for i := 100; i < 150; i++ { - pod := newPod(i, namespace, true, 1) + pod := newPod(i, namespace, true, 1, false) slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4)) pods = append(pods, pod) } @@ -955,7 +991,7 @@ func TestReconcileEndpointSlicesRecreation(t *testing.T) { svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace) slice := newEmptyEndpointSlice(1, namespace, endpointMeta, svc) - pod := newPod(1, namespace, true, 1) + pod := newPod(1, namespace, true, 1, false) slice.Endpoints = append(slice.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4)) if !tc.ownedByService { @@ -1023,7 +1059,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { for i := 0; i < 300; i++ { ready := !(i%3 == 0) portOffset := i % 5 - pod := newPod(i, namespace, ready, 1) + pod := newPod(i, namespace, ready, 1, false) pod.Spec.Containers[0].Ports = []corev1.ContainerPort{{ Name: portNameIntStr.StrVal, ContainerPort: int32(8080 + portOffset), @@ -1073,7 +1109,7 @@ func TestReconcileMaxEndpointsPerSlice(t *testing.T) { pods := []*corev1.Pod{} for i := 0; i < 250; i++ { ready := !(i%3 == 0) - pods = append(pods, newPod(i, namespace, ready, 1)) + pods = append(pods, newPod(i, namespace, ready, 1, false)) } testCases := []struct { @@ -1125,7 +1161,7 @@ func TestReconcileEndpointSlicesMetrics(t *testing.T) { // start with 20 pods pods := []*corev1.Pod{} for i := 0; i < 20; i++ { - pods = append(pods, newPod(i, namespace, true, 1)) + pods = append(pods, newPod(i, namespace, true, 1, false)) } r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) diff --git a/pkg/controller/endpointslice/utils.go b/pkg/controller/endpointslice/utils.go index 83cfc028633..03aa71da8dc 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/pkg/controller/endpointslice/utils.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -34,6 +35,7 @@ import ( helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/apis/discovery/validation" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" + "k8s.io/kubernetes/pkg/features" utilnet "k8s.io/utils/net" ) @@ -60,7 +62,11 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service, } } - ready := service.Spec.PublishNotReadyAddresses || podutil.IsPodReady(pod) + accepting := podutil.IsPodReady(pod) + terminating := pod.DeletionTimestamp != nil + // For compatibility reasons, "ready" should never be "true" if a pod is terminatng, unless + // publishNotReadyAddresses was set. + ready := service.Spec.PublishNotReadyAddresses || (accepting && !terminating) ep := discovery.Endpoint{ Addresses: getEndpointAddresses(pod.Status, service, addressType), Conditions: discovery.EndpointConditions{ @@ -76,6 +82,11 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service, }, } + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition) { + ep.Conditions.Accepting = &accepting + ep.Conditions.Terminating = &terminating + } + if endpointutil.ShouldSetHostname(pod, service) { ep.Hostname = &pod.Spec.Hostname } diff --git a/pkg/controller/endpointslice/utils_test.go b/pkg/controller/endpointslice/utils_test.go index a84c8caa37d..e42fb53c280 100644 --- a/pkg/controller/endpointslice/utils_test.go +++ b/pkg/controller/endpointslice/utils_test.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -30,8 +31,11 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" utilpointer "k8s.io/utils/pointer" ) @@ -219,13 +223,15 @@ func TestPodToEndpoint(t *testing.T) { svcPublishNotReady, _ := newServiceAndEndpointMeta("publishnotready", ns) svcPublishNotReady.Spec.PublishNotReadyAddresses = true - readyPod := newPod(1, ns, true, 1) - readyPodHostname := newPod(1, ns, true, 1) + readyPod := newPod(1, ns, true, 1, false) + readyTerminatingPod := newPod(1, ns, true, 1, true) + readyPodHostname := newPod(1, ns, true, 1, false) readyPodHostname.Spec.Subdomain = svc.Name readyPodHostname.Spec.Hostname = "example-hostname" - unreadyPod := newPod(1, ns, false, 1) - multiIPPod := newPod(1, ns, true, 1) + unreadyPod := newPod(1, ns, false, 1, false) + unreadyTerminatingPod := newPod(1, ns, false, 1, true) + multiIPPod := newPod(1, ns, true, 1, false) multiIPPod.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}} node1 := &v1.Node{ @@ -245,6 +251,7 @@ func TestPodToEndpoint(t *testing.T) { svc *v1.Service expectedEndpoint discovery.Endpoint publishNotReadyAddresses bool + terminatingGateEnabled bool }{ { name: "Ready pod", @@ -381,13 +388,121 @@ func TestPodToEndpoint(t *testing.T) { }, }, }, + { + name: "Ready pod, terminating gate enabled", + pod: readyPod, + svc: &svc, + expectedEndpoint: discovery.Endpoint{ + Addresses: []string{"1.2.3.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Accepting: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "node-1"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: ns, + Name: readyPod.Name, + UID: readyPod.UID, + ResourceVersion: readyPod.ResourceVersion, + }, + }, + terminatingGateEnabled: true, + }, + { + name: "Ready terminating pod, terminating gate disabled", + pod: readyTerminatingPod, + svc: &svc, + expectedEndpoint: discovery.Endpoint{ + Addresses: []string{"1.2.3.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "node-1"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: ns, + Name: readyPod.Name, + UID: readyPod.UID, + ResourceVersion: readyPod.ResourceVersion, + }, + }, + terminatingGateEnabled: false, + }, + { + name: "Ready terminating pod, terminating gate enabled", + pod: readyTerminatingPod, + svc: &svc, + expectedEndpoint: discovery.Endpoint{ + Addresses: []string{"1.2.3.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Accepting: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": "node-1"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: ns, + Name: readyPod.Name, + UID: readyPod.UID, + ResourceVersion: readyPod.ResourceVersion, + }, + }, + terminatingGateEnabled: true, + }, + { + name: "Not ready terminating pod, terminating gate disabled", + pod: unreadyTerminatingPod, + svc: &svc, + expectedEndpoint: discovery.Endpoint{ + Addresses: []string{"1.2.3.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + }, + Topology: map[string]string{"kubernetes.io/hostname": "node-1"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: ns, + Name: readyPod.Name, + UID: readyPod.UID, + ResourceVersion: readyPod.ResourceVersion, + }, + }, + terminatingGateEnabled: false, + }, + { + name: "Not ready terminating pod, terminating gate enabled", + pod: unreadyTerminatingPod, + svc: &svc, + expectedEndpoint: discovery.Endpoint{ + Addresses: []string{"1.2.3.5"}, + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(false), + Accepting: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(true), + }, + Topology: map[string]string{"kubernetes.io/hostname": "node-1"}, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: ns, + Name: readyPod.Name, + UID: readyPod.UID, + ResourceVersion: readyPod.ResourceVersion, + }, + }, + terminatingGateEnabled: true, + }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testCase.terminatingGateEnabled)() + endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.svc, discovery.AddressTypeIPv4) if !reflect.DeepEqual(testCase.expectedEndpoint, endpoint) { - t.Errorf("Expected endpoint: %v, got: %v", testCase.expectedEndpoint, endpoint) + t.Errorf("Expected endpoint: %+v, got: %+v", testCase.expectedEndpoint, endpoint) } }) } @@ -811,18 +926,26 @@ func TestSetEndpointSliceLabels(t *testing.T) { // Test helpers -func newPod(n int, namespace string, ready bool, nPorts int) *v1.Pod { +func newPod(n int, namespace string, ready bool, nPorts int, terminating bool) *v1.Pod { status := v1.ConditionTrue if !ready { status = v1.ConditionFalse } + var deletionTimestamp *metav1.Time + if terminating { + deletionTimestamp = &metav1.Time{ + Time: time.Now(), + } + } + p := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ - Namespace: namespace, - Name: fmt.Sprintf("pod%d", n), - Labels: map[string]string{"foo": "bar"}, + Namespace: namespace, + Name: fmt.Sprintf("pod%d", n), + Labels: map[string]string{"foo": "bar"}, + DeletionTimestamp: deletionTimestamp, }, Spec: v1.PodSpec{ Containers: []v1.Container{{ diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index d1744441bec..bd57d46a76a 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -122,14 +122,14 @@ func DeepHashObjectToString(objectToWrite interface{}) string { return hex.EncodeToString(hasher.Sum(nil)[0:]) } -// ShouldPodBeInEndpoints returns true if a specified pod should be in an -// endpoints object. Terminating pods are only included if publishNotReady is true. -func ShouldPodBeInEndpoints(pod *v1.Pod, publishNotReady bool) bool { +// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an EndpointSlice object. +// Terminating pods are only included if includeTerminating is true +func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool { if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 { return false } - if !publishNotReady && pod.DeletionTimestamp != nil { + if !includeTerminating && pod.DeletionTimestamp != nil { return false } diff --git a/pkg/controller/util/endpoint/controller_utils_test.go b/pkg/controller/util/endpoint/controller_utils_test.go index df118e0a7af..85d20544dd3 100644 --- a/pkg/controller/util/endpoint/controller_utils_test.go +++ b/pkg/controller/util/endpoint/controller_utils_test.go @@ -100,12 +100,12 @@ func TestDetermineNeededServiceUpdates(t *testing.T) { // There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). // Not listing them all here. Just listing all of the 3 false cases and 3 of the // 12 true cases. -func TestShouldPodBeInEndpoints(t *testing.T) { +func TestShouldPodBeInEndpointSlice(t *testing.T) { testCases := []struct { - name string - pod *v1.Pod - publishNotReady bool - expected bool + name string + pod *v1.Pod + expected bool + includeTerminating bool }{ // Pod should not be in endpoints: { @@ -162,7 +162,7 @@ func TestShouldPodBeInEndpoints(t *testing.T) { expected: false, }, { - name: "Terminating Pod", + name: "Terminating Pod with includeTerminating=false", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &metav1.Time{ @@ -175,8 +175,7 @@ func TestShouldPodBeInEndpoints(t *testing.T) { PodIP: "1.2.3.4", }, }, - publishNotReady: false, - expected: false, + expected: false, }, // Pod should be in endpoints: { @@ -245,7 +244,7 @@ func TestShouldPodBeInEndpoints(t *testing.T) { expected: true, }, { - name: "Terminating Pod with publish not ready", + name: "Terminating Pod with includeTerminating=true", pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ DeletionTimestamp: &metav1.Time{ @@ -258,14 +257,14 @@ func TestShouldPodBeInEndpoints(t *testing.T) { PodIP: "1.2.3.4", }, }, - publishNotReady: true, - expected: true, + expected: true, + includeTerminating: true, }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - result := ShouldPodBeInEndpoints(test.pod, test.publishNotReady) + result := ShouldPodBeInEndpointSlice(test.pod, test.includeTerminating) if result != test.expected { t.Errorf("expected: %t, got: %t", test.expected, result) }