Merge pull request #110191 from rphillips/fixes/probe_shutdowns_readiness

Re-enable Kubelet Pod Readiness Probes on Termination and Pod probes should be handled by pod worker
This commit is contained in:
Kubernetes Prow Robot 2022-06-07 00:55:48 -07:00 committed by GitHub
commit 79cef12276
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 466 additions and 18 deletions

View File

@ -1713,6 +1713,9 @@ func (kl *Kubelet) syncPod(ctx context.Context, updateType kubetypes.SyncPodType
// Fetch the pull secrets for the pod // Fetch the pull secrets for the pod
pullSecrets := kl.getPullSecretsForPod(pod) pullSecrets := kl.getPullSecretsForPod(pod)
// Ensure the pod is being probed
kl.probeManager.AddPod(pod)
// Call the container runtime's SyncPod callback // Call the container runtime's SyncPod callback
result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff) result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
kl.reasonCache.Update(pod.UID, result) kl.reasonCache.Update(pod.UID, result)
@ -1770,6 +1773,9 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu
} else { } else {
klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil) klog.V(4).InfoS("Pod terminating with grace period", "pod", klog.KObj(pod), "podUID", pod.UID, "gracePeriod", nil)
} }
kl.probeManager.StopLivenessAndStartup(pod)
p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus) p := kubecontainer.ConvertPodStatusToRunningPod(kl.getRuntime().Type(), podStatus)
if err := kl.killPod(pod, p, gracePeriod); err != nil { if err := kl.killPod(pod, p, gracePeriod); err != nil {
kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err)
@ -1778,6 +1784,12 @@ func (kl *Kubelet) syncTerminatingPod(ctx context.Context, pod *v1.Pod, podStatu
return err return err
} }
// Once the containers are stopped, we can stop probing for liveness and readiness.
// TODO: once a pod is terminal, certain probes (liveness exec) could be stopped immediately after
// the detection of a container shutdown or (for readiness) after the first failure. Tracked as
// https://github.com/kubernetes/kubernetes/issues/107894 although may not be worth optimizing.
kl.probeManager.RemovePod(pod)
// Guard against consistency issues in KillPod implementations by checking that there are no // Guard against consistency issues in KillPod implementations by checking that there are no
// running containers. This method is invoked infrequently so this is effectively free and can // running containers. This method is invoked infrequently so this is effectively free and can
// catch race conditions introduced by callers updating pod status out of order. // catch race conditions introduced by callers updating pod status out of order.
@ -2233,9 +2245,6 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
} }
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// TODO: move inside syncPod and make reentrant
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.AddPod(pod)
} }
} }
@ -2269,10 +2278,6 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
if err := kl.deletePod(pod); err != nil { if err := kl.deletePod(pod); err != nil {
klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err) klog.V(2).InfoS("Failed to delete pod", "pod", klog.KObj(pod), "err", err)
} }
// TODO: move inside syncTerminatingPod|syncTerminatedPod (we should stop probing
// once the pod kill is acknowledged and during eviction)
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.RemovePod(pod)
} }
} }

View File

@ -1105,8 +1105,8 @@ func (kl *Kubelet) HandlePodCleanups() error {
} }
// Stop probing pods that are not running // Stop probing pods that are not running
klog.V(3).InfoS("Clean up probes for terminating and terminated pods") klog.V(3).InfoS("Clean up probes for terminated pods")
kl.probeManager.CleanupPods(runningPods) kl.probeManager.CleanupPods(possiblyRunningPods)
// Terminate any pods that are observed in the runtime but not // Terminate any pods that are observed in the runtime but not
// present in the list of known running pods from config. // present in the list of known running pods from config.
@ -1205,9 +1205,6 @@ func (kl *Kubelet) HandlePodCleanups() error {
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID) klog.V(3).InfoS("Pod is restartable after termination due to UID reuse", "pod", klog.KObj(pod), "podUID", pod.UID)
kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
// TODO: move inside syncPod and make reentrant
// https://github.com/kubernetes/kubernetes/issues/105014
kl.probeManager.AddPod(pod)
} }
return nil return nil

View File

