diff --git a/cmd/kubeadm/app/preflight/checks.go b/cmd/kubeadm/app/preflight/checks.go index 70265a84cf2..bb4aa2319b0 100644 --- a/cmd/kubeadm/app/preflight/checks.go +++ b/cmd/kubeadm/app/preflight/checks.go @@ -815,6 +815,7 @@ type ImagePullCheck struct { imageList []string sandboxImage string imagePullPolicy v1.PullPolicy + imagePullSerial bool } // Name returns the label for ImagePullCheck @@ -824,22 +825,39 @@ func (ImagePullCheck) Name() string { // Check pulls images required by kubeadm. This is a mutating check func (ipc ImagePullCheck) Check() (warnings, errorList []error) { + // Handle unsupported image pull policy and policy Never. policy := ipc.imagePullPolicy - klog.V(1).Infof("using image pull policy: %s", policy) - for _, image := range ipc.imageList { - if image == ipc.sandboxImage { - criSandboxImage, err := ipc.runtime.SandboxImage() - if err != nil { - klog.V(4).Infof("failed to detect the sandbox image for local container runtime, %v", err) - } else if criSandboxImage != ipc.sandboxImage { - klog.Warningf("detected that the sandbox image %q of the container runtime is inconsistent with that used by kubeadm. It is recommended that using %q as the CRI sandbox image.", - criSandboxImage, ipc.sandboxImage) - } + switch policy { + case v1.PullAlways, v1.PullIfNotPresent: + klog.V(1).Infof("using image pull policy: %s", policy) + case v1.PullNever: + klog.V(1).Infof("skipping the pull of all images due to policy: %s", policy) + return warnings, errorList + default: + errorList = append(errorList, errors.Errorf("unsupported pull policy %q", policy)) + return warnings, errorList + } + + // Handle CRI sandbox image warnings. + criSandboxImage, err := ipc.runtime.SandboxImage() + if err != nil { + klog.V(4).Infof("failed to detect the sandbox image for local container runtime, %v", err) + } else if criSandboxImage != ipc.sandboxImage { + klog.Warningf("detected that the sandbox image %q of the container runtime is inconsistent with that used by kubeadm."+ + "It is recommended to use %q as the CRI sandbox image.", criSandboxImage, ipc.sandboxImage) + } + + // Perform parallel pulls. + if !ipc.imagePullSerial { + if err := ipc.runtime.PullImagesInParallel(ipc.imageList, policy == v1.PullIfNotPresent); err != nil { + errorList = append(errorList, err) } + return warnings, errorList + } + + // Perform serial pulls. + for _, image := range ipc.imageList { switch policy { - case v1.PullNever: - klog.V(1).Infof("skipping pull of image: %s", image) - continue case v1.PullIfNotPresent: ret, err := ipc.runtime.ImageExists(image) if ret && err == nil { @@ -853,14 +871,11 @@ func (ipc ImagePullCheck) Check() (warnings, errorList []error) { case v1.PullAlways: klog.V(1).Infof("pulling: %s", image) if err := ipc.runtime.PullImage(image); err != nil { - errorList = append(errorList, errors.Wrapf(err, "failed to pull image %s", image)) + errorList = append(errorList, errors.WithMessagef(err, "failed to pull image %s", image)) } - default: - // If the policy is unknown return early with an error - errorList = append(errorList, errors.Errorf("unsupported pull policy %q", policy)) - return warnings, errorList } } + return warnings, errorList } @@ -1096,12 +1111,18 @@ func RunPullImagesCheck(execer utilsexec.Interface, cfg *kubeadmapi.InitConfigur return &Error{Msg: err.Error()} } + serialPull := true + if cfg.NodeRegistration.ImagePullSerial != nil { + serialPull = *cfg.NodeRegistration.ImagePullSerial + } + checks := []Checker{ ImagePullCheck{ runtime: containerRuntime, imageList: images.GetControlPlaneImages(&cfg.ClusterConfiguration), sandboxImage: images.GetPauseImage(&cfg.ClusterConfiguration), imagePullPolicy: cfg.NodeRegistration.ImagePullPolicy, + imagePullSerial: serialPull, }, } return RunChecks(checks, os.Stderr, ignorePreflightErrors) diff --git a/cmd/kubeadm/app/preflight/checks_test.go b/cmd/kubeadm/app/preflight/checks_test.go index 28a493ea9d0..8ccf303c08c 100644 --- a/cmd/kubeadm/app/preflight/checks_test.go +++ b/cmd/kubeadm/app/preflight/checks_test.go @@ -866,9 +866,11 @@ func TestImagePullCheck(t *testing.T) { }, CombinedOutputScript: []fakeexec.FakeAction{ // Test case1: pull only img3 + func() ([]byte, []byte, error) { return []byte("pause"), nil, nil }, func() ([]byte, []byte, error) { return nil, nil, nil }, // Test case 2: fail to pull image2 and image3 // If the pull fails, it will be retried 5 times (see PullImageRetry in constants/constants.go) + func() ([]byte, []byte, error) { return []byte("pause"), nil, nil }, func() ([]byte, []byte, error) { return nil, nil, nil }, func() ([]byte, []byte, error) { return []byte("error"), nil, &fakeexec.FakeExitError{Status: 1} }, func() ([]byte, []byte, error) { return []byte("error"), nil, &fakeexec.FakeExitError{Status: 1} }, @@ -903,6 +905,8 @@ func TestImagePullCheck(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return "/usr/bin/crictl", nil }, } @@ -914,8 +918,10 @@ func TestImagePullCheck(t *testing.T) { check := ImagePullCheck{ runtime: containerRuntime, + sandboxImage: "pause", imageList: []string{"img1", "img2", "img3"}, imagePullPolicy: corev1.PullIfNotPresent, + imagePullSerial: true, } warnings, errors := check.Check() if len(warnings) != 0 { @@ -936,8 +942,10 @@ func TestImagePullCheck(t *testing.T) { // Test with unknown policy check = ImagePullCheck{ runtime: containerRuntime, + sandboxImage: "pause", imageList: []string{"img1", "img2", "img3"}, imagePullPolicy: "", + imagePullSerial: true, } _, errors = check.Check() if len(errors) != 1 { diff --git a/cmd/kubeadm/app/util/runtime/runtime.go b/cmd/kubeadm/app/util/runtime/runtime.go index 6767d44ae86..1036fa4c60d 100644 --- a/cmd/kubeadm/app/util/runtime/runtime.go +++ b/cmd/kubeadm/app/util/runtime/runtime.go @@ -43,6 +43,7 @@ type ContainerRuntime interface { ListKubeContainers() ([]string, error) RemoveContainers(containers []string) error PullImage(image string) error + PullImagesInParallel(images []string, ifNotPresent bool) error ImageExists(image string) (bool, error) SandboxImage() (string, error) } @@ -139,6 +140,53 @@ func (runtime *CRIRuntime) PullImage(image string) error { return errors.Wrapf(err, "output: %s, error", out) } +// PullImagesInParallel pulls a list of images in parallel +func (runtime *CRIRuntime) PullImagesInParallel(images []string, ifNotPresent bool) error { + errs := pullImagesInParallelImpl(images, ifNotPresent, runtime.ImageExists, runtime.PullImage) + return errorsutil.NewAggregate(errs) +} + +func pullImagesInParallelImpl(images []string, ifNotPresent bool, + imageExistsFunc func(string) (bool, error), pullImageFunc func(string) error) []error { + + var errs []error + errChan := make(chan error, len(images)) + + klog.V(1).Info("pulling all images in parallel") + for _, img := range images { + image := img + go func() { + if ifNotPresent { + exists, err := imageExistsFunc(image) + if err != nil { + errChan <- errors.WithMessagef(err, "failed to check if image %s exists", image) + return + } + if exists { + klog.V(1).Infof("image exists: %s", image) + errChan <- nil + return + } + } + err := pullImageFunc(image) + if err != nil { + err = errors.WithMessagef(err, "failed to pull image %s", image) + } else { + klog.V(1).Infof("done pulling: %s", image) + } + errChan <- err + }() + } + + for i := 0; i < len(images); i++ { + if err := <-errChan; err != nil { + errs = append(errs, err) + } + } + + return errs +} + // ImageExists checks to see if the image exists on the system func (runtime *CRIRuntime) ImageExists(image string) (bool, error) { err := runtime.crictl("inspecti", image).Run() diff --git a/cmd/kubeadm/app/util/runtime/runtime_test.go b/cmd/kubeadm/app/util/runtime/runtime_test.go index fc2fcb48ce3..d1990923414 100644 --- a/cmd/kubeadm/app/util/runtime/runtime_test.go +++ b/cmd/kubeadm/app/util/runtime/runtime_test.go @@ -461,3 +461,78 @@ func TestDetectCRISocketImpl(t *testing.T) { }) } } + +func TestPullImagesInParallelImpl(t *testing.T) { + testError := errors.New("error") + + tests := []struct { + name string + images []string + ifNotPresent bool + imageExistsFunc func(string) (bool, error) + pullImageFunc func(string) error + expectedErrors int + }{ + { + name: "all images exist, no errors", + images: []string{"foo", "bar", "baz"}, + ifNotPresent: true, + imageExistsFunc: func(string) (bool, error) { + return true, nil + }, + pullImageFunc: nil, + expectedErrors: 0, + }, + { + name: "cannot check if one image exists due to error", + images: []string{"foo", "bar", "baz"}, + ifNotPresent: true, + imageExistsFunc: func(image string) (bool, error) { + if image == "baz" { + return false, testError + } + return true, nil + }, + pullImageFunc: nil, + expectedErrors: 1, + }, + { + name: "cannot pull two images", + images: []string{"foo", "bar", "baz"}, + ifNotPresent: true, + imageExistsFunc: func(string) (bool, error) { + return false, nil + }, + pullImageFunc: func(image string) error { + if image == "foo" { + return nil + } + return testError + }, + expectedErrors: 2, + }, + { + name: "pull all images", + images: []string{"foo", "bar", "baz"}, + ifNotPresent: true, + imageExistsFunc: func(string) (bool, error) { + return false, nil + }, + pullImageFunc: func(string) error { + return nil + }, + expectedErrors: 0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actual := pullImagesInParallelImpl(tc.images, tc.ifNotPresent, + tc.imageExistsFunc, tc.pullImageFunc) + if len(actual) != tc.expectedErrors { + t.Fatalf("expected non-nil errors: %v, got: %v, full list of errors: %v", + tc.expectedErrors, len(actual), actual) + } + }) + } +}