kubelet: Introduce volume manager.

The volume manager book-keeps the information for the volume mounts
on the kubelet, so we can avoid passing the volumeMap around.
This commit is contained in:
Yifan Gu 2015-04-15 17:40:07 -07:00
parent f9156c281a
commit af1e9f737f
4 changed files with 106 additions and 33 deletions

View File

@ -265,6 +265,7 @@ func NewMainKubelet(
}
statusManager := newStatusManager(kubeClient)
containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst)
volumeManager := newVolumeManager()
klet := &Kubelet{
hostname: hostname,
@ -288,6 +289,7 @@ func NewMainKubelet(
containerGC: containerGC,
imageManager: imageManager,
statusManager: statusManager,
volumeManager: volumeManager,
cloud: cloud,
nodeRef: nodeRef,
containerManager: containerManager,
@ -416,6 +418,9 @@ type Kubelet struct {
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager *statusManager
// Manager for the volume maps for the pods.
volumeManager *volumeManager
//Cloud provider interface
cloud cloudprovider.Interface
@ -661,11 +666,11 @@ func (kl *Kubelet) syncNodeStatus() {
}
}
func makeBinds(container *api.Container, podVolumes volumeMap) []string {
binds := []string{}
func makeBinds(container *api.Container, podVolumes volumeMap) (binds []string) {
for _, mount := range container.VolumeMounts {
vol, ok := podVolumes[mount.Name]
if !ok {
glog.Warningf("Mount cannot be satisified for container %q, because the volume is missing: %q", container.Name, mount)
continue
}
b := fmt.Sprintf("%s:%s", vol.GetPath(), mount.MountPath)
@ -674,19 +679,23 @@ func makeBinds(container *api.Container, podVolumes volumeMap) []string {
}
binds = append(binds, b)
}
return binds
return
}
// generateRunContainerOptions generates the RunContainerOptions, which can be used by
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) {
func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) {
var err error
opts := &kubecontainer.RunContainerOptions{
NetMode: netMode,
IpcMode: ipcMode,
}
opts.Binds = makeBinds(container, podVolumes)
vol, ok := kl.volumeManager.GetVolumes(pod.UID)
if !ok {
return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", kubecontainer.GetPodFullName(pod))
}
opts.Binds = makeBinds(container, vol)
opts.Envs, err = kl.makeEnvironmentVariables(pod.Namespace, container)
if err != nil {
return nil, err
@ -710,13 +719,13 @@ func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Cont
}
// Run a single container from a pod. Returns the docker container ID
func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (dockertools.DockerID, error) {
func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, netMode, ipcMode string) (dockertools.DockerID, error) {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
opts, err := kl.generateRunContainerOptions(pod, container, podVolumes, netMode, ipcMode)
opts, err := kl.generateRunContainerOptions(pod, container, netMode, ipcMode)
if err != nil {
return "", err
}
@ -953,7 +962,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.Pod) (dockertools.DockerID,
kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
}
id, err := kl.runContainer(pod, container, nil, netNamespace, "")
id, err := kl.runContainer(pod, container, netNamespace, "")
if err != nil {
return "", err
}
@ -1084,8 +1093,7 @@ func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu
// Attempts to start a container pulling the image before that if necessary. It returns DockerID of a started container
// if it was successful, and a non-nil error otherwise.
func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Container, podVolumes *volumeMap,
podInfraContainerID dockertools.DockerID) (dockertools.DockerID, error) {
func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Container, podInfraContainerID dockertools.DockerID) (dockertools.DockerID, error) {
podFullName := kubecontainer.GetPodFullName(pod)
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
@ -1109,7 +1117,7 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain
}
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
containerID, err := kl.runContainer(pod, container, *podVolumes, namespaceMode, namespaceMode)
containerID, err := kl.runContainer(pod, container, namespaceMode, namespaceMode)
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running pod %q container %q: %v", podFullName, container.Name, err)
@ -1330,15 +1338,26 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
}
}
// Starting phase: if we should create infra container then we do it first
var ref *api.ObjectReference
var podVolumes volumeMap
// Starting phase:
ref, err := api.GetReference(pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
}
// Mount volumes.
podVolumes, err := kl.mountExternalVolumes(pod)
if err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failedMount", "Unable to mount volumes for pod %q: %v", podFullName, err)
}
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)
return err
}
kl.volumeManager.SetVolumes(pod.UID, podVolumes)
// If we should create infra container then we do it first.
podInfraContainerID := containerChanges.infraContainerId
if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) {
ref, err = api.GetReference(pod)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
}
glog.V(4).Infof("Creating pod infra container for %q", podFullName)
podInfraContainerID, err = kl.createPodInfraContainer(pod)
@ -1352,21 +1371,10 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
}
}
// Mount volumes
podVolumes, err = kl.mountExternalVolumes(pod)
if err != nil {
if ref != nil {
kl.recorder.Eventf(ref, "failedMount",
"Unable to mount volumes for pod %q: %v", podFullName, err)
}
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)
return err
}
// Start everything
for container := range containerChanges.containersToStart {
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], podInfraContainerID)
}
if isStaticPod(pod) {
@ -1452,6 +1460,8 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Con
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
//to be deleted and volumes that are leftover after a crash.
glog.Warningf("Orphaned volume %q found, tearing down volume", name)
// TODO(yifan): Refactor this hacky string manipulation.
kl.volumeManager.DeleteVolumes(types.UID(parts[0]))
//TODO (jonesdl) This should not block other kubelet synchronization procedures
err := vol.TearDown()
if err != nil {

View File

@ -116,7 +116,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
kubelet.prober = newProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager)
kubelet.volumeManager = newVolumeManager()
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
}
@ -4106,7 +4106,8 @@ func TestGetPodCreationFailureReason(t *testing.T) {
}
pods := []*api.Pod{pod}
kubelet.podManager.SetPods(pods)
_, err := kubelet.runContainer(pod, &pod.Spec.Containers[0], make(map[string]volume.Volume), "", "")
kubelet.volumeManager.SetVolumes(pod.UID, volumeMap{})
_, err := kubelet.runContainer(pod, &pod.Spec.Containers[0], "", "")
if err == nil {
t.Errorf("expected error, found nil")
}
@ -4121,7 +4122,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
if state.Waiting == nil {
t.Errorf("expected waiting state, got %#v", state)
} else if state.Waiting.Reason != failureReason {
t.Errorf("expected reason %q, got %q", state.Waiting.Reason)
t.Errorf("expected reason %q, got %q", failureReason, state.Waiting.Reason)
}
}
}

