diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 89eff1e5cc7..c8640789ba0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1713,6 +1713,9 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType // Fetch the pull secrets for the pod pullSecrets := kl.getPullSecretsForPod(pod) + // Ensure the pod is being probed + kl.probeManager.AddPod(pod) + // Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) @@ -1770,6 +1773,9 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu } else { klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) } + + kl.probeManager.StopLivenessAndStartup(pod) + p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) if err := kl.killPod(pod, p, gracePeriod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) @@ -1778,6 +1784,12 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu return err } + // Once the containers are stopped, we can stop probing for liveness and readiness. + // TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after + // the detection of a container shutdown or (for readiness) after the first failure. Tracked as + // https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing. + kl.probeManager.RemovePod(pod) + // Guard against consistency issues in KillPod implementations by checking that there are no // running containers. This method is invoked infrequently so this is effectively free and can // catch race conditions introduced by callers updating pod status out of order. @@ -2233,9 +2245,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) - // TODO: move inside syncPod and make reentrant - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.AddPod(pod) } } @@ -2269,10 +2278,6 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { if err := kl.deletePod(pod); err != nil { klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err) } - // TODO: move inside syncTerminatingPod|syncTerminatedPod (we should stop probing - // once the pod kill is acknowledged and during eviction) - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.RemovePod(pod) } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index a6d11d2ae48..a69558261e8 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1105,8 +1105,8 @@ func (kl *Kubelet) HandlePodCleanups() error { } // Stop probing pods that are not running - klog.V(3).InfoS("Clean up probes for terminating and terminated pods") - kl.probeManager.CleanupPods(runningPods) + klog.V(3).InfoS("Clean up probes for terminated pods") + kl.probeManager.CleanupPods(possiblyRunningPods) // Terminate any pods that are observed in the runtime but not // present in the list of known running pods from config. @@ -1205,9 +1205,6 @@ func (kl *Kubelet) HandlePodCleanups() error { mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) - // TODO: move inside syncPod and make reentrant - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.AddPod(pod) } return nil diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index 6974d82acf4..a9733b70210 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -364,9 +364,6 @@ func (m *managerImpl) processShutdownEvent() error { gracePeriodOverride := group.ShutdownGracePeriodSeconds - // Stop probes for the pod - m.probeManager.RemovePod(pod) - // If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod. if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride { gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 37282748f65..bb073688f89 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -57,6 +57,9 @@ type Manager interface { // pod created. AddPod(pod *v1.Pod) + // StopLivenessAndStartup handles stopping liveness and startup probes during termination. + StopLivenessAndStartup(pod *v1.Pod) + // RemovePod handles cleaning up the removed pod state, including terminating probe workers and // deleting cached results. RemovePod(pod *v1.Pod) @@ -160,7 +163,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.StartupProbe != nil { key.probeType = startup if _, ok := m.workers[key]; ok { - klog.ErrorS(nil, "Startup probe already exists for container", + klog.V(8).ErrorS(nil, "Startup probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } @@ -172,7 +175,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.ReadinessProbe != nil { key.probeType = readiness if _, ok := m.workers[key]; ok { - klog.ErrorS(nil, "Readiness probe already exists for container", + klog.V(8).ErrorS(nil, "Readiness probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } @@ -184,7 +187,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.LivenessProbe != nil { key.probeType = liveness if _, ok := m.workers[key]; ok { - klog.ErrorS(nil, "Liveness probe already exists for container", + klog.V(8).ErrorS(nil, "Liveness probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } @@ -195,6 +198,22 @@ func (m *manager) AddPod(pod *v1.Pod) { } } +func (m *manager) StopLivenessAndStartup(pod *v1.Pod) { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + + key := probeKey{podUID: pod.UID} + for _, c := range pod.Spec.Containers { + key.containerName = c.Name + for _, probeType := range [...]probeType{liveness, startup} { + key.probeType = probeType + if worker, ok := m.workers[key]; ok { + worker.stop() + } + } + } +} + func (m *manager) RemovePod(pod *v1.Pod) { m.workerLock.RLock() defer m.workerLock.RUnlock() diff --git a/pkg/kubelet/prober/testing/fake_manager.go b/pkg/kubelet/prober/testing/fake_manager.go index 41910382d38..500cfcd9962 100644 --- a/pkg/kubelet/prober/testing/fake_manager.go +++ b/pkg/kubelet/prober/testing/fake_manager.go @@ -33,6 +33,9 @@ func (FakeManager) AddPod(_ *v1.Pod) {} // RemovePod simulates removing a Pod. func (FakeManager) RemovePod(_ *v1.Pod) {} +// Simulated stopping liveness and startup probes. +func (FakeManager) StopLivenessAndStartup(_ *v1.Pod) {} + // CleanupPods simulates cleaning up Pods. func (FakeManager) CleanupPods(_ map[types.UID]sets.Empty) {} diff --git a/test/e2e/common/node/container_probe.go b/test/e2e/common/node/container_probe.go index 45ea0b74bc4..c5d981d9003 100644 --- a/test/e2e/common/node/container_probe.go +++ b/test/e2e/common/node/container_probe.go @@ -18,9 +18,11 @@ package node import ( "context" + "errors" "fmt" "net" "net/url" + "strings" "time" v1 "k8s.io/api/core/v1" @@ -28,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/test/e2e/framework" @@ -551,6 +554,171 @@ var _ = SIGDescribe("Probing container", func() { pod := gRPCServerPodSpec(nil, livenessProbe, "etcd") RunLivenessTest(f, pod, 1, defaultObservationTimeout) }) + + ginkgo.It("should mark readiness on pods to false while pod is in progress of terminating when a pod has a readiness probe", func() { + podName := "probe-test-" + string(uuid.NewUUID()) + podClient := f.PodClient() + terminationGracePeriod := int64(30) + script := ` +_term() { + rm -f /tmp/ready + sleep 30 + exit 0 +} +trap _term SIGTERM + +touch /tmp/ready + +while true; do + echo \"hello\" + sleep 10 +done + ` + + // Create Pod + podClient.Create(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Name: podName, + Command: []string{"/bin/bash"}, + Args: []string{"-c", script}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"cat", "/tmp/ready"}, + }, + }, + FailureThreshold: 1, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + }, + }, + }, + TerminationGracePeriodSeconds: &terminationGracePeriod, + }, + }) + + // verify pods are running and ready + err := e2epod.WaitForPodsRunningReady(f.ClientSet, f.Namespace.Name, 1, 0, f.Timeouts.PodStart, map[string]string{}) + framework.ExpectNoError(err) + + // Shutdown pod. Readiness should change to false + podClient.Delete(context.Background(), podName, metav1.DeleteOptions{}) + err = wait.PollImmediate(framework.Poll, f.Timeouts.PodDelete, func() (bool, error) { + pod, err := podClient.Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + // verify the pod ready status has reported not ready + return podutil.IsPodReady(pod) == false, nil + }) + framework.ExpectNoError(err) + }) + + ginkgo.It("should mark readiness on pods to false and disable liveness probes while pod is in progress of terminating", func() { + podName := "probe-test-" + string(uuid.NewUUID()) + podClient := f.PodClient() + terminationGracePeriod := int64(30) + script := ` +_term() { + rm -f /tmp/ready + rm -f /tmp/liveness + sleep 20 + exit 0 +} +trap _term SIGTERM + +touch /tmp/ready +touch /tmp/liveness + +while true; do + echo \"hello\" + sleep 10 +done +` + + // Create Pod + podClient.Create(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Name: podName, + Command: []string{"/bin/bash"}, + Args: []string{"-c", script}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"cat", "/tmp/ready"}, + }, + }, + FailureThreshold: 1, + // delay startup to make sure the script script has + // time to create the ready+liveness files + InitialDelaySeconds: 5, + PeriodSeconds: 2, + }, + LivenessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"cat", "/tmp/liveness"}, + }, + }, + FailureThreshold: 1, + // delay startup to make sure the script script has + // time to create the ready+liveness files + InitialDelaySeconds: 5, + PeriodSeconds: 1, + }, + }, + }, + TerminationGracePeriodSeconds: &terminationGracePeriod, + }, + }) + + // verify pods are running and ready + err := e2epod.WaitForPodsRunningReady(f.ClientSet, f.Namespace.Name, 1, 0, f.Timeouts.PodStart, map[string]string{}) + framework.ExpectNoError(err) + + // Shutdown pod. Readiness should change to false + podClient.Delete(context.Background(), podName, metav1.DeleteOptions{}) + + // Wait for pod to go unready + err = wait.PollImmediate(framework.Poll, f.Timeouts.PodDelete, func() (bool, error) { + pod, err := podClient.Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + // verify the pod ready status has reported not ready + return podutil.IsPodReady(pod) == false, nil + }) + framework.ExpectNoError(err) + + // Verify there are zero liveness failures since they are turned off + // during pod termination + gomega.Consistently(func() (bool, error) { + items, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(context.Background(), metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, event := range items.Items { + // Search only for the pod we are interested in + if event.InvolvedObject.Name != podName { + continue + } + if strings.Contains(event.Message, "failed liveness probe") { + return true, errors.New("should not see liveness probe failures") + } + } + return false, nil + }, 1*time.Minute, framework.Poll).ShouldNot(gomega.BeTrue(), "should not see liveness probes") + }) }) // GetContainerStartedTime returns the time when the given container started and error if any diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index fffef5398a2..b9c80cdd199 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -55,6 +55,7 @@ import ( netutils "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" @@ -1799,6 +1800,264 @@ var _ = common.SIGDescribe("Services", func() { } }) + ginkgo.It("should be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is true", func() { + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-tolerate-unready" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.PublishNotReadyAddresses = true + }) + framework.ExpectNoError(err, "failed to create Service") + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + gracePeriod := int64(300) + webserverPod0 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "webserver-pod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "agnhost", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)}, + Ports: []v1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + LivenessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + } + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err) + } + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 1 pause pods that will try to connect to the webservers") + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err) + } + + // webserver should continue to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is unready but PublishNotReadyAddresses is true + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // Wait until the pod becomes unready + err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { + return !podutil.IsPodReady(pod), nil + }) + if err != nil { + framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err) + } + // assert 5 times that the pause pod can connect to the Service + nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP) + clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort)) + nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + // connect 3 times every 5 seconds to the Service with the unready and terminating endpoint + for i := 0; i < 5; i++ { + execHostnameTest(*pausePod1, clusterIPAddress, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress1, webserverPod0.Name) + time.Sleep(5 * time.Second) + } + }) + + ginkgo.It("should not be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is false", func() { + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-not-tolerate-unready" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.PublishNotReadyAddresses = false + }) + framework.ExpectNoError(err, "failed to create Service") + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + gracePeriod := int64(300) + webserverPod0 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "webserver-pod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "agnhost", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)}, + Ports: []v1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + LivenessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + } + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err) + } + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 1 pause pods that will try to connect to the webservers") + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err) + } + + // webserver should stop to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is unready but PublishNotReadyAddresses is false + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // Wait until the pod becomes unready + err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { + return !podutil.IsPodReady(pod), nil + }) + if err != nil { + framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err) + } + // Wait the change has been propagated and the service start to fail + clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort)) + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress) + if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) { + _, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + if err != nil { + return true, nil + } + return false, nil + }); pollErr != nil { + framework.ExpectNoError(pollErr, "service still serves traffic") + } + + nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP) + nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + // connect 3 times every 5 seconds to the Service and expect a failure + for i := 0; i < 5; i++ { + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress) + _, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to cluster IP") + + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress0) + _, err = framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to NodePort address") + + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1) + _, err = framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to NodePort address") + + time.Sleep(5 * time.Second) + } + }) + /* Release: v1.19 Testname: Service, ClusterIP type, session affinity to ClientIP