Merge pull request #31157 from pmorie/kubelet-move

Automatic merge from submit-queue

Kubelet code move: volume / util

Addresses some odds and ends that I apparently missed earlier.  Preparation for kubelet code-move ENDGAME.

cc @kubernetes/sig-node
This commit is contained in:
Kubernetes Submit Queue 2016-08-25 00:20:39 -07:00 committed by GitHub
commit bb9523bd0f
8 changed files with 243 additions and 208 deletions

View File

@ -71,14 +71,13 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/clock"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/integer"
@ -88,7 +87,6 @@ import (
"k8s.io/kubernetes/pkg/util/oom"
"k8s.io/kubernetes/pkg/util/procfs"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
utilvalidation "k8s.io/kubernetes/pkg/util/validation"
@ -1057,51 +1055,6 @@ func (kl *Kubelet) getActivePods() []*api.Pod {
return activePods
}
// relabelVolumes relabels SELinux volumes to match the pod's
// SELinuxOptions specification. This is only needed if the pod uses
// hostPID or hostIPC. Otherwise relabeling is delegated to docker.
func (kl *Kubelet) relabelVolumes(pod *api.Pod, volumes kubecontainer.VolumeMap) error {
if pod.Spec.SecurityContext.SELinuxOptions == nil {
return nil
}
rootDirContext, err := kl.getRootDirContext()
if err != nil {
return err
}
selinuxRunner := selinux.NewSelinuxContextRunner()
// Apply the pod's Level to the rootDirContext
rootDirSELinuxOptions, err := securitycontext.ParseSELinuxOptions(rootDirContext)
if err != nil {
return err
}
rootDirSELinuxOptions.Level = pod.Spec.SecurityContext.SELinuxOptions.Level
volumeContext := fmt.Sprintf("%s:%s:%s:%s", rootDirSELinuxOptions.User, rootDirSELinuxOptions.Role, rootDirSELinuxOptions.Type, rootDirSELinuxOptions.Level)
for _, vol := range volumes {
if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux {
// Relabel the volume and its content to match the 'Level' of the pod
path, err := volume.GetPath(vol.Mounter)
if err != nil {
return err
}
err = filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
return selinuxRunner.SetContext(path, volumeContext)
})
if err != nil {
return err
}
vol.SELinuxLabeled = true
}
}
return nil
}
// makeMounts determines the mount points for the given container.
func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
// Kubernetes only mounts on /etc/hosts if :
@ -1781,48 +1734,6 @@ func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) {
return pullSecrets, nil
}
// cleanupOrphanedPodDirs removes the volumes of pods that should not be
// running and that have no containers running.
func (kl *Kubelet) cleanupOrphanedPodDirs(
pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
allPods := sets.NewString()
for _, pod := range pods {
allPods.Insert(string(pod.UID))
}
for _, pod := range runningPods {
allPods.Insert(string(pod.ID))
}
found, err := kl.listPodsFromDisk()
if err != nil {
return err
}
errlist := []error{}
for _, uid := range found {
if allPods.Has(string(uid)) {
continue
}
// If volumes have not been unmounted/detached, do not delete directory.
// Doing so may result in corruption of data.
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err)
continue
}
// Check whether volume is still mounted on disk. If so, do not delete directory
if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 {
glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames)
continue
}
glog.V(3).Infof("Orphaned pod %q found, removing", uid)
if err := os.RemoveAll(kl.getPodDir(uid)); err != nil {
glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err)
errlist = append(errlist, err)
}
}
return utilerrors.NewAggregate(errlist)
}
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * internal modules that request sync of a pod.
@ -1930,9 +1841,6 @@ func (kl *Kubelet) deletePod(pod *api.Pod) error {
return nil
}
// empty is a placeholder type used to implement a set
type empty struct{}
// HandlePodCleanups performs a series of cleanup work, including terminating
// pod workers, killing unwanted pods, and removing orphaned volumes/pod
// directories.
@ -2046,23 +1954,6 @@ func (kl *Kubelet) podKiller() {
}
}
// podsByCreationTime makes an array of pods sortable by their creation
// timestamps.
// TODO: move into util package
type podsByCreationTime []*api.Pod
func (s podsByCreationTime) Len() int {
return len(s)
}
func (s podsByCreationTime) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s podsByCreationTime) Less(i, j int) bool {
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
}
// checkHostPortConflicts detects pods with conflicted host ports.
func hasHostPortConflicts(pods []*api.Pod) bool {
ports := sets.String{}
@ -2385,7 +2276,7 @@ func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) {
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
start := kl.clock.Now()
sort.Sort(podsByCreationTime(pods))
sort.Sort(sliceutils.PodsByCreationTime(pods))
for _, pod := range pods {
if kubepod.IsMirrorPod(pod) {
kl.podManager.AddPod(pod)
@ -2599,15 +2490,6 @@ func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadv
}
}
type byImageSize []kubecontainer.Image
// Sort from max to min
func (a byImageSize) Less(i, j int) bool {
return a[i].Size > a[j].Size
}
func (a byImageSize) Len() int { return len(a) }
func (a byImageSize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// GetPhase returns the phase of a pod given its container info.
// This func is exported to simplify integration with 3rd party kubelet
// integrations like kubernetes-mesos.

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
@ -430,7 +431,7 @@ func (kl *Kubelet) setNodeStatusImages(node *api.Node) {
glog.Errorf("Error getting image list: %v", err)
} else {
// sort the images from max to min, and only set top N images into the node status.
sort.Sort(byImageSize(containerImages))
sort.Sort(sliceutils.ByImageSize(containerImages))
if maxImagesInNodeStatus < len(containerImages) {
containerImages = containerImages[0:maxImagesInNodeStatus]
}

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/util/rand"
@ -57,7 +58,7 @@ func generateTestingImageList(count int) ([]kubecontainer.Image, []api.Container
// expectedImageList is generated by imageList according to size and maxImagesInNodeStatus
// 1. sort the imageList by size
sort.Sort(byImageSize(imageList))
sort.Sort(sliceutils.ByImageSize(imageList))
// 2. convert sorted imageList to api.ContainerImage list
var expectedImageList []api.ContainerImage
for _, kubeImage := range imageList {

View File

@ -794,90 +794,6 @@ func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
}
}
func TestPodVolumesExist(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
pods := []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol1",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "pod2",
UID: "pod2uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol2",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device2",
},
},
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "pod3",
UID: "pod3uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol3",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device3",
},
},
},
},
},
},
}
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
kubelet.podManager.SetPods(pods)
for _, pod := range pods {
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
if err != nil {
t.Errorf("Expected success: %v", err)
}
}
for _, pod := range pods {
podVolumesExist := kubelet.podVolumesExist(pod.UID)
if !podVolumesExist {
t.Errorf(
"Expected to find volumes for pod %q, but podVolumesExist returned false",
pod.UID)
}
}
}
type stubVolume struct {
path string
volume.MetricsNil

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -18,10 +18,17 @@ package kubelet
import (
"fmt"
"os"
"path/filepath"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
)
@ -66,3 +73,90 @@ func (kl *Kubelet) newVolumeMounterFromPlugins(spec *volume.Spec, pod *api.Pod,
glog.V(10).Infof("Using volume plugin %q to mount %s", plugin.GetPluginName(), spec.Name())
return physicalMounter, nil
}
// relabelVolumes relabels SELinux volumes to match the pod's
// SELinuxOptions specification. This is only needed if the pod uses
// hostPID or hostIPC. Otherwise relabeling is delegated to docker.
func (kl *Kubelet) relabelVolumes(pod *api.Pod, volumes kubecontainer.VolumeMap) error {
if pod.Spec.SecurityContext.SELinuxOptions == nil {
return nil
}
rootDirContext, err := kl.getRootDirContext()
if err != nil {
return err
}
selinuxRunner := selinux.NewSelinuxContextRunner()
// Apply the pod's Level to the rootDirContext
rootDirSELinuxOptions, err := securitycontext.ParseSELinuxOptions(rootDirContext)
if err != nil {
return err
}
rootDirSELinuxOptions.Level = pod.Spec.SecurityContext.SELinuxOptions.Level
volumeContext := fmt.Sprintf("%s:%s:%s:%s", rootDirSELinuxOptions.User, rootDirSELinuxOptions.Role, rootDirSELinuxOptions.Type, rootDirSELinuxOptions.Level)
for _, vol := range volumes {
if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux {
// Relabel the volume and its content to match the 'Level' of the pod
path, err := volume.GetPath(vol.Mounter)
if err != nil {
return err
}
err = filepath.Walk(path, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
return selinuxRunner.SetContext(path, volumeContext)
})
if err != nil {
return err
}
vol.SELinuxLabeled = true
}
}
return nil
}
// cleanupOrphanedPodDirs removes the volumes of pods that should not be
// running and that have no containers running.
func (kl *Kubelet) cleanupOrphanedPodDirs(
pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
allPods := sets.NewString()
for _, pod := range pods {
allPods.Insert(string(pod.UID))
}
for _, pod := range runningPods {
allPods.Insert(string(pod.ID))
}
found, err := kl.listPodsFromDisk()
if err != nil {
return err
}
errlist := []error{}
for _, uid := range found {
if allPods.Has(string(uid)) {
continue
}
// If volumes have not been unmounted/detached, do not delete directory.
// Doing so may result in corruption of data.
if podVolumesExist := kl.podVolumesExist(uid); podVolumesExist {
glog.V(3).Infof("Orphaned pod %q found, but volumes are not cleaned up; err: %v", uid, err)
continue
}
// Check whether volume is still mounted on disk. If so, do not delete directory
if volumeNames, err := kl.getPodVolumeNameListFromDisk(uid); err != nil || len(volumeNames) != 0 {
glog.V(3).Infof("Orphaned pod %q found, but volumes are still mounted; err: %v, volumes: %v ", uid, err, volumeNames)
continue
}
glog.V(3).Infof("Orphaned pod %q found, removing", uid)
if err := os.RemoveAll(kl.getPodDir(uid)); err != nil {
glog.Errorf("Failed to remove orphaned pod %q dir; err: %v", uid, err)
errlist = append(errlist, err)
}
}
return utilerrors.NewAggregate(errlist)
}

