mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
kubeadm: enable parallel pulls of images
- Update the logic in checks.go to separate serial and parallel image pulls. - Add a new CRI function PullImagesInParallel() with a private implementation. - Unit test the private implementation. - Update other unit tests in checks_test.go.
This commit is contained in:
parent
510df7eab1
commit
0ba903fd57
@ -815,6 +815,7 @@ type ImagePullCheck struct {
|
||||
imageList []string
|
||||
sandboxImage string
|
||||
imagePullPolicy v1.PullPolicy
|
||||
imagePullSerial bool
|
||||
}
|
||||
|
||||
// Name returns the label for ImagePullCheck
|
||||
@ -824,22 +825,39 @@ func (ImagePullCheck) Name() string {
|
||||
|
||||
// Check pulls images required by kubeadm. This is a mutating check
|
||||
func (ipc ImagePullCheck) Check() (warnings, errorList []error) {
|
||||
// Handle unsupported image pull policy and policy Never.
|
||||
policy := ipc.imagePullPolicy
|
||||
klog.V(1).Infof("using image pull policy: %s", policy)
|
||||
for _, image := range ipc.imageList {
|
||||
if image == ipc.sandboxImage {
|
||||
criSandboxImage, err := ipc.runtime.SandboxImage()
|
||||
if err != nil {
|
||||
klog.V(4).Infof("failed to detect the sandbox image for local container runtime, %v", err)
|
||||
} else if criSandboxImage != ipc.sandboxImage {
|
||||
klog.Warningf("detected that the sandbox image %q of the container runtime is inconsistent with that used by kubeadm. It is recommended that using %q as the CRI sandbox image.",
|
||||
criSandboxImage, ipc.sandboxImage)
|
||||
}
|
||||
switch policy {
|
||||
case v1.PullAlways, v1.PullIfNotPresent:
|
||||
klog.V(1).Infof("using image pull policy: %s", policy)
|
||||
case v1.PullNever:
|
||||
klog.V(1).Infof("skipping the pull of all images due to policy: %s", policy)
|
||||
return warnings, errorList
|
||||
default:
|
||||
errorList = append(errorList, errors.Errorf("unsupported pull policy %q", policy))
|
||||
return warnings, errorList
|
||||
}
|
||||
|
||||
// Handle CRI sandbox image warnings.
|
||||
criSandboxImage, err := ipc.runtime.SandboxImage()
|
||||
if err != nil {
|
||||
klog.V(4).Infof("failed to detect the sandbox image for local container runtime, %v", err)
|
||||
} else if criSandboxImage != ipc.sandboxImage {
|
||||
klog.Warningf("detected that the sandbox image %q of the container runtime is inconsistent with that used by kubeadm."+
|
||||
"It is recommended to use %q as the CRI sandbox image.", criSandboxImage, ipc.sandboxImage)
|
||||
}
|
||||
|
||||
// Perform parallel pulls.
|
||||
if !ipc.imagePullSerial {
|
||||
if err := ipc.runtime.PullImagesInParallel(ipc.imageList, policy == v1.PullIfNotPresent); err != nil {
|
||||
errorList = append(errorList, err)
|
||||
}
|
||||
return warnings, errorList
|
||||
}
|
||||
|
||||
// Perform serial pulls.
|
||||
for _, image := range ipc.imageList {
|
||||
switch policy {
|
||||
case v1.PullNever:
|
||||
klog.V(1).Infof("skipping pull of image: %s", image)
|
||||
continue
|
||||
case v1.PullIfNotPresent:
|
||||
ret, err := ipc.runtime.ImageExists(image)
|
||||
if ret && err == nil {
|
||||
@ -853,14 +871,11 @@ func (ipc ImagePullCheck) Check() (warnings, errorList []error) {
|
||||
case v1.PullAlways:
|
||||
klog.V(1).Infof("pulling: %s", image)
|
||||
if err := ipc.runtime.PullImage(image); err != nil {
|
||||
errorList = append(errorList, errors.Wrapf(err, "failed to pull image %s", image))
|
||||
errorList = append(errorList, errors.WithMessagef(err, "failed to pull image %s", image))
|
||||
}
|
||||
default:
|
||||
// If the policy is unknown return early with an error
|
||||
errorList = append(errorList, errors.Errorf("unsupported pull policy %q", policy))
|
||||
return warnings, errorList
|
||||
}
|
||||
}
|
||||
|
||||
return warnings, errorList
|
||||
}
|
||||
|
||||
@ -1096,12 +1111,18 @@ func RunPullImagesCheck(execer utilsexec.Interface, cfg *kubeadmapi.InitConfigur
|
||||
return &Error{Msg: err.Error()}
|
||||
}
|
||||
|
||||
serialPull := true
|
||||
if cfg.NodeRegistration.ImagePullSerial != nil {
|
||||
serialPull = *cfg.NodeRegistration.ImagePullSerial
|
||||
}
|
||||
|
||||
checks := []Checker{
|
||||
ImagePullCheck{
|
||||
runtime: containerRuntime,
|
||||
imageList: images.GetControlPlaneImages(&cfg.ClusterConfiguration),
|
||||
sandboxImage: images.GetPauseImage(&cfg.ClusterConfiguration),
|
||||
imagePullPolicy: cfg.NodeRegistration.ImagePullPolicy,
|
||||
imagePullSerial: serialPull,
|
||||
},
|
||||
}
|
||||
return RunChecks(checks, os.Stderr, ignorePreflightErrors)
|
||||
|
@ -866,9 +866,11 @@ func TestImagePullCheck(t *testing.T) {
|
||||
},
|
||||
CombinedOutputScript: []fakeexec.FakeAction{
|
||||
// Test case1: pull only img3
|
||||
func() ([]byte, []byte, error) { return []byte("pause"), nil, nil },
|
||||
func() ([]byte, []byte, error) { return nil, nil, nil },
|
||||
// Test case 2: fail to pull image2 and image3
|
||||
// If the pull fails, it will be retried 5 times (see PullImageRetry in constants/constants.go)
|
||||
func() ([]byte, []byte, error) { return []byte("pause"), nil, nil },
|
||||
func() ([]byte, []byte, error) { return nil, nil, nil },
|
||||
func() ([]byte, []byte, error) { return []byte("error"), nil, &fakeexec.FakeExitError{Status: 1} },
|
||||
func() ([]byte, []byte, error) { return []byte("error"), nil, &fakeexec.FakeExitError{Status: 1} },
|
||||
@ -903,6 +905,8 @@ func TestImagePullCheck(t *testing.T) {
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
|
||||
},
|
||||
LookPathFunc: func(cmd string) (string, error) { return "/usr/bin/crictl", nil },
|
||||
}
|
||||
@ -914,8 +918,10 @@ func TestImagePullCheck(t *testing.T) {
|
||||
|
||||
check := ImagePullCheck{
|
||||
runtime: containerRuntime,
|
||||
sandboxImage: "pause",
|
||||
imageList: []string{"img1", "img2", "img3"},
|
||||
imagePullPolicy: corev1.PullIfNotPresent,
|
||||
imagePullSerial: true,
|
||||
}
|
||||
warnings, errors := check.Check()
|
||||
if len(warnings) != 0 {
|
||||
@ -936,8 +942,10 @@ func TestImagePullCheck(t *testing.T) {
|
||||
// Test with unknown policy
|
||||
check = ImagePullCheck{
|
||||
runtime: containerRuntime,
|
||||
sandboxImage: "pause",
|
||||
imageList: []string{"img1", "img2", "img3"},
|
||||
imagePullPolicy: "",
|
||||
imagePullSerial: true,
|
||||
}
|
||||
_, errors = check.Check()
|
||||
if len(errors) != 1 {
|
||||
|
@ -43,6 +43,7 @@ type ContainerRuntime interface {
|
||||
ListKubeContainers() ([]string, error)
|
||||
RemoveContainers(containers []string) error
|
||||
PullImage(image string) error
|
||||
PullImagesInParallel(images []string, ifNotPresent bool) error
|
||||
ImageExists(image string) (bool, error)
|
||||
SandboxImage() (string, error)
|
||||
}
|
||||
@ -139,6 +140,53 @@ func (runtime *CRIRuntime) PullImage(image string) error {
|
||||
return errors.Wrapf(err, "output: %s, error", out)
|
||||
}
|
||||
|
||||
// PullImagesInParallel pulls a list of images in parallel
|
||||
func (runtime *CRIRuntime) PullImagesInParallel(images []string, ifNotPresent bool) error {
|
||||
errs := pullImagesInParallelImpl(images, ifNotPresent, runtime.ImageExists, runtime.PullImage)
|
||||
return errorsutil.NewAggregate(errs)
|
||||
}
|
||||
|
||||
func pullImagesInParallelImpl(images []string, ifNotPresent bool,
|
||||
imageExistsFunc func(string) (bool, error), pullImageFunc func(string) error) []error {
|
||||
|
||||
var errs []error
|
||||
errChan := make(chan error, len(images))
|
||||
|
||||
klog.V(1).Info("pulling all images in parallel")
|
||||
for _, img := range images {
|
||||
image := img
|
||||
go func() {
|
||||
if ifNotPresent {
|
||||
exists, err := imageExistsFunc(image)
|
||||
if err != nil {
|
||||
errChan <- errors.WithMessagef(err, "failed to check if image %s exists", image)
|
||||
return
|
||||
}
|
||||
if exists {
|
||||
klog.V(1).Infof("image exists: %s", image)
|
||||
errChan <- nil
|
||||
return
|
||||
}
|
||||
}
|
||||
err := pullImageFunc(image)
|
||||
if err != nil {
|
||||
err = errors.WithMessagef(err, "failed to pull image %s", image)
|
||||
} else {
|
||||
klog.V(1).Infof("done pulling: %s", image)
|
||||
}
|
||||
errChan <- err
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < len(images); i++ {
|
||||
if err := <-errChan; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
// ImageExists checks to see if the image exists on the system
|
||||
func (runtime *CRIRuntime) ImageExists(image string) (bool, error) {
|
||||
err := runtime.crictl("inspecti", image).Run()
|
||||
|
@ -461,3 +461,78 @@ func TestDetectCRISocketImpl(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPullImagesInParallelImpl(t *testing.T) {
|
||||
testError := errors.New("error")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
images []string
|
||||
ifNotPresent bool
|
||||
imageExistsFunc func(string) (bool, error)
|
||||
pullImageFunc func(string) error
|
||||
expectedErrors int
|
||||
}{
|
||||
{
|
||||
name: "all images exist, no errors",
|
||||
images: []string{"foo", "bar", "baz"},
|
||||
ifNotPresent: true,
|
||||
imageExistsFunc: func(string) (bool, error) {
|
||||
return true, nil
|
||||
},
|
||||
pullImageFunc: nil,
|
||||
expectedErrors: 0,
|
||||
},
|
||||
{
|
||||
name: "cannot check if one image exists due to error",
|
||||
images: []string{"foo", "bar", "baz"},
|
||||
ifNotPresent: true,
|
||||
imageExistsFunc: func(image string) (bool, error) {
|
||||
if image == "baz" {
|
||||
return false, testError
|
||||
}
|
||||
return true, nil
|
||||
},
|
||||
pullImageFunc: nil,
|
||||
expectedErrors: 1,
|
||||
},
|
||||
{
|
||||
name: "cannot pull two images",
|
||||
images: []string{"foo", "bar", "baz"},
|
||||
ifNotPresent: true,
|
||||
imageExistsFunc: func(string) (bool, error) {
|
||||
return false, nil
|
||||
},
|
||||
pullImageFunc: func(image string) error {
|
||||
if image == "foo" {
|
||||
return nil
|
||||
}
|
||||
return testError
|
||||
},
|
||||
expectedErrors: 2,
|
||||
},
|
||||
{
|
||||
name: "pull all images",
|
||||
images: []string{"foo", "bar", "baz"},
|
||||
ifNotPresent: true,
|
||||
imageExistsFunc: func(string) (bool, error) {
|
||||
return false, nil
|
||||
},
|
||||
pullImageFunc: func(string) error {
|
||||
return nil
|
||||
},
|
||||
expectedErrors: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actual := pullImagesInParallelImpl(tc.images, tc.ifNotPresent,
|
||||
tc.imageExistsFunc, tc.pullImageFunc)
|
||||
if len(actual) != tc.expectedErrors {
|
||||
t.Fatalf("expected non-nil errors: %v, got: %v, full list of errors: %v",
|
||||
tc.expectedErrors, len(actual), actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user