Merge pull request #15914 from vishh/serialize-pull

make kubelet image pulls serialized by default.
This commit is contained in:
Filip Grzadkowski 2015-10-23 14:04:23 +02:00
commit edd10d8a83
11 changed files with 306 additions and 14 deletions

View File

@ -146,6 +146,9 @@ type KubeletServer struct {
KubeApiQps float32 KubeApiQps float32
KubeApiBurst int KubeApiBurst int
// Pull images one at a time.
SerializeImagePulls bool
} }
// bootstrapping interface for kubelet, targets the initialization protocol // bootstrapping interface for kubelet, targets the initialization protocol
@ -185,6 +188,8 @@ func NewKubeletServer() *KubeletServer {
HTTPCheckFrequency: 20 * time.Second, HTTPCheckFrequency: 20 * time.Second,
ImageGCHighThresholdPercent: 90, ImageGCHighThresholdPercent: 90,
ImageGCLowThresholdPercent: 80, ImageGCLowThresholdPercent: 80,
KubeApiQps: 5.0,
KubeApiBurst: 10,
KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"), KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"),
LowDiskSpaceThresholdMB: 256, LowDiskSpaceThresholdMB: 256,
MasterServiceNamespace: api.NamespaceDefault, MasterServiceNamespace: api.NamespaceDefault,
@ -199,6 +204,7 @@ func NewKubeletServer() *KubeletServer {
PodInfraContainerImage: dockertools.PodInfraContainerImage, PodInfraContainerImage: dockertools.PodInfraContainerImage,
Port: ports.KubeletPort, Port: ports.KubeletPort,
ReadOnlyPort: ports.KubeletReadOnlyPort, ReadOnlyPort: ports.KubeletReadOnlyPort,
ReconcileCIDR: true,
RegisterNode: true, // will be ignored if no apiserver is configured RegisterNode: true, // will be ignored if no apiserver is configured
RegisterSchedulable: true, RegisterSchedulable: true,
RegistryBurst: 10, RegistryBurst: 10,
@ -208,9 +214,6 @@ func NewKubeletServer() *KubeletServer {
RootDirectory: defaultRootDir, RootDirectory: defaultRootDir,
SyncFrequency: 10 * time.Second, SyncFrequency: 10 * time.Second,
SystemContainer: "", SystemContainer: "",
ReconcileCIDR: true,
KubeApiQps: 5.0,
KubeApiBurst: 10,
} }
} }
@ -292,6 +295,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.RegisterSchedulable, "register-schedulable", s.RegisterSchedulable, "Register the node as schedulable. No-op if register-node is false. [default=true]") fs.BoolVar(&s.RegisterSchedulable, "register-schedulable", s.RegisterSchedulable, "Register the node as schedulable. No-op if register-node is false. [default=true]")
fs.Float32Var(&s.KubeApiQps, "kube-api-qps", s.KubeApiQps, "QPS to use while talking with kubernetes apiserver") fs.Float32Var(&s.KubeApiQps, "kube-api-qps", s.KubeApiQps, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeApiBurst, "kube-api-burst", s.KubeApiBurst, "Burst to use while talking with kubernetes apiserver") fs.IntVar(&s.KubeApiBurst, "kube-api-burst", s.KubeApiBurst, "Burst to use while talking with kubernetes apiserver")
fs.BoolVar(&s.SerializeImagePulls, "serialize-image-pulls", s.SerializeImagePulls, "Pull images one at a time. We recommend *not* changing the default value on nodes that run docker daemon with version < 1.9 or an Aufs storage backend. Issue #10959 has more details. [default=true]")
} }
// UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup // UnsecuredKubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup
@ -413,6 +417,7 @@ func (s *KubeletServer) UnsecuredKubeletConfig() (*KubeletConfig, error) {
RktStage1Image: s.RktStage1Image, RktStage1Image: s.RktStage1Image,
RootDirectory: s.RootDirectory, RootDirectory: s.RootDirectory,
Runonce: s.RunOnce, Runonce: s.RunOnce,
SerializeImagePulls: s.SerializeImagePulls,
StandaloneMode: (len(s.APIServerList) == 0), StandaloneMode: (len(s.APIServerList) == 0),
StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout, StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout,
SyncFrequency: s.SyncFrequency, SyncFrequency: s.SyncFrequency,
@ -672,6 +677,7 @@ func SimpleKubelet(client *client.Client,
ResolverConfig: kubelet.ResolvConfDefault, ResolverConfig: kubelet.ResolvConfDefault,
ResourceContainer: "/kubelet", ResourceContainer: "/kubelet",
RootDirectory: rootDir, RootDirectory: rootDir,
SerializeImagePulls: true,
SyncFrequency: syncFrequency, SyncFrequency: syncFrequency,
SystemContainer: "", SystemContainer: "",
TLSOptions: tlsOptions, TLSOptions: tlsOptions,
@ -866,6 +872,7 @@ type KubeletConfig struct {
RktStage1Image string RktStage1Image string
RootDirectory string RootDirectory string
Runonce bool Runonce bool
SerializeImagePulls bool
StandaloneMode bool StandaloneMode bool
StreamingConnectionIdleTimeout time.Duration StreamingConnectionIdleTimeout time.Duration
SyncFrequency time.Duration SyncFrequency time.Duration
@ -948,7 +955,9 @@ func CreateAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.ResolverConfig, kc.ResolverConfig,
kc.CPUCFSQuota, kc.CPUCFSQuota,
daemonEndpoints, daemonEndpoints,
kc.OOMAdjuster) kc.OOMAdjuster,
kc.SerializeImagePulls,
)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err

View File

@ -257,6 +257,7 @@ runtime-config
scheduler-config scheduler-config
schema-cache-dir schema-cache-dir
secure-port secure-port
serialize-image-pulls
service-account-key-file service-account-key-file
service-account-lookup service-account-lookup
service-account-private-key-file service-account-private-key-file

View File

@ -34,6 +34,9 @@ type imagePuller struct {
backOff *util.Backoff backOff *util.Backoff
} }
// enforce compatibility.
var _ ImagePuller = &imagePuller{}
// NewImagePuller takes an event recorder and container runtime to create a // NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface. // image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller { func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
@ -75,6 +78,7 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul
if err != nil { if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
} }
spec := ImageSpec{container.Image} spec := ImageSpec{container.Image}
present, err := puller.runtime.IsImagePresent(spec) present, err := puller.runtime.IsImagePresent(spec)
if err != nil { if err != nil {
@ -102,7 +106,7 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul
return ErrImagePullBackOff, msg return ErrImagePullBackOff, msg
} }
puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) puller.logIt(ref, "Pulling", logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info)
if err = puller.runtime.PullImage(spec, pullSecrets); err != nil { if err := puller.runtime.PullImage(spec, pullSecrets); err != nil {
puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable { if err == RegistryUnavailable {

View File

@ -115,5 +115,6 @@ func TestPuller(t *testing.T) {
fakeRuntime.AssertCalls(c.calledFunctions) fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
} }
} }
} }

