Merge pull request #13516 from samsabed/backoff-image-reason

backoff image pulling on failure
This commit is contained in:
Piotr Szczesniak 2015-10-08 13:05:32 +02:00
commit 95b293c615
14 changed files with 343 additions and 104 deletions

View File

@ -41,6 +41,7 @@ type FakeRuntime struct {
KilledContainers []string KilledContainers []string
VersionInfo string VersionInfo string
Err error Err error
InspectErr error
} }
// FakeRuntime should implement Runtime. // FakeRuntime should implement Runtime.
@ -94,6 +95,7 @@ func (f *FakeRuntime) ClearCalls() {
f.KilledContainers = []string{} f.KilledContainers = []string{}
f.VersionInfo = "" f.VersionInfo = ""
f.Err = nil f.Err = nil
f.InspectErr = nil
} }
func (f *FakeRuntime) assertList(expect []string, test []string) error { func (f *FakeRuntime) assertList(expect []string, test []string) error {
@ -264,10 +266,10 @@ func (f *FakeRuntime) IsImagePresent(image ImageSpec) (bool, error) {
f.CalledFunctions = append(f.CalledFunctions, "IsImagePresent") f.CalledFunctions = append(f.CalledFunctions, "IsImagePresent")
for _, i := range f.ImageList { for _, i := range f.ImageList {
if i.ID == image.Image { if i.ID == image.Image {
return true, f.Err return true, nil
} }
} }
return false, f.Err return false, f.InspectErr
} }
func (f *FakeRuntime) ListImages() ([]Image, error) { func (f *FakeRuntime) ListImages() ([]Image, error) {

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
) )
// imagePuller pulls the image using Runtime.PullImage(). // imagePuller pulls the image using Runtime.PullImage().
@ -30,14 +31,16 @@ import (
type imagePuller struct { type imagePuller struct {
recorder record.EventRecorder recorder record.EventRecorder
runtime Runtime runtime Runtime
backOff *util.Backoff
} }
// NewImagePuller takes an event recorder and container runtime to create a // NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface. // image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime) ImagePuller { func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
return &imagePuller{ return &imagePuller{
recorder: recorder, recorder: recorder,
runtime: runtime, runtime: runtime,
backOff: imageBackOff,
} }
} }
@ -56,24 +59,18 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool {
return false return false
} }
// reportImagePull reports 'image pulling', 'image pulled' or 'image pulling failed' events. // records an event using ref, event msg. log to glog using prefix, msg, logFn
func (puller *imagePuller) reportImagePull(ref *api.ObjectReference, event string, image string, pullError error) { func (puller *imagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) {
if ref == nil { if ref != nil {
return puller.recorder.Eventf(ref, event, msg)
} } else {
logFn(fmt.Sprint(prefix, " ", msg))
switch event {
case "pulling":
puller.recorder.Eventf(ref, "Pulling", "Pulling image %q", image)
case "pulled":
puller.recorder.Eventf(ref, "Pulled", "Successfully pulled image %q", image)
case "failed":
puller.recorder.Eventf(ref, "Failed", "Failed to pull image %q: %v", image, pullError)
} }
} }
// PullImage pulls the image for the specified pod and container. // PullImage pulls the image for the specified pod and container.
func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error { func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container) ref, err := GenerateContainerRef(pod, container)
if err != nil { if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
@ -81,24 +78,36 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul
spec := ImageSpec{container.Image} spec := ImageSpec{container.Image}
present, err := puller.runtime.IsImagePresent(spec) present, err := puller.runtime.IsImagePresent(spec)
if err != nil { if err != nil {
if ref != nil { msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
puller.recorder.Eventf(ref, "Failed", "Failed to inspect image %q: %v", container.Image, err) puller.logIt(ref, "Failed", logPrefix, msg, glog.Warning)
} return ErrImageInspect, msg
return fmt.Errorf("failed to inspect image %q: %v", container.Image, err)
} }
if !shouldPullImage(container, present) { if !shouldPullImage(container, present) {
if present && ref != nil { if present {
puller.recorder.Eventf(ref, "Pulled", "Container image %q already present on machine", container.Image) msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
puller.logIt(ref, "Pulled", logPrefix, msg, glog.Info)
return nil, ""
} else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
puller.logIt(ref, "ErrImageNeverPull", logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg
} }
return nil
} }
puller.reportImagePull(ref, "pulling", container.Image, nil) backOffKey := fmt.Sprintf("%s_%s", pod.Name, container.Image)
if err = puller.runtime.PullImage(spec, pullSecrets); err != nil { if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
puller.reportImagePull(ref, "failed", container.Image, err) msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
return err puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg
} }
puller.reportImagePull(ref, "pulled", container.Image, nil) puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
return nil if err = puller.runtime.PullImage(spec, pullSecrets); err != nil {
puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
return ErrImagePull, err.Error()
}
puller.logIt(ref, "Pulled", logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)
puller.backOff.GC()
return nil, ""
} }

