Merge pull request #7870 from yifan-gu/rkt_volume

Add volumeGetter to rkt.
This commit is contained in:
Victor Marmol 2015-05-07 08:58:00 -07:00
commit 35c644a45f
7 changed files with 36 additions and 21 deletions

View File

@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
) )
type Version interface { type Version interface {
@ -205,6 +206,8 @@ type RunContainerOptions struct {
CgroupParent string CgroupParent string
} }
type VolumeMap map[string]volume.Volume
type Pods []*Pod type Pods []*Pod
// FindPodByID finds and returns a pod in the pod list by UID. It will return an empty pod // FindPodByID finds and returns a pod in the pod list by UID. It will return an empty pod

View File

@ -94,8 +94,6 @@ type SyncHandler interface {
type SourcesReadyFn func() bool type SourcesReadyFn func() bool
type volumeMap map[string]volume.Volume
// Wait for the container runtime to be up with a timeout. // Wait for the container runtime to be up with a timeout.
func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error { func waitUntilRuntimeIsUp(cr kubecontainer.Runtime, timeout time.Duration) error {
var err error = nil var err error = nil
@ -657,7 +655,7 @@ func (kl *Kubelet) syncNodeStatus() {
} }
} }
func makeBinds(container *api.Container, podVolumes volumeMap) (binds []string) { func makeBinds(container *api.Container, podVolumes kubecontainer.VolumeMap) (binds []string) {
for _, mount := range container.VolumeMounts { for _, mount := range container.VolumeMounts {
vol, ok := podVolumes[mount.Name] vol, ok := podVolumes[mount.Name]
if !ok { if !ok {

View File

@ -1309,7 +1309,7 @@ func TestMakeVolumesAndBinds(t *testing.T) {
}, },
} }
podVolumes := volumeMap{ podVolumes := kubecontainer.VolumeMap{
"disk": &stubVolume{"/mnt/disk"}, "disk": &stubVolume{"/mnt/disk"},
"disk4": &stubVolume{"/mnt/host"}, "disk4": &stubVolume{"/mnt/host"},
"disk5": &stubVolume{"/var/lib/kubelet/podID/volumes/empty/disk5"}, "disk5": &stubVolume{"/var/lib/kubelet/podID/volumes/empty/disk5"},
@ -4078,7 +4078,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
} }
pods := []*api.Pod{pod} pods := []*api.Pod{pod}
kubelet.podManager.SetPods(pods) kubelet.podManager.SetPods(pods)
kubelet.volumeManager.SetVolumes(pod.UID, volumeMap{}) kubelet.volumeManager.SetVolumes(pod.UID, kubecontainer.VolumeMap{})
// TODO: Move this test to dockertools so that we don't have to do the hacky // TODO: Move this test to dockertools so that we don't have to do the hacky
// type assertion here. // type assertion here.
dm := kubelet.containerRuntime.(*dockertools.DockerManager) dm := kubelet.containerRuntime.(*dockertools.DockerManager)

View File

@ -42,7 +42,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/securitycontext" "github.com/GoogleCloudPlatform/kubernetes/pkg/securitycontext"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
appcschema "github.com/appc/spec/schema" appcschema "github.com/appc/spec/schema"
appctypes "github.com/appc/spec/schema/types" appctypes "github.com/appc/spec/schema/types"
"github.com/coreos/go-systemd/dbus" "github.com/coreos/go-systemd/dbus"
@ -101,10 +100,16 @@ type runtime struct {
recorder record.EventRecorder recorder record.EventRecorder
prober prober.Prober prober prober.Prober
readinessManager *kubecontainer.ReadinessManager readinessManager *kubecontainer.ReadinessManager
volumeGetter volumeGetter
} }
var _ kubecontainer.Runtime = &runtime{} var _ kubecontainer.Runtime = &runtime{}
// TODO(yifan): Remove this when volumeManager is moved to separate package.
type volumeGetter interface {
GetVolumes(podUID types.UID) (kubecontainer.VolumeMap, bool)
}
// New creates the rkt container runtime which implements the container runtime interface. // New creates the rkt container runtime which implements the container runtime interface.
// It will test if the rkt binary is in the $PATH, and whether we can get the // It will test if the rkt binary is in the $PATH, and whether we can get the
// version of it. If so, creates the rkt container runtime, otherwise returns an error. // version of it. If so, creates the rkt container runtime, otherwise returns an error.
@ -112,7 +117,8 @@ func New(config *Config,
generator kubecontainer.RunContainerOptionsGenerator, generator kubecontainer.RunContainerOptionsGenerator,
recorder record.EventRecorder, recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
readinessManager *kubecontainer.ReadinessManager) (kubecontainer.Runtime, error) { readinessManager *kubecontainer.ReadinessManager,
volumeGetter volumeGetter) (kubecontainer.Runtime, error) {
systemdVersion, err := getSystemdVersion() systemdVersion, err := getSystemdVersion()
if err != nil { if err != nil {
@ -352,7 +358,7 @@ func setApp(app *appctypes.App, c *api.Container) error {
// makePodManifest transforms a kubelet pod spec to the rkt pod manifest. // makePodManifest transforms a kubelet pod spec to the rkt pod manifest.
// TODO(yifan): Use the RunContainerOptions generated by GenerateRunContainerOptions(). // TODO(yifan): Use the RunContainerOptions generated by GenerateRunContainerOptions().
func (r *runtime) makePodManifest(pod *api.Pod, volumeMap map[string]volume.Volume) (*appcschema.PodManifest, error) { func (r *runtime) makePodManifest(pod *api.Pod) (*appcschema.PodManifest, error) {
manifest := appcschema.BlankPodManifest() manifest := appcschema.BlankPodManifest()
for _, c := range pod.Spec.Containers { for _, c := range pod.Spec.Containers {
@ -384,6 +390,11 @@ func (r *runtime) makePodManifest(pod *api.Pod, volumeMap map[string]volume.Volu
}) })
} }
volumeMap, ok := r.volumeGetter.GetVolumes(pod.UID)
if !ok {
return nil, fmt.Errorf("cannot get the volumes for pod %q", kubecontainer.GetPodFullName(pod))
}
// Set global volumes. // Set global volumes.
for name, volume := range volumeMap { for name, volume := range volumeMap {
volName, err := appctypes.NewACName(name) volName, err := appctypes.NewACName(name)
@ -458,11 +469,11 @@ func (r *runtime) apiPodToruntimePod(uuid string, pod *api.Pod) *kubecontainer.P
// On success, it will return a string that represents name of the unit file // On success, it will return a string that represents name of the unit file
// and a boolean that indicates if the unit file needs to be reloaded (whether // and a boolean that indicates if the unit file needs to be reloaded (whether
// the file is already existed). // the file is already existed).
func (r *runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (string, bool, error) { func (r *runtime) preparePod(pod *api.Pod) (string, bool, error) {
cmds := []string{"prepare", "--quiet", "--pod-manifest"} cmds := []string{"prepare", "--quiet", "--pod-manifest"}
// Generate the pod manifest from the pod spec. // Generate the pod manifest from the pod spec.
manifest, err := r.makePodManifest(pod, volumeMap) manifest, err := r.makePodManifest(pod)
if err != nil { if err != nil {
return "", false, err return "", false, err
} }
@ -533,10 +544,10 @@ func (r *runtime) preparePod(pod *api.Pod, volumeMap map[string]volume.Volume) (
// RunPod first creates the unit file for a pod, and then calls // RunPod first creates the unit file for a pod, and then calls
// StartUnit over d-bus. // StartUnit over d-bus.
func (r *runtime) RunPod(pod *api.Pod, volumeMap map[string]volume.Volume) error { func (r *runtime) RunPod(pod *api.Pod) error {
glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name) glog.V(4).Infof("Rkt starts to run pod: name %q.", pod.Name)
name, needReload, err := r.preparePod(pod, volumeMap) name, needReload, err := r.preparePod(pod)
if err != nil { if err != nil {
return err return err
} }
@ -784,7 +795,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
if len(runningPod.Containers) == 0 { if len(runningPod.Containers) == 0 {
glog.V(4).Infof("Pod %q is not running, will start it", podFullName) glog.V(4).Infof("Pod %q is not running, will start it", podFullName)
// TODO(yifan): Use RunContainerOptionsGeneratior to get volumeMaps, etc. // TODO(yifan): Use RunContainerOptionsGeneratior to get volumeMaps, etc.
return r.RunPod(pod, nil) return r.RunPod(pod)
} }
// Add references to all containers. // Add references to all containers.
@ -841,7 +852,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
if err := r.KillPod(runningPod); err != nil { if err := r.KillPod(runningPod); err != nil {
return err return err
} }
if err := r.RunPod(pod, nil); err != nil { if err := r.RunPod(pod); err != nil {
return err return err
} }
} }

View File

@ -38,7 +38,8 @@ func New(config *Config,
generator kubecontainer.RunContainerOptionsGenerator, generator kubecontainer.RunContainerOptionsGenerator,
recorder record.EventRecorder, recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager, containerRefManager *kubecontainer.RefManager,
readinessManager *kubecontainer.ReadinessManager) (kubecontainer.Runtime, error) { readinessManager *kubecontainer.ReadinessManager,
volumeGetter volumeGetter) (kubecontainer.Runtime, error) {
return nil, unsupportedError return nil, unsupportedError
} }

View File

@ -19,6 +19,7 @@ package kubelet
import ( import (
"sync" "sync"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
@ -27,18 +28,18 @@ import (
// take care of the volumePlugins. // take care of the volumePlugins.
type volumeManager struct { type volumeManager struct {
lock sync.RWMutex lock sync.RWMutex
volumeMaps map[types.UID]volumeMap volumeMaps map[types.UID]kubecontainer.VolumeMap
} }
func newVolumeManager() *volumeManager { func newVolumeManager() *volumeManager {
vm := &volumeManager{} vm := &volumeManager{}
vm.volumeMaps = make(map[types.UID]volumeMap) vm.volumeMaps = make(map[types.UID]kubecontainer.VolumeMap)
return vm return vm
} }
// SetVolumes sets the volume map for a pod. // SetVolumes sets the volume map for a pod.
// TODO(yifan): Currently we assume the volume is already mounted, so we only do a book keeping here. // TODO(yifan): Currently we assume the volume is already mounted, so we only do a book keeping here.
func (vm *volumeManager) SetVolumes(podUID types.UID, podVolumes volumeMap) { func (vm *volumeManager) SetVolumes(podUID types.UID, podVolumes kubecontainer.VolumeMap) {
vm.lock.Lock() vm.lock.Lock()
defer vm.lock.Unlock() defer vm.lock.Unlock()
vm.volumeMaps[podUID] = podVolumes vm.volumeMaps[podUID] = podVolumes
@ -46,7 +47,7 @@ func (vm *volumeManager) SetVolumes(podUID types.UID, podVolumes volumeMap) {
// GetVolumes returns the volume map which are already mounted on the host machine // GetVolumes returns the volume map which are already mounted on the host machine
// for a pod. // for a pod.
func (vm *volumeManager) GetVolumes(podUID types.UID) (volumeMap, bool) { func (vm *volumeManager) GetVolumes(podUID types.UID) (kubecontainer.VolumeMap, bool) {
vm.lock.RLock() vm.lock.RLock()
defer vm.lock.RUnlock() defer vm.lock.RUnlock()
vol, ok := vm.volumeMaps[podUID] vol, ok := vm.volumeMaps[podUID]

View File

@ -23,6 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
@ -96,8 +97,8 @@ func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, podRef *api.Ob
return builder, nil return builder, nil
} }
func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (volumeMap, error) { func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (kubecontainer.VolumeMap, error) {
podVolumes := make(volumeMap) podVolumes := make(kubecontainer.VolumeMap)
for i := range pod.Spec.Volumes { for i := range pod.Spec.Volumes {
volSpec := &pod.Spec.Volumes[i] volSpec := &pod.Spec.Volumes[i]