View File

@ -0,0 +1,107 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
"k8s.io/kubernetes/pkg/api"
)
func TestPodVolumesExist(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
pods := []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol1",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "pod2",
UID: "pod2uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol2",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device2",
},
},
},
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "pod3",
UID: "pod3uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol3",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device3",
},
},
},
},
},
},
}
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
kubelet.podManager.SetPods(pods)
for _, pod := range pods {
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
if err != nil {
t.Errorf("Expected success: %v", err)
}
}
for _, pod := range pods {
podVolumesExist := kubelet.podVolumesExist(pod.UID)
if !podVolumesExist {
t.Errorf(
"Expected to find volumes for pod %q, but podVolumesExist returned false",
pod.UID)
}
}
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -128,3 +128,6 @@ func dirExists(path string) bool {
}
return s.IsDir()
}
// empty is a placeholder type used to implement a set
type empty struct{}

View File

@ -16,6 +16,11 @@ limitations under the License.
package sliceutils
import (
"k8s.io/kubernetes/pkg/api"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
func StringInSlice(s string, list []string) bool {
for _, v := range list {
if v == s {
@ -25,3 +30,29 @@ func StringInSlice(s string, list []string) bool {
return false
}
// PodsByCreationTime makes an array of pods sortable by their creation
// timestamps in ascending order.
type PodsByCreationTime []*api.Pod
func (s PodsByCreationTime) Len() int {
return len(s)
}
func (s PodsByCreationTime) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s PodsByCreationTime) Less(i, j int) bool {
return s[i].CreationTimestamp.Before(s[j].CreationTimestamp)
}
// ByImageSize makes an array of images sortable by their size in descending
// order.
type ByImageSize []kubecontainer.Image
func (a ByImageSize) Less(i, j int) bool {
return a[i].Size > a[j].Size
}
func (a ByImageSize) Len() int { return len(a) }
func (a ByImageSize) Swap(i, j int) { a[i], a[j] = a[j], a[i] }