mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #6885 from yifan-gu/refactor_volumes
kubelet: Introduce volume manager.
This commit is contained in:
commit
52711eacf7
@ -265,6 +265,7 @@ func NewMainKubelet(
|
|||||||
}
|
}
|
||||||
statusManager := newStatusManager(kubeClient)
|
statusManager := newStatusManager(kubeClient)
|
||||||
containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst)
|
containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst)
|
||||||
|
volumeManager := newVolumeManager()
|
||||||
|
|
||||||
klet := &Kubelet{
|
klet := &Kubelet{
|
||||||
hostname: hostname,
|
hostname: hostname,
|
||||||
@ -288,6 +289,7 @@ func NewMainKubelet(
|
|||||||
containerGC: containerGC,
|
containerGC: containerGC,
|
||||||
imageManager: imageManager,
|
imageManager: imageManager,
|
||||||
statusManager: statusManager,
|
statusManager: statusManager,
|
||||||
|
volumeManager: volumeManager,
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
nodeRef: nodeRef,
|
nodeRef: nodeRef,
|
||||||
containerManager: containerManager,
|
containerManager: containerManager,
|
||||||
@ -416,6 +418,9 @@ type Kubelet struct {
|
|||||||
// Syncs pods statuses with apiserver; also used as a cache of statuses.
|
// Syncs pods statuses with apiserver; also used as a cache of statuses.
|
||||||
statusManager *statusManager
|
statusManager *statusManager
|
||||||
|
|
||||||
|
// Manager for the volume maps for the pods.
|
||||||
|
volumeManager *volumeManager
|
||||||
|
|
||||||
//Cloud provider interface
|
//Cloud provider interface
|
||||||
cloud cloudprovider.Interface
|
cloud cloudprovider.Interface
|
||||||
|
|
||||||
@ -661,11 +666,11 @@ func (kl *Kubelet) syncNodeStatus() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeBinds(container *api.Container, podVolumes volumeMap) []string {
|
func makeBinds(container *api.Container, podVolumes volumeMap) (binds []string) {
|
||||||
binds := []string{}
|
|
||||||
for _, mount := range container.VolumeMounts {
|
for _, mount := range container.VolumeMounts {
|
||||||
vol, ok := podVolumes[mount.Name]
|
vol, ok := podVolumes[mount.Name]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
glog.Warningf("Mount cannot be satisified for container %q, because the volume is missing: %q", container.Name, mount)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
b := fmt.Sprintf("%s:%s", vol.GetPath(), mount.MountPath)
|
b := fmt.Sprintf("%s:%s", vol.GetPath(), mount.MountPath)
|
||||||
@ -674,19 +679,23 @@ func makeBinds(container *api.Container, podVolumes volumeMap) []string {
|
|||||||
}
|
}
|
||||||
binds = append(binds, b)
|
binds = append(binds, b)
|
||||||
}
|
}
|
||||||
return binds
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// generateRunContainerOptions generates the RunContainerOptions, which can be used by
|
// generateRunContainerOptions generates the RunContainerOptions, which can be used by
|
||||||
// the container runtime to set parameters for launching a container.
|
// the container runtime to set parameters for launching a container.
|
||||||
func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) {
|
func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Container, netMode, ipcMode string) (*kubecontainer.RunContainerOptions, error) {
|
||||||
var err error
|
var err error
|
||||||
opts := &kubecontainer.RunContainerOptions{
|
opts := &kubecontainer.RunContainerOptions{
|
||||||
NetMode: netMode,
|
NetMode: netMode,
|
||||||
IpcMode: ipcMode,
|
IpcMode: ipcMode,
|
||||||
}
|
}
|
||||||
|
|
||||||
opts.Binds = makeBinds(container, podVolumes)
|
vol, ok := kl.volumeManager.GetVolumes(pod.UID)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("impossible: cannot find the mounted volumes for pod %q", kubecontainer.GetPodFullName(pod))
|
||||||
|
}
|
||||||
|
opts.Binds = makeBinds(container, vol)
|
||||||
opts.Envs, err = kl.makeEnvironmentVariables(pod.Namespace, container)
|
opts.Envs, err = kl.makeEnvironmentVariables(pod.Namespace, container)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -710,13 +719,13 @@ func (kl *Kubelet) generateRunContainerOptions(pod *api.Pod, container *api.Cont
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run a single container from a pod. Returns the docker container ID
|
// Run a single container from a pod. Returns the docker container ID
|
||||||
func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, podVolumes volumeMap, netMode, ipcMode string) (dockertools.DockerID, error) {
|
func (kl *Kubelet) runContainer(pod *api.Pod, container *api.Container, netMode, ipcMode string) (dockertools.DockerID, error) {
|
||||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
opts, err := kl.generateRunContainerOptions(pod, container, podVolumes, netMode, ipcMode)
|
opts, err := kl.generateRunContainerOptions(pod, container, netMode, ipcMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -953,7 +962,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.Pod) (dockertools.DockerID,
|
|||||||
kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
|
kl.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", container.Image)
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := kl.runContainer(pod, container, nil, netNamespace, "")
|
id, err := kl.runContainer(pod, container, netNamespace, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -1084,8 +1093,7 @@ func shouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu
|
|||||||
|
|
||||||
// Attempts to start a container pulling the image before that if necessary. It returns DockerID of a started container
|
// Attempts to start a container pulling the image before that if necessary. It returns DockerID of a started container
|
||||||
// if it was successful, and a non-nil error otherwise.
|
// if it was successful, and a non-nil error otherwise.
|
||||||
func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Container, podVolumes *volumeMap,
|
func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Container, podInfraContainerID dockertools.DockerID) (dockertools.DockerID, error) {
|
||||||
podInfraContainerID dockertools.DockerID) (dockertools.DockerID, error) {
|
|
||||||
podFullName := kubecontainer.GetPodFullName(pod)
|
podFullName := kubecontainer.GetPodFullName(pod)
|
||||||
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
ref, err := kubecontainer.GenerateContainerRef(pod, container)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1109,7 +1117,7 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain
|
|||||||
}
|
}
|
||||||
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
|
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
|
||||||
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
|
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
|
||||||
containerID, err := kl.runContainer(pod, container, *podVolumes, namespaceMode, namespaceMode)
|
containerID, err := kl.runContainer(pod, container, namespaceMode, namespaceMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
// TODO(bburns) : Perhaps blacklist a container after N failures?
|
||||||
glog.Errorf("Error running pod %q container %q: %v", podFullName, container.Name, err)
|
glog.Errorf("Error running pod %q container %q: %v", podFullName, container.Name, err)
|
||||||
@ -1330,15 +1338,26 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Starting phase: if we should create infra container then we do it first
|
// Starting phase:
|
||||||
var ref *api.ObjectReference
|
ref, err := api.GetReference(pod)
|
||||||
var podVolumes volumeMap
|
|
||||||
podInfraContainerID := containerChanges.infraContainerId
|
|
||||||
if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) {
|
|
||||||
ref, err = api.GetReference(pod)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
|
glog.Errorf("Couldn't make a ref to pod %q: '%v'", podFullName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mount volumes.
|
||||||
|
podVolumes, err := kl.mountExternalVolumes(pod)
|
||||||
|
if err != nil {
|
||||||
|
if ref != nil {
|
||||||
|
kl.recorder.Eventf(ref, "failedMount", "Unable to mount volumes for pod %q: %v", podFullName, err)
|
||||||
|
}
|
||||||
|
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
kl.volumeManager.SetVolumes(pod.UID, podVolumes)
|
||||||
|
|
||||||
|
// If we should create infra container then we do it first.
|
||||||
|
podInfraContainerID := containerChanges.infraContainerId
|
||||||
|
if containerChanges.startInfraContainer && (len(containerChanges.containersToStart) > 0) {
|
||||||
glog.V(4).Infof("Creating pod infra container for %q", podFullName)
|
glog.V(4).Infof("Creating pod infra container for %q", podFullName)
|
||||||
podInfraContainerID, err = kl.createPodInfraContainer(pod)
|
podInfraContainerID, err = kl.createPodInfraContainer(pod)
|
||||||
|
|
||||||
@ -1352,21 +1371,10 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mount volumes
|
|
||||||
podVolumes, err = kl.mountExternalVolumes(pod)
|
|
||||||
if err != nil {
|
|
||||||
if ref != nil {
|
|
||||||
kl.recorder.Eventf(ref, "failedMount",
|
|
||||||
"Unable to mount volumes for pod %q: %v", podFullName, err)
|
|
||||||
}
|
|
||||||
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", podFullName, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start everything
|
// Start everything
|
||||||
for container := range containerChanges.containersToStart {
|
for container := range containerChanges.containersToStart {
|
||||||
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
|
glog.V(4).Infof("Creating container %+v", pod.Spec.Containers[container])
|
||||||
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
|
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], podInfraContainerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if isStaticPod(pod) {
|
if isStaticPod(pod) {
|
||||||
@ -1452,6 +1460,8 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Con
|
|||||||
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
|
//TODO (jonesdl) We should somehow differentiate between volumes that are supposed
|
||||||
//to be deleted and volumes that are leftover after a crash.
|
//to be deleted and volumes that are leftover after a crash.
|
||||||
glog.Warningf("Orphaned volume %q found, tearing down volume", name)
|
glog.Warningf("Orphaned volume %q found, tearing down volume", name)
|
||||||
|
// TODO(yifan): Refactor this hacky string manipulation.
|
||||||
|
kl.volumeManager.DeleteVolumes(types.UID(parts[0]))
|
||||||
//TODO (jonesdl) This should not block other kubelet synchronization procedures
|
//TODO (jonesdl) This should not block other kubelet synchronization procedures
|
||||||
err := vol.TearDown()
|
err := vol.TearDown()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -116,7 +116,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
||||||
kubelet.prober = newProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
|
kubelet.prober = newProber(nil, kubelet.readinessManager, kubelet.containerRefManager, kubelet.recorder)
|
||||||
kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
kubelet.handlerRunner = newHandlerRunner(&fakeHTTP{}, &fakeContainerCommandRunner{}, kubelet.containerManager)
|
||||||
|
kubelet.volumeManager = newVolumeManager()
|
||||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4106,7 +4106,8 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
|||||||
}
|
}
|
||||||
pods := []*api.Pod{pod}
|
pods := []*api.Pod{pod}
|
||||||
kubelet.podManager.SetPods(pods)
|
kubelet.podManager.SetPods(pods)
|
||||||
_, err := kubelet.runContainer(pod, &pod.Spec.Containers[0], make(map[string]volume.Volume), "", "")
|
kubelet.volumeManager.SetVolumes(pod.UID, volumeMap{})
|
||||||
|
_, err := kubelet.runContainer(pod, &pod.Spec.Containers[0], "", "")
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("expected error, found nil")
|
t.Errorf("expected error, found nil")
|
||||||
}
|
}
|
||||||
@ -4121,7 +4122,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
|||||||
if state.Waiting == nil {
|
if state.Waiting == nil {
|
||||||
t.Errorf("expected waiting state, got %#v", state)
|
t.Errorf("expected waiting state, got %#v", state)
|
||||||
} else if state.Waiting.Reason != failureReason {
|
} else if state.Waiting.Reason != failureReason {
|
||||||
t.Errorf("expected reason %q, got %q", state.Waiting.Reason)
|
t.Errorf("expected reason %q, got %q", failureReason, state.Waiting.Reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,6 +87,7 @@ func TestRunOnce(t *testing.T) {
|
|||||||
readinessManager: kubecontainer.NewReadinessManager(),
|
readinessManager: kubecontainer.NewReadinessManager(),
|
||||||
podManager: podManager,
|
podManager: podManager,
|
||||||
os: FakeOS{},
|
os: FakeOS{},
|
||||||
|
volumeManager: newVolumeManager(),
|
||||||
}
|
}
|
||||||
|
|
||||||
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
|
61
pkg/kubelet/volume_manager.go
Normal file
61
pkg/kubelet/volume_manager.go
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 Google Inc. All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package kubelet
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// volumeManager manages the volumes for the pods running on the kubelet.
|
||||||
|
// Currently it only does book keeping, but it can be expanded to
|
||||||
|
// take care of the volumePlugins.
|
||||||
|
type volumeManager struct {
|
||||||
|
lock sync.RWMutex
|
||||||
|
volumeMaps map[types.UID]volumeMap
|
||||||
|
}
|
||||||
|
|
||||||
|
func newVolumeManager() *volumeManager {
|
||||||
|
vm := &volumeManager{}
|
||||||
|
vm.volumeMaps = make(map[types.UID]volumeMap)
|
||||||
|
return vm
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetVolumes sets the volume map for a pod.
|
||||||
|
// TODO(yifan): Currently we assume the volume is already mounted, so we only do a book keeping here.
|
||||||
|
func (vm *volumeManager) SetVolumes(podUID types.UID, podVolumes volumeMap) {
|
||||||
|
vm.lock.Lock()
|
||||||
|
defer vm.lock.Unlock()
|
||||||
|
vm.volumeMaps[podUID] = podVolumes
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetVolumes returns the volume map which are already mounted on the host machine
|
||||||
|
// for a pod.
|
||||||
|
func (vm *volumeManager) GetVolumes(podUID types.UID) (volumeMap, bool) {
|
||||||
|
vm.lock.RLock()
|
||||||
|
defer vm.lock.RUnlock()
|
||||||
|
vol, ok := vm.volumeMaps[podUID]
|
||||||
|
return vol, ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteVolumes removes the reference to a volume map for a pod.
|
||||||
|
func (vm *volumeManager) DeleteVolumes(podUID types.UID) {
|
||||||
|
vm.lock.Lock()
|
||||||
|
defer vm.lock.Unlock()
|
||||||
|
delete(vm.volumeMaps, podUID)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user