diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7cc7bac8869..3c4348b7e8e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -265,6 +265,7 @@ func NewMainKubelet( } statusManager := newStatusManager(kubeClient) containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst) + volumeManager := newVolumeManager() klet := &Kubelet{ hostname: hostname, @@ -288,6 +289,7 @@ func NewMainKubelet( containerGC: containerGC, imageManager: imageManager, statusManager: statusManager, + volumeManager: volumeManager, cloud: cloud, nodeRef: nodeRef, containerManager: containerManager, @@ -416,6 +418,9 @@ type Kubelet struct { // Syncs pods statuses with apiserver; also used as a cache of statuses. statusManager *statusManager + // Manager for the volume maps for the pods. + volumeManager *volumeManager + //Cloud provider interface cloud cloudprovider.Interface @@ -661,11 +666,11 @@ func (kl *Kubelet) syncNodeStatus() { } } -func makeBinds(container *api.Container, podVolumes volumeMap) []string { - binds := []string{} +func makeBinds(container *api.Container, podVolumes volumeMap) (binds []string) { for _, mount := range container.VolumeMounts { vol, ok := podVolumes[mount.Name] if !ok { + glog.Warningf("Mount cannot be satisified for container %q, because the volume is missing: %q", container.Name, mount) continue } b := fmt.Sprintf("%s:%s", vol.GetPath(), mount.MountPath) @@ -674,19 +679,23 @@ func makeBinds(container *api.Container, podVolumes volumeMap) []string { } binds = append(binds, b) } - return binds + return } // generateRunContainerOptions generates the RunContainerOptions, which can be used by // the container runtime to set parameters for launching a container. -func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) { +func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) { var err error opts := &kubecontainer.RunContainerOptions{ NetMode: netMode, IpcMode: ipcMode, } - opts.Binds = makeBinds(container, podVolumes) + vol, ok := kl.volumeManager.GetVolumes(pod.UID) + if !ok { + return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", kubecontainer.GetPodFullName(pod)) + } + opts.Binds = makeBinds(container, vol) opts.Envs, err = kl.makeEnvironmentVariables(pod.Namespace, container) if err != nil { return nil, err @@ -710,13 +719,13 @@ func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Cont } // Run a single container from a pod. Returns the docker container ID -func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (dockertools.DockerID, error) { +func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, netMode, ipcMode string) (dockertools.DockerID, error) { ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } - opts, err := kl.generateRunContainerOptions(pod, container, podVolumes, netMode, ipcMode) + opts, err := kl.generateRunContainerOptions(pod, container, netMode, ipcMode) if err != nil { return "", err } @@ -953,7 +962,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.Pod) (dockertools.DockerID, kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image) } - id, err := kl.runContainer(pod, container, nil, netNamespace, "") + id, err := kl.runContainer(pod, container, netNamespace, "") if err != nil { return "", err } @@ -1084,8 +1093,7 @@ func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu // Attempts to start a container pulling the image before that if necessary. It returns DockerID of a started container // if it was successful, and a non-nil error otherwise. -func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Container, podVolumes *volumeMap, - podInfraContainerID dockertools.DockerID) (dockertools.DockerID, error) { +func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Container, podInfraContainerID dockertools.DockerID) (dockertools.DockerID, error) { podFullName := kubecontainer.GetPodFullName(pod) ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { @@ -1109,7 +1117,7 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain } // TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID) - containerID, err := kl.runContainer(pod, container, *podVolumes, namespaceMode, namespaceMode) + containerID, err := kl.runContainer(pod, container, namespaceMode, namespaceMode) if err != nil { // TODO(bburns) : Perhaps blacklist a container after N failures? glog.Errorf("Error running pod %q container %q: %v", podFullName, container.Name, err) @@ -1330,15 +1338,26 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } } - // Starting phase: if we should create infra container then we do it first - var ref *api.ObjectReference - var podVolumes volumeMap + // Starting phase: + ref, err := api.GetReference(pod) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err) + } + + // Mount volumes. + podVolumes, err := kl.mountExternalVolumes(pod) + if err != nil { + if ref != nil { + kl.recorder.Eventf(ref, "failedMount", "Unable to mount volumes for pod %q: %v", podFullName, err) + } + glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err) + return err + } + kl.volumeManager.SetVolumes(pod.UID, podVolumes) + + // If we should create infra container then we do it first. podInfraContainerID := containerChanges.infraContainerId if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) { - ref, err = api.GetReference(pod) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err) - } glog.V(4).Infof("Creating pod infra container for %q", podFullName) podInfraContainerID, err = kl.createPodInfraContainer(pod) @@ -1352,21 +1371,10 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont } } - // Mount volumes - podVolumes, err = kl.mountExternalVolumes(pod) - if err != nil { - if ref != nil { - kl.recorder.Eventf(ref, "failedMount", - "Unable to mount volumes for pod %q: %v", podFullName, err) - } - glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err) - return err - } - // Start everything for container := range containerChanges.containersToStart { glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container]) - kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID) + kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], podInfraContainerID) } if isStaticPod(pod) { @@ -1452,6 +1460,8 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Con //TODO (jonesdl) We should somehow differentiate between volumes that are supposed //to be deleted and volumes that are leftover after a crash. glog.Warningf("Orphaned volume %q found, tearing down volume", name) + // TODO(yifan): Refactor this hacky string manipulation. + kl.volumeManager.DeleteVolumes(types.UID(parts[0])) //TODO (jonesdl) This should not block other kubelet synchronization procedures err := vol.TearDown() if err != nil { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 23d6ee5c77e..facb9442490 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -116,7 +116,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{} kubelet.prober = newProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder) kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager) - + kubelet.volumeManager = newVolumeManager() return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} } @@ -4106,7 +4106,8 @@ func TestGetPodCreationFailureReason(t *testing.T) { } pods := []*api.Pod{pod} kubelet.podManager.SetPods(pods) - _, err := kubelet.runContainer(pod, &pod.Spec.Containers[0], make(map[string]volume.Volume), "", "") + kubelet.volumeManager.SetVolumes(pod.UID, volumeMap{}) + _, err := kubelet.runContainer(pod, &pod.Spec.Containers[0], "", "") if err == nil { t.Errorf("expected error, found nil") } @@ -4121,7 +4122,7 @@ func TestGetPodCreationFailureReason(t *testing.T) { if state.Waiting == nil { t.Errorf("expected waiting state, got %#v", state) } else if state.Waiting.Reason != failureReason { - t.Errorf("expected reason %q, got %q", state.Waiting.Reason) + t.Errorf("expected reason %q, got %q", failureReason, state.Waiting.Reason) } } } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index dd7460b2415..ee7db574f5b 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -87,6 +87,7 @@ func TestRunOnce(t *testing.T) { readinessManager: kubecontainer.NewReadinessManager(), podManager: podManager, os: FakeOS{}, + volumeManager: newVolumeManager(), } kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) diff --git a/pkg/kubelet/volume_manager.go b/pkg/kubelet/volume_manager.go new file mode 100644 index 00000000000..5ca19e6ba08 --- /dev/null +++ b/pkg/kubelet/volume_manager.go @@ -0,0 +1,61 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" +) + +// volumeManager manages the volumes for the pods running on the kubelet. +// Currently it only does book keeping, but it can be expanded to +// take care of the volumePlugins. +type volumeManager struct { + lock sync.RWMutex + volumeMaps map[types.UID]volumeMap +} + +func newVolumeManager() *volumeManager { + vm := &volumeManager{} + vm.volumeMaps = make(map[types.UID]volumeMap) + return vm +} + +// SetVolumes sets the volume map for a pod. +// TODO(yifan): Currently we assume the volume is already mounted, so we only do a book keeping here. +func (vm *volumeManager) SetVolumes(podUID types.UID, podVolumes volumeMap) { + vm.lock.Lock() + defer vm.lock.Unlock() + vm.volumeMaps[podUID] = podVolumes +} + +// GetVolumes returns the volume map which are already mounted on the host machine +// for a pod. +func (vm *volumeManager) GetVolumes(podUID types.UID) (volumeMap, bool) { + vm.lock.RLock() + defer vm.lock.RUnlock() + vol, ok := vm.volumeMaps[podUID] + return vol, ok +} + +// DeleteVolumes removes the reference to a volume map for a pod. +func (vm *volumeManager) DeleteVolumes(podUID types.UID) { + vm.lock.Lock() + defer vm.lock.Unlock() + delete(vm.volumeMaps, podUID) +}