View File

@ -70,6 +70,7 @@ type ImageSpec struct {
// Runtime interface defines the interfaces that should be implemented // Runtime interface defines the interfaces that should be implemented
// by a container runtime. // by a container runtime.
// Thread safety is required from implementations of this interface.
type Runtime interface { type Runtime interface {
// Version returns the version information of the container runtime. // Version returns the version information of the container runtime.
Version() (Version, error) Version() (Version, error)

View File

@ -0,0 +1,140 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
)
type imagePullRequest struct {
spec ImageSpec
container *api.Container
pullSecrets []api.Secret
logPrefix string
ref *api.ObjectReference
returnChan chan<- error
}
// serializedImagePuller pulls the image using Runtime.PullImage().
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type serializedImagePuller struct {
recorder record.EventRecorder
runtime Runtime
backOff *util.Backoff
pullRequests chan *imagePullRequest
}
// enforce compatibility.
var _ ImagePuller = &serializedImagePuller{}
// NewSerializedImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
// Pulls one image at a time.
// Issue #10959 has the rationale behind serializing image pulls.
func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *util.Backoff) ImagePuller {
imagePuller := &serializedImagePuller{
recorder: recorder,
runtime: runtime,
backOff: imageBackOff,
pullRequests: make(chan *imagePullRequest, 10),
}
go util.Until(imagePuller.pullImages, time.Second, util.NeverStop)
return imagePuller
}
// records an event using ref, event msg. log to glog using prefix, msg, logFn
func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, event, prefix, msg string, logFn func(args ...interface{})) {
if ref != nil {
puller.recorder.Eventf(ref, event, msg)
} else {
logFn(fmt.Sprint(prefix, " ", msg))
}
}
// PullImage pulls the image for the specified pod and container.
func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) {
logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image)
ref, err := GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
spec := ImageSpec{container.Image}
present, err := puller.runtime.IsImagePresent(spec)
if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
puller.logIt(ref, "Failed", logPrefix, msg, glog.Warning)
return ErrImageInspect, msg
}
if !shouldPullImage(container, present) {
if present {
msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
puller.logIt(ref, "Pulled", logPrefix, msg, glog.Info)
return nil, ""
} else {
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
puller.logIt(ref, "ErrImageNeverPull", logPrefix, msg, glog.Warning)
return ErrImageNeverPull, msg
}
}
backOffKey := fmt.Sprintf("%s_%s", pod.Name, container.Image)
if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
puller.logIt(ref, "Back-off", logPrefix, msg, glog.Info)
return ErrImagePullBackOff, msg
}
// enqueue image pull request and wait for response.
returnChan := make(chan error)
puller.pullRequests <- &imagePullRequest{
spec: spec,
container: container,
pullSecrets: pullSecrets,
logPrefix: logPrefix,
ref: ref,
returnChan: returnChan,
}
if err = <-returnChan; err != nil {
puller.logIt(ref, "Failed", logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning)
puller.backOff.Next(backOffKey, puller.backOff.Clock.Now())
if err == RegistryUnavailable {
msg := fmt.Sprintf("image pull failed for %s because the registry is temporarily unavailable.", container.Image)
return err, msg
} else {
return ErrImagePull, err.Error()
}
}
puller.logIt(ref, "Pulled", logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info)
puller.backOff.GC()
return nil, ""
}
func (puller *serializedImagePuller) pullImages() {
for pullRequest := range puller.pullRequests {
puller.logIt(pullRequest.ref, "Pulling", pullRequest.logPrefix, fmt.Sprintf("pulling image %q", pullRequest.container.Image), glog.Info)
pullRequest.returnChan <- puller.runtime.PullImage(pullRequest.spec, pullRequest.pullSecrets)
}
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util"
)
func TestSerializedPuller(t *testing.T) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test_pod",
Namespace: "test-ns",
UID: "bar",
ResourceVersion: "42",
SelfLink: "/api/v1/pods/foo",
}}
cases := []struct {
containerImage string
policy api.PullPolicy
calledFunctions []string
inspectErr error
pullerErr error
expectedErr []error
}{
{ // pull missing image
containerImage: "missing_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil}},
{ // image present, dont pull
containerImage: "present_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil, nil, nil}},
// image present, pull it
{containerImage: "present_image",
policy: api.PullAlways,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{nil, nil, nil}},
// missing image, error PullNever
{containerImage: "missing_image",
policy: api.PullNever,
calledFunctions: []string{"IsImagePresent"},
inspectErr: nil,
pullerErr: nil,
expectedErr: []error{ErrImageNeverPull, ErrImageNeverPull, ErrImageNeverPull}},
// missing image, unable to inspect
{containerImage: "missing_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent"},
inspectErr: errors.New("unknown inspectError"),
pullerErr: nil,
expectedErr: []error{ErrImageInspect, ErrImageInspect, ErrImageInspect}},
// missing image, unable to fetch
{containerImage: "typo_image",
policy: api.PullIfNotPresent,
calledFunctions: []string{"IsImagePresent", "PullImage"},
inspectErr: nil,
pullerErr: errors.New("404"),
expectedErr: []error{ErrImagePull, ErrImagePull, ErrImagePullBackOff, ErrImagePull, ErrImagePullBackOff, ErrImagePullBackOff}},
}
for i, c := range cases {
container := &api.Container{
Name: "container_name",
Image: c.containerImage,
ImagePullPolicy: c.policy,
}
backOff := util.NewBackOff(time.Second, time.Minute)
fakeClock := &util.FakeClock{Time: time.Now()}
backOff.Clock = fakeClock
fakeRuntime := &FakeRuntime{}
fakeRecorder := &record.FakeRecorder{}
puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff)
fakeRuntime.ImageList = []Image{{"present_image", nil, 0}}
fakeRuntime.Err = c.pullerErr
fakeRuntime.InspectErr = c.inspectErr
for tick, expected := range c.expectedErr {
fakeClock.Step(time.Second)
err, _ := puller.PullImage(pod, container, nil)
fakeRuntime.AssertCalls(c.calledFunctions)
assert.Equal(t, expected, err, "in test %d tick=%d", i, tick)
}
}
}

