diff --git a/test/e2e_node/mirror_pod_grace_period_test.go b/test/e2e_node/mirror_pod_grace_period_test.go index 6bc3dc2933d..3a8a29125bf 100644 --- a/test/e2e_node/mirror_pod_grace_period_test.go +++ b/test/e2e_node/mirror_pod_grace_period_test.go @@ -131,6 +131,55 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() { framework.ExpectEqual(pod.Spec.Containers[0].Image, image) }) + ginkgo.Context("and the container runtime is temporarily down during pod termination [NodeConformance] [Serial] [Disruptive]", func() { + ginkgo.It("the mirror pod should terminate successfully", func(ctx context.Context) { + ginkgo.By("delete the static pod") + err := deleteStaticPod(podPath, staticPodName, ns) + framework.ExpectNoError(err) + + // Note it is important we have a small delay here as we would like to reproduce https://issues.k8s.io/113091 which requires a failure in syncTerminatingPod() + // This requires waiting a small period between the static pod being deleted so that syncTerminatingPod() will attempt to run + ginkgo.By("sleeping before stopping the container runtime") + time.Sleep(2 * time.Second) + + ginkgo.By("stop the container runtime") + err = stopContainerRuntime() + framework.ExpectNoError(err, "expected no error stopping the container runtime") + + ginkgo.By("waiting for the container runtime to be stopped") + gomega.Eventually(ctx, func(ctx context.Context) error { + _, _, err := getCRIClient() + return err + }, 2*time.Minute, time.Second*5).ShouldNot(gomega.Succeed()) + + ginkgo.By("start the container runtime") + err = startContainerRuntime() + framework.ExpectNoError(err, "expected no error starting the container runtime") + gomega.Consistently(ctx, func(ctx context.Context) error { + ginkgo.By(fmt.Sprintf("verifying that the mirror pod (%s/%s) is running", ns, mirrorPodName)) + err := checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns) + if err != nil { + return fmt.Errorf("expected mirror pod (%s/%s) to be running but it was not: %v", ns, mirrorPodName, err) + } + return nil + }, time.Second*30, time.Second*5).Should(gomega.Succeed()) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + ginkgo.By("starting the container runtime") + err := startContainerRuntime() + framework.ExpectNoError(err, "expected no error starting the container runtime") + ginkgo.By("waiting for the container runtime to start") + gomega.Eventually(ctx, func(ctx context.Context) error { + _, _, err := getCRIClient() + if err != nil { + return fmt.Errorf("error getting cri client: %v", err) + } + return nil + }, 2*time.Minute, time.Second*5).Should(gomega.Succeed()) + }) + }) + ginkgo.AfterEach(func(ctx context.Context) { ginkgo.By("delete the static pod") err := deleteStaticPod(podPath, staticPodName, ns) diff --git a/test/e2e_node/resource_collector.go b/test/e2e_node/resource_collector.go index 60178b9d764..1d4602af6db 100644 --- a/test/e2e_node/resource_collector.go +++ b/test/e2e_node/resource_collector.go @@ -23,11 +23,8 @@ import ( "bytes" "context" "fmt" - "io" "log" - "os" "sort" - "strconv" "strings" "sync" "text/tabwriter" @@ -39,11 +36,9 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" - "k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/test/e2e/framework" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -465,38 +460,6 @@ func (r *ResourceCollector) GetResourceTimeSeries() map[string]*perftype.Resourc const kubeletProcessName = "kubelet" -func getPidsForProcess(name, pidFile string) ([]int, error) { - if len(pidFile) > 0 { - pid, err := getPidFromPidFile(pidFile) - if err == nil { - return []int{pid}, nil - } - // log the error and fall back to pidof - runtime.HandleError(err) - } - return procfs.PidOf(name) -} - -func getPidFromPidFile(pidFile string) (int, error) { - file, err := os.Open(pidFile) - if err != nil { - return 0, fmt.Errorf("error opening pid file %s: %w", pidFile, err) - } - defer file.Close() - - data, err := io.ReadAll(file) - if err != nil { - return 0, fmt.Errorf("error reading pid file %s: %w", pidFile, err) - } - - pid, err := strconv.Atoi(string(data)) - if err != nil { - return 0, fmt.Errorf("error parsing %s as a number: %w", string(data), err) - } - - return pid, nil -} - func getContainerNameForProcess(name, pidFile string) (string, error) { pids, err := getPidsForProcess(name, pidFile) if err != nil { diff --git a/test/e2e_node/util.go b/test/e2e_node/util.go index 9ca1d505acd..82068aead32 100644 --- a/test/e2e_node/util.go +++ b/test/e2e_node/util.go @@ -25,17 +25,21 @@ import ( "io" "net" "net/http" + "os" "os/exec" "regexp" "strconv" "strings" "time" + "k8s.io/kubernetes/pkg/util/procfs" + oteltrace "go.opentelemetry.io/otel/trace" v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" @@ -55,6 +59,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util" + "github.com/coreos/go-systemd/v22/dbus" "k8s.io/kubernetes/test/e2e/framework" e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet" e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics" @@ -84,12 +89,14 @@ const ( var kubeletHealthCheckURL = fmt.Sprintf("http://127.0.0.1:%d/healthz", ports.KubeletHealthzPort) +var containerRuntimeUnitName = "" + func getNodeSummary(ctx context.Context) (*stats.Summary, error) { kubeletConfig, err := getCurrentKubeletConfig(ctx) if err != nil { return nil, fmt.Errorf("failed to get current kubelet config") } - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil) + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s/stats/summary", net.JoinHostPort(kubeletConfig.Address, strconv.Itoa(int(kubeletConfig.ReadOnlyPort)))), nil) if err != nil { return nil, fmt.Errorf("failed to build http request: %w", err) } @@ -340,6 +347,71 @@ func findKubeletServiceName(running bool) string { return kubeletServiceName } +func findContainerRuntimeServiceName() (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn, err := dbus.NewWithContext(ctx) + framework.ExpectNoError(err, "Failed to setup dbus connection") + defer conn.Close() + + runtimePids, err := getPidsForProcess(framework.TestContext.ContainerRuntimeProcessName, framework.TestContext.ContainerRuntimePidFile) + framework.ExpectNoError(err, "failed to get list of container runtime pids") + framework.ExpectEqual(len(runtimePids), 1, "Unexpected number of container runtime pids. Expected 1 but got %v", len(runtimePids)) + + containerRuntimePid := runtimePids[0] + + unitName, err := conn.GetUnitNameByPID(ctx, uint32(containerRuntimePid)) + framework.ExpectNoError(err, "Failed to get container runtime unit name") + + return unitName, nil +} + +type containerRuntimeUnitOp int + +const ( + startContainerRuntimeUnitOp containerRuntimeUnitOp = iota + stopContainerRuntimeUnitOp +) + +func performContainerRuntimeUnitOp(op containerRuntimeUnitOp) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn, err := dbus.NewWithContext(ctx) + framework.ExpectNoError(err, "Failed to setup dbus connection") + defer conn.Close() + + if containerRuntimeUnitName == "" { + containerRuntimeUnitName, err = findContainerRuntimeServiceName() + framework.ExpectNoError(err, "Failed to find container runtime name") + } + + reschan := make(chan string) + + switch op { + case startContainerRuntimeUnitOp: + conn.StartUnitContext(ctx, containerRuntimeUnitName, "replace", reschan) + case stopContainerRuntimeUnitOp: + conn.StopUnitContext(ctx, containerRuntimeUnitName, "replace", reschan) + default: + framework.Failf("Unexpected container runtime op: %v", op) + } + + job := <-reschan + framework.ExpectEqual(job, "done", "Expected job to complete with done") + + return nil +} + +func stopContainerRuntime() error { + return performContainerRuntimeUnitOp(stopContainerRuntimeUnitOp) +} + +func startContainerRuntime() error { + return performContainerRuntimeUnitOp(startContainerRuntimeUnitOp) +} + // restartKubelet restarts the current kubelet service. // the "current" kubelet service is the instance managed by the current e2e_node test run. // If `running` is true, restarts only if the current kubelet is actually running. In some cases, @@ -465,3 +537,35 @@ func waitForAllContainerRemoval(ctx context.Context, podName, podNS string) { return nil }, 2*time.Minute, 1*time.Second).Should(gomega.Succeed()) } + +func getPidsForProcess(name, pidFile string) ([]int, error) { + if len(pidFile) > 0 { + pid, err := getPidFromPidFile(pidFile) + if err == nil { + return []int{pid}, nil + } + // log the error and fall back to pidof + runtime.HandleError(err) + } + return procfs.PidOf(name) +} + +func getPidFromPidFile(pidFile string) (int, error) { + file, err := os.Open(pidFile) + if err != nil { + return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err) + } + defer file.Close() + + data, err := io.ReadAll(file) + if err != nil { + return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err) + } + + pid, err := strconv.Atoi(string(data)) + if err != nil { + return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err) + } + + return pid, nil +}