From 1d518adb764a5f540ba12846888b603a8f88016d Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 31 Jan 2022 22:02:48 -0500 Subject: [PATCH 1/4] kubelet: Pod probes should be handled by pod worker The pod worker is the owner of when a container is running or not, and the start and stop of the probes for a given pod should be handled during the pod sync loop. This ensures that probes do not continue running even after eviction. Because the pod semantics allow lifecycle probes to shorten grace period, the probe is removed after the containers in a pod are terminated successfully. As an optimization, if the pod will have a very short grace period (0 or 1 seconds) we stop the probes immediately to reduce resource usage during eviction slightly. After this change, the probe manager is only called by the pod worker or by the reconcile loop. --- pkg/kubelet/kubelet.go | 22 +++++++++++++------ pkg/kubelet/kubelet_pods.go | 3 --- .../nodeshutdown_manager_linux.go | 3 --- pkg/kubelet/prober/prober_manager.go | 19 ++++++++++++++++ pkg/kubelet/prober/testing/fake_manager.go | 3 +++ 5 files changed, 37 insertions(+), 13 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 89eff1e5cc7..e5fe6c5e078 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1713,6 +1713,9 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType // Fetch the pull secrets for the pod pullSecrets := kl.getPullSecretsForPod(pod) + // Ensure the pod is being probed + kl.probeManager.AddPod(pod) + // Call the container runtime's SyncPod callback result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) @@ -1766,10 +1769,16 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu kl.statusManager.SetPodStatus(pod, apiPodStatus) if gracePeriod != nil { + if *gracePeriod <= 1 { + // If we plan to terminate quickly, stop probes immediately, otherwise we will wait until the pod is completely done + kl.probeManager.RemovePod(pod) + } klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod) } else { klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) } + kl.probeManager.StopLivenessAndStartup(pod) + p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) if err := kl.killPod(pod, p, gracePeriod); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) @@ -1778,6 +1787,12 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu return err } + // Once the containers are stopped, we can stop probing for liveness and readiness. + // TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after + // the detection of a container shutdown or (for readiness) after the first failure. Tracked as + // https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing. + kl.probeManager.RemovePod(pod) + // Guard against consistency issues in KillPod implementations by checking that there are no // running containers. This method is invoked infrequently so this is effectively free and can // catch race conditions introduced by callers updating pod status out of order. @@ -2233,9 +2248,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) - // TODO: move inside syncPod and make reentrant - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.AddPod(pod) } } @@ -2269,10 +2281,6 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) { if err := kl.deletePod(pod); err != nil { klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err) } - // TODO: move inside syncTerminatingPod|syncTerminatedPod (we should stop probing - // once the pod kill is acknowledged and during eviction) - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.RemovePod(pod) } } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index a6d11d2ae48..0cdf20d2ba5 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1205,9 +1205,6 @@ func (kl *Kubelet) HandlePodCleanups() error { mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) - // TODO: move inside syncPod and make reentrant - // https://github.com/kubernetes/kubernetes/issues/105014 - kl.probeManager.AddPod(pod) } return nil diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go index 6974d82acf4..a9733b70210 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go @@ -364,9 +364,6 @@ func (m *managerImpl) processShutdownEvent() error { gracePeriodOverride := group.ShutdownGracePeriodSeconds - // Stop probes for the pod - m.probeManager.RemovePod(pod) - // If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod. if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride { gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 37282748f65..d8295930383 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -57,6 +57,9 @@ type Manager interface { // pod created. AddPod(pod *v1.Pod) + // StopLivenessAndStartup handles stopping liveness and startup probes during termination. + StopLivenessAndStartup(pod *v1.Pod) + // RemovePod handles cleaning up the removed pod state, including terminating probe workers and // deleting cached results. RemovePod(pod *v1.Pod) @@ -195,6 +198,22 @@ func (m *manager) AddPod(pod *v1.Pod) { } } +func (m *manager) StopLivenessAndStartup(pod *v1.Pod) { + m.workerLock.RLock() + defer m.workerLock.RUnlock() + + key := probeKey{podUID: pod.UID} + for _, c := range pod.Spec.Containers { + key.containerName = c.Name + for _, probeType := range [...]probeType{liveness, startup} { + key.probeType = probeType + if worker, ok := m.workers[key]; ok { + worker.stop() + } + } + } +} + func (m *manager) RemovePod(pod *v1.Pod) { m.workerLock.RLock() defer m.workerLock.RUnlock() diff --git a/pkg/kubelet/prober/testing/fake_manager.go b/pkg/kubelet/prober/testing/fake_manager.go index 41910382d38..500cfcd9962 100644 --- a/pkg/kubelet/prober/testing/fake_manager.go +++ b/pkg/kubelet/prober/testing/fake_manager.go @@ -33,6 +33,9 @@ func (FakeManager) AddPod(_ *v1.Pod) {} // RemovePod simulates removing a Pod. func (FakeManager) RemovePod(_ *v1.Pod) {} +// Simulated stopping liveness and startup probes. +func (FakeManager) StopLivenessAndStartup(_ *v1.Pod) {} + // CleanupPods simulates cleaning up Pods. func (FakeManager) CleanupPods(_ map[types.UID]sets.Empty) {} From f25ca15e1c47ebd8036c1fb5c06fe8fe6714807a Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Mon, 23 May 2022 14:06:32 -0500 Subject: [PATCH 2/4] kubelet: only shutdown probes for pods that are terminated This fixes a bug where terminating pods would not run their readiness probes. Terminating pods are found within the possiblyRunningPods map. --- pkg/kubelet/kubelet_pods.go | 4 ++-- pkg/kubelet/prober/prober_manager.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 0cdf20d2ba5..a69558261e8 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1105,8 +1105,8 @@ func (kl *Kubelet) HandlePodCleanups() error { } // Stop probing pods that are not running - klog.V(3).InfoS("Clean up probes for terminating and terminated pods") - kl.probeManager.CleanupPods(runningPods) + klog.V(3).InfoS("Clean up probes for terminated pods") + kl.probeManager.CleanupPods(possiblyRunningPods) // Terminate any pods that are observed in the runtime but not // present in the list of known running pods from config. diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index d8295930383..5c4699bb84d 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -163,7 +163,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.StartupProbe != nil { key.probeType = startup if _, ok := m.workers[key]; ok { - klog.ErrorS(nil, "Startup probe already exists for container", + klog.V(4).ErrorS(nil, "Startup probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } @@ -175,7 +175,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.ReadinessProbe != nil { key.probeType = readiness if _, ok := m.workers[key]; ok { - klog.ErrorS(nil, "Readiness probe already exists for container", + klog.V(4).ErrorS(nil, "Readiness probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } @@ -187,7 +187,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.LivenessProbe != nil { key.probeType = liveness if _, ok := m.workers[key]; ok { - klog.ErrorS(nil, "Liveness probe already exists for container", + klog.V(4).ErrorS(nil, "Liveness probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } From 230124f3d40c0106cd74c7e605b57da2a1396ac2 Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Mon, 23 May 2022 17:12:26 -0500 Subject: [PATCH 3/4] kubelet: add e2e test to verify probe readiness --- pkg/kubelet/kubelet.go | 5 +- pkg/kubelet/prober/prober_manager.go | 6 +- test/e2e/common/node/container_probe.go | 168 ++++++++++++++++++++++++ 3 files changed, 172 insertions(+), 7 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e5fe6c5e078..c8640789ba0 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1769,14 +1769,11 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu kl.statusManager.SetPodStatus(pod, apiPodStatus) if gracePeriod != nil { - if *gracePeriod <= 1 { - // If we plan to terminate quickly, stop probes immediately, otherwise we will wait until the pod is completely done - kl.probeManager.RemovePod(pod) - } klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", *gracePeriod) } else { klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) } + kl.probeManager.StopLivenessAndStartup(pod) p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 5c4699bb84d..bb073688f89 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -163,7 +163,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.StartupProbe != nil { key.probeType = startup if _, ok := m.workers[key]; ok { - klog.V(4).ErrorS(nil, "Startup probe already exists for container", + klog.V(8).ErrorS(nil, "Startup probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } @@ -175,7 +175,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.ReadinessProbe != nil { key.probeType = readiness if _, ok := m.workers[key]; ok { - klog.V(4).ErrorS(nil, "Readiness probe already exists for container", + klog.V(8).ErrorS(nil, "Readiness probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } @@ -187,7 +187,7 @@ func (m *manager) AddPod(pod *v1.Pod) { if c.LivenessProbe != nil { key.probeType = liveness if _, ok := m.workers[key]; ok { - klog.V(4).ErrorS(nil, "Liveness probe already exists for container", + klog.V(8).ErrorS(nil, "Liveness probe already exists for container", "pod", klog.KObj(pod), "containerName", c.Name) return } diff --git a/test/e2e/common/node/container_probe.go b/test/e2e/common/node/container_probe.go index 45ea0b74bc4..c5d981d9003 100644 --- a/test/e2e/common/node/container_probe.go +++ b/test/e2e/common/node/container_probe.go @@ -18,9 +18,11 @@ package node import ( "context" + "errors" "fmt" "net" "net/url" + "strings" "time" v1 "k8s.io/api/core/v1" @@ -28,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apimachinery/pkg/util/wait" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/test/e2e/framework" @@ -551,6 +554,171 @@ var _ = SIGDescribe("Probing container", func() { pod := gRPCServerPodSpec(nil, livenessProbe, "etcd") RunLivenessTest(f, pod, 1, defaultObservationTimeout) }) + + ginkgo.It("should mark readiness on pods to false while pod is in progress of terminating when a pod has a readiness probe", func() { + podName := "probe-test-" + string(uuid.NewUUID()) + podClient := f.PodClient() + terminationGracePeriod := int64(30) + script := ` +_term() { + rm -f /tmp/ready + sleep 30 + exit 0 +} +trap _term SIGTERM + +touch /tmp/ready + +while true; do + echo \"hello\" + sleep 10 +done + ` + + // Create Pod + podClient.Create(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Name: podName, + Command: []string{"/bin/bash"}, + Args: []string{"-c", script}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"cat", "/tmp/ready"}, + }, + }, + FailureThreshold: 1, + InitialDelaySeconds: 5, + PeriodSeconds: 2, + }, + }, + }, + TerminationGracePeriodSeconds: &terminationGracePeriod, + }, + }) + + // verify pods are running and ready + err := e2epod.WaitForPodsRunningReady(f.ClientSet, f.Namespace.Name, 1, 0, f.Timeouts.PodStart, map[string]string{}) + framework.ExpectNoError(err) + + // Shutdown pod. Readiness should change to false + podClient.Delete(context.Background(), podName, metav1.DeleteOptions{}) + err = wait.PollImmediate(framework.Poll, f.Timeouts.PodDelete, func() (bool, error) { + pod, err := podClient.Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + // verify the pod ready status has reported not ready + return podutil.IsPodReady(pod) == false, nil + }) + framework.ExpectNoError(err) + }) + + ginkgo.It("should mark readiness on pods to false and disable liveness probes while pod is in progress of terminating", func() { + podName := "probe-test-" + string(uuid.NewUUID()) + podClient := f.PodClient() + terminationGracePeriod := int64(30) + script := ` +_term() { + rm -f /tmp/ready + rm -f /tmp/liveness + sleep 20 + exit 0 +} +trap _term SIGTERM + +touch /tmp/ready +touch /tmp/liveness + +while true; do + echo \"hello\" + sleep 10 +done +` + + // Create Pod + podClient.Create(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Name: podName, + Command: []string{"/bin/bash"}, + Args: []string{"-c", script}, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"cat", "/tmp/ready"}, + }, + }, + FailureThreshold: 1, + // delay startup to make sure the script script has + // time to create the ready+liveness files + InitialDelaySeconds: 5, + PeriodSeconds: 2, + }, + LivenessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + Exec: &v1.ExecAction{ + Command: []string{"cat", "/tmp/liveness"}, + }, + }, + FailureThreshold: 1, + // delay startup to make sure the script script has + // time to create the ready+liveness files + InitialDelaySeconds: 5, + PeriodSeconds: 1, + }, + }, + }, + TerminationGracePeriodSeconds: &terminationGracePeriod, + }, + }) + + // verify pods are running and ready + err := e2epod.WaitForPodsRunningReady(f.ClientSet, f.Namespace.Name, 1, 0, f.Timeouts.PodStart, map[string]string{}) + framework.ExpectNoError(err) + + // Shutdown pod. Readiness should change to false + podClient.Delete(context.Background(), podName, metav1.DeleteOptions{}) + + // Wait for pod to go unready + err = wait.PollImmediate(framework.Poll, f.Timeouts.PodDelete, func() (bool, error) { + pod, err := podClient.Get(context.Background(), podName, metav1.GetOptions{}) + if err != nil { + return false, err + } + // verify the pod ready status has reported not ready + return podutil.IsPodReady(pod) == false, nil + }) + framework.ExpectNoError(err) + + // Verify there are zero liveness failures since they are turned off + // during pod termination + gomega.Consistently(func() (bool, error) { + items, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(context.Background(), metav1.ListOptions{}) + framework.ExpectNoError(err) + for _, event := range items.Items { + // Search only for the pod we are interested in + if event.InvolvedObject.Name != podName { + continue + } + if strings.Contains(event.Message, "failed liveness probe") { + return true, errors.New("should not see liveness probe failures") + } + } + return false, nil + }, 1*time.Minute, framework.Poll).ShouldNot(gomega.BeTrue(), "should not see liveness probes") + }) }) // GetContainerStartedTime returns the time when the given container started and error if any From 97db4ac96317022436e516075212c32abbbe90ab Mon Sep 17 00:00:00 2001 From: Ryan Phillips Date: Tue, 31 May 2022 14:06:47 -0500 Subject: [PATCH 4/4] add service e2e tests --- test/e2e/network/service.go | 259 ++++++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index fffef5398a2..b9c80cdd199 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -55,6 +55,7 @@ import ( netutils "k8s.io/utils/net" utilpointer "k8s.io/utils/pointer" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/test/e2e/framework" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" @@ -1799,6 +1800,264 @@ var _ = common.SIGDescribe("Services", func() { } }) + ginkgo.It("should be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is true", func() { + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-tolerate-unready" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.PublishNotReadyAddresses = true + }) + framework.ExpectNoError(err, "failed to create Service") + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + gracePeriod := int64(300) + webserverPod0 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "webserver-pod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "agnhost", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)}, + Ports: []v1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + LivenessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + } + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err) + } + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 1 pause pods that will try to connect to the webservers") + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err) + } + + // webserver should continue to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is unready but PublishNotReadyAddresses is true + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // Wait until the pod becomes unready + err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { + return !podutil.IsPodReady(pod), nil + }) + if err != nil { + framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err) + } + // assert 5 times that the pause pod can connect to the Service + nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP) + clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort)) + nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + // connect 3 times every 5 seconds to the Service with the unready and terminating endpoint + for i := 0; i < 5; i++ { + execHostnameTest(*pausePod1, clusterIPAddress, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name) + execHostnameTest(*pausePod1, nodePortAddress1, webserverPod0.Name) + time.Sleep(5 * time.Second) + } + }) + + ginkgo.It("should not be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is false", func() { + nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2) + framework.ExpectNoError(err) + nodeCounts := len(nodes.Items) + if nodeCounts < 2 { + e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts) + } + node0 := nodes.Items[0] + node1 := nodes.Items[1] + + serviceName := "svc-not-tolerate-unready" + ns := f.Namespace.Name + servicePort := 80 + + ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns) + jig := e2eservice.NewTestJig(cs, ns, serviceName) + svc, err := jig.CreateTCPService(func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{ + {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)}, + } + svc.Spec.Type = v1.ServiceTypeNodePort + svc.Spec.PublishNotReadyAddresses = false + }) + framework.ExpectNoError(err, "failed to create Service") + + ginkgo.By("Creating 1 webserver pod to be part of the TCP service") + gracePeriod := int64(300) + webserverPod0 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "webserver-pod", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "agnhost", + Image: imageutils.GetE2EImage(imageutils.Agnhost), + Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)}, + Ports: []v1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + ReadinessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/readyz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + LivenessProbe: &v1.Probe{ + ProbeHandler: v1.ProbeHandler{ + HTTPGet: &v1.HTTPGetAction{ + Path: "/healthz", + Port: intstr.IntOrString{ + IntVal: int32(80), + }, + Scheme: v1.URISchemeHTTP, + }, + }, + }, + }, + }, + }, + } + webserverPod0.Labels = jig.Labels + webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod) + e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name}) + + _, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err) + } + validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}}) + + ginkgo.By("Creating 1 pause pods that will try to connect to the webservers") + pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil) + e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name}) + + pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{}) + framework.ExpectNoError(err, "failed to create pod") + err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout) + if err != nil { + framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err) + } + + // webserver should stop to serve traffic through the Service after delete since: + // - it has a 600s termination grace period + // - it is unready but PublishNotReadyAddresses is false + err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // Wait until the pod becomes unready + err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) { + return !podutil.IsPodReady(pod), nil + }) + if err != nil { + framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err) + } + // Wait the change has been propagated and the service start to fail + clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort)) + cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress) + if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) { + _, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + if err != nil { + return true, nil + } + return false, nil + }); pollErr != nil { + framework.ExpectNoError(pollErr, "service still serves traffic") + } + + nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP) + nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP) + nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort))) + // connect 3 times every 5 seconds to the Service and expect a failure + for i := 0; i < 5; i++ { + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress) + _, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to cluster IP") + + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress0) + _, err = framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to NodePort address") + + cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1) + _, err = framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd) + framework.ExpectError(err, "expected error when trying to connect to NodePort address") + + time.Sleep(5 * time.Second) + } + }) + /* Release: v1.19 Testname: Service, ClusterIP type, session affinity to ClientIP