View File

@ -87,6 +87,7 @@ func TestRunOnce(t *testing.T) {
readinessManager: kubecontainer.NewReadinessManager(),
podManager: podManager,
os: FakeOS{},
volumeManager: newVolumeManager(),
}
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))

View File

@ -0,0 +1,61 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
// volumeManager manages the volumes for the pods running on the kubelet.
// Currently it only does book keeping, but it can be expanded to
// take care of the volumePlugins.
type volumeManager struct {
lock sync.RWMutex
volumeMaps map[types.UID]volumeMap
}
func newVolumeManager() *volumeManager {
vm := &volumeManager{}
vm.volumeMaps = make(map[types.UID]volumeMap)
return vm
}
// SetVolumes sets the volume map for a pod.
// TODO(yifan): Currently we assume the volume is already mounted, so we only do a book keeping here.
func (vm *volumeManager) SetVolumes(podUID types.UID, podVolumes volumeMap) {
vm.lock.Lock()
defer vm.lock.Unlock()
vm.volumeMaps[podUID] = podVolumes
}
// GetVolumes returns the volume map which are already mounted on the host machine
// for a pod.
func (vm *volumeManager) GetVolumes(podUID types.UID) (volumeMap, bool) {
vm.lock.RLock()
defer vm.lock.RUnlock()
vol, ok := vm.volumeMaps[podUID]
return vol, ok
}
// DeleteVolumes removes the reference to a volume map for a pod.
func (vm *volumeManager) DeleteVolumes(podUID types.UID) {
vm.lock.Lock()
defer vm.lock.Unlock()
delete(vm.volumeMaps, podUID)
}