test: Add e2e to verify static pod termination

Add a node e2e to verify that if a static pod is terminated while the
container runtime or CRI returns an error, the pod is eventually
terminated successfully.

This test serves as a regression test for k8s.io/issue/113145 which
fixes an issue where force deleted pods may not be terminated if the
container runtime fails during a `syncTerminatingPod`.

To test this behavior, start a static pod, stop the container runtime,
and later start the container runtime. The static pod is expected to
eventually terminate successfully.

To start and stop the container runtime, we need to find the container
runtime systemd unit name. Introduce a util function
`findContainerRuntimeServiceName` which finds the unit name by getting
the pid of the container runtime from the existing
`ContainerRuntimeProcessName` flag passed into node e2e and using
systemd dbus `GetUnitNameByPID` function to convert the pid of the
container runtime to a unit name. Using the unit name, introduce helper
functions to start and stop the container runtime.

Signed-off-by: David Porter <david@porter.me>
This commit is contained in:
David Porter 2022-10-19 20:37:24 -07:00 committed by Clayton Coleman
parent d446bebca8
commit 1c75c2cda8
No known key found for this signature in database
GPG Key ID: CF7DB7FC943D3E0E
3 changed files with 154 additions and 38 deletions

View File

@ -131,6 +131,55 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
framework.ExpectEqual(pod.Spec.Containers[0].Image, image) framework.ExpectEqual(pod.Spec.Containers[0].Image, image)
}) })
ginkgo.Context("and the container runtime is temporarily down during pod termination [NodeConformance] [Serial] [Disruptive]", func() {
ginkgo.It("the mirror pod should terminate successfully", func(ctx context.Context) {
ginkgo.By("delete the static pod")
err := deleteStaticPod(podPath, staticPodName, ns)
framework.ExpectNoError(err)
// Note it is important we have a small delay here as we would like to reproduce https://issues.k8s.io/113091 which requires a failure in syncTerminatingPod()
// This requires waiting a small period between the static pod being deleted so that syncTerminatingPod() will attempt to run
ginkgo.By("sleeping before stopping the container runtime")
time.Sleep(2 * time.Second)
ginkgo.By("stop the container runtime")
err = stopContainerRuntime()
framework.ExpectNoError(err, "expected no error stopping the container runtime")
ginkgo.By("waiting for the container runtime to be stopped")
gomega.Eventually(ctx, func(ctx context.Context) error {
_, _, err := getCRIClient()
return err
}, 2*time.Minute, time.Second*5).ShouldNot(gomega.Succeed())
ginkgo.By("start the container runtime")
err = startContainerRuntime()
framework.ExpectNoError(err, "expected no error starting the container runtime")
gomega.Consistently(ctx, func(ctx context.Context) error {
ginkgo.By(fmt.Sprintf("verifying that the mirror pod (%s/%s) is running", ns, mirrorPodName))
err := checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns)
if err != nil {
return fmt.Errorf("expected mirror pod (%s/%s) to be running but it was not: %v", ns, mirrorPodName, err)
}
return nil
}, time.Second*30, time.Second*5).Should(gomega.Succeed())
})
ginkgo.AfterEach(func(ctx context.Context) {
ginkgo.By("starting the container runtime")
err := startContainerRuntime()
framework.ExpectNoError(err, "expected no error starting the container runtime")
ginkgo.By("waiting for the container runtime to start")
gomega.Eventually(ctx, func(ctx context.Context) error {
_, _, err := getCRIClient()
if err != nil {
return fmt.Errorf("error getting cri client: %v", err)
}
return nil
}, 2*time.Minute, time.Second*5).Should(gomega.Succeed())
})
})
ginkgo.AfterEach(func(ctx context.Context) { ginkgo.AfterEach(func(ctx context.Context) {
ginkgo.By("delete the static pod") ginkgo.By("delete the static pod")
err := deleteStaticPod(podPath, staticPodName, ns) err := deleteStaticPod(podPath, staticPodName, ns)

View File

@ -23,11 +23,8 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"io"
"log" "log"
"os"
"sort" "sort"
"strconv"
"strings" "strings"
"sync" "sync"
"text/tabwriter" "text/tabwriter"
@ -39,11 +36,9 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/util/procfs"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -465,38 +460,6 @@ func (r *ResourceCollector) GetResourceTimeSeries() map[string]*perftype.Resourc
const kubeletProcessName = "kubelet" const kubeletProcessName = "kubelet"
func getPidsForProcess(name, pidFile string) ([]int, error) {
if len(pidFile) > 0 {
pid, err := getPidFromPidFile(pidFile)
if err == nil {
return []int{pid}, nil
}
// log the error and fall back to pidof
runtime.HandleError(err)
}
return procfs.PidOf(name)
}
func getPidFromPidFile(pidFile string) (int, error) {
file, err := os.Open(pidFile)
if err != nil {
return 0, fmt.Errorf("error opening pid file %s: %w", pidFile, err)
}
defer file.Close()
data, err := io.ReadAll(file)
if err != nil {
return 0, fmt.Errorf("error reading pid file %s: %w", pidFile, err)
}
pid, err := strconv.Atoi(string(data))
if err != nil {
return 0, fmt.Errorf("error parsing %s as a number: %w", string(data), err)
}
return pid, nil
}
func getContainerNameForProcess(name, pidFile string) (string, error) { func getContainerNameForProcess(name, pidFile string) (string, error) {
pids, err := getPidsForProcess(name, pidFile) pids, err := getPidsForProcess(name, pidFile)
if err != nil { if err != nil {

View File

@ -25,17 +25,21 @@ import (
"io" "io"
"net" "net"
"net/http" "net/http"
"os"
"os/exec" "os/exec"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"k8s.io/kubernetes/pkg/util/procfs"
oteltrace "go.opentelemetry.io/otel/trace" oteltrace "go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -55,6 +59,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util"
"github.com/coreos/go-systemd/v22/dbus"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
@ -84,12 +89,14 @@ const (
var kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort) var kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort)
var containerRuntimeUnitName = ""
func getNodeSummary(ctx context.Context) (*stats.Summary, error) { func getNodeSummary(ctx context.Context) (*stats.Summary, error) {
kubeletConfig, err := getCurrentKubeletConfig(ctx) kubeletConfig, err := getCurrentKubeletConfig(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get current kubelet config") return nil, fmt.Errorf("failed to get current kubelet config")
} }
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil) req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to build http request: %w", err) return nil, fmt.Errorf("failed to build http request: %w", err)
} }
@ -340,6 +347,71 @@ func findKubeletServiceName(running bool) string {
return kubeletServiceName return kubeletServiceName
} }
func findContainerRuntimeServiceName() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn, err := dbus.NewWithContext(ctx)
framework.ExpectNoError(err, "Failed to setup dbus connection")
defer conn.Close()
runtimePids, err := getPidsForProcess(framework.TestContext.ContainerRuntimeProcessName, framework.TestContext.ContainerRuntimePidFile)
framework.ExpectNoError(err, "failed to get list of container runtime pids")
framework.ExpectEqual(len(runtimePids), 1, "Unexpected number of container runtime pids. Expected 1 but got %v", len(runtimePids))
containerRuntimePid := runtimePids[0]
unitName, err := conn.GetUnitNameByPID(ctx, uint32(containerRuntimePid))
framework.ExpectNoError(err, "Failed to get container runtime unit name")
return unitName, nil
}
type containerRuntimeUnitOp int
const (
startContainerRuntimeUnitOp containerRuntimeUnitOp = iota
stopContainerRuntimeUnitOp
)
func performContainerRuntimeUnitOp(op containerRuntimeUnitOp) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn, err := dbus.NewWithContext(ctx)
framework.ExpectNoError(err, "Failed to setup dbus connection")
defer conn.Close()
if containerRuntimeUnitName == "" {
containerRuntimeUnitName, err = findContainerRuntimeServiceName()
framework.ExpectNoError(err, "Failed to find container runtime name")
}
reschan := make(chan string)
switch op {
case startContainerRuntimeUnitOp:
conn.StartUnitContext(ctx, containerRuntimeUnitName, "replace", reschan)
case stopContainerRuntimeUnitOp:
conn.StopUnitContext(ctx, containerRuntimeUnitName, "replace", reschan)
default:
framework.Failf("Unexpected container runtime op: %v", op)
}
job := <-reschan
framework.ExpectEqual(job, "done", "Expected job to complete with done")
return nil
}
func stopContainerRuntime() error {
return performContainerRuntimeUnitOp(stopContainerRuntimeUnitOp)
}
func startContainerRuntime() error {
return performContainerRuntimeUnitOp(startContainerRuntimeUnitOp)
}
// restartKubelet restarts the current kubelet service. // restartKubelet restarts the current kubelet service.
// the "current" kubelet service is the instance managed by the current e2e_node test run. // the "current" kubelet service is the instance managed by the current e2e_node test run.
// If `running` is true, restarts only if the current kubelet is actually running. In some cases, // If `running` is true, restarts only if the current kubelet is actually running. In some cases,
@ -465,3 +537,35 @@ func waitForAllContainerRemoval(ctx context.Context, podName, podNS string) {
return nil return nil
}, 2*time.Minute, 1*time.Second).Should(gomega.Succeed()) }, 2*time.Minute, 1*time.Second).Should(gomega.Succeed())
} }
func getPidsForProcess(name, pidFile string) ([]int, error) {
if len(pidFile) > 0 {
pid, err := getPidFromPidFile(pidFile)
if err == nil {
return []int{pid}, nil
}
// log the error and fall back to pidof
runtime.HandleError(err)
}
return procfs.PidOf(name)
}
func getPidFromPidFile(pidFile string) (int, error) {
file, err := os.Open(pidFile)
if err != nil {
return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
}
defer file.Close()
data, err := io.ReadAll(file)
if err != nil {
return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
}
pid, err := strconv.Atoi(string(data))
if err != nil {
return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
}
return pid, nil
}