@ -364,9 +364,6 @@ func (m *managerImpl) processShutdownEvent() error {
gracePeriodOverride := group.ShutdownGracePeriodSeconds gracePeriodOverride := group.ShutdownGracePeriodSeconds
// Stop probes for the pod
m.probeManager.RemovePod(pod)
// If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod. // If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod.
if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride { if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds

View File

@ -57,6 +57,9 @@ type Manager interface {
// pod created. // pod created.
AddPod(pod *v1.Pod) AddPod(pod *v1.Pod)
// StopLivenessAndStartup handles stopping liveness and startup probes during termination.
StopLivenessAndStartup(pod *v1.Pod)
// RemovePod handles cleaning up the removed pod state, including terminating probe workers and // RemovePod handles cleaning up the removed pod state, including terminating probe workers and
// deleting cached results. // deleting cached results.
RemovePod(pod *v1.Pod) RemovePod(pod *v1.Pod)
@ -160,7 +163,7 @@ func (m *manager) AddPod(pod *v1.Pod) {
if c.StartupProbe != nil { if c.StartupProbe != nil {
key.probeType = startup key.probeType = startup
if _, ok := m.workers[key]; ok { if _, ok := m.workers[key]; ok {
klog.ErrorS(nil, "Startup probe already exists for container", klog.V(8).ErrorS(nil, "Startup probe already exists for container",
"pod", klog.KObj(pod), "containerName", c.Name) "pod", klog.KObj(pod), "containerName", c.Name)
return return
} }
@ -172,7 +175,7 @@ func (m *manager) AddPod(pod *v1.Pod) {
if c.ReadinessProbe != nil { if c.ReadinessProbe != nil {
key.probeType = readiness key.probeType = readiness
if _, ok := m.workers[key]; ok { if _, ok := m.workers[key]; ok {
klog.ErrorS(nil, "Readiness probe already exists for container", klog.V(8).ErrorS(nil, "Readiness probe already exists for container",
"pod", klog.KObj(pod), "containerName", c.Name) "pod", klog.KObj(pod), "containerName", c.Name)
return return
} }
@ -184,7 +187,7 @@ func (m *manager) AddPod(pod *v1.Pod) {
if c.LivenessProbe != nil { if c.LivenessProbe != nil {
key.probeType = liveness key.probeType = liveness
if _, ok := m.workers[key]; ok { if _, ok := m.workers[key]; ok {
klog.ErrorS(nil, "Liveness probe already exists for container", klog.V(8).ErrorS(nil, "Liveness probe already exists for container",
"pod", klog.KObj(pod), "containerName", c.Name) "pod", klog.KObj(pod), "containerName", c.Name)
return return
} }
@ -195,6 +198,22 @@ func (m *manager) AddPod(pod *v1.Pod) {
} }
} }
func (m *manager) StopLivenessAndStartup(pod *v1.Pod) {
m.workerLock.RLock()
defer m.workerLock.RUnlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
for _, probeType := range [...]probeType{liveness, startup} {
key.probeType = probeType
if worker, ok := m.workers[key]; ok {
worker.stop()
}
}
}
}
func (m *manager) RemovePod(pod *v1.Pod) { func (m *manager) RemovePod(pod *v1.Pod) {
m.workerLock.RLock() m.workerLock.RLock()
defer m.workerLock.RUnlock() defer m.workerLock.RUnlock()

View File

@ -33,6 +33,9 @@ func (FakeManager) AddPod(_ *v1.Pod) {}
// RemovePod simulates removing a Pod. // RemovePod simulates removing a Pod.
func (FakeManager) RemovePod(_ *v1.Pod) {} func (FakeManager) RemovePod(_ *v1.Pod) {}
// Simulated stopping liveness and startup probes.
func (FakeManager) StopLivenessAndStartup(_ *v1.Pod) {}
// CleanupPods simulates cleaning up Pods. // CleanupPods simulates cleaning up Pods.
func (FakeManager) CleanupPods(_ map[types.UID]sets.Empty) {} func (FakeManager) CleanupPods(_ map[types.UID]sets.Empty) {}

View File

@ -18,9 +18,11 @@ package node
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net" "net"
"net/url" "net/url"
"strings"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -28,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -551,6 +554,171 @@ var _ = SIGDescribe("Probing container", func() {
pod := gRPCServerPodSpec(nil, livenessProbe, "etcd") pod := gRPCServerPodSpec(nil, livenessProbe, "etcd")
RunLivenessTest(f, pod, 1, defaultObservationTimeout) RunLivenessTest(f, pod, 1, defaultObservationTimeout)
}) })
ginkgo.It("should mark readiness on pods to false while pod is in progress of terminating when a pod has a readiness probe", func() {
podName := "probe-test-" + string(uuid.NewUUID())
podClient := f.PodClient()
terminationGracePeriod := int64(30)
script := `
_term() {
rm -f /tmp/ready
sleep 30
exit 0
}
trap _term SIGTERM
touch /tmp/ready
while true; do
echo \"hello\"
sleep 10
done
`
// Create Pod
podClient.Create(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Name: podName,
Command: []string{"/bin/bash"},
Args: []string{"-c", script},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
Exec: &v1.ExecAction{
Command: []string{"cat", "/tmp/ready"},
},
},
FailureThreshold: 1,
InitialDelaySeconds: 5,
PeriodSeconds: 2,
},
},
},
TerminationGracePeriodSeconds: &terminationGracePeriod,
},
})
// verify pods are running and ready
err := e2epod.WaitForPodsRunningReady(f.ClientSet, f.Namespace.Name, 1, 0, f.Timeouts.PodStart, map[string]string{})
framework.ExpectNoError(err)
// Shutdown pod. Readiness should change to false
podClient.Delete(context.Background(), podName, metav1.DeleteOptions{})
err = wait.PollImmediate(framework.Poll, f.Timeouts.PodDelete, func() (bool, error) {
pod, err := podClient.Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
// verify the pod ready status has reported not ready
return podutil.IsPodReady(pod) == false, nil
})
framework.ExpectNoError(err)
})
ginkgo.It("should mark readiness on pods to false and disable liveness probes while pod is in progress of terminating", func() {
podName := "probe-test-" + string(uuid.NewUUID())
podClient := f.PodClient()
terminationGracePeriod := int64(30)
script := `
_term() {
rm -f /tmp/ready
rm -f /tmp/liveness
sleep 20
exit 0
}
trap _term SIGTERM
touch /tmp/ready
touch /tmp/liveness
while true; do
echo \"hello\"
sleep 10
done
`
// Create Pod
podClient.Create(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Name: podName,
Command: []string{"/bin/bash"},
Args: []string{"-c", script},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
Exec: &v1.ExecAction{
Command: []string{"cat", "/tmp/ready"},
},
},
FailureThreshold: 1,
// delay startup to make sure the script script has
// time to create the ready+liveness files
InitialDelaySeconds: 5,
PeriodSeconds: 2,
},
LivenessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
Exec: &v1.ExecAction{
Command: []string{"cat", "/tmp/liveness"},
},
},
FailureThreshold: 1,
// delay startup to make sure the script script has
// time to create the ready+liveness files
InitialDelaySeconds: 5,
PeriodSeconds: 1,
},
},
},
TerminationGracePeriodSeconds: &terminationGracePeriod,
},
})
// verify pods are running and ready
err := e2epod.WaitForPodsRunningReady(f.ClientSet, f.Namespace.Name, 1, 0, f.Timeouts.PodStart, map[string]string{})
framework.ExpectNoError(err)
// Shutdown pod. Readiness should change to false
podClient.Delete(context.Background(), podName, metav1.DeleteOptions{})
// Wait for pod to go unready
err = wait.PollImmediate(framework.Poll, f.Timeouts.PodDelete, func() (bool, error) {
pod, err := podClient.Get(context.Background(), podName, metav1.GetOptions{})
if err != nil {
return false, err
}
// verify the pod ready status has reported not ready
return podutil.IsPodReady(pod) == false, nil
})
framework.ExpectNoError(err)
// Verify there are zero liveness failures since they are turned off
// during pod termination
gomega.Consistently(func() (bool, error) {
items, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(context.Background(), metav1.ListOptions{})
framework.ExpectNoError(err)
for _, event := range items.Items {
// Search only for the pod we are interested in
if event.InvolvedObject.Name != podName {
continue
}
if strings.Contains(event.Message, "failed liveness probe") {
return true, errors.New("should not see liveness probe failures")
}
}
return false, nil
}, 1*time.Minute, framework.Poll).ShouldNot(gomega.BeTrue(), "should not see liveness probes")
})
}) })
// GetContainerStartedTime returns the time when the given container started and error if any // GetContainerStartedTime returns the time when the given container started and error if any