View File

@ -0,0 +1,119 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
)
func TestPuller(t *testing.T) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test_pod",
Namespace: "test-ns",
UID: "bar",
ResourceVersion: "42",
SelfLink: "/api/v1/pods/foo",
}}
cases := []struct {
containerImage string
policy api.PullPolicy
calledFunctions []string
inspectErr error
pullerErr error
expectedErr []error
}{
{ // pull missing image
containerImage: "missing_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil}},
{ // image present, dont pull
containerImage: "present_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil, nil, nil}},
// image present, pull it
{containerImage: "present_image",
policy: api.PullAlways,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil, nil, nil}},
// missing image, error PullNever
{containerImage: "missing_image",
policy: api.PullNever,
calledFunctions: []string{"IsImagePresent"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}},
// missing image, unable to inspect
{containerImage: "missing_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent"},
inspectErr: errors.New("unknown inspectError"),
pullerErr: nil,
expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}},
// missing image, unable to fetch
{containerImage: "typo_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: errors.New("404"),
expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}},
}
for i, c := range cases {
container := &api.Container{
Name: "container_name",
Image: c.containerImage,
ImagePullPolicy: c.policy,
}
backOff := util.NewBackOff(time.Second, time.Minute)
fakeClock := &util.FakeClock{Time: time.Now()}
backOff.Clock = fakeClock
fakeRuntime := &FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, 0}}
fakeRuntime.Err = c.pullerErr
fakeRuntime.InspectErr = c.inspectErr
for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second)
err, _ := puller.PullImage(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
}
}
}

View File

@ -33,6 +33,22 @@ import (
// Container Terminated and Kubelet is backing off the restart // Container Terminated and Kubelet is backing off the restart
var ErrCrashLoopBackOff = errors.New("CrashLoopBackOff") var ErrCrashLoopBackOff = errors.New("CrashLoopBackOff")
var (
// Container image pull failed, kubelet is backing off image pull
ErrImagePullBackOff = errors.New("ImagePullBackOff")
// Unable to inspect image
ErrImageInspect = errors.New("ImageInspectError")
// General image pull error
ErrImagePull = errors.New("ErrImagePull")
// Required Image is absent on host and PullPolicy is NeverPullImage
ErrImageNeverPull = errors.New("ErrImageNeverPull")
)
var ErrRunContainer = errors.New("RunContainerError")
type Version interface { type Version interface {
// Compare compares two versions of the runtime. On success it returns -1 // Compare compares two versions of the runtime. On success it returns -1
// if the version is less than the other, 1 if it is greater than the other, // if the version is less than the other, 1 if it is greater than the other,
@ -108,7 +124,7 @@ type ContainerCommandRunner interface {
// It will check the presence of the image, and report the 'image pulling', // It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly. // 'image pulled' events correspondingly.
type ImagePuller interface { type ImagePuller interface {
PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string)
} }
// Pod is a group of containers, with the status of the pod. // Pod is a group of containers, with the status of the pod.

View File

@ -670,7 +670,8 @@ func TestFindContainersByPod(t *testing.T) {
} }
fakeClient := &FakeDockerClient{} fakeClient := &FakeDockerClient{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) // image back-off is set to nil, this test shouldnt pull images
containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil)
for i, test := range tests { for i, test := range tests {
fakeClient.ContainerList = test.containerList fakeClient.ContainerList = test.containerList
fakeClient.ExitedContainerList = test.exitedContainerList fakeClient.ExitedContainerList = test.exitedContainerList

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/prober" "k8s.io/kubernetes/pkg/kubelet/prober"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs" "k8s.io/kubernetes/pkg/util/procfs"
) )
@ -40,13 +41,13 @@ func NewFakeDockerManager(
osInterface kubecontainer.OSInterface, osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin, networkPlugin network.NetworkPlugin,
generator kubecontainer.RunContainerOptionsGenerator, generator kubecontainer.RunContainerOptionsGenerator,
httpClient kubeletTypes.HttpGetter) *DockerManager { httpClient kubeletTypes.HttpGetter, imageBackOff *util.Backoff) *DockerManager {
fakeOOMAdjuster := oom.NewFakeOOMAdjuster() fakeOOMAdjuster := oom.NewFakeOOMAdjuster()
fakeProcFs := procfs.NewFakeProcFs() fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps, dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false) fakeOOMAdjuster, fakeProcFs, false, imageBackOff)
dm.dockerPuller = &FakeDockerPuller{} dm.dockerPuller = &FakeDockerPuller{}
return dm return dm
} }

