Merge pull request #7520 from vmarmol/runtime-pull

Add RuntimeHooks to abstract Kubelet logic
This commit is contained in:
Yu-Ju Hong 2015-04-30 08:33:19 -07:00
commit baaea11cd5
6 changed files with 93 additions and 34 deletions

View File

@ -81,6 +81,16 @@ type Runtime interface {
GetContainerLogs(containerID, tail string, follow bool, stdout, stderr io.Writer) (err error)
}
// Customizable hooks injected into container runtimes.
type RuntimeHooks interface {
// Determines whether the runtime should pull the specified container's image.
ShouldPullImage(pod *api.Pod, container *api.Container, imagePresent bool) bool
// Runs after an image is pulled reporting its status. Error may be nil
// for a successful pull.
ReportImagePull(pod *api.Pod, container *api.Container, err error)
}
// Pod is a group of containers, with the status of the pod.
type Pod struct {
// The ID of the pod, which can be used to retrieve a particular pod

View File

@ -235,6 +235,7 @@ func NewMainKubelet(
resourceContainer: resourceContainer,
os: osInterface,
oomWatcher: oomWatcher,
runtimeHooks: newKubeletRuntimeHooks(recorder),
}
if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
@ -402,8 +403,14 @@ type Kubelet struct {
// Name must be absolute.
resourceContainer string
os kubecontainer.OSInterface
os kubecontainer.OSInterface
// Watcher of out of memory events.
oomWatcher OOMWatcher
// TODO(vmarmol): Remove this when we only have to inject the hooks into the runtimes.
// Hooks injected into the container runtime.
runtimeHooks kubecontainer.RuntimeHooks
}
// getRootDir returns the full path to the directory under which kubelet can
@ -867,41 +874,25 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,
// Pull the image for the specified pod and container.
func (kl *Kubelet) pullImage(pod *api.Pod, container *api.Container) error {
if container.ImagePullPolicy == api.PullNever {
return nil
}
start := time.Now()
defer func() {
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
}()
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
present, err := kl.containerManager.IsImagePresent(container.Image)
if err != nil {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
if ref != nil {
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
}
return fmt.Errorf("failed to inspect image %q: %v", container.Image, err)
}
if container.ImagePullPolicy == api.PullAlways ||
(container.ImagePullPolicy == api.PullIfNotPresent && (!present)) {
if err := kl.containerManager.Pull(container.Image); err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", container.Image, err)
}
return err
}
if ref != nil {
kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
}
if !kl.runtimeHooks.ShouldPullImage(pod, container, present) {
return nil
}
return nil
err = kl.containerManager.Pull(container.Image)
kl.runtimeHooks.ReportImagePull(pod, container, err)
return err
}
// Kill all running containers in a pod (includes the pod infra container).

View File

@ -123,6 +123,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.containerManager.Prober = kubelet.prober
kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager)
kubelet.volumeManager = newVolumeManager()
kubelet.runtimeHooks = newKubeletRuntimeHooks(kubelet.recorder)
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
}

View File

@ -28,13 +28,6 @@ import (
const kubeletSubsystem = "kubelet"
var (
ImagePullLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Name: "image_pull_latency_microseconds",
Help: "Image pull latency in microseconds.",
},
)
ContainersPerPodCount = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
@ -73,7 +66,6 @@ var registerMetrics sync.Once
func Register(containerCache kubecontainer.RuntimeCache) {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(ImagePullLatency)
prometheus.MustRegister(SyncPodLatency)
prometheus.MustRegister(DockerOperationsLatency)
prometheus.MustRegister(SyncPodsLatency)

View File

@ -90,6 +90,7 @@ func TestRunOnce(t *testing.T) {
os: kubecontainer.FakeOS{},
volumeManager: newVolumeManager(),
}
kb.runtimeHooks = newKubeletRuntimeHooks(kb.recorder)
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
if err := kb.setupDataDirs(); err != nil {

View File

@ -0,0 +1,64 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/golang/glog"
)
// Kubelet-specific runtime hooks.
type kubeletRuntimeHooks struct {
recorder record.EventRecorder
}
var _ kubecontainer.RuntimeHooks = &kubeletRuntimeHooks{}
func newKubeletRuntimeHooks(recorder record.EventRecorder) kubecontainer.RuntimeHooks {
return &kubeletRuntimeHooks{
recorder: recorder,
}
}
func (kr *kubeletRuntimeHooks) ShouldPullImage(pod *api.Pod, container *api.Container, imagePresent bool) bool {
if container.ImagePullPolicy == api.PullNever {
return false
}
if container.ImagePullPolicy == api.PullAlways ||
(container.ImagePullPolicy == api.PullIfNotPresent && (!imagePresent)) {
return true
}
return false
}
func (kr *kubeletRuntimeHooks) ReportImagePull(pod *api.Pod, container *api.Container, pullError error) {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q, container %q: '%v'", pod.Name, container.Name, err)
return
}
if pullError != nil {
kr.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", container.Image, pullError)
} else {
kr.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
}
}