diff --git a/test/e2e/nodefeature/nodefeature.go b/test/e2e/nodefeature/nodefeature.go index a20bc86a4e0..3b0d1bb287d 100644 --- a/test/e2e/nodefeature/nodefeature.go +++ b/test/e2e/nodefeature/nodefeature.go @@ -77,9 +77,6 @@ var ( // TODO: document the feature (owning SIG, when to use this feature for a test) LSCIQuotaMonitoring = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("LSCIQuotaMonitoring")) - // TODO: document the feature (owning SIG, when to use this feature for a test) - MaxParallelImagePull = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("MaxParallelImagePull")) - // TODO: document the feature (owning SIG, when to use this feature for a test) NodeAllocatable = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("NodeAllocatable")) diff --git a/test/e2e_node/criproxy_test.go b/test/e2e_node/criproxy_test.go index fd200b23086..dd0af330132 100644 --- a/test/e2e_node/criproxy_test.go +++ b/test/e2e_node/criproxy_test.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" kubeletevents "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -68,7 +69,12 @@ var _ = SIGDescribe(feature.CriProxy, framework.WithSerial(), func() { framework.ExpectNoError(err) pod := e2epod.NewPodClient(f).Create(ctx, newPullImageAlwaysPod()) - podErr := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod) + podErr := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "ImagePullBackOff", 1*time.Minute, func(pod *v1.Pod) (bool, error) { + if len(pod.Status.ContainerStatuses) > 0 && pod.Status.Reason == images.ErrImagePullBackOff.Error() { + return true, nil + } + return false, nil + }) gomega.Expect(podErr).To(gomega.HaveOccurred()) eventMsg, err := getFailedToPullImageMsg(ctx, f, pod.Name) diff --git a/test/e2e_node/image_pull_test.go b/test/e2e_node/image_pull_test.go index 480f869069b..6b6e6a47200 100644 --- a/test/e2e_node/image_pull_test.go +++ b/test/e2e_node/image_pull_test.go @@ -1,3 +1,6 @@ +//go:build linux +// +build linux + /* Copyright 2024 The Kubernetes Authors. @@ -18,28 +21,32 @@ package e2enode import ( "context" + "fmt" "strings" + "sync" "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletevents "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" - e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" - "k8s.io/kubernetes/test/e2e/nodefeature" + "k8s.io/kubernetes/test/e2e_node/criproxy" imageutils "k8s.io/kubernetes/test/utils/image" admissionapi "k8s.io/pod-security-admission/api" "k8s.io/utils/ptr" ) -// This test needs to run in serial to prevent caching of the images by other tests -// and to prevent the wait time of image pulls to be increased by other images -var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParallelImagePull, func() { +// CriProxy injector is used to simulate and verify the image pull behavior. +// These tests need to run in serial to prevent caching of the images by other tests +// and to prevent the wait time of image pulls to be increased by other images. +var _ = SIGDescribe("Pull Image", feature.CriProxy, framework.WithSerial(), func() { f := framework.NewDefaultFramework("parallel-pull-image-test") f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged @@ -52,10 +59,18 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle }) ginkgo.BeforeEach(func(ctx context.Context) { + if err := resetCRIProxyInjector(); err != nil { + ginkgo.Skip("Skip the test since the CRI Proxy is undefined.") + } + testpods = prepareAndCleanup(ctx, f) + gomega.Expect(len(testpods)).To(gomega.BeNumerically("<=", 5)) }) ginkgo.AfterEach(func(ctx context.Context) { + err := resetCRIProxyInjector() + framework.ExpectNoError(err) + ginkgo.By("cleanup pods") for _, pod := range testpods { deletePodSyncByName(ctx, f, pod.Name) @@ -63,60 +78,48 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle }) ginkgo.It("should pull immediately if no more than 5 pods", func(ctx context.Context) { - var pods []*v1.Pod - for _, testpod := range testpods { - pods = append(pods, e2epod.NewPodClient(f).Create(ctx, testpod)) - } - for _, pod := range pods { - err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 10*time.Minute, func(pod *v1.Pod) (bool, error) { - if pod.Status.Phase == v1.PodRunning { - return true, nil - } - return false, nil - }) - framework.ExpectNoError(err) - } - - events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{}) - framework.ExpectNoError(err) - imagePulled := map[string]*pulledStruct{} - // start from pulling event creationTimestamp - // end from pulled event creationTimestamp - podStartTime, podEndTime := map[string]metav1.Time{}, map[string]metav1.Time{} - for _, event := range events.Items { - if event.Reason == kubeletevents.PulledImage { - podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp - for _, testpod := range testpods { - if event.InvolvedObject.Name == testpod.Name { - pulled, err := getDurationsFromPulledEventMsg(event.Message) - imagePulled[testpod.Name] = pulled - framework.ExpectNoError(err) - break + var mu sync.Mutex + timeout := 20 * time.Second + callCh := make(chan struct{}) + callStatus := make(map[int]chan struct{}) + err := addCRIProxyInjector(func(apiName string) error { + if apiName == criproxy.PullImage { + mu.Lock() + callID := len(callStatus) + callStatus[callID] = callCh + mu.Unlock() + if callID == 0 { + // wait for next call + select { + case <-callCh: + return nil + case <-time.After(timeout): + return fmt.Errorf("no parallel image pull after %s", timeout) } + } else { + // send a signal to the first call + callCh <- struct{}{} } - } else if event.Reason == kubeletevents.PullingImage { - podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp } - } - gomega.Expect(len(testpods)).To(gomega.BeComparableTo(len(imagePulled))) + return nil + }) + framework.ExpectNoError(err) - // skip if pod1 pulling time and pod2 pulling time are not overlapped - if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) { - if podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) { - e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped") - } - } else { - if podEndTime[testpods[1].Name].Time.Before(podStartTime[testpods[0].Name].Time) { - e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped") - } + for _, testpod := range testpods { + _ = e2epod.NewPodClient(f).Create(ctx, testpod) } - // as this is parallel image pulling, the waiting duration should be similar with the pulled duration. - // use 1.2 as a common ratio - for _, pulled := range imagePulled { - if float32(pulled.pulledIncludeWaitingDuration/time.Millisecond)/float32(pulled.pulledDuration/time.Millisecond) > 1.2 { - framework.Failf("the pull duration including waiting %v should be similar with the pulled duration %v", - pulled.pulledIncludeWaitingDuration, pulled.pulledDuration) + imagePulled, podStartTime, podEndTime, err := getPodImagePullDurations(ctx, f, testpods) + framework.ExpectNoError(err) + + checkPodPullingOverlap(podStartTime, podEndTime, testpods) + + for _, img := range imagePulled { + framework.Logf("Pod pull duration including waiting is %v, and the pulled duration is %v", img.pulledIncludeWaitingDuration, img.pulledDuration) + // if a pod image pull hanged for more than 50%, it is a delayed pull. + if float32(img.pulledIncludeWaitingDuration.Milliseconds())/float32(img.pulledDuration.Milliseconds()) > 1.5 { + // as this is parallel image pulling, the waiting duration should be similar with the pulled duration. + framework.Failf("There is a delayed image pulling, which is not expected for parallel image pulling.") } } }) @@ -124,7 +127,7 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle }) }) -var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParallelImagePull, func() { +var _ = SIGDescribe("Pull Image", feature.CriProxy, framework.WithSerial(), func() { f := framework.NewDefaultFramework("serialize-pull-image-test") f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged @@ -139,10 +142,18 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle var testpods []*v1.Pod ginkgo.BeforeEach(func(ctx context.Context) { + if err := resetCRIProxyInjector(); err != nil { + ginkgo.Skip("Skip the test since the CRI Proxy is undefined.") + } + testpods = prepareAndCleanup(ctx, f) + gomega.Expect(len(testpods)).To(gomega.BeNumerically("<=", 5)) }) ginkgo.AfterEach(func(ctx context.Context) { + err := resetCRIProxyInjector() + framework.ExpectNoError(err) + ginkgo.By("cleanup pods") for _, pod := range testpods { deletePodSyncByName(ctx, f, pod.Name) @@ -150,12 +161,45 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle }) ginkgo.It("should be waiting more", func(ctx context.Context) { + // all serialize image pulls should timeout + timeout := 20 * time.Second + var mu sync.Mutex + callCh := make(chan struct{}) + callStatus := make(map[int]chan struct{}) + err := addCRIProxyInjector(func(apiName string) error { + if apiName == criproxy.PullImage { + mu.Lock() + callID := len(callStatus) + callStatus[callID] = callCh + mu.Unlock() + if callID == 0 { + // wait for next call + select { + case <-callCh: + return errors.New("parallel image pull detected") + case <-time.After(timeout): + return nil + } + } else { + // send a signal to the first call + select { + case callCh <- struct{}{}: + return errors.New("parallel image pull detected") + case <-time.After(timeout): + return nil + } + } + } + return nil + }) + framework.ExpectNoError(err) + var pods []*v1.Pod for _, testpod := range testpods { pods = append(pods, e2epod.NewPodClient(f).Create(ctx, testpod)) } for _, pod := range pods { - err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 10*time.Minute, func(pod *v1.Pod) (bool, error) { + err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 2*time.Minute, func(pod *v1.Pod) (bool, error) { if pod.Status.Phase == v1.PodRunning { return true, nil } @@ -164,56 +208,74 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle framework.ExpectNoError(err) } - events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + imagePulled, podStartTime, podEndTime, err := getPodImagePullDurations(ctx, f, testpods) framework.ExpectNoError(err) - imagePulled := map[string]*pulledStruct{} - // start from pulling event creationTimestamp - // end from pulled event creationTimestamp - podStartTime, podEndTime := map[string]metav1.Time{}, map[string]metav1.Time{} - for _, event := range events.Items { - if event.Reason == kubeletevents.PulledImage { - podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp - for _, testpod := range testpods { - if event.InvolvedObject.Name == testpod.Name { - pulled, err := getDurationsFromPulledEventMsg(event.Message) - imagePulled[testpod.Name] = pulled - framework.ExpectNoError(err) - break - } - } - } else if event.Reason == kubeletevents.PullingImage { - podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp - } - } gomega.Expect(len(testpods)).To(gomega.BeComparableTo(len(imagePulled))) - // skip if pod1 pulling time and pod2 pulling time are not overlapped - if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) { - if podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) { - e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped") - } - } else { - if podEndTime[testpods[1].Name].Time.Before(podStartTime[testpods[0].Name].Time) { - e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped") + checkPodPullingOverlap(podStartTime, podEndTime, testpods) + + // if a pod image pull hanged for more than 50%, it is a delayed pull. + var anyDelayedPull bool + for _, img := range imagePulled { + framework.Logf("Pod pull duration including waiting is %v, and the pulled duration is %v", img.pulledIncludeWaitingDuration, img.pulledDuration) + if float32(img.pulledIncludeWaitingDuration.Milliseconds())/float32(img.pulledDuration.Milliseconds()) > 1.5 { + anyDelayedPull = true } } - // as this is serialize image pulling, the waiting duration should be almost double the duration with the pulled duration. // use 1.5 as a common ratio to avoid some overlap during pod creation - if float32(imagePulled[testpods[1].Name].pulledIncludeWaitingDuration/time.Millisecond)/float32(imagePulled[testpods[1].Name].pulledDuration/time.Millisecond) < 1.5 && - float32(imagePulled[testpods[0].Name].pulledIncludeWaitingDuration/time.Millisecond)/float32(imagePulled[testpods[0].Name].pulledDuration/time.Millisecond) < 1.5 { - framework.Failf("At least, one of the pull duration including waiting %v/%v should be similar with the pulled duration %v/%v", - imagePulled[testpods[1].Name].pulledIncludeWaitingDuration, imagePulled[testpods[0].Name].pulledIncludeWaitingDuration, imagePulled[testpods[1].Name].pulledDuration, imagePulled[testpods[0].Name].pulledDuration) + if !anyDelayedPull { + framework.Failf("All image pullings are not delayed, which is not expected for serilized image pull") } }) }) }) +func getPodImagePullDurations(ctx context.Context, f *framework.Framework, testpods []*v1.Pod) (map[string]*pulledStruct, map[string]metav1.Time, map[string]metav1.Time, error) { + events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, nil, nil, err + } + + imagePulled := map[string]*pulledStruct{} + podStartTime := map[string]metav1.Time{} + podEndTime := map[string]metav1.Time{} + + for _, event := range events.Items { + if event.Reason == kubeletevents.PulledImage { + podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp + for _, testpod := range testpods { + if event.InvolvedObject.Name == testpod.Name { + pulled, err := getDurationsFromPulledEventMsg(event.Message) + if err != nil { + return nil, nil, nil, err + } + imagePulled[testpod.Name] = pulled + break + } + } + } else if event.Reason == kubeletevents.PullingImage { + podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp + } + } + + return imagePulled, podStartTime, podEndTime, nil +} + +// as pods are created at the same time and image pull will delay 15s, the image pull time should be overlapped +func checkPodPullingOverlap(podStartTime map[string]metav1.Time, podEndTime map[string]metav1.Time, testpods []*v1.Pod) { + if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) && podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) { + framework.Failf("%v pulling time and %v pulling time are not overlapped", testpods[0].Name, testpods[1].Name) + } else if podStartTime[testpods[0].Name].Time.After(podStartTime[testpods[1].Name].Time) && podStartTime[testpods[0].Name].Time.After(podEndTime[testpods[1].Name].Time) { + framework.Failf("%v pulling time and %v pulling time are not overlapped", testpods[0].Name, testpods[1].Name) + } +} + func prepareAndCleanup(ctx context.Context, f *framework.Framework) (testpods []*v1.Pod) { // cuda images are > 2Gi and it will reduce the flaky rate - image1 := imageutils.GetE2EImage(imageutils.CudaVectorAdd) - image2 := imageutils.GetE2EImage(imageutils.CudaVectorAdd2) + image1 := imageutils.GetE2EImage(imageutils.Httpd) + image2 := imageutils.GetE2EImage(imageutils.HttpdNew) node := getNodeName(ctx, f) testpod := &v1.Pod{ diff --git a/test/e2e_node/split_disk_test.go b/test/e2e_node/split_disk_test.go index 3ba77dd3f45..5da03534fbd 100644 --- a/test/e2e_node/split_disk_test.go +++ b/test/e2e_node/split_disk_test.go @@ -19,13 +19,14 @@ package e2enode import ( "context" "fmt" - "k8s.io/kubernetes/pkg/features" - e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "os/exec" "path/filepath" "strings" "time" + "k8s.io/kubernetes/pkg/features" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -217,7 +218,7 @@ func runImageFsPressureTest(f *framework.Framework, pressureTimeout time.Duratio if expectedNodeCondition == v1.NodeDiskPressure && framework.TestContext.PrepullImages { // The disk eviction test may cause the pre-pulled images to be evicted, // so pre-pull those images again to ensure this test does not affect subsequent tests. - err := PrePullAllImages() + err := PrePullAllImages(ctx) framework.ExpectNoError(err) } } diff --git a/test/e2e_node/system_node_critical_test.go b/test/e2e_node/system_node_critical_test.go index 8a5cb539c78..8b5c130bec4 100644 --- a/test/e2e_node/system_node_critical_test.go +++ b/test/e2e_node/system_node_critical_test.go @@ -43,16 +43,15 @@ var _ = SIGDescribe("SystemNodeCriticalPod", framework.WithSlow(), framework.Wit // this test only manipulates pods in kube-system f.SkipNamespaceCreation = true + ginkgo.AfterEach(func(ctx context.Context) { + if framework.TestContext.PrepullImages { + // The test may cause the prepulled images to be evicted, + // prepull those images again to ensure this test not affect following tests. + err := PrePullAllImages(ctx) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + } + }) ginkgo.Context("when create a system-node-critical pod", func() { - ginkgo.AfterEach(func(ctx context.Context) { - if framework.TestContext.PrepullImages { - // The test may cause the prepulled images to be evicted, - // prepull those images again to ensure this test not affect following tests. - err := PrePullAllImages(ctx) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - } - }) - tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) { diskConsumed := resource.MustParse("200Mi") summary := eventuallyGetSummary(ctx)