View File

@ -47,7 +47,7 @@ func NewFakeDockerManager(
fakeProcFs := procfs.NewFakeProcFs() fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, livenessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, dm := NewDockerManager(client, recorder, livenessManager, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOOMAdjuster, fakeProcFs, false, imageBackOff) fakeOOMAdjuster, fakeProcFs, false, imageBackOff, true)
dm.dockerPuller = &FakeDockerPuller{} dm.dockerPuller = &FakeDockerPuller{}
return dm return dm
} }

View File

@ -161,7 +161,8 @@ func NewDockerManager(
oomAdjuster *oom.OOMAdjuster, oomAdjuster *oom.OOMAdjuster,
procFs procfs.ProcFsInterface, procFs procfs.ProcFsInterface,
cpuCFSQuota bool, cpuCFSQuota bool,
imageBackOff *util.Backoff) *DockerManager { imageBackOff *util.Backoff,
serializeImagePulls bool) *DockerManager {
// Work out the location of the Docker runtime, defaulting to /var/lib/docker // Work out the location of the Docker runtime, defaulting to /var/lib/docker
// if there are any problems. // if there are any problems.
@ -215,7 +216,11 @@ func NewDockerManager(
cpuCFSQuota: cpuCFSQuota, cpuCFSQuota: cpuCFSQuota,
} }
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff) if serializeImagePulls {
dm.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, dm, imageBackOff)
} else {
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm, imageBackOff)
}
dm.containerGC = NewContainerGC(client, containerLogsDir) dm.containerGC = NewContainerGC(client, containerLogsDir)
return dm return dm

