diff --git a/test/e2e_node/criproxy_test.go b/test/e2e_node/criproxy_test.go index fd200b23086..dd0af330132 100644 --- a/test/e2e_node/criproxy_test.go +++ b/test/e2e_node/criproxy_test.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" kubeletevents "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/test/e2e/feature" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -68,7 +69,12 @@ var _ = SIGDescribe(feature.CriProxy, framework.WithSerial(), func() { framework.ExpectNoError(err) pod := e2epod.NewPodClient(f).Create(ctx, newPullImageAlwaysPod()) - podErr := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod) + podErr := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "ImagePullBackOff", 1*time.Minute, func(pod *v1.Pod) (bool, error) { + if len(pod.Status.ContainerStatuses) > 0 && pod.Status.Reason == images.ErrImagePullBackOff.Error() { + return true, nil + } + return false, nil + }) gomega.Expect(podErr).To(gomega.HaveOccurred()) eventMsg, err := getFailedToPullImageMsg(ctx, f, pod.Name) diff --git a/test/e2e_node/e2e_node_suite_test.go b/test/e2e_node/e2e_node_suite_test.go index e581a4f016a..8089b0809e9 100644 --- a/test/e2e_node/e2e_node_suite_test.go +++ b/test/e2e_node/e2e_node_suite_test.go @@ -240,7 +240,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx context.Context) []byte { if framework.TestContext.PrepullImages { klog.Infof("Pre-pulling images so that they are cached for the tests.") updateImageAllowList(ctx) - err := PrePullAllImages() + err := PrePullAllImages(ctx) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) } diff --git a/test/e2e_node/eviction_test.go b/test/e2e_node/eviction_test.go index cc31d6e54fc..7d7f70cf204 100644 --- a/test/e2e_node/eviction_test.go +++ b/test/e2e_node/eviction_test.go @@ -647,7 +647,8 @@ func runEvictionTest(f *framework.Framework, pressureTimeout time.Duration, expe if expectedNodeCondition == v1.NodeDiskPressure && framework.TestContext.PrepullImages { // The disk eviction test may cause the prepulled images to be evicted, // prepull those images again to ensure this test not affect following tests. - PrePullAllImages() + err := PrePullAllImages(ctx) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) } } // Run prePull using a defer to make sure it is executed even when the assertions below fails diff --git a/test/e2e_node/image_gc_test.go b/test/e2e_node/image_gc_test.go index 118c078285e..5a725e66875 100644 --- a/test/e2e_node/image_gc_test.go +++ b/test/e2e_node/image_gc_test.go @@ -50,8 +50,8 @@ var _ = SIGDescribe("ImageGarbageCollect", framework.WithSerial(), framework.Wit _, is, err = getCRIClient() framework.ExpectNoError(err) }) - ginkgo.AfterEach(func() { - framework.ExpectNoError(PrePullAllImages()) + ginkgo.AfterEach(func(ctx context.Context) { + framework.ExpectNoError(PrePullAllImages(ctx)) }) ginkgo.Context("when ImageMaximumGCAge is set", func() { tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) { diff --git a/test/e2e_node/image_list.go b/test/e2e_node/image_list.go index 3363be4993f..d96ba8ebf35 100644 --- a/test/e2e_node/image_list.go +++ b/test/e2e_node/image_list.go @@ -110,7 +110,9 @@ func getNodeProblemDetectorImage() string { // puller represents a generic image puller type puller interface { // Pull pulls an image by name - Pull(image string) ([]byte, error) + Pull(ctx context.Context, image string) ([]byte, error) + // Remove removes an image by name + Remove(ctx context.Context, image string) error // Name returns the name of the specific puller implementation Name() string } @@ -123,15 +125,19 @@ func (rp *remotePuller) Name() string { return "CRI" } -func (rp *remotePuller) Pull(image string) ([]byte, error) { - resp, err := rp.imageService.ImageStatus(context.Background(), &runtimeapi.ImageSpec{Image: image}, false) +func (rp *remotePuller) Pull(ctx context.Context, image string) ([]byte, error) { + resp, err := rp.imageService.ImageStatus(ctx, &runtimeapi.ImageSpec{Image: image}, false) if err == nil && resp.GetImage() != nil { return nil, nil } - _, err = rp.imageService.PullImage(context.Background(), &runtimeapi.ImageSpec{Image: image}, nil, nil) + _, err = rp.imageService.PullImage(ctx, &runtimeapi.ImageSpec{Image: image}, nil, nil) return nil, err } +func (rp *remotePuller) Remove(ctx context.Context, image string) error { + return rp.imageService.RemoveImage(ctx, &runtimeapi.ImageSpec{Image: image}) +} + func getPuller() (puller, error) { _, is, err := getCRIClient() if err != nil { @@ -143,7 +149,7 @@ func getPuller() (puller, error) { } // PrePullAllImages pre-fetches all images tests depend on so that we don't fail in an actual test. -func PrePullAllImages() error { +func PrePullAllImages(ctx context.Context) error { puller, err := getPuller() if err != nil { return err @@ -191,7 +197,7 @@ func PrePullAllImages() error { if retryCount > 0 { time.Sleep(imagePullRetryDelay) } - if output, pullErr = puller.Pull(images[i]); pullErr == nil { + if output, pullErr = puller.Pull(ctx, images[i]); pullErr == nil { break } klog.Warningf("Failed to pull %s as user %q, retrying in %s (%d of %d): %v", @@ -211,6 +217,14 @@ func PrePullAllImages() error { return utilerrors.NewAggregate(pullErrs) } +func RemoveImage(ctx context.Context, image string) error { + puller, err := getPuller() + if err != nil { + return err + } + return puller.Remove(ctx, image) +} + func getContainerImageFromE2ETestDaemonset(dsYamlPath string) (string, error) { data, err := e2etestfiles.Read(dsYamlPath) if err != nil { diff --git a/test/e2e_node/image_pull_test.go b/test/e2e_node/image_pull_test.go new file mode 100644 index 00000000000..6b6e6a47200 --- /dev/null +++ b/test/e2e_node/image_pull_test.go @@ -0,0 +1,345 @@ +//go:build linux +// +build linux + +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "github.com/pkg/errors" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + kubeletevents "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/test/e2e/feature" + "k8s.io/kubernetes/test/e2e/framework" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + "k8s.io/kubernetes/test/e2e_node/criproxy" + imageutils "k8s.io/kubernetes/test/utils/image" + admissionapi "k8s.io/pod-security-admission/api" + "k8s.io/utils/ptr" +) + +// CriProxy injector is used to simulate and verify the image pull behavior. +// These tests need to run in serial to prevent caching of the images by other tests +// and to prevent the wait time of image pulls to be increased by other images. +var _ = SIGDescribe("Pull Image", feature.CriProxy, framework.WithSerial(), func() { + + f := framework.NewDefaultFramework("parallel-pull-image-test") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + var testpods []*v1.Pod + + ginkgo.Context("parallel image pull with MaxParallelImagePulls=5", func() { + tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) { + initialConfig.SerializeImagePulls = false + initialConfig.MaxParallelImagePulls = ptr.To[int32](5) + }) + + ginkgo.BeforeEach(func(ctx context.Context) { + if err := resetCRIProxyInjector(); err != nil { + ginkgo.Skip("Skip the test since the CRI Proxy is undefined.") + } + + testpods = prepareAndCleanup(ctx, f) + gomega.Expect(len(testpods)).To(gomega.BeNumerically("<=", 5)) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + err := resetCRIProxyInjector() + framework.ExpectNoError(err) + + ginkgo.By("cleanup pods") + for _, pod := range testpods { + deletePodSyncByName(ctx, f, pod.Name) + } + }) + + ginkgo.It("should pull immediately if no more than 5 pods", func(ctx context.Context) { + var mu sync.Mutex + timeout := 20 * time.Second + callCh := make(chan struct{}) + callStatus := make(map[int]chan struct{}) + err := addCRIProxyInjector(func(apiName string) error { + if apiName == criproxy.PullImage { + mu.Lock() + callID := len(callStatus) + callStatus[callID] = callCh + mu.Unlock() + if callID == 0 { + // wait for next call + select { + case <-callCh: + return nil + case <-time.After(timeout): + return fmt.Errorf("no parallel image pull after %s", timeout) + } + } else { + // send a signal to the first call + callCh <- struct{}{} + } + } + return nil + }) + framework.ExpectNoError(err) + + for _, testpod := range testpods { + _ = e2epod.NewPodClient(f).Create(ctx, testpod) + } + + imagePulled, podStartTime, podEndTime, err := getPodImagePullDurations(ctx, f, testpods) + framework.ExpectNoError(err) + + checkPodPullingOverlap(podStartTime, podEndTime, testpods) + + for _, img := range imagePulled { + framework.Logf("Pod pull duration including waiting is %v, and the pulled duration is %v", img.pulledIncludeWaitingDuration, img.pulledDuration) + // if a pod image pull hanged for more than 50%, it is a delayed pull. + if float32(img.pulledIncludeWaitingDuration.Milliseconds())/float32(img.pulledDuration.Milliseconds()) > 1.5 { + // as this is parallel image pulling, the waiting duration should be similar with the pulled duration. + framework.Failf("There is a delayed image pulling, which is not expected for parallel image pulling.") + } + } + }) + + }) +}) + +var _ = SIGDescribe("Pull Image", feature.CriProxy, framework.WithSerial(), func() { + + f := framework.NewDefaultFramework("serialize-pull-image-test") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + ginkgo.Context("serialize image pull", func() { + // this is the default behavior now. + tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) { + initialConfig.SerializeImagePulls = true + initialConfig.MaxParallelImagePulls = ptr.To[int32](1) + }) + + var testpods []*v1.Pod + + ginkgo.BeforeEach(func(ctx context.Context) { + if err := resetCRIProxyInjector(); err != nil { + ginkgo.Skip("Skip the test since the CRI Proxy is undefined.") + } + + testpods = prepareAndCleanup(ctx, f) + gomega.Expect(len(testpods)).To(gomega.BeNumerically("<=", 5)) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + err := resetCRIProxyInjector() + framework.ExpectNoError(err) + + ginkgo.By("cleanup pods") + for _, pod := range testpods { + deletePodSyncByName(ctx, f, pod.Name) + } + }) + + ginkgo.It("should be waiting more", func(ctx context.Context) { + // all serialize image pulls should timeout + timeout := 20 * time.Second + var mu sync.Mutex + callCh := make(chan struct{}) + callStatus := make(map[int]chan struct{}) + err := addCRIProxyInjector(func(apiName string) error { + if apiName == criproxy.PullImage { + mu.Lock() + callID := len(callStatus) + callStatus[callID] = callCh + mu.Unlock() + if callID == 0 { + // wait for next call + select { + case <-callCh: + return errors.New("parallel image pull detected") + case <-time.After(timeout): + return nil + } + } else { + // send a signal to the first call + select { + case callCh <- struct{}{}: + return errors.New("parallel image pull detected") + case <-time.After(timeout): + return nil + } + } + } + return nil + }) + framework.ExpectNoError(err) + + var pods []*v1.Pod + for _, testpod := range testpods { + pods = append(pods, e2epod.NewPodClient(f).Create(ctx, testpod)) + } + for _, pod := range pods { + err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 2*time.Minute, func(pod *v1.Pod) (bool, error) { + if pod.Status.Phase == v1.PodRunning { + return true, nil + } + return false, nil + }) + framework.ExpectNoError(err) + } + + imagePulled, podStartTime, podEndTime, err := getPodImagePullDurations(ctx, f, testpods) + framework.ExpectNoError(err) + gomega.Expect(len(testpods)).To(gomega.BeComparableTo(len(imagePulled))) + + checkPodPullingOverlap(podStartTime, podEndTime, testpods) + + // if a pod image pull hanged for more than 50%, it is a delayed pull. + var anyDelayedPull bool + for _, img := range imagePulled { + framework.Logf("Pod pull duration including waiting is %v, and the pulled duration is %v", img.pulledIncludeWaitingDuration, img.pulledDuration) + if float32(img.pulledIncludeWaitingDuration.Milliseconds())/float32(img.pulledDuration.Milliseconds()) > 1.5 { + anyDelayedPull = true + } + } + // as this is serialize image pulling, the waiting duration should be almost double the duration with the pulled duration. + // use 1.5 as a common ratio to avoid some overlap during pod creation + if !anyDelayedPull { + framework.Failf("All image pullings are not delayed, which is not expected for serilized image pull") + } + }) + + }) +}) + +func getPodImagePullDurations(ctx context.Context, f *framework.Framework, testpods []*v1.Pod) (map[string]*pulledStruct, map[string]metav1.Time, map[string]metav1.Time, error) { + events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, nil, nil, err + } + + imagePulled := map[string]*pulledStruct{} + podStartTime := map[string]metav1.Time{} + podEndTime := map[string]metav1.Time{} + + for _, event := range events.Items { + if event.Reason == kubeletevents.PulledImage { + podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp + for _, testpod := range testpods { + if event.InvolvedObject.Name == testpod.Name { + pulled, err := getDurationsFromPulledEventMsg(event.Message) + if err != nil { + return nil, nil, nil, err + } + imagePulled[testpod.Name] = pulled + break + } + } + } else if event.Reason == kubeletevents.PullingImage { + podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp + } + } + + return imagePulled, podStartTime, podEndTime, nil +} + +// as pods are created at the same time and image pull will delay 15s, the image pull time should be overlapped +func checkPodPullingOverlap(podStartTime map[string]metav1.Time, podEndTime map[string]metav1.Time, testpods []*v1.Pod) { + if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) && podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) { + framework.Failf("%v pulling time and %v pulling time are not overlapped", testpods[0].Name, testpods[1].Name) + } else if podStartTime[testpods[0].Name].Time.After(podStartTime[testpods[1].Name].Time) && podStartTime[testpods[0].Name].Time.After(podEndTime[testpods[1].Name].Time) { + framework.Failf("%v pulling time and %v pulling time are not overlapped", testpods[0].Name, testpods[1].Name) + } +} + +func prepareAndCleanup(ctx context.Context, f *framework.Framework) (testpods []*v1.Pod) { + // cuda images are > 2Gi and it will reduce the flaky rate + image1 := imageutils.GetE2EImage(imageutils.Httpd) + image2 := imageutils.GetE2EImage(imageutils.HttpdNew) + node := getNodeName(ctx, f) + + testpod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: f.Namespace.Name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "testpod", + Image: image1, + ImagePullPolicy: v1.PullAlways, + }}, + NodeName: node, + RestartPolicy: v1.RestartPolicyNever, + }, + } + testpod2 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod2", + Namespace: f.Namespace.Name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{ + Name: "testpod2", + Image: image2, + ImagePullPolicy: v1.PullAlways, + }}, + NodeName: node, + RestartPolicy: v1.RestartPolicyNever, + }, + } + testpods = []*v1.Pod{testpod, testpod2} + + ginkgo.By("cleanup images") + for _, pod := range testpods { + _ = RemoveImage(ctx, pod.Spec.Containers[0].Image) + } + return testpods +} + +type pulledStruct struct { + pulledDuration time.Duration + pulledIncludeWaitingDuration time.Duration +} + +// getDurationsFromPulledEventMsg will parse two durations in the pulled message +// Example msg: `Successfully pulled image \"busybox:1.28\" in 39.356s (49.356s including waiting). Image size: 41901587 bytes.` +func getDurationsFromPulledEventMsg(msg string) (*pulledStruct, error) { + splits := strings.Split(msg, " ") + if len(splits) != 13 { + return nil, errors.Errorf("pull event message should be spilted to 13: %d", len(splits)) + } + pulledDuration, err := time.ParseDuration(splits[5]) + if err != nil { + return nil, err + } + // to skip '(' + pulledIncludeWaitingDuration, err := time.ParseDuration(splits[6][1:]) + if err != nil { + return nil, err + } + return &pulledStruct{ + pulledDuration: pulledDuration, + pulledIncludeWaitingDuration: pulledIncludeWaitingDuration, + }, nil +} diff --git a/test/e2e_node/split_disk_test.go b/test/e2e_node/split_disk_test.go index 3ba77dd3f45..5da03534fbd 100644 --- a/test/e2e_node/split_disk_test.go +++ b/test/e2e_node/split_disk_test.go @@ -19,13 +19,14 @@ package e2enode import ( "context" "fmt" - "k8s.io/kubernetes/pkg/features" - e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "os/exec" "path/filepath" "strings" "time" + "k8s.io/kubernetes/pkg/features" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" @@ -217,7 +218,7 @@ func runImageFsPressureTest(f *framework.Framework, pressureTimeout time.Duratio if expectedNodeCondition == v1.NodeDiskPressure && framework.TestContext.PrepullImages { // The disk eviction test may cause the pre-pulled images to be evicted, // so pre-pull those images again to ensure this test does not affect subsequent tests. - err := PrePullAllImages() + err := PrePullAllImages(ctx) framework.ExpectNoError(err) } } diff --git a/test/e2e_node/system_node_critical_test.go b/test/e2e_node/system_node_critical_test.go index d4f842e44ea..8b5c130bec4 100644 --- a/test/e2e_node/system_node_critical_test.go +++ b/test/e2e_node/system_node_critical_test.go @@ -43,14 +43,14 @@ var _ = SIGDescribe("SystemNodeCriticalPod", framework.WithSlow(), framework.Wit // this test only manipulates pods in kube-system f.SkipNamespaceCreation = true - ginkgo.AfterEach(func() { + ginkgo.AfterEach(func(ctx context.Context) { if framework.TestContext.PrepullImages { // The test may cause the prepulled images to be evicted, // prepull those images again to ensure this test not affect following tests. - PrePullAllImages() + err := PrePullAllImages(ctx) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) } }) - ginkgo.Context("when create a system-node-critical pod", func() { tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) { diskConsumed := resource.MustParse("200Mi") @@ -110,7 +110,8 @@ var _ = SIGDescribe("SystemNodeCriticalPod", framework.WithSlow(), framework.Wit if framework.TestContext.PrepullImages { // The test may cause the prepulled images to be evicted, // prepull those images again to ensure this test not affect following tests. - PrePullAllImages() + err := PrePullAllImages(ctx) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) } }() ginkgo.By("delete the static pod")