mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
endpointslice controller: set new conditions 'accepting' and 'terminating'
Signed-off-by: Andrew Sy Kim <kim.andrewsy@gmail.com>
This commit is contained in:
parent
9a7c3c4c34
commit
1c603e90ef
@ -19,6 +19,7 @@ go_library(
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/endpointslice/metrics:go_default_library",
|
||||
"//pkg/controller/util/endpoint:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||
@ -31,6 +32,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/discovery/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
@ -61,6 +63,7 @@ go_test(
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/endpointslice/metrics:go_default_library",
|
||||
"//pkg/controller/util/endpoint:go_default_library",
|
||||
"//pkg/features:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||
@ -71,11 +74,13 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/rand:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/utils/pointer:go_default_library",
|
||||
|
@ -186,11 +186,11 @@ func TestSyncServicePodSelection(t *testing.T) {
|
||||
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
||||
ns := metav1.NamespaceDefault
|
||||
|
||||
pod1 := newPod(1, ns, true, 0)
|
||||
pod1 := newPod(1, ns, true, 0, false)
|
||||
esController.podStore.Add(pod1)
|
||||
|
||||
// ensure this pod will not match the selector
|
||||
pod2 := newPod(2, ns, true, 0)
|
||||
pod2 := newPod(2, ns, true, 0, false)
|
||||
pod2.Labels["foo"] = "boo"
|
||||
esController.podStore.Add(pod2)
|
||||
|
||||
@ -341,11 +341,11 @@ func TestSyncServiceFull(t *testing.T) {
|
||||
namespace := metav1.NamespaceDefault
|
||||
serviceName := "all-the-protocols"
|
||||
|
||||
pod1 := newPod(1, namespace, true, 0)
|
||||
pod1 := newPod(1, namespace, true, 0, false)
|
||||
pod1.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}}
|
||||
esController.podStore.Add(pod1)
|
||||
|
||||
pod2 := newPod(2, namespace, true, 0)
|
||||
pod2 := newPod(2, namespace, true, 0, false)
|
||||
pod2.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.5"}, {IP: "1234::5678:0000:0000:9abc:def0"}}
|
||||
esController.podStore.Add(pod2)
|
||||
|
||||
@ -499,7 +499,7 @@ func TestPodAddsBatching(t *testing.T) {
|
||||
for i, add := range tc.adds {
|
||||
time.Sleep(add.delay)
|
||||
|
||||
p := newPod(i, ns, true, 0)
|
||||
p := newPod(i, ns, true, 0, false)
|
||||
esController.podStore.Add(p)
|
||||
esController.addPod(p)
|
||||
}
|
||||
@ -789,7 +789,7 @@ func TestPodDeleteBatching(t *testing.T) {
|
||||
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
|
||||
t.Helper()
|
||||
for i := 0; i < podsCount; i++ {
|
||||
pod := newPod(i, namespace, true, 0)
|
||||
pod := newPod(i, namespace, true, 0, false)
|
||||
esController.podStore.Add(pod)
|
||||
}
|
||||
}
|
||||
|
@ -29,10 +29,12 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
|
||||
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
// reconciler is responsible for transforming current EndpointSlice state into
|
||||
@ -136,7 +138,8 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
|
||||
numDesiredEndpoints := 0
|
||||
|
||||
for _, pod := range pods {
|
||||
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
|
||||
includeTerminating := service.Spec.PublishNotReadyAddresses || utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition)
|
||||
if !endpointutil.ShouldPodBeInEndpointSlice(pod, includeTerminating) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -32,13 +32,16 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
k8stesting "k8s.io/client-go/testing"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
@ -95,7 +98,7 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
dualStackSvc.Spec.ClusterIP = "10.0.0.10"
|
||||
dualStackSvc.Spec.ClusterIPs = []string{"10.0.0.10", "2000::1"}
|
||||
|
||||
pod1 := newPod(1, namespace, true, 1)
|
||||
pod1 := newPod(1, namespace, true, 1, false)
|
||||
pod1.Status.PodIPs = []corev1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}}
|
||||
pod1.Spec.Hostname = "example-hostname"
|
||||
node1 := &corev1.Node{
|
||||
@ -114,6 +117,7 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
expectedEndpoint discovery.Endpoint
|
||||
expectedLabels map[string]string
|
||||
expectedEndpointPerSlice map[discovery.AddressType][]discovery.Endpoint
|
||||
terminatingGateEnabled bool
|
||||
}{
|
||||
"no-family-service": {
|
||||
service: noFamilyService,
|
||||
@ -140,7 +144,6 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
discovery.LabelServiceName: "foo",
|
||||
},
|
||||
},
|
||||
|
||||
"ipv4": {
|
||||
service: svcv4,
|
||||
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
|
||||
@ -167,6 +170,37 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
corev1.IsHeadlessService: "",
|
||||
},
|
||||
},
|
||||
"ipv4-with-terminating-gate-enabled": {
|
||||
service: svcv4,
|
||||
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
|
||||
discovery.AddressTypeIPv4: {
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Accepting: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
Name: "pod1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedLabels: map[string]string{
|
||||
discovery.LabelManagedBy: controllerName,
|
||||
discovery.LabelServiceName: "foo",
|
||||
corev1.IsHeadlessService: "",
|
||||
},
|
||||
terminatingGateEnabled: true,
|
||||
},
|
||||
"ipv4-clusterip": {
|
||||
service: svcv4ClusterIP,
|
||||
expectedEndpointPerSlice: map[discovery.AddressType][]discovery.Endpoint{
|
||||
@ -389,6 +423,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
|
||||
for name, testCase := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testCase.terminatingGateEnabled)()
|
||||
|
||||
client := newClientset()
|
||||
setupMetrics()
|
||||
triggerTime := time.Now()
|
||||
@ -496,7 +532,7 @@ func TestReconcile1EndpointSlicePublishNotReadyAddresses(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 50; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
@ -530,7 +566,7 @@ func TestReconcileManyPods(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 250; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
@ -563,7 +599,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 250; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
// have approximately 1/4 in first slice
|
||||
@ -619,7 +655,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 300; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
// have approximately 1/4 in first slice
|
||||
@ -665,7 +701,7 @@ func TestReconcileEndpointSlicesUpdating(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 250; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
@ -698,7 +734,7 @@ func TestReconcileEndpointSlicesServicesLabelsUpdating(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 250; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
@ -742,7 +778,7 @@ func TestReconcileEndpointSlicesServicesReservedLabels(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 250; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
@ -776,7 +812,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 300; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
// generate 10 existing slices with 30 pods/endpoints each
|
||||
@ -827,7 +863,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
|
||||
|
||||
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
for i := 0; i < 80; i++ {
|
||||
pod := newPod(i, namespace, true, 1)
|
||||
pod := newPod(i, namespace, true, 1, false)
|
||||
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
@ -835,7 +871,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
|
||||
|
||||
slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
|
||||
for i := 100; i < 120; i++ {
|
||||
pod := newPod(i, namespace, true, 1)
|
||||
pod := newPod(i, namespace, true, 1, false)
|
||||
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &svc, discovery.AddressTypeIPv4))
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
@ -856,7 +892,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
|
||||
|
||||
// add a few additional endpoints - no more than could fit in either slice.
|
||||
for i := 200; i < 215; i++ {
|
||||
pods = append(pods, newPod(i, namespace, true, 1))
|
||||
pods = append(pods, newPod(i, namespace, true, 1, false))
|
||||
}
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
@ -888,7 +924,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
|
||||
|
||||
slice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
for i := 0; i < 80; i++ {
|
||||
pod := newPod(i, namespace, true, 1)
|
||||
pod := newPod(i, namespace, true, 1, false)
|
||||
slice1.Endpoints = append(slice1.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4))
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
@ -896,7 +932,7 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
|
||||
|
||||
slice2 := newEmptyEndpointSlice(2, namespace, endpointMeta, svc)
|
||||
for i := 100; i < 150; i++ {
|
||||
pod := newPod(i, namespace, true, 1)
|
||||
pod := newPod(i, namespace, true, 1, false)
|
||||
slice2.Endpoints = append(slice2.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4))
|
||||
pods = append(pods, pod)
|
||||
}
|
||||
@ -955,7 +991,7 @@ func TestReconcileEndpointSlicesRecreation(t *testing.T) {
|
||||
svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
|
||||
pod := newPod(1, namespace, true, 1)
|
||||
pod := newPod(1, namespace, true, 1, false)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}, discovery.AddressTypeIPv4))
|
||||
|
||||
if !tc.ownedByService {
|
||||
@ -1023,7 +1059,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
|
||||
for i := 0; i < 300; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
portOffset := i % 5
|
||||
pod := newPod(i, namespace, ready, 1)
|
||||
pod := newPod(i, namespace, ready, 1, false)
|
||||
pod.Spec.Containers[0].Ports = []corev1.ContainerPort{{
|
||||
Name: portNameIntStr.StrVal,
|
||||
ContainerPort: int32(8080 + portOffset),
|
||||
@ -1073,7 +1109,7 @@ func TestReconcileMaxEndpointsPerSlice(t *testing.T) {
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 250; i++ {
|
||||
ready := !(i%3 == 0)
|
||||
pods = append(pods, newPod(i, namespace, ready, 1))
|
||||
pods = append(pods, newPod(i, namespace, ready, 1, false))
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
@ -1125,7 +1161,7 @@ func TestReconcileEndpointSlicesMetrics(t *testing.T) {
|
||||
// start with 20 pods
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 20; i++ {
|
||||
pods = append(pods, newPod(i, namespace, true, 1))
|
||||
pods = append(pods, newPod(i, namespace, true, 1, false))
|
||||
}
|
||||
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
@ -34,6 +35,7 @@ import (
|
||||
helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
"k8s.io/kubernetes/pkg/apis/discovery/validation"
|
||||
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
utilnet "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
@ -60,7 +62,11 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service,
|
||||
}
|
||||
}
|
||||
|
||||
ready := service.Spec.PublishNotReadyAddresses || podutil.IsPodReady(pod)
|
||||
accepting := podutil.IsPodReady(pod)
|
||||
terminating := pod.DeletionTimestamp != nil
|
||||
// For compatibility reasons, "ready" should never be "true" if a pod is terminatng, unless
|
||||
// publishNotReadyAddresses was set.
|
||||
ready := service.Spec.PublishNotReadyAddresses || (accepting && !terminating)
|
||||
ep := discovery.Endpoint{
|
||||
Addresses: getEndpointAddresses(pod.Status, service, addressType),
|
||||
Conditions: discovery.EndpointConditions{
|
||||
@ -76,6 +82,11 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service,
|
||||
},
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition) {
|
||||
ep.Conditions.Accepting = &accepting
|
||||
ep.Conditions.Terminating = &terminating
|
||||
}
|
||||
|
||||
if endpointutil.ShouldSetHostname(pod, service) {
|
||||
ep.Hostname = &pod.Spec.Hostname
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@ -30,8 +31,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
k8stesting "k8s.io/client-go/testing"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
@ -219,13 +223,15 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
svcPublishNotReady, _ := newServiceAndEndpointMeta("publishnotready", ns)
|
||||
svcPublishNotReady.Spec.PublishNotReadyAddresses = true
|
||||
|
||||
readyPod := newPod(1, ns, true, 1)
|
||||
readyPodHostname := newPod(1, ns, true, 1)
|
||||
readyPod := newPod(1, ns, true, 1, false)
|
||||
readyTerminatingPod := newPod(1, ns, true, 1, true)
|
||||
readyPodHostname := newPod(1, ns, true, 1, false)
|
||||
readyPodHostname.Spec.Subdomain = svc.Name
|
||||
readyPodHostname.Spec.Hostname = "example-hostname"
|
||||
|
||||
unreadyPod := newPod(1, ns, false, 1)
|
||||
multiIPPod := newPod(1, ns, true, 1)
|
||||
unreadyPod := newPod(1, ns, false, 1, false)
|
||||
unreadyTerminatingPod := newPod(1, ns, false, 1, true)
|
||||
multiIPPod := newPod(1, ns, true, 1, false)
|
||||
multiIPPod.Status.PodIPs = []v1.PodIP{{IP: "1.2.3.4"}, {IP: "1234::5678:0000:0000:9abc:def0"}}
|
||||
|
||||
node1 := &v1.Node{
|
||||
@ -245,6 +251,7 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
svc *v1.Service
|
||||
expectedEndpoint discovery.Endpoint
|
||||
publishNotReadyAddresses bool
|
||||
terminatingGateEnabled bool
|
||||
}{
|
||||
{
|
||||
name: "Ready pod",
|
||||
@ -381,13 +388,121 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Ready pod, terminating gate enabled",
|
||||
pod: readyPod,
|
||||
svc: &svc,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
Accepting: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
Name: readyPod.Name,
|
||||
UID: readyPod.UID,
|
||||
ResourceVersion: readyPod.ResourceVersion,
|
||||
},
|
||||
},
|
||||
terminatingGateEnabled: true,
|
||||
},
|
||||
{
|
||||
name: "Ready terminating pod, terminating gate disabled",
|
||||
pod: readyTerminatingPod,
|
||||
svc: &svc,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
Name: readyPod.Name,
|
||||
UID: readyPod.UID,
|
||||
ResourceVersion: readyPod.ResourceVersion,
|
||||
},
|
||||
},
|
||||
terminatingGateEnabled: false,
|
||||
},
|
||||
{
|
||||
name: "Ready terminating pod, terminating gate enabled",
|
||||
pod: readyTerminatingPod,
|
||||
svc: &svc,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Accepting: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
Name: readyPod.Name,
|
||||
UID: readyPod.UID,
|
||||
ResourceVersion: readyPod.ResourceVersion,
|
||||
},
|
||||
},
|
||||
terminatingGateEnabled: true,
|
||||
},
|
||||
{
|
||||
name: "Not ready terminating pod, terminating gate disabled",
|
||||
pod: unreadyTerminatingPod,
|
||||
svc: &svc,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
Name: readyPod.Name,
|
||||
UID: readyPod.UID,
|
||||
ResourceVersion: readyPod.ResourceVersion,
|
||||
},
|
||||
},
|
||||
terminatingGateEnabled: false,
|
||||
},
|
||||
{
|
||||
name: "Not ready terminating pod, terminating gate enabled",
|
||||
pod: unreadyTerminatingPod,
|
||||
svc: &svc,
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
Accepting: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
Name: readyPod.Name,
|
||||
UID: readyPod.UID,
|
||||
ResourceVersion: readyPod.ResourceVersion,
|
||||
},
|
||||
},
|
||||
terminatingGateEnabled: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceTerminatingCondition, testCase.terminatingGateEnabled)()
|
||||
|
||||
endpoint := podToEndpoint(testCase.pod, testCase.node, testCase.svc, discovery.AddressTypeIPv4)
|
||||
if !reflect.DeepEqual(testCase.expectedEndpoint, endpoint) {
|
||||
t.Errorf("Expected endpoint: %v, got: %v", testCase.expectedEndpoint, endpoint)
|
||||
t.Errorf("Expected endpoint: %+v, got: %+v", testCase.expectedEndpoint, endpoint)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -811,18 +926,26 @@ func TestSetEndpointSliceLabels(t *testing.T) {
|
||||
|
||||
// Test helpers
|
||||
|
||||
func newPod(n int, namespace string, ready bool, nPorts int) *v1.Pod {
|
||||
func newPod(n int, namespace string, ready bool, nPorts int, terminating bool) *v1.Pod {
|
||||
status := v1.ConditionTrue
|
||||
if !ready {
|
||||
status = v1.ConditionFalse
|
||||
}
|
||||
|
||||
var deletionTimestamp *metav1.Time
|
||||
if terminating {
|
||||
deletionTimestamp = &metav1.Time{
|
||||
Time: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
p := &v1.Pod{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: "v1"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
Name: fmt.Sprintf("pod%d", n),
|
||||
Labels: map[string]string{"foo": "bar"},
|
||||
Namespace: namespace,
|
||||
Name: fmt.Sprintf("pod%d", n),
|
||||
Labels: map[string]string{"foo": "bar"},
|
||||
DeletionTimestamp: deletionTimestamp,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{{
|
||||
|
@ -122,14 +122,14 @@ func DeepHashObjectToString(objectToWrite interface{}) string {
|
||||
return hex.EncodeToString(hasher.Sum(nil)[0:])
|
||||
}
|
||||
|
||||
// ShouldPodBeInEndpoints returns true if a specified pod should be in an
|
||||
// endpoints object. Terminating pods are only included if publishNotReady is true.
|
||||
func ShouldPodBeInEndpoints(pod *v1.Pod, publishNotReady bool) bool {
|
||||
// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an EndpointSlice object.
|
||||
// Terminating pods are only included if includeTerminating is true
|
||||
func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool {
|
||||
if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
if !publishNotReady && pod.DeletionTimestamp != nil {
|
||||
if !includeTerminating && pod.DeletionTimestamp != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -100,12 +100,12 @@ func TestDetermineNeededServiceUpdates(t *testing.T) {
|
||||
// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase).
|
||||
// Not listing them all here. Just listing all of the 3 false cases and 3 of the
|
||||
// 12 true cases.
|
||||
func TestShouldPodBeInEndpoints(t *testing.T) {
|
||||
func TestShouldPodBeInEndpointSlice(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
publishNotReady bool
|
||||
expected bool
|
||||
name string
|
||||
pod *v1.Pod
|
||||
expected bool
|
||||
includeTerminating bool
|
||||
}{
|
||||
// Pod should not be in endpoints:
|
||||
{
|
||||
@ -162,7 +162,7 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "Terminating Pod",
|
||||
name: "Terminating Pod with includeTerminating=false",
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
DeletionTimestamp: &metav1.Time{
|
||||
@ -175,8 +175,7 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
|
||||
PodIP: "1.2.3.4",
|
||||
},
|
||||
},
|
||||
publishNotReady: false,
|
||||
expected: false,
|
||||
expected: false,
|
||||
},
|
||||
// Pod should be in endpoints:
|
||||
{
|
||||
@ -245,7 +244,7 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "Terminating Pod with publish not ready",
|
||||
name: "Terminating Pod with includeTerminating=true",
|
||||
pod: &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
DeletionTimestamp: &metav1.Time{
|
||||
@ -258,14 +257,14 @@ func TestShouldPodBeInEndpoints(t *testing.T) {
|
||||
PodIP: "1.2.3.4",
|
||||
},
|
||||
},
|
||||
publishNotReady: true,
|
||||
expected: true,
|
||||
expected: true,
|
||||
includeTerminating: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
result := ShouldPodBeInEndpoints(test.pod, test.publishNotReady)
|
||||
result := ShouldPodBeInEndpointSlice(test.pod, test.includeTerminating)
|
||||
if result != test.expected {
|
||||
t.Errorf("expected: %t, got: %t", test.expected, result)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user