Merge pull request #121604 from pacoxu/image-pull-e2e

[node-e2e] add test cases for serialize and parallel image pulling
This commit is contained in:
Kubernetes Prow Robot 2024-10-31 08:01:26 +00:00 committed by GitHub
commit 453efd7a4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 386 additions and 18 deletions

View File

@ -32,6 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -68,7 +69,12 @@ var _ = SIGDescribe(feature.CriProxy, framework.WithSerial(), func() {
framework.ExpectNoError(err)
pod := e2epod.NewPodClient(f).Create(ctx, newPullImageAlwaysPod())
podErr := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)
podErr := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "ImagePullBackOff", 1*time.Minute, func(pod *v1.Pod) (bool, error) {
if len(pod.Status.ContainerStatuses) > 0 && pod.Status.Reason == images.ErrImagePullBackOff.Error() {
return true, nil
}
return false, nil
})
gomega.Expect(podErr).To(gomega.HaveOccurred())
eventMsg, err := getFailedToPullImageMsg(ctx, f, pod.Name)

View File

@ -240,7 +240,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx context.Context) []byte {
if framework.TestContext.PrepullImages {
klog.Infof("Pre-pulling images so that they are cached for the tests.")
updateImageAllowList(ctx)
err := PrePullAllImages()
err := PrePullAllImages(ctx)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}

View File

@ -647,7 +647,8 @@ func runEvictionTest(f *framework.Framework, pressureTimeout time.Duration, expe
if expectedNodeCondition == v1.NodeDiskPressure && framework.TestContext.PrepullImages {
// The disk eviction test may cause the prepulled images to be evicted,
// prepull those images again to ensure this test not affect following tests.
PrePullAllImages()
err := PrePullAllImages(ctx)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
}
// Run prePull using a defer to make sure it is executed even when the assertions below fails

View File

@ -50,8 +50,8 @@ var _ = SIGDescribe("ImageGarbageCollect", framework.WithSerial(), framework.Wit
_, is, err = getCRIClient()
framework.ExpectNoError(err)
})
ginkgo.AfterEach(func() {
framework.ExpectNoError(PrePullAllImages())
ginkgo.AfterEach(func(ctx context.Context) {
framework.ExpectNoError(PrePullAllImages(ctx))
})
ginkgo.Context("when ImageMaximumGCAge is set", func() {
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {

View File

@ -110,7 +110,9 @@ func getNodeProblemDetectorImage() string {
// puller represents a generic image puller
type puller interface {
// Pull pulls an image by name
Pull(image string) ([]byte, error)
Pull(ctx context.Context, image string) ([]byte, error)
// Remove removes an image by name
Remove(ctx context.Context, image string) error
// Name returns the name of the specific puller implementation
Name() string
}
@ -123,15 +125,19 @@ func (rp *remotePuller) Name() string {
return "CRI"
}
func (rp *remotePuller) Pull(image string) ([]byte, error) {
resp, err := rp.imageService.ImageStatus(context.Background(), &runtimeapi.ImageSpec{Image: image}, false)
func (rp *remotePuller) Pull(ctx context.Context, image string) ([]byte, error) {
resp, err := rp.imageService.ImageStatus(ctx, &runtimeapi.ImageSpec{Image: image}, false)
if err == nil && resp.GetImage() != nil {
return nil, nil
}
_, err = rp.imageService.PullImage(context.Background(), &runtimeapi.ImageSpec{Image: image}, nil, nil)
_, err = rp.imageService.PullImage(ctx, &runtimeapi.ImageSpec{Image: image}, nil, nil)
return nil, err
}
func (rp *remotePuller) Remove(ctx context.Context, image string) error {
return rp.imageService.RemoveImage(ctx, &runtimeapi.ImageSpec{Image: image})
}
func getPuller() (puller, error) {
_, is, err := getCRIClient()
if err != nil {
@ -143,7 +149,7 @@ func getPuller() (puller, error) {
}
// PrePullAllImages pre-fetches all images tests depend on so that we don't fail in an actual test.
func PrePullAllImages() error {
func PrePullAllImages(ctx context.Context) error {
puller, err := getPuller()
if err != nil {
return err
@ -191,7 +197,7 @@ func PrePullAllImages() error {
if retryCount > 0 {
time.Sleep(imagePullRetryDelay)
}
if output, pullErr = puller.Pull(images[i]); pullErr == nil {
if output, pullErr = puller.Pull(ctx, images[i]); pullErr == nil {
break
}
klog.Warningf("Failed to pull %s as user %q, retrying in %s (%d of %d): %v",
@ -211,6 +217,14 @@ func PrePullAllImages() error {
return utilerrors.NewAggregate(pullErrs)
}
func RemoveImage(ctx context.Context, image string) error {
puller, err := getPuller()
if err != nil {
return err
}
return puller.Remove(ctx, image)
}
func getContainerImageFromE2ETestDaemonset(dsYamlPath string) (string, error) {
data, err := e2etestfiles.Read(dsYamlPath)
if err != nil {

View File

@ -0,0 +1,345 @@
//go:build linux
// +build linux
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
"k8s.io/kubernetes/test/e2e_node/criproxy"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
"k8s.io/utils/ptr"
)
// CriProxy injector is used to simulate and verify the image pull behavior.
// These tests need to run in serial to prevent caching of the images by other tests
// and to prevent the wait time of image pulls to be increased by other images.
var _ = SIGDescribe("Pull Image", feature.CriProxy, framework.WithSerial(), func() {
f := framework.NewDefaultFramework("parallel-pull-image-test")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
var testpods []*v1.Pod
ginkgo.Context("parallel image pull with MaxParallelImagePulls=5", func() {
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
initialConfig.SerializeImagePulls = false
initialConfig.MaxParallelImagePulls = ptr.To[int32](5)
})
ginkgo.BeforeEach(func(ctx context.Context) {
if err := resetCRIProxyInjector(); err != nil {
ginkgo.Skip("Skip the test since the CRI Proxy is undefined.")
}
testpods = prepareAndCleanup(ctx, f)
gomega.Expect(len(testpods)).To(gomega.BeNumerically("<=", 5))
})
ginkgo.AfterEach(func(ctx context.Context) {
err := resetCRIProxyInjector()
framework.ExpectNoError(err)
ginkgo.By("cleanup pods")
for _, pod := range testpods {
deletePodSyncByName(ctx, f, pod.Name)
}
})
ginkgo.It("should pull immediately if no more than 5 pods", func(ctx context.Context) {
var mu sync.Mutex
timeout := 20 * time.Second
callCh := make(chan struct{})
callStatus := make(map[int]chan struct{})
err := addCRIProxyInjector(func(apiName string) error {
if apiName == criproxy.PullImage {
mu.Lock()
callID := len(callStatus)
callStatus[callID] = callCh
mu.Unlock()
if callID == 0 {
// wait for next call
select {
case <-callCh:
return nil
case <-time.After(timeout):
return fmt.Errorf("no parallel image pull after %s", timeout)
}
} else {
// send a signal to the first call
callCh <- struct{}{}
}
}
return nil
})
framework.ExpectNoError(err)
for _, testpod := range testpods {
_ = e2epod.NewPodClient(f).Create(ctx, testpod)
}
imagePulled, podStartTime, podEndTime, err := getPodImagePullDurations(ctx, f, testpods)
framework.ExpectNoError(err)
checkPodPullingOverlap(podStartTime, podEndTime, testpods)
for _, img := range imagePulled {
framework.Logf("Pod pull duration including waiting is %v, and the pulled duration is %v", img.pulledIncludeWaitingDuration, img.pulledDuration)
// if a pod image pull hanged for more than 50%, it is a delayed pull.
if float32(img.pulledIncludeWaitingDuration.Milliseconds())/float32(img.pulledDuration.Milliseconds()) > 1.5 {
// as this is parallel image pulling, the waiting duration should be similar with the pulled duration.
framework.Failf("There is a delayed image pulling, which is not expected for parallel image pulling.")
}
}
})
})
})
var _ = SIGDescribe("Pull Image", feature.CriProxy, framework.WithSerial(), func() {
f := framework.NewDefaultFramework("serialize-pull-image-test")
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
ginkgo.Context("serialize image pull", func() {
// this is the default behavior now.
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
initialConfig.SerializeImagePulls = true
initialConfig.MaxParallelImagePulls = ptr.To[int32](1)
})
var testpods []*v1.Pod
ginkgo.BeforeEach(func(ctx context.Context) {
if err := resetCRIProxyInjector(); err != nil {
ginkgo.Skip("Skip the test since the CRI Proxy is undefined.")
}
testpods = prepareAndCleanup(ctx, f)
gomega.Expect(len(testpods)).To(gomega.BeNumerically("<=", 5))
})
ginkgo.AfterEach(func(ctx context.Context) {
err := resetCRIProxyInjector()
framework.ExpectNoError(err)
ginkgo.By("cleanup pods")
for _, pod := range testpods {
deletePodSyncByName(ctx, f, pod.Name)
}
})
ginkgo.It("should be waiting more", func(ctx context.Context) {
// all serialize image pulls should timeout
timeout := 20 * time.Second
var mu sync.Mutex
callCh := make(chan struct{})
callStatus := make(map[int]chan struct{})
err := addCRIProxyInjector(func(apiName string) error {
if apiName == criproxy.PullImage {
mu.Lock()
callID := len(callStatus)
callStatus[callID] = callCh
mu.Unlock()
if callID == 0 {
// wait for next call
select {
case <-callCh:
return errors.New("parallel image pull detected")
case <-time.After(timeout):
return nil
}
} else {
// send a signal to the first call
select {
case callCh <- struct{}{}:
return errors.New("parallel image pull detected")
case <-time.After(timeout):
return nil
}
}
}
return nil
})
framework.ExpectNoError(err)
var pods []*v1.Pod
for _, testpod := range testpods {
pods = append(pods, e2epod.NewPodClient(f).Create(ctx, testpod))
}
for _, pod := range pods {
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 2*time.Minute, func(pod *v1.Pod) (bool, error) {
if pod.Status.Phase == v1.PodRunning {
return true, nil
}
return false, nil
})
framework.ExpectNoError(err)
}
imagePulled, podStartTime, podEndTime, err := getPodImagePullDurations(ctx, f, testpods)
framework.ExpectNoError(err)
gomega.Expect(len(testpods)).To(gomega.BeComparableTo(len(imagePulled)))
checkPodPullingOverlap(podStartTime, podEndTime, testpods)
// if a pod image pull hanged for more than 50%, it is a delayed pull.
var anyDelayedPull bool
for _, img := range imagePulled {
framework.Logf("Pod pull duration including waiting is %v, and the pulled duration is %v", img.pulledIncludeWaitingDuration, img.pulledDuration)
if float32(img.pulledIncludeWaitingDuration.Milliseconds())/float32(img.pulledDuration.Milliseconds()) > 1.5 {
anyDelayedPull = true
}
}
// as this is serialize image pulling, the waiting duration should be almost double the duration with the pulled duration.
// use 1.5 as a common ratio to avoid some overlap during pod creation
if !anyDelayedPull {
framework.Failf("All image pullings are not delayed, which is not expected for serilized image pull")
}
})
})
})
func getPodImagePullDurations(ctx context.Context, f *framework.Framework, testpods []*v1.Pod) (map[string]*pulledStruct, map[string]metav1.Time, map[string]metav1.Time, error) {
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, nil, nil, err
}
imagePulled := map[string]*pulledStruct{}
podStartTime := map[string]metav1.Time{}
podEndTime := map[string]metav1.Time{}
for _, event := range events.Items {
if event.Reason == kubeletevents.PulledImage {
podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp
for _, testpod := range testpods {
if event.InvolvedObject.Name == testpod.Name {
pulled, err := getDurationsFromPulledEventMsg(event.Message)
if err != nil {
return nil, nil, nil, err
}
imagePulled[testpod.Name] = pulled
break
}
}
} else if event.Reason == kubeletevents.PullingImage {
podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp
}
}
return imagePulled, podStartTime, podEndTime, nil
}
// as pods are created at the same time and image pull will delay 15s, the image pull time should be overlapped
func checkPodPullingOverlap(podStartTime map[string]metav1.Time, podEndTime map[string]metav1.Time, testpods []*v1.Pod) {
if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) && podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
framework.Failf("%v pulling time and %v pulling time are not overlapped", testpods[0].Name, testpods[1].Name)
} else if podStartTime[testpods[0].Name].Time.After(podStartTime[testpods[1].Name].Time) && podStartTime[testpods[0].Name].Time.After(podEndTime[testpods[1].Name].Time) {
framework.Failf("%v pulling time and %v pulling time are not overlapped", testpods[0].Name, testpods[1].Name)
}
}
func prepareAndCleanup(ctx context.Context, f *framework.Framework) (testpods []*v1.Pod) {
// cuda images are > 2Gi and it will reduce the flaky rate
image1 := imageutils.GetE2EImage(imageutils.Httpd)
image2 := imageutils.GetE2EImage(imageutils.HttpdNew)
node := getNodeName(ctx, f)
testpod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod",
Namespace: f.Namespace.Name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "testpod",
Image: image1,
ImagePullPolicy: v1.PullAlways,
}},
NodeName: node,
RestartPolicy: v1.RestartPolicyNever,
},
}
testpod2 := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod2",
Namespace: f.Namespace.Name,
},
Spec: v1.PodSpec{
Containers: []v1.Container{{
Name: "testpod2",
Image: image2,
ImagePullPolicy: v1.PullAlways,
}},
NodeName: node,
RestartPolicy: v1.RestartPolicyNever,
},
}
testpods = []*v1.Pod{testpod, testpod2}
ginkgo.By("cleanup images")
for _, pod := range testpods {
_ = RemoveImage(ctx, pod.Spec.Containers[0].Image)
}
return testpods
}
type pulledStruct struct {
pulledDuration time.Duration
pulledIncludeWaitingDuration time.Duration
}
// getDurationsFromPulledEventMsg will parse two durations in the pulled message
// Example msg: `Successfully pulled image \"busybox:1.28\" in 39.356s (49.356s including waiting). Image size: 41901587 bytes.`
func getDurationsFromPulledEventMsg(msg string) (*pulledStruct, error) {
splits := strings.Split(msg, " ")
if len(splits) != 13 {
return nil, errors.Errorf("pull event message should be spilted to 13: %d", len(splits))
}
pulledDuration, err := time.ParseDuration(splits[5])
if err != nil {
return nil, err
}
// to skip '('
pulledIncludeWaitingDuration, err := time.ParseDuration(splits[6][1:])
if err != nil {
return nil, err
}
return &pulledStruct{
pulledDuration: pulledDuration,
pulledIncludeWaitingDuration: pulledIncludeWaitingDuration,
}, nil
}

View File

@ -19,13 +19,14 @@ package e2enode
import (
"context"
"fmt"
"k8s.io/kubernetes/pkg/features"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"os/exec"
"path/filepath"
"strings"
"time"
"k8s.io/kubernetes/pkg/features"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
@ -217,7 +218,7 @@ func runImageFsPressureTest(f *framework.Framework, pressureTimeout time.Duratio
if expectedNodeCondition == v1.NodeDiskPressure && framework.TestContext.PrepullImages {
// The disk eviction test may cause the pre-pulled images to be evicted,
// so pre-pull those images again to ensure this test does not affect subsequent tests.
err := PrePullAllImages()
err := PrePullAllImages(ctx)
framework.ExpectNoError(err)
}
}

View File

@ -43,14 +43,14 @@ var _ = SIGDescribe("SystemNodeCriticalPod", framework.WithSlow(), framework.Wit
// this test only manipulates pods in kube-system
f.SkipNamespaceCreation = true
ginkgo.AfterEach(func() {
ginkgo.AfterEach(func(ctx context.Context) {
if framework.TestContext.PrepullImages {
// The test may cause the prepulled images to be evicted,
// prepull those images again to ensure this test not affect following tests.
PrePullAllImages()
err := PrePullAllImages(ctx)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
})
ginkgo.Context("when create a system-node-critical pod", func() {
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
diskConsumed := resource.MustParse("200Mi")
@ -110,7 +110,8 @@ var _ = SIGDescribe("SystemNodeCriticalPod", framework.WithSlow(), framework.Wit
if framework.TestContext.PrepullImages {
// The test may cause the prepulled images to be evicted,
// prepull those images again to ensure this test not affect following tests.
PrePullAllImages()
err := PrePullAllImages(ctx)
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
}
}()
ginkgo.By("delete the static pod")