diff --git a/pkg/api/v1/pod/util.go b/pkg/api/v1/pod/util.go index 9560121bbe5..4ae77d543a2 100644 --- a/pkg/api/v1/pod/util.go +++ b/pkg/api/v1/pod/util.go @@ -301,6 +301,16 @@ func IsPodReady(pod *v1.Pod) bool { return IsPodReadyConditionTrue(pod.Status) } +// IsPodTerminal returns true if a pod is terminal, all containers are stopped and cannot ever regress. +func IsPodTerminal(pod *v1.Pod) bool { + return IsPodPhaseTerminal(pod.Status.Phase) +} + +// IsPhaseTerminal returns true if the pod's phase is terminal. +func IsPodPhaseTerminal(phase v1.PodPhase) bool { + return phase == v1.PodFailed || phase == v1.PodSucceeded +} + // IsPodReadyConditionTrue returns true if a pod is ready; false otherwise. func IsPodReadyConditionTrue(status v1.PodStatus) bool { condition := GetPodReadyCondition(status) diff --git a/pkg/api/v1/pod/util_test.go b/pkg/api/v1/pod/util_test.go index eb0425cf44a..78b1b92f376 100644 --- a/pkg/api/v1/pod/util_test.go +++ b/pkg/api/v1/pod/util_test.go @@ -749,6 +749,48 @@ func TestIsPodAvailable(t *testing.T) { } } +func TestIsPodTerminal(t *testing.T) { + now := metav1.Now() + + tests := []struct { + podPhase v1.PodPhase + expected bool + }{ + { + podPhase: v1.PodFailed, + expected: true, + }, + { + podPhase: v1.PodSucceeded, + expected: true, + }, + { + podPhase: v1.PodUnknown, + expected: false, + }, + { + podPhase: v1.PodPending, + expected: false, + }, + { + podPhase: v1.PodRunning, + expected: false, + }, + { + expected: false, + }, + } + + for i, test := range tests { + pod := newPod(now, true, 0) + pod.Status.Phase = test.podPhase + isTerminal := IsPodTerminal(pod) + if isTerminal != test.expected { + t.Errorf("[tc #%d] expected terminal pod: %t, got: %t", i, test.expected, isTerminal) + } + } +} + func TestGetContainerStatus(t *testing.T) { type ExpectedStruct struct { status v1.ContainerStatus diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 209cad19cfe..61df9eac35a 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -402,9 +402,6 @@ func (e *Controller) syncService(ctx context.Context, key string) error { return err } - // If the user specified the older (deprecated) annotation, we have to respect it. - tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses - // We call ComputeEndpointLastChangeTriggerTime here to make sure that the // state of the trigger time tracker gets updated even if the sync turns out // to be no-op and we don't update the endpoints object. @@ -416,12 +413,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error { var totalNotReadyEps int for _, pod := range pods { - if len(pod.Status.PodIP) == 0 { - klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) - continue - } - if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil { - klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name) + if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) { + klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name) continue } @@ -441,7 +434,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // Allow headless service not to have ports. if len(service.Spec.Ports) == 0 { if service.Spec.ClusterIP == api.ClusterIPNone { - subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints) + subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) // No need to repack subsets for headless service without ports. } } else { @@ -455,7 +448,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { epp := endpointPortFromServicePort(servicePort, portNum) var readyEps, notReadyEps int - subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints) + subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses) totalReadyEps = totalReadyEps + readyEps totalNotReadyEps = totalNotReadyEps + notReadyEps } @@ -591,6 +584,10 @@ func (e *Controller) checkLeftoverEndpoints() { } } +// addEndpointSubset add the endpoints addresses and ports to the EndpointSubset. +// The addresses are added to the corresponding field, ready or not ready, depending +// on the pod status and the Service PublishNotReadyAddresses field value. +// The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints. func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) { var readyEps int @@ -605,7 +602,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint Ports: ports, }) readyEps++ - } else if shouldPodBeInEndpoints(pod) { + } else { // if it is not a ready address it has to be not ready klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name) subsets = append(subsets, v1.EndpointSubset{ NotReadyAddresses: []v1.EndpointAddress{epa}, @@ -616,17 +613,6 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint return subsets, readyEps, notReadyEps } -func shouldPodBeInEndpoints(pod *v1.Pod) bool { - switch pod.Spec.RestartPolicy { - case v1.RestartPolicyNever: - return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded - case v1.RestartPolicyOnFailure: - return pod.Status.Phase != v1.PodSucceeded - default: - return true - } -} - func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort { epp := &v1.EndpointPort{ Name: servicePort.Name, diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 72b7f6813dd..a6514701ab4 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -1155,97 +1155,6 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data) } -// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here. -// Just list all of the 3 false cases and 3 of the 12 true cases. -func TestShouldPodBeInEndpoints(t *testing.T) { - testCases := []struct { - name string - pod *v1.Pod - expected bool - }{ - // Pod should not be in endpoints cases: - { - name: "Failed pod with Never RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - }, - Status: v1.PodStatus{ - Phase: v1.PodFailed, - }, - }, - expected: false, - }, - { - name: "Succeeded pod with Never RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - }, - Status: v1.PodStatus{ - Phase: v1.PodSucceeded, - }, - }, - expected: false, - }, - { - name: "Succeeded pod with OnFailure RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyOnFailure, - }, - Status: v1.PodStatus{ - Phase: v1.PodSucceeded, - }, - }, - expected: false, - }, - // Pod should be in endpoints cases: - { - name: "Failed pod with Always RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyAlways, - }, - Status: v1.PodStatus{ - Phase: v1.PodFailed, - }, - }, - expected: true, - }, - { - name: "Pending pod with Never RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - }, - Status: v1.PodStatus{ - Phase: v1.PodPending, - }, - }, - expected: true, - }, - { - name: "Unknown pod with OnFailure RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyOnFailure, - }, - Status: v1.PodStatus{ - Phase: v1.PodUnknown, - }, - }, - expected: true, - }, - } - for _, test := range testCases { - result := shouldPodBeInEndpoints(test.pod) - if result != test.expected { - t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result) - } - } -} - func TestPodToEndpointAddressForService(t *testing.T) { ipv4 := v1.IPv4Protocol ipv6 := v1.IPv6Protocol @@ -2313,6 +2222,235 @@ func TestMultipleServiceChanges(t *testing.T) { close(stopChan) } +func TestSyncServiceAddresses(t *testing.T) { + makeService := func(tolerateUnready bool) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + PublishNotReadyAddresses: tolerateUnready, + Type: v1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []v1.ServicePort{{Port: 80}}, + }, + } + } + + makePod := func(phase v1.PodPhase, isReady bool, terminating bool) *v1.Pod { + statusCondition := v1.ConditionFalse + if isReady { + statusCondition = v1.ConditionTrue + } + + now := metav1.Now() + deletionTimestamp := &now + if !terminating { + deletionTimestamp = nil + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "fakepod", + DeletionTimestamp: deletionTimestamp, + Labels: map[string]string{"foo": "bar"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{Ports: []v1.ContainerPort{ + {Name: "port1", ContainerPort: int32(8080)}, + }}}, + }, + Status: v1.PodStatus{ + Phase: phase, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: statusCondition, + }, + }, + PodIP: "10.1.1.1", + PodIPs: []v1.PodIP{ + {IP: "10.1.1.1"}, + }, + }, + } + } + + testCases := []struct { + name string + pod *v1.Pod + service *v1.Service + expectedReady int + expectedUnready int + }{ + { + name: "pod running phase", + pod: makePod(v1.PodRunning, true, false), + service: makeService(false), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod running phase being deleted", + pod: makePod(v1.PodRunning, true, true), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod unknown phase container ready", + pod: makePod(v1.PodUnknown, true, false), + service: makeService(false), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod unknown phase container ready being deleted", + pod: makePod(v1.PodUnknown, true, true), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod pending phase container ready", + pod: makePod(v1.PodPending, true, false), + service: makeService(false), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod pending phase container ready being deleted", + pod: makePod(v1.PodPending, true, true), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod unknown phase container not ready", + pod: makePod(v1.PodUnknown, false, false), + service: makeService(false), + expectedReady: 0, + expectedUnready: 1, + }, + { + name: "pod pending phase container not ready", + pod: makePod(v1.PodPending, false, false), + service: makeService(false), + expectedReady: 0, + expectedUnready: 1, + }, + { + name: "pod failed phase", + pod: makePod(v1.PodFailed, false, false), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod succeeded phase", + pod: makePod(v1.PodSucceeded, false, false), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod running phase and tolerate unready", + pod: makePod(v1.PodRunning, false, false), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod running phase and tolerate unready being deleted", + pod: makePod(v1.PodRunning, false, true), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod unknown phase and tolerate unready", + pod: makePod(v1.PodUnknown, false, false), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod unknown phase and tolerate unready being deleted", + pod: makePod(v1.PodUnknown, false, true), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod pending phase and tolerate unready", + pod: makePod(v1.PodPending, false, false), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod pending phase and tolerate unready being deleted", + pod: makePod(v1.PodPending, false, true), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod failed phase and tolerate unready", + pod: makePod(v1.PodFailed, false, false), + service: makeService(true), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod succeeded phase and tolerate unready endpoints", + pod: makePod(v1.PodSucceeded, false, false), + service: makeService(true), + expectedReady: 0, + expectedUnready: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ns := tc.service.Namespace + client, c := newFakeController(0 * time.Second) + + err := c.podStore.Add(tc.pod) + if err != nil { + t.Errorf("Unexpected error adding pod %v", err) + } + err = c.serviceStore.Add(tc.service) + if err != nil { + t.Errorf("Unexpected error adding service %v", err) + } + err = c.syncService(context.TODO(), fmt.Sprintf("%s/%s", ns, tc.service.Name)) + if err != nil { + t.Errorf("Unexpected error syncing service %v", err) + } + + endpoints, err := client.CoreV1().Endpoints(ns).Get(context.TODO(), tc.service.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + readyEndpoints := 0 + unreadyEndpoints := 0 + for _, subset := range endpoints.Subsets { + readyEndpoints += len(subset.Addresses) + unreadyEndpoints += len(subset.NotReadyAddresses) + } + + if tc.expectedReady != readyEndpoints { + t.Errorf("Expected %d ready endpoints, got %d", tc.expectedReady, readyEndpoints) + } + + if tc.expectedUnready != unreadyEndpoints { + t.Errorf("Expected %d ready endpoints, got %d", tc.expectedUnready, unreadyEndpoints) + } + }) + } +} + func TestEndpointsDeletionEvents(t *testing.T) { ns := metav1.NamespaceDefault testServer, _ := makeTestServer(t, ns) diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 4756a0d3448..9b91566f853 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -154,7 +154,7 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor for _, pod := range pods { includeTerminating := service.Spec.PublishNotReadyAddresses || utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceTerminatingCondition) - if !endpointutil.ShouldPodBeInEndpointSlice(pod, includeTerminating) { + if !endpointutil.ShouldPodBeInEndpoints(pod, includeTerminating) { continue } diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index 58c38b3c3c1..addaccc9d68 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -135,9 +135,17 @@ func DeepHashObjectToString(objectToWrite interface{}) string { return hex.EncodeToString(hasher.Sum(nil)[0:]) } -// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an EndpointSlice object. -// Terminating pods are only included if includeTerminating is true -func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool { +// ShouldPodBeInEndpoints returns true if a specified pod should be in an +// Endpoints or EndpointSlice resource. Terminating pods are only included if +// includeTerminating is true. +func ShouldPodBeInEndpoints(pod *v1.Pod, includeTerminating bool) bool { + // "Terminal" describes when a Pod is complete (in a succeeded or failed phase). + // This is distinct from the "Terminating" condition which represents when a Pod + // is being terminated (metadata.deletionTimestamp is non nil). + if podutil.IsPodTerminal(pod) { + return false + } + if len(pod.Status.PodIP) == 0 && len(pod.Status.PodIPs) == 0 { return false } @@ -146,14 +154,6 @@ func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool { return false } - if pod.Spec.RestartPolicy == v1.RestartPolicyNever { - return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded - } - - if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure { - return pod.Status.Phase != v1.PodSucceeded - } - return true } diff --git a/pkg/controller/util/endpoint/controller_utils_test.go b/pkg/controller/util/endpoint/controller_utils_test.go index bf4d199dd95..e9656b0a8dd 100644 --- a/pkg/controller/util/endpoint/controller_utils_test.go +++ b/pkg/controller/util/endpoint/controller_utils_test.go @@ -99,10 +99,7 @@ func TestDetermineNeededServiceUpdates(t *testing.T) { } } -// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). -// Not listing them all here. Just listing all of the 3 false cases and 3 of the -// 12 true cases. -func TestShouldPodBeInEndpointSlice(t *testing.T) { +func TestShouldPodBeInEndpoints(t *testing.T) { testCases := []struct { name string pod *v1.Pod @@ -179,7 +176,6 @@ func TestShouldPodBeInEndpointSlice(t *testing.T) { }, expected: false, }, - // Pod should be in endpoints: { name: "Failed pod with Always RestartPolicy", pod: &v1.Pod{ @@ -191,8 +187,9 @@ func TestShouldPodBeInEndpointSlice(t *testing.T) { PodIP: "1.2.3.4", }, }, - expected: true, + expected: false, }, + // Pod should be in endpoints: { name: "Pending pod with Never RestartPolicy", pod: &v1.Pod{ @@ -266,7 +263,7 @@ func TestShouldPodBeInEndpointSlice(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - result := ShouldPodBeInEndpointSlice(test.pod, test.includeTerminating) + result := ShouldPodBeInEndpoints(test.pod, test.includeTerminating) if result != test.expected { t.Errorf("expected: %t, got: %t", test.expected, result) } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 377d44b4718..845137e0656 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -859,7 +859,7 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon // the Kubelet exclusively owns must be released prior to a pod being reported terminal, // while resources that have participanting components above the API use the pod's // transition to a terminal phase (or full deletion) to release those resources. - if !isPhaseTerminal(oldPodStatus.Phase) && isPhaseTerminal(newPodStatus.Phase) { + if !podutil.IsPodPhaseTerminal(oldPodStatus.Phase) && podutil.IsPodPhaseTerminal(newPodStatus.Phase) { if couldHaveRunningContainers { newPodStatus.Phase = oldPodStatus.Phase newPodStatus.Reason = oldPodStatus.Reason @@ -870,11 +870,6 @@ func mergePodStatus(oldPodStatus, newPodStatus v1.PodStatus, couldHaveRunningCon return newPodStatus } -// isPhaseTerminal returns true if the pod's phase is terminal. -func isPhaseTerminal(phase v1.PodPhase) bool { - return phase == v1.PodFailed || phase == v1.PodSucceeded -} - // NeedToReconcilePodReadiness returns if the pod "Ready" condition need to be reconcile func NeedToReconcilePodReadiness(pod *v1.Pod) bool { if len(pod.Spec.ReadinessGates) == 0 { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index e0d7e600984..fffef5398a2 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -33,6 +33,7 @@ import ( v1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" @@ -2040,6 +2041,97 @@ var _ = common.SIGDescribe("Services", func() { } }) + // regression test for https://issues.k8s.io/109414 and https://issues.k8s.io/109718 + ginkgo.It("should be rejected for evicted pods (no endpoints exist)", func() { + namespace := f.Namespace.Name + serviceName := "evicted-pods" + jig := e2eservice.NewTestJig(cs, namespace, serviceName) + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, e2eservice.MaxNodesForEndpointsTests) + framework.ExpectNoError(err) + nodeName := nodes.Items[0].Name + + port := 80 + + ginkgo.By("creating a service with no endpoints") + _, err = jig.CreateTCPServiceWithPort(func(s *v1.Service) { + // set publish not ready addresses to cover edge cases too + s.Spec.PublishNotReadyAddresses = true + }, int32(port)) + framework.ExpectNoError(err) + + // Create a pod in one node to get evicted + ginkgo.By("creating a client pod that is going to be evicted for the service " + serviceName) + evictedPod := e2epod.NewAgnhostPod(namespace, "evicted-pod", nil, nil, nil) + evictedPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "sleep 10; fallocate -l 10M file; sleep 10000"} + evictedPod.Spec.Containers[0].Name = "evicted-pod" + evictedPod.Spec.Containers[0].Resources = v1.ResourceRequirements{ + Limits: v1.ResourceList{"ephemeral-storage": resource.MustParse("5Mi")}, + } + f.PodClient().Create(evictedPod) + err = e2epod.WaitForPodTerminatedInNamespace(f.ClientSet, evictedPod.Name, "Evicted", f.Namespace.Name) + if err != nil { + framework.Failf("error waiting for pod to be evicted: %v", err) + } + + podName := "execpod-evictedpods" + ginkgo.By(fmt.Sprintf("creating %v on node %v", podName, nodeName)) + execPod := e2epod.CreateExecPodOrFail(f.ClientSet, namespace, podName, func(pod *v1.Pod) { + nodeSelection := e2epod.NodeSelection{Name: nodeName} + e2epod.SetNodeSelection(&pod.Spec, nodeSelection) + }) + + if epErr := wait.PollImmediate(framework.Poll, e2eservice.ServiceEndpointsTimeout, func() (bool, error) { + endpoints, err := cs.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) + if err != nil { + framework.Logf("error fetching '%s/%s' Endpoints: %s", namespace, serviceName, err.Error()) + return false, err + } + if len(endpoints.Subsets) > 0 { + framework.Logf("expected '%s/%s' Endpoints to be empty, got: %v", namespace, serviceName, endpoints.Subsets) + return false, nil + } + epsList, err := cs.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)}) + if err != nil { + framework.Logf("error fetching '%s/%s' EndpointSlices: %s", namespace, serviceName, err.Error()) + return false, err + } + if len(epsList.Items) != 1 { + framework.Logf("expected exactly 1 EndpointSlice, got: %d", len(epsList.Items)) + return false, nil + } + endpointSlice := epsList.Items[0] + if len(endpointSlice.Endpoints) > 0 { + framework.Logf("expected EndpointSlice to be empty, got %d endpoints", len(endpointSlice.Endpoints)) + return false, nil + } + return true, nil + }); epErr != nil { + framework.ExpectNoError(epErr) + } + + serviceAddress := net.JoinHostPort(serviceName, strconv.Itoa(port)) + framework.Logf("waiting up to %v to connect to %v", e2eservice.KubeProxyEndpointLagTimeout, serviceAddress) + cmd := fmt.Sprintf("/agnhost connect --timeout=3s %s", serviceAddress) + + ginkgo.By(fmt.Sprintf("hitting service %v from pod %v on node %v expected to be refused", serviceAddress, podName, nodeName)) + expectedErr := "REFUSED" + if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) { + _, err := framework.RunHostCmd(execPod.Namespace, execPod.Name, cmd) + + if err != nil { + if strings.Contains(err.Error(), expectedErr) { + framework.Logf("error contained '%s', as expected: %s", expectedErr, err.Error()) + return true, nil + } + framework.Logf("error didn't contain '%s', keep trying: %s", expectedErr, err.Error()) + return false, nil + } + return true, errors.New("expected connect call to fail") + }); pollErr != nil { + framework.ExpectNoError(pollErr) + } + }) + ginkgo.It("should respect internalTrafficPolicy=Local Pod to Pod [Feature:ServiceInternalTrafficPolicy]", func() { // windows kube-proxy does not support this feature yet // TODO: remove this skip when windows-based proxies implement internalTrafficPolicy diff --git a/test/e2e/node/pods.go b/test/e2e/node/pods.go index 9fdfa935eb3..557b3a786dc 100644 --- a/test/e2e/node/pods.go +++ b/test/e2e/node/pods.go @@ -298,7 +298,49 @@ var _ = SIGDescribe("Pods Extended", func() { } } }) + + ginkgo.It("evicted pods should be terminal", func() { + ginkgo.By("creating the pod that should be evicted") + + name := "pod-should-be-evicted" + string(uuid.NewUUID()) + image := imageutils.GetE2EImage(imageutils.BusyBox) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyOnFailure, + Containers: []v1.Container{ + { + Name: "bar", + Image: image, + Command: []string{ + "/bin/sh", "-c", "sleep 10; fallocate -l 10M file; sleep 10000", + }, + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + "ephemeral-storage": resource.MustParse("5Mi"), + }, + }}, + }, + }, + } + + ginkgo.By("submitting the pod to kubernetes") + podClient.Create(pod) + defer func() { + ginkgo.By("deleting the pod") + podClient.Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + }() + + err := e2epod.WaitForPodTerminatedInNamespace(f.ClientSet, pod.Name, "Evicted", f.Namespace.Name) + if err != nil { + framework.Failf("error waiting for pod to be evicted: %v", err) + } + + }) }) + }) func createAndTestPodRepeatedly(workers, iterations int, scenario podScenario, podClient v1core.PodInterface) {