View File

@ -55,6 +55,7 @@ import (
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment" e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
@ -1799,6 +1800,264 @@ var _ = common.SIGDescribe("Services", func() {
} }
}) })
ginkgo.It("should be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is true", func() {
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
framework.ExpectNoError(err)
nodeCounts := len(nodes.Items)
if nodeCounts < 2 {
e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
}
node0 := nodes.Items[0]
node1 := nodes.Items[1]
serviceName := "svc-tolerate-unready"
ns := f.Namespace.Name
servicePort := 80
ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns)
jig := e2eservice.NewTestJig(cs, ns, serviceName)
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)},
}
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.PublishNotReadyAddresses = true
})
framework.ExpectNoError(err, "failed to create Service")
ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
gracePeriod := int64(300)
webserverPod0 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "webserver-pod",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "agnhost",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)},
Ports: []v1.ContainerPort{
{
ContainerPort: 80,
},
},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/readyz",
Port: intstr.IntOrString{
IntVal: int32(80),
},
Scheme: v1.URISchemeHTTP,
},
},
},
LivenessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{
IntVal: int32(80),
},
Scheme: v1.URISchemeHTTP,
},
},
},
},
},
},
}
webserverPod0.Labels = jig.Labels
webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create pod")
err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)
if err != nil {
framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err)
}
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
ginkgo.By("Creating 1 pause pods that will try to connect to the webservers")
pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create pod")
err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)
if err != nil {
framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err)
}
// webserver should continue to serve traffic through the Service after delete since:
// - it has a 600s termination grace period
// - it is unready but PublishNotReadyAddresses is true
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)
// Wait until the pod becomes unready
err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
return !podutil.IsPodReady(pod), nil
})
if err != nil {
framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err)
}
// assert 5 times that the pause pod can connect to the Service
nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
// connect 3 times every 5 seconds to the Service with the unready and terminating endpoint
for i := 0; i < 5; i++ {
execHostnameTest(*pausePod1, clusterIPAddress, webserverPod0.Name)
execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name)
execHostnameTest(*pausePod1, nodePortAddress1, webserverPod0.Name)
time.Sleep(5 * time.Second)
}
})
ginkgo.It("should not be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is false", func() {
nodes, err := e2enode.GetBoundedReadySchedulableNodes(cs, 2)
framework.ExpectNoError(err)
nodeCounts := len(nodes.Items)
if nodeCounts < 2 {
e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
}
node0 := nodes.Items[0]
node1 := nodes.Items[1]
serviceName := "svc-not-tolerate-unready"
ns := f.Namespace.Name
servicePort := 80
ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns)
jig := e2eservice.NewTestJig(cs, ns, serviceName)
svc, err := jig.CreateTCPService(func(svc *v1.Service) {
svc.Spec.Ports = []v1.ServicePort{
{Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(80)},
}
svc.Spec.Type = v1.ServiceTypeNodePort
svc.Spec.PublishNotReadyAddresses = false
})
framework.ExpectNoError(err, "failed to create Service")
ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
gracePeriod := int64(300)
webserverPod0 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "webserver-pod",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "agnhost",
Image: imageutils.GetE2EImage(imageutils.Agnhost),
Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)},
Ports: []v1.ContainerPort{
{
ContainerPort: 80,
},
},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/readyz",
Port: intstr.IntOrString{
IntVal: int32(80),
},
Scheme: v1.URISchemeHTTP,
},
},
},
LivenessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
HTTPGet: &v1.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{
IntVal: int32(80),
},
Scheme: v1.URISchemeHTTP,
},
},
},
},
},
},
}
webserverPod0.Labels = jig.Labels
webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
_, err = cs.CoreV1().Pods(ns).Create(context.TODO(), webserverPod0, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create pod")
err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)
if err != nil {
framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err)
}
validateEndpointsPortsOrFail(cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
ginkgo.By("Creating 1 pause pods that will try to connect to the webservers")
pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
pausePod1, err = cs.CoreV1().Pods(ns).Create(context.TODO(), pausePod1, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create pod")
err = e2epod.WaitTimeoutForPodReadyInNamespace(f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)
if err != nil {
framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err)
}
// webserver should stop to serve traffic through the Service after delete since:
// - it has a 600s termination grace period
// - it is unready but PublishNotReadyAddresses is false
err = cs.CoreV1().Pods(ns).Delete(context.TODO(), webserverPod0.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)
// Wait until the pod becomes unready
err = e2epod.WaitForPodCondition(f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
return !podutil.IsPodReady(pod), nil
})
if err != nil {
framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err)
}
// Wait the change has been propagated and the service start to fail
clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress)
if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) {
_, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
if err != nil {
return true, nil
}
return false, nil
}); pollErr != nil {
framework.ExpectNoError(pollErr, "service still serves traffic")
}
nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
// connect 3 times every 5 seconds to the Service and expect a failure
for i := 0; i < 5; i++ {
cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress)
_, err := framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
framework.ExpectError(err, "expected error when trying to connect to cluster IP")
cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress0)
_, err = framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
framework.ExpectError(err, "expected error when trying to connect to NodePort address")
cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1)
_, err = framework.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
framework.ExpectError(err, "expected error when trying to connect to NodePort address")
time.Sleep(5 * time.Second)
}
})
/* /*
Release: v1.19 Release: v1.19
Testname: Service, ClusterIP type, session affinity to ClientIP Testname: Service, ClusterIP type, session affinity to ClientIP