From b91ad7606666edd425afa47f81b42d2923594ad4 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Mon, 22 Aug 2016 16:38:36 -0400 Subject: [PATCH] Kubelet code move: volume / util --- pkg/kubelet/kubelet.go | 122 +--------------------- pkg/kubelet/kubelet_node_status.go | 3 +- pkg/kubelet/kubelet_node_status_test.go | 3 +- pkg/kubelet/kubelet_test.go | 84 --------------- pkg/kubelet/kubelet_volumes.go | 96 ++++++++++++++++- pkg/kubelet/kubelet_volumes_test.go | 107 +++++++++++++++++++ pkg/kubelet/util.go | 5 +- pkg/kubelet/util/sliceutils/sliceutils.go | 31 ++++++ 8 files changed, 243 insertions(+), 208 deletions(-) create mode 100644 pkg/kubelet/kubelet_volumes_test.go diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 3d0ddf0d423..1b0d3cc11df 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -71,14 +71,13 @@ import ( "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/ioutils" "k8s.io/kubernetes/pkg/kubelet/util/queue" + "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/kubelet/volumemanager" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/clock" utildbus "k8s.io/kubernetes/pkg/util/dbus" - utilerrors "k8s.io/kubernetes/pkg/util/errors" utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/integer" @@ -88,7 +87,6 @@ import ( "k8s.io/kubernetes/pkg/util/oom" "k8s.io/kubernetes/pkg/util/procfs" utilruntime "k8s.io/kubernetes/pkg/util/runtime" - "k8s.io/kubernetes/pkg/util/selinux" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/term" utilvalidation "k8s.io/kubernetes/pkg/util/validation" @@ -1056,51 +1054,6 @@ func (kl *Kubelet) getActivePods() []*api.Pod { return activePods } -// relabelVolumes relabels SELinux volumes to match the pod's -// SELinuxOptions specification. This is only needed if the pod uses -// hostPID or hostIPC. Otherwise relabeling is delegated to docker. -func (kl *Kubelet) relabelVolumes(pod *api.Pod, volumes kubecontainer.VolumeMap) error { - if pod.Spec.SecurityContext.SELinuxOptions == nil { - return nil - } - - rootDirContext, err := kl.getRootDirContext() - if err != nil { - return err - } - - selinuxRunner := selinux.NewSelinuxContextRunner() - // Apply the pod's Level to the rootDirContext - rootDirSELinuxOptions, err := securitycontext.ParseSELinuxOptions(rootDirContext) - if err != nil { - return err - } - - rootDirSELinuxOptions.Level = pod.Spec.SecurityContext.SELinuxOptions.Level - volumeContext := fmt.Sprintf("%s:%s:%s:%s", rootDirSELinuxOptions.User, rootDirSELinuxOptions.Role, rootDirSELinuxOptions.Type, rootDirSELinuxOptions.Level) - - for _, vol := range volumes { - if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux { - // Relabel the volume and its content to match the 'Level' of the pod - path, err := volume.GetPath(vol.Mounter) - if err != nil { - return err - } - err = filepath.Walk(path, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - return selinuxRunner.SetContext(path, volumeContext) - }) - if err != nil { - return err - } - vol.SELinuxLabeled = true - } - } - return nil -} - // makeMounts determines the mount points for the given container. func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) { // Kubernetes only mounts on /etc/hosts if : @@ -1780,48 +1733,6 @@ func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) { return pullSecrets, nil } -// cleanupOrphanedPodDirs removes the volumes of pods that should not be -// running and that have no containers running. -func (kl *Kubelet) cleanupOrphanedPodDirs( - pods []*api.Pod, runningPods []*kubecontainer.Pod) error { - allPods := sets.NewString() - for _, pod := range pods { - allPods.Insert(string(pod.UID)) - } - for _, pod := range runningPods { - allPods.Insert(string(pod.ID)) - } - - found, err := kl.listPodsFromDisk() - if err != nil { - return err - } - errlist := []error{} - for _, uid := range found { - if allPods.Has(string(uid)) { - continue - } - // If volumes have not been unmounted/detached, do not delete directory. - // Doing so may result in corruption of data. - if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist { - glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err) - continue - } - // Check whether volume is still mounted on disk. If so, do not delete directory - if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 { - glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames) - continue - } - - glog.V(3).Infof("Orphaned pod %q found, removing", uid) - if err := os.RemoveAll(kl.getPodDir(uid)); err != nil { - glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err) - errlist = append(errlist, err) - } - } - return utilerrors.NewAggregate(errlist) -} - // Get pods which should be resynchronized. Currently, the following pod should be resynchronized: // * pod whose work is ready. // * internal modules that request sync of a pod. @@ -1929,9 +1840,6 @@ func (kl *Kubelet) deletePod(pod *api.Pod) error { return nil } -// empty is a placeholder type used to implement a set -type empty struct{} - // HandlePodCleanups performs a series of cleanup work, including terminating // pod workers, killing unwanted pods, and removing orphaned volumes/pod // directories. @@ -2045,23 +1953,6 @@ func (kl *Kubelet) podKiller() { } } -// podsByCreationTime makes an array of pods sortable by their creation -// timestamps. -// TODO: move into util package -type podsByCreationTime []*api.Pod - -func (s podsByCreationTime) Len() int { - return len(s) -} - -func (s podsByCreationTime) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -func (s podsByCreationTime) Less(i, j int) bool { - return s[i].CreationTimestamp.Before(s[j].CreationTimestamp) -} - // checkHostPortConflicts detects pods with conflicted host ports. func hasHostPortConflicts(pods []*api.Pod) bool { ports := sets.String{} @@ -2384,7 +2275,7 @@ func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) { // a config source. func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) { start := kl.clock.Now() - sort.Sort(podsByCreationTime(pods)) + sort.Sort(sliceutils.PodsByCreationTime(pods)) for _, pod := range pods { if kubepod.IsMirrorPod(pod) { kl.podManager.AddPod(pod) @@ -2598,15 +2489,6 @@ func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadv } } -type byImageSize []kubecontainer.Image - -// Sort from max to min -func (a byImageSize) Less(i, j int) bool { - return a[i].Size > a[j].Size -} -func (a byImageSize) Len() int { return len(a) } -func (a byImageSize) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - // GetPhase returns the phase of a pod given its container info. // This func is exported to simplify integration with 3rd party kubelet // integrations like kubernetes-mesos. diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index faae4b357a0..b0df4a986ed 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume/util/volumehelper" @@ -430,7 +431,7 @@ func (kl *Kubelet) setNodeStatusImages(node *api.Node) { glog.Errorf("Error getting image list: %v", err) } else { // sort the images from max to min, and only set top N images into the node status. - sort.Sort(byImageSize(containerImages)) + sort.Sort(sliceutils.ByImageSize(containerImages)) if maxImagesInNodeStatus < len(containerImages) { containerImages = containerImages[0:maxImagesInNodeStatus] } diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 2f91aefd133..b80a332dc62 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/util/sliceutils" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/rand" @@ -57,7 +58,7 @@ func generateTestingImageList(count int) ([]kubecontainer.Image, []api.Container // expectedImageList is generated by imageList according to size and maxImagesInNodeStatus // 1. sort the imageList by size - sort.Sort(byImageSize(imageList)) + sort.Sort(sliceutils.ByImageSize(imageList)) // 2. convert sorted imageList to api.ContainerImage list var expectedImageList []api.ContainerImage for _, kubeImage := range imageList { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 12392b91dae..9529d6ceb34 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -794,90 +794,6 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) { } } -func TestPodVolumesExist(t *testing.T) { - testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) - kubelet := testKubelet.kubelet - - pods := []*api.Pod{ - { - ObjectMeta: api.ObjectMeta{ - Name: "pod1", - UID: "pod1uid", - }, - Spec: api.PodSpec{ - Volumes: []api.Volume{ - { - Name: "vol1", - VolumeSource: api.VolumeSource{ - GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ - PDName: "fake-device1", - }, - }, - }, - }, - }, - }, - { - ObjectMeta: api.ObjectMeta{ - Name: "pod2", - UID: "pod2uid", - }, - Spec: api.PodSpec{ - Volumes: []api.Volume{ - { - Name: "vol2", - VolumeSource: api.VolumeSource{ - GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ - PDName: "fake-device2", - }, - }, - }, - }, - }, - }, - { - ObjectMeta: api.ObjectMeta{ - Name: "pod3", - UID: "pod3uid", - }, - Spec: api.PodSpec{ - Volumes: []api.Volume{ - { - Name: "vol3", - VolumeSource: api.VolumeSource{ - GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ - PDName: "fake-device3", - }, - }, - }, - }, - }, - }, - } - - stopCh := runVolumeManager(kubelet) - defer func() { - close(stopCh) - }() - - kubelet.podManager.SetPods(pods) - for _, pod := range pods { - err := kubelet.volumeManager.WaitForAttachAndMount(pod) - if err != nil { - t.Errorf("Expected success: %v", err) - } - } - - for _, pod := range pods { - podVolumesExist := kubelet.podVolumesExist(pod.UID) - if !podVolumesExist { - t.Errorf( - "Expected to find volumes for pod %q, but podVolumesExist returned false", - pod.UID) - } - } -} - type stubVolume struct { path string volume.MetricsNil diff --git a/pkg/kubelet/kubelet_volumes.go b/pkg/kubelet/kubelet_volumes.go index f92d7e363ae..3838b31558f 100644 --- a/pkg/kubelet/kubelet_volumes.go +++ b/pkg/kubelet/kubelet_volumes.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -18,10 +18,17 @@ package kubelet import ( "fmt" + "os" + "path/filepath" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" + utilerrors "k8s.io/kubernetes/pkg/util/errors" + "k8s.io/kubernetes/pkg/util/selinux" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/volume" volumetypes "k8s.io/kubernetes/pkg/volume/util/types" ) @@ -66,3 +73,90 @@ func (kl *Kubelet) newVolumeMounterFromPlugins(spec *volume.Spec, pod *api.Pod, glog.V(10).Infof("Using volume plugin %q to mount %s", plugin.GetPluginName(), spec.Name()) return physicalMounter, nil } + +// relabelVolumes relabels SELinux volumes to match the pod's +// SELinuxOptions specification. This is only needed if the pod uses +// hostPID or hostIPC. Otherwise relabeling is delegated to docker. +func (kl *Kubelet) relabelVolumes(pod *api.Pod, volumes kubecontainer.VolumeMap) error { + if pod.Spec.SecurityContext.SELinuxOptions == nil { + return nil + } + + rootDirContext, err := kl.getRootDirContext() + if err != nil { + return err + } + + selinuxRunner := selinux.NewSelinuxContextRunner() + // Apply the pod's Level to the rootDirContext + rootDirSELinuxOptions, err := securitycontext.ParseSELinuxOptions(rootDirContext) + if err != nil { + return err + } + + rootDirSELinuxOptions.Level = pod.Spec.SecurityContext.SELinuxOptions.Level + volumeContext := fmt.Sprintf("%s:%s:%s:%s", rootDirSELinuxOptions.User, rootDirSELinuxOptions.Role, rootDirSELinuxOptions.Type, rootDirSELinuxOptions.Level) + + for _, vol := range volumes { + if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux { + // Relabel the volume and its content to match the 'Level' of the pod + path, err := volume.GetPath(vol.Mounter) + if err != nil { + return err + } + err = filepath.Walk(path, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + return selinuxRunner.SetContext(path, volumeContext) + }) + if err != nil { + return err + } + vol.SELinuxLabeled = true + } + } + return nil +} + +// cleanupOrphanedPodDirs removes the volumes of pods that should not be +// running and that have no containers running. +func (kl *Kubelet) cleanupOrphanedPodDirs( + pods []*api.Pod, runningPods []*kubecontainer.Pod) error { + allPods := sets.NewString() + for _, pod := range pods { + allPods.Insert(string(pod.UID)) + } + for _, pod := range runningPods { + allPods.Insert(string(pod.ID)) + } + + found, err := kl.listPodsFromDisk() + if err != nil { + return err + } + errlist := []error{} + for _, uid := range found { + if allPods.Has(string(uid)) { + continue + } + // If volumes have not been unmounted/detached, do not delete directory. + // Doing so may result in corruption of data. + if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist { + glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err) + continue + } + // Check whether volume is still mounted on disk. If so, do not delete directory + if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 { + glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames) + continue + } + + glog.V(3).Infof("Orphaned pod %q found, removing", uid) + if err := os.RemoveAll(kl.getPodDir(uid)); err != nil { + glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err) + errlist = append(errlist, err) + } + } + return utilerrors.NewAggregate(errlist) +} diff --git a/pkg/kubelet/kubelet_volumes_test.go b/pkg/kubelet/kubelet_volumes_test.go new file mode 100644 index 00000000000..3c0f6cd2e73 --- /dev/null +++ b/pkg/kubelet/kubelet_volumes_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api" +) + +func TestPodVolumesExist(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + kubelet := testKubelet.kubelet + + pods := []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "pod1", + UID: "pod1uid", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "vol1", + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device1", + }, + }, + }, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "pod2", + UID: "pod2uid", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "vol2", + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device2", + }, + }, + }, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "pod3", + UID: "pod3uid", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "vol3", + VolumeSource: api.VolumeSource{ + GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{ + PDName: "fake-device3", + }, + }, + }, + }, + }, + }, + } + + stopCh := runVolumeManager(kubelet) + defer func() { + close(stopCh) + }() + + kubelet.podManager.SetPods(pods) + for _, pod := range pods { + err := kubelet.volumeManager.WaitForAttachAndMount(pod) + if err != nil { + t.Errorf("Expected success: %v", err) + } + } + + for _, pod := range pods { + podVolumesExist := kubelet.podVolumesExist(pod.UID) + if !podVolumesExist { + t.Errorf( + "Expected to find volumes for pod %q, but podVolumesExist returned false", + pod.UID) + } + } +} diff --git a/pkg/kubelet/util.go b/pkg/kubelet/util.go index 0ddb76a86c5..20afbf30099 100644 --- a/pkg/kubelet/util.go +++ b/pkg/kubelet/util.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -128,3 +128,6 @@ func dirExists(path string) bool { } return s.IsDir() } + +// empty is a placeholder type used to implement a set +type empty struct{} diff --git a/pkg/kubelet/util/sliceutils/sliceutils.go b/pkg/kubelet/util/sliceutils/sliceutils.go index 18e89848fee..55448f17ffd 100644 --- a/pkg/kubelet/util/sliceutils/sliceutils.go +++ b/pkg/kubelet/util/sliceutils/sliceutils.go @@ -16,6 +16,11 @@ limitations under the License. package sliceutils +import ( + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + func StringInSlice(s string, list []string) bool { for _, v := range list { if v == s { @@ -25,3 +30,29 @@ func StringInSlice(s string, list []string) bool { return false } + +// PodsByCreationTime makes an array of pods sortable by their creation +// timestamps in ascending order. +type PodsByCreationTime []*api.Pod + +func (s PodsByCreationTime) Len() int { + return len(s) +} + +func (s PodsByCreationTime) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s PodsByCreationTime) Less(i, j int) bool { + return s[i].CreationTimestamp.Before(s[j].CreationTimestamp) +} + +// ByImageSize makes an array of images sortable by their size in descending +// order. +type ByImageSize []kubecontainer.Image + +func (a ByImageSize) Less(i, j int) bool { + return a[i].Size > a[j].Size +} +func (a ByImageSize) Len() int { return len(a) } +func (a ByImageSize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }