Merge pull request #28975 from ronnielai/image-gc-1

Automatic merge from submit-queue

ImagePuller refactoring

A plain refactoring
- Moving image pullers to a new pkg/kubelet/images directory
- Hiding image pullers inside the new ImageManager 

The next step is to consolidate the logic of the serialized and the parallel image pullers inside ImageManager

xref: #25577
This commit is contained in:
k8s-merge-robot 2016-07-20 13:37:48 -07:00 committed by GitHub
commit afe4977c9b
10 changed files with 155 additions and 58 deletions

View File

@ -142,13 +142,6 @@ type ContainerCommandRunner interface {
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
}
// ImagePuller wraps Runtime.PullImage() to pull a container image.
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type ImagePuller interface {
PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string)
}
// Pod is a group of containers.
type Pod struct {
// The ID of the pod, which can be used to retrieve a particular pod

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
@ -128,7 +129,7 @@ type DockerManager struct {
dockerPuller DockerPuller
// wrapped image puller.
imagePuller kubecontainer.ImagePuller
imagePuller images.ImageManager
// Root of the Docker runtime.
dockerRoot string
@ -261,11 +262,7 @@ func NewDockerManager(
seccompProfileRoot: seccompProfileRoot,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
if serializeImagePulls {
dm.imagePuller = kubecontainer.NewSerializedImagePuller(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff)
} else {
dm.imagePuller = kubecontainer.NewImagePuller(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff)
}
dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls)
dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir)
dm.versionCache = cache.NewObjectCache(
@ -1718,7 +1715,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubecontainer.Do
// No pod secrets for the infra container.
// The message isn't needed for the Infra container
if err, msg := dm.imagePuller.PullImage(pod, container, nil); err != nil {
if err, msg := dm.imagePuller.EnsureImageExists(pod, container, nil); err != nil {
return "", err, msg
}
@ -2130,7 +2127,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
// tryContainerStart attempts to pull and start the container, returning an error and a reason string if the start
// was not successful.
func (dm *DockerManager) tryContainerStart(container *api.Container, pod *api.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, namespaceMode, pidMode, podIP string) (err error, reason string) {
err, msg := dm.imagePuller.PullImage(pod, container, pullSecrets)
err, msg := dm.imagePuller.EnsureImageExists(pod, container, pullSecrets)
if err != nil {
return err, msg
}

18
pkg/kubelet/images/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package images is responsible for managing lifecycle of container images.
package images

View File

@ -0,0 +1,52 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
type imageManager struct {
recorder record.EventRecorder
runtime kubecontainer.Runtime
backOff *flowcontrol.Backoff
imagePuller imagePuller
}
var _ ImageManager = &imageManager{}
func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager {
var imagePuller imagePuller
if serialized {
imagePuller = newSerializedImagePuller(recorder, runtime, imageBackOff)
} else {
imagePuller = newParallelImagePuller(recorder, runtime, imageBackOff)
}
return &imageManager{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
imagePuller: imagePuller,
}
}
func (im *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
return im.imagePuller.pullImage(pod, container, pullSecrets)
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2015 The Kubernetes Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package container
package images
import (
"fmt"
@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/flowcontrol"
)
@ -29,19 +30,19 @@ import (
// imagePuller pulls the image using Runtime.PullImage().
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type imagePuller struct {
type parallelImagePuller struct {
recorder record.EventRecorder
runtime Runtime
runtime kubecontainer.Runtime
backOff *flowcontrol.Backoff
}
// enforce compatibility.
var _ ImagePuller = &imagePuller{}
var _ imagePuller = &parallelImagePuller{}
// NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) ImagePuller {
return &imagePuller{
func newParallelImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller {
return &parallelImagePuller{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
@ -64,7 +65,7 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool {
}
// records an event using ref, event msg. log to glog using prefix, msg, logFn
func (puller *imagePuller) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) {
func (puller *parallelImagePuller) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) {
if ref != nil {
puller.recorder.Event(ref, eventtype, event, msg)
} else {
@ -73,19 +74,19 @@ func (puller *imagePuller) logIt(ref *api.ObjectReference, eventtype, event, pre
}
// PullImage pulls the image for the specified pod and container.
func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container)
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
spec := ImageSpec{container.Image}
spec := kubecontainer.ImageSpec{Image: container.Image}
present, err := puller.runtime.IsImagePresent(spec)
if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
return ErrImageInspect, msg
return kubecontainer.ErrImageInspect, msg
}
if !shouldPullImage(container, present) {
@ -96,7 +97,7 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul
} else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg
return kubecontainer.ErrImageNeverPull, msg
}
}
@ -104,17 +105,17 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg
return kubecontainer.ErrImagePullBackOff, msg
}
puller.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
if err := puller.runtime.PullImage(spec, pullSecrets); err != nil {
puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable {
if err == kubecontainer.RegistryUnavailable {
msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image)
return err, msg
} else {
return ErrImagePull, err.Error()
return kubecontainer.ErrImagePull, err.Error()
}
}
puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package container_test
package images
import (
"errors"
@ -106,7 +106,7 @@ func TestPuller(t *testing.T) {
fakeRuntime := &ctest.FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff)
puller := newParallelImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}}
fakeRuntime.Err = c.pullerErr
@ -114,7 +114,7 @@ func TestPuller(t *testing.T) {
for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second)
err, _ := puller.PullImage(pod, container, nil)
err, _ := puller.pullImage(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2015 The Kubernetes Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package container
package images
import (
"fmt"
@ -23,13 +23,14 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
)
type imagePullRequest struct {
spec ImageSpec
spec kubecontainer.ImageSpec
container *api.Container
pullSecrets []api.Secret
logPrefix string
@ -42,19 +43,19 @@ type imagePullRequest struct {
// 'image pulled' events correspondingly.
type serializedImagePuller struct {
recorder record.EventRecorder
runtime Runtime
runtime kubecontainer.Runtime
backOff *flowcontrol.Backoff
pullRequests chan *imagePullRequest
}
// enforce compatibility.
var _ ImagePuller = &serializedImagePuller{}
var _ imagePuller = &serializedImagePuller{}
// NewSerializedImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
// Pulls one image at a time.
// Issue #10959 has the rationale behind serializing image pulls.
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) ImagePuller {
func newSerializedImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller {
imagePuller := &serializedImagePuller{
recorder: recorder,
runtime: runtime,
@ -75,19 +76,19 @@ func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, eventtype,
}
// PullImage pulls the image for the specified pod and container.
func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
func (puller *serializedImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container)
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
spec := ImageSpec{container.Image}
spec := kubecontainer.ImageSpec{Image: container.Image}
present, err := puller.runtime.IsImagePresent(spec)
if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning)
return ErrImageInspect, msg
return kubecontainer.ErrImageInspect, msg
}
if !shouldPullImage(container, present) {
@ -98,7 +99,7 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont
} else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg
return kubecontainer.ErrImageNeverPull, msg
}
}
@ -106,7 +107,7 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg
return kubecontainer.ErrImagePullBackOff, msg
}
// enqueue image pull request and wait for response.
@ -122,11 +123,11 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont
if err = <-returnChan; err != nil {
puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable {
if err == kubecontainer.RegistryUnavailable {
msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image)
return err, msg
} else {
return ErrImagePull, err.Error()
return kubecontainer.ErrImagePull, err.Error()
}
}
puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package container_test
package images
import (
"errors"
@ -106,7 +106,7 @@ func TestSerializedPuller(t *testing.T) {
fakeRuntime := &ctest.FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff)
puller := newSerializedImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 0}}
fakeRuntime.Err = c.pullerErr
@ -114,7 +114,7 @@ func TestSerializedPuller(t *testing.T) {
for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second)
err, _ := puller.PullImage(pod, container, nil)
err, _ := puller.pullImage(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package images
import "k8s.io/kubernetes/pkg/api"
// ImageManager provides an interface to manage the lifecycle of images.
// Implementations of this interface are expected to deal with pulling (downloading),
// managing, and deleting container images.
// Implementations are expected to abstract the underlying runtimes.
// Implementations are expected to be thread safe.
type ImageManager interface {
// EnsureImageExists ensures that image specified in `container` exists.
EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string)
// TODO(ronl): consolidating image managing and deleting operation in this interface
}
// ImagePuller wraps Runtime.PullImage() to pull a container image.
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type imagePuller interface {
pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string)
}

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/credentialprovider"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/leaky"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/network"
@ -152,7 +153,7 @@ type Runtime struct {
runtimeHelper kubecontainer.RuntimeHelper
recorder record.EventRecorder
livenessManager proberesults.Manager
imagePuller kubecontainer.ImagePuller
imagePuller images.ImageManager
runner kubecontainer.HandlerRunner
execer utilexec.Interface
os kubecontainer.OSInterface
@ -271,11 +272,7 @@ func New(
rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt)
if serializeImagePulls {
rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff)
} else {
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff)
}
rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls)
if err := rkt.getVersions(); err != nil {
return nil, fmt.Errorf("rkt: error getting version info: %v", err)
@ -753,7 +750,7 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, podIP string, c api.Container,
if requiresPrivileged && !securitycontext.HasPrivilegedRequest(&c) {
return fmt.Errorf("cannot make %q: running a custom stage1 requires a privileged security context", format.Pod(pod))
}
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
if err, _ := r.imagePuller.EnsureImageExists(pod, &c, pullSecrets); err != nil {
return nil
}
imgManifest, err := r.getImageManifest(c.Image)