View File

@ -158,7 +158,9 @@ func NewDockerManager(
execHandler ExecHandler, execHandler ExecHandler,
oomAdjuster *oom.OOMAdjuster, oomAdjuster *oom.OOMAdjuster,
procFs procfs.ProcFsInterface, procFs procfs.ProcFsInterface,
cpuCFSQuota bool) *DockerManager { cpuCFSQuota bool,
imageBackOff *util.Backoff) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker // Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems. // if there are any problems.
dockerRoot := "/var/lib/docker" dockerRoot := "/var/lib/docker"
@ -211,7 +213,7 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota, cpuCFSQuota: cpuCFSQuota,
} }
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm) dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff)
return dm return dm
} }
@ -509,9 +511,12 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
} }
} }
containerStatus.LastTerminationState = containerStatus.State containerStatus.LastTerminationState = containerStatus.State
containerStatus.State.Waiting = &api.ContainerStateWaiting{Reason: reasonInfo.reason, containerStatus.State = api.ContainerState{
Message: reasonInfo.message} Waiting: &api.ContainerStateWaiting{
containerStatus.State.Running = nil Reason: reasonInfo.reason,
Message: reasonInfo.message,
},
}
} }
continue continue
} }
@ -524,20 +529,27 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
containerStatus.RestartCount = oldStatus.RestartCount containerStatus.RestartCount = oldStatus.RestartCount
containerStatus.LastTerminationState = oldStatus.LastTerminationState containerStatus.LastTerminationState = oldStatus.LastTerminationState
} }
//Check image is ready on the node or not.
image := container.Image
// TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists // TODO(dchen1107): docker/docker/issues/8365 to figure out if the image exists
_, err := dm.client.InspectImage(image) reasonInfo, ok := dm.reasonCache.Get(uid, container.Name)
if err == nil { if !ok {
containerStatus.State.Waiting = &api.ContainerStateWaiting{ // default position for a container
Message: fmt.Sprintf("Image: %s is ready, container is creating", image), // At this point there are no active or dead containers, the reasonCache is empty (no entry or the entry has expired)
Reason: "ContainerCreating", // its reasonable to say the container is being created till a more accurate reason is logged
} containerStatus.State = api.ContainerState{
} else if err == docker.ErrNoSuchImage { Waiting: &api.ContainerStateWaiting{
containerStatus.State.Waiting = &api.ContainerStateWaiting{ Reason: fmt.Sprintf("ContainerCreating"),
Message: fmt.Sprintf("Image: %s is not ready on the node", image), Message: fmt.Sprintf("Image: %s is ready, container is creating", container.Image),
Reason: "ImageNotReady", },
} }
} else if reasonInfo.reason == kubecontainer.ErrImagePullBackOff.Error() ||
reasonInfo.reason == kubecontainer.ErrImageInspect.Error() ||
reasonInfo.reason == kubecontainer.ErrImagePull.Error() ||
reasonInfo.reason == kubecontainer.ErrImageNeverPull.Error() {
// mark it as waiting, reason will be filled bellow
containerStatus.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}}
} else if reasonInfo.reason == kubecontainer.ErrRunContainer.Error() {
// mark it as waiting, reason will be filled bellow
containerStatus.State = api.ContainerState{Waiting: &api.ContainerStateWaiting{}}
} }
statuses[container.Name] = &containerStatus statuses[container.Name] = &containerStatus
} }
@ -545,6 +557,7 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
podStatus.ContainerStatuses = make([]api.ContainerStatus, 0) podStatus.ContainerStatuses = make([]api.ContainerStatus, 0)
for containerName, status := range statuses { for containerName, status := range statuses {
if status.State.Waiting != nil { if status.State.Waiting != nil {
status.State.Running = nil
// For containers in the waiting state, fill in a specific reason if it is recorded. // For containers in the waiting state, fill in a specific reason if it is recorded.
if reasonInfo, ok := dm.reasonCache.Get(uid, containerName); ok { if reasonInfo, ok := dm.reasonCache.Get(uid, containerName); ok {
status.State.Waiting.Reason = reasonInfo.reason status.State.Waiting.Reason = reasonInfo.reason
@ -1599,7 +1612,8 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.Doc
} }
// No pod secrets for the infra container. // No pod secrets for the infra container.
if err := dm.imagePuller.PullImage(pod, container, nil); err != nil { // The message isnt needed for the Infra container
if err, _ := dm.imagePuller.PullImage(pod, container, nil); err != nil {
return "", err return "", err
} }
@ -1841,10 +1855,9 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
continue continue
} }
glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName) glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName)
err := dm.imagePuller.PullImage(pod, container, pullSecrets) err, msg := dm.imagePuller.PullImage(pod, container, pullSecrets)
dm.updateReasonCache(pod, container, "PullImageError", err)
if err != nil { if err != nil {
glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", container.Image, kubecontainer.GetPodFullName(pod), container.Name, err) dm.updateReasonCache(pod, container, err.Error(), errors.New(msg))
continue continue
} }
@ -1864,7 +1877,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
// See createPodInfraContainer for infra container setup. // See createPodInfraContainer for infra container setup.
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID) namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
_, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, getPidMode(pod)) _, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, getPidMode(pod))
dm.updateReasonCache(pod, container, "RunContainerError", err) dm.updateReasonCache(pod, container, kubecontainer.ErrRunContainer.Error(), err)
if err != nil { if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures? // TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err) glog.Errorf("Error running pod %q container %q: %v", kubecontainer.GetPodFullName(pod), container.Name, err)

View File

@ -31,8 +31,10 @@ import (
docker "github.com/fsouza/go-dockerclient" docker "github.com/fsouza/go-dockerclient"
cadvisorApi "github.com/google/cadvisor/info/v1" cadvisorApi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
@ -89,7 +91,8 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage
kubecontainer.FakeOS{}, kubecontainer.FakeOS{},
networkPlugin, networkPlugin,
optionGenerator, optionGenerator,
fakeHTTPClient) fakeHTTPClient,
util.NewBackOff(time.Second, 300*time.Second))
return dockerManager, fakeDocker return dockerManager, fakeDocker
} }
@ -976,60 +979,48 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
Spec: api.PodSpec{ Spec: api.PodSpec{
Containers: []api.Container{ Containers: []api.Container{
{Name: "bar", Image: "pull_always_image", ImagePullPolicy: api.PullAlways}, {Name: "bar", Image: "pull_always_image", ImagePullPolicy: api.PullAlways},
{Name: "bar1", Image: "pull_never_image", ImagePullPolicy: api.PullNever},
{Name: "bar2", Image: "pull_if_not_present_image", ImagePullPolicy: api.PullIfNotPresent}, {Name: "bar2", Image: "pull_if_not_present_image", ImagePullPolicy: api.PullIfNotPresent},
{Name: "bar3", Image: "existing_one", ImagePullPolicy: api.PullIfNotPresent}, {Name: "bar3", Image: "existing_one", ImagePullPolicy: api.PullIfNotPresent},
{Name: "bar4", Image: "want:latest", ImagePullPolicy: api.PullIfNotPresent}, {Name: "bar4", Image: "want:latest", ImagePullPolicy: api.PullIfNotPresent},
{Name: "bar5", Image: "pull_never_image", ImagePullPolicy: api.PullNever},
}, },
}, },
} }
runSyncPod(t, dm, fakeDocker, pod, nil) expectedStatusMap := map[string]api.ContainerState{
"bar": {Running: &api.ContainerStateRunning{unversioned.Now()}},
fakeDocker.Lock() "bar2": {Running: &api.ContainerStateRunning{unversioned.Now()}},
"bar3": {Running: &api.ContainerStateRunning{unversioned.Now()}},
eventSet := []string{ "bar4": {Running: &api.ContainerStateRunning{unversioned.Now()}},
`Pulling Pulling image "pod_infra_image"`, "bar5": {Waiting: &api.ContainerStateWaiting{Reason: kubecontainer.ErrImageNeverPull.Error(),
`Pulled Successfully pulled image "pod_infra_image"`, Message: "Container image \"pull_never_image\" is not present with pull policy of Never"}},
`Pulling Pulling image "pull_always_image"`,
`Pulled Successfully pulled image "pull_always_image"`,
`Pulling Pulling image "pull_if_not_present_image"`,
`Pulled Successfully pulled image "pull_if_not_present_image"`,
`Pulled Container image "existing_one" already present on machine`,
`Pulled Container image "want:latest" already present on machine`,
} }
recorder := dm.recorder.(*record.FakeRecorder) runSyncPod(t, dm, fakeDocker, pod, nil)
statuses, err := dm.GetPodStatus(pod)
var actualEvents []string if err != nil {
for _, ev := range recorder.Events { t.Errorf("unable to get pod status")
if strings.HasPrefix(ev, "Pull") { }
actualEvents = append(actualEvents, ev) for _, c := range pod.Spec.Containers {
if containerStatus, ok := api.GetContainerStatus(statuses.ContainerStatuses, c.Name); ok {
// copy the StartedAt time, to make the structs match
if containerStatus.State.Running != nil && expectedStatusMap[c.Name].Running != nil {
expectedStatusMap[c.Name].Running.StartedAt = containerStatus.State.Running.StartedAt
}
assert.Equal(t, containerStatus.State, expectedStatusMap[c.Name], "for container %s", c.Name)
} }
} }
sort.StringSlice(actualEvents).Sort()
sort.StringSlice(eventSet).Sort()
if !reflect.DeepEqual(actualEvents, eventSet) {
t.Errorf("Expected: %#v, Actual: %#v", eventSet, actualEvents)
}
pulledImageSet := make(map[string]empty) fakeDocker.Lock()
for v := range puller.ImagesPulled { defer fakeDocker.Unlock()
pulledImageSet[puller.ImagesPulled[v]] = empty{}
}
if !reflect.DeepEqual(pulledImageSet, map[string]empty{ pulledImageSorted := puller.ImagesPulled[:]
"pod_infra_image": {}, sort.Strings(pulledImageSorted)
"pull_always_image": {}, assert.Equal(t, []string{"pod_infra_image", "pull_always_image", "pull_if_not_present_image"}, pulledImageSorted)
"pull_if_not_present_image": {},
}) {
t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled)
}
if len(fakeDocker.Created) != 6 { if len(fakeDocker.Created) != 5 {
t.Errorf("Unexpected containers created %v", fakeDocker.Created) t.Errorf("Unexpected containers created %v", fakeDocker.Created)
} }
fakeDocker.Unlock()
} }
func TestSyncPodWithRestartPolicy(t *testing.T) { func TestSyncPodWithRestartPolicy(t *testing.T) {
@ -1474,7 +1465,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
puller := dm.dockerPuller.(*FakeDockerPuller) puller := dm.dockerPuller.(*FakeDockerPuller)
puller.HasImages = []string{} puller.HasImages = []string{}
// Inject the pull image failure error. // Inject the pull image failure error.
failureReason := "PullImageError" failureReason := kubecontainer.ErrImagePull.Error()
puller.ErrorsToInject = []error{fmt.Errorf("%s", failureReason)} puller.ErrorsToInject = []error{fmt.Errorf("%s", failureReason)}
pod := &api.Pod{ pod := &api.Pod{

View File

@ -309,7 +309,7 @@ func NewMainKubelet(
} }
procFs := procfs.NewProcFs() procFs := procfs.NewProcFs()
imageBackOff := util.NewBackOff(resyncInterval, maxContainerBackOff)
// Initialize the runtime. // Initialize the runtime.
switch containerRuntime { switch containerRuntime {
case "docker": case "docker":
@ -331,7 +331,9 @@ func NewMainKubelet(
dockerExecHandler, dockerExecHandler,
oomAdjuster, oomAdjuster,
procFs, procFs,
klet.cpuCFSQuota) klet.cpuCFSQuota,
imageBackOff)
case "rkt": case "rkt":
conf := &rkt.Config{ conf := &rkt.Config{
Path: rktPath, Path: rktPath,
@ -344,7 +346,8 @@ func NewMainKubelet(
recorder, recorder,
containerRefManager, containerRefManager,
klet, // prober klet, // prober
klet.volumeManager) klet.volumeManager,
imageBackOff)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -157,6 +157,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker
kubecontainer.FakeOS{}, kubecontainer.FakeOS{},
networkPlugin, networkPlugin,
nil, nil,
nil,
nil) nil)
return dockerManager, fakeDocker return dockerManager, fakeDocker

View File

@ -45,7 +45,7 @@ func newPod(uid, name string) *api.Pod {
func createFakeRuntimeCache(fakeRecorder *record.FakeRecorder) kubecontainer.RuntimeCache { func createFakeRuntimeCache(fakeRecorder *record.FakeRecorder) kubecontainer.RuntimeCache {
fakeDocker := &dockertools.FakeDockerClient{} fakeDocker := &dockertools.FakeDockerClient{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil)
return kubecontainer.NewFakeRuntimeCache(dockerManager) return kubecontainer.NewFakeRuntimeCache(dockerManager)
} }
@ -193,7 +193,7 @@ func TestFakePodWorkers(t *testing.T) {
fakeDocker := &dockertools.FakeDockerClient{} fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil)
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
kubeletForRealWorkers := &simpleFakeKubelet{} kubeletForRealWorkers := &simpleFakeKubelet{}

View File

@ -113,7 +113,7 @@ func New(config *Config,
recorder record.EventRecorder, recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
prober prober.Prober, prober prober.Prober,
volumeGetter volumeGetter) (*Runtime, error) { volumeGetter volumeGetter, imageBackOff *util.Backoff) (*Runtime, error) {
systemdVersion, err := getSystemdVersion() systemdVersion, err := getSystemdVersion()
if err != nil { if err != nil {
@ -153,7 +153,7 @@ func New(config *Config,
prober: prober, prober: prober,
volumeGetter: volumeGetter, volumeGetter: volumeGetter,
} }
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt) rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff)
// Test the rkt version. // Test the rkt version.
version, err := rkt.Version() version, err := rkt.Version()
@ -418,7 +418,7 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
manifest := appcschema.BlankPodManifest() manifest := appcschema.BlankPodManifest()
for _, c := range pod.Spec.Containers { for _, c := range pod.Spec.Containers {
if err := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil { if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, err return nil, err
} }
imgManifest, err := r.getImageManifest(c.Image) imgManifest, err := r.getImageManifest(c.Image)

View File

@ -84,6 +84,20 @@ func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
return p.Clock.Now().Sub(eventTime) < entry.backoff return p.Clock.Now().Sub(eventTime) < entry.backoff
} }
// Returns True if time since lastupdate is less than the current backoff window.
func (p *Backoff) IsInBackOffSinceUpdate(id string, eventTime time.Time) bool {
p.Lock()
defer p.Unlock()
entry, ok := p.perItemBackoff[id]
if !ok {
return false
}
if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
return false
}
return eventTime.Sub(entry.lastUpdate) < entry.backoff
}
// Garbage collect records that have aged past maxDuration. Backoff users are expected // Garbage collect records that have aged past maxDuration. Backoff users are expected
// to invoke this periodically. // to invoke this periodically.
func (p *Backoff) GC() { func (p *Backoff) GC() {

View File

@ -123,3 +123,72 @@ func TestBackoffGC(t *testing.T) {
t.Errorf("expected GC of entry after %s got entry %v", tc.Now().Sub(lastUpdate), r) t.Errorf("expected GC of entry after %s got entry %v", tc.Now().Sub(lastUpdate), r)
} }
} }
func TestIsInBackOffSinceUpdate(t *testing.T) {
id := "_idIsInBackOffSinceUpdate"
tc := &FakeClock{Time: time.Now()}
step := time.Second
maxDuration := 10 * step
b := NewFakeBackOff(step, maxDuration, tc)
startTime := tc.Now()
cases := []struct {
tick time.Duration
inBackOff bool
value int
}{
{tick: 0, inBackOff: false, value: 0},
{tick: 1, inBackOff: false, value: 1},
{tick: 2, inBackOff: true, value: 2},
{tick: 3, inBackOff: false, value: 2},
{tick: 4, inBackOff: true, value: 4},
{tick: 5, inBackOff: true, value: 4},
{tick: 6, inBackOff: true, value: 4},
{tick: 7, inBackOff: false, value: 4},
{tick: 8, inBackOff: true, value: 8},
{tick: 9, inBackOff: true, value: 8},
{tick: 10, inBackOff: true, value: 8},
{tick: 11, inBackOff: true, value: 8},
{tick: 12, inBackOff: true, value: 8},
{tick: 13, inBackOff: true, value: 8},
{tick: 14, inBackOff: true, value: 8},
{tick: 15, inBackOff: false, value: 8},
{tick: 16, inBackOff: true, value: 10},
{tick: 17, inBackOff: true, value: 10},
{tick: 18, inBackOff: true, value: 10},
{tick: 19, inBackOff: true, value: 10},
{tick: 20, inBackOff: true, value: 10},
{tick: 21, inBackOff: true, value: 10},
{tick: 22, inBackOff: true, value: 10},
{tick: 23, inBackOff: true, value: 10},
{tick: 24, inBackOff: true, value: 10},
{tick: 25, inBackOff: false, value: 10},
{tick: 26, inBackOff: true, value: 10},
{tick: 27, inBackOff: true, value: 10},
{tick: 28, inBackOff: true, value: 10},
{tick: 29, inBackOff: true, value: 10},
{tick: 30, inBackOff: true, value: 10},
{tick: 31, inBackOff: true, value: 10},
{tick: 32, inBackOff: true, value: 10},
{tick: 33, inBackOff: true, value: 10},
{tick: 34, inBackOff: true, value: 10},
{tick: 35, inBackOff: false, value: 10},
{tick: 56, inBackOff: false, value: 0},
{tick: 57, inBackOff: false, value: 1},
}
for _, c := range cases {
tc.Time = startTime.Add(c.tick * step)
if c.inBackOff != b.IsInBackOffSinceUpdate(id, tc.Now()) {
t.Errorf("expected IsInBackOffSinceUpdate %v got %v at tick %s", c.inBackOff, b.IsInBackOffSinceUpdate(id, tc.Now()), c.tick*step)
}
if c.inBackOff && (time.Duration(c.value)*step != b.Get(id)) {
t.Errorf("expected backoff value=%s got %s at tick %s", time.Duration(c.value)*step, b.Get(id), c.tick*step)
}
if !c.inBackOff {
b.Next(id, tc.Now())
}
}
}