View File

@ -184,7 +184,9 @@ func NewMainKubelet(
resolverConfig string, resolverConfig string,
cpuCFSQuota bool, cpuCFSQuota bool,
daemonEndpoints *api.NodeDaemonEndpoints, daemonEndpoints *api.NodeDaemonEndpoints,
oomAdjuster *oom.OOMAdjuster) (*Kubelet, error) { oomAdjuster *oom.OOMAdjuster,
serializeImagePulls bool,
) (*Kubelet, error) {
if rootDirectory == "" { if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory) return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
} }
@ -335,7 +337,9 @@ func NewMainKubelet(
oomAdjuster, oomAdjuster,
procFs, procFs,
klet.cpuCFSQuota, klet.cpuCFSQuota,
imageBackOff) imageBackOff,
serializeImagePulls,
)
case "rkt": case "rkt":
conf := &rkt.Config{ conf := &rkt.Config{
@ -350,7 +354,9 @@ func NewMainKubelet(
containerRefManager, containerRefManager,
klet.livenessManager, klet.livenessManager,
klet.volumeManager, klet.volumeManager,
imageBackOff) imageBackOff,
serializeImagePulls,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -109,8 +109,9 @@ func New(config *Config,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
livenessManager proberesults.Manager, livenessManager proberesults.Manager,
volumeGetter volumeGetter, volumeGetter volumeGetter,
imageBackOff *util.Backoff) (*Runtime, error) { imageBackOff *util.Backoff,
serializeImagePulls bool,
) (*Runtime, error) {
systemdVersion, err := getSystemdVersion() systemdVersion, err := getSystemdVersion()
if err != nil { if err != nil {
return nil, err return nil, err
@ -149,7 +150,11 @@ func New(config *Config,
livenessManager: livenessManager, livenessManager: livenessManager,
volumeGetter: volumeGetter, volumeGetter: volumeGetter,
} }
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) if serializeImagePulls {
rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff)
} else {
rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff)
}
// Test the rkt version. // Test the rkt version.
version, err := rkt.Version() version, err := rkt.Version()