configurable pv recyclers

This commit is contained in:
markturansky 2015-09-02 23:14:26 -04:00
parent 0902f80f8b
commit 7bc55b5aea
16 changed files with 468 additions and 178 deletions

View File

@ -118,7 +118,10 @@ func NewCMServer() *CMServer {
EnableDeploymentController: false,
VolumeConfigFlags: VolumeConfigFlags{
// default values here
PersistentVolumeRecyclerTimeoutNFS: 300,
PersistentVolumeRecyclerMinimumTimeoutNFS: 300,
PersistentVolumeRecyclerIncrementTimeoutNFS: 30,
PersistentVolumeRecyclerMinimumTimeoutHostPath: 60,
PersistentVolumeRecyclerIncrementTimeoutHostPath: 30,
},
}
return &s
@ -129,7 +132,12 @@ func NewCMServer() *CMServer {
// of volume.VolumeConfig which are then passed to the appropriate plugin. The ControllerManager binary is the only
// part of the code which knows what plugins are supported and which CLI flags correspond to each plugin.
type VolumeConfigFlags struct {
PersistentVolumeRecyclerTimeoutNFS int
PersistentVolumeRecyclerMinimumTimeoutNFS int
PersistentVolumeRecyclerPodTemplateFilePathNFS string
PersistentVolumeRecyclerIncrementTimeoutNFS int
PersistentVolumeRecyclerPodTemplateFilePathHostPath string
PersistentVolumeRecyclerMinimumTimeoutHostPath int
PersistentVolumeRecyclerIncrementTimeoutHostPath int
}
// AddFlags adds flags for a specific CMServer to the specified FlagSet
@ -147,8 +155,12 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource-quota-sync-period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace-sync-period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder-sync-period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims")
// TODO markt -- make this example a working config item with Recycler Config PR.
// fs.MyExample(&s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "pv-recycler-timeout-nfs", s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "The minimum timeout for an NFS PV recycling operation")
fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "pv-recycler-pod-template-filepath-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "The file path to a pod definition used as a template for NFS persistent volume recycling")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "pv-recycler-minimum-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "The minimum ActiveDeadlineSeconds to use for an NFS Recycler pod")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "pv-recycler-increment-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "the increment of time added per Gi to ActiveDeadlineSeconds for an NFS scrubber pod")
fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "pv-recycler-pod-template-filepath-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "pv-recycler-minimum-timeout-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "pv-recycler-timeout-increment-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. This is for development and testing only and will not work in a multi-node cluster.")
fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.")
fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.")
fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.")
@ -271,6 +283,7 @@ func (s *CMServer) Run(_ []string) error {
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)

View File

@ -25,9 +25,12 @@ import (
_ "k8s.io/kubernetes/pkg/cloudprovider/providers"
// Volume plugins
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/host_path"
"k8s.io/kubernetes/pkg/volume/nfs"
"github.com/golang/glog"
)
// ProbeRecyclableVolumePlugins collects all persistent volume plugins into an easy to use list.
@ -41,15 +44,41 @@ func ProbeRecyclableVolumePlugins(flags VolumeConfigFlags) []volume.VolumePlugin
// Each plugin can make use of VolumeConfig. The single arg to this func contains *all* enumerated
// CLI flags meant to configure volume plugins. From that single config, create an instance of volume.VolumeConfig
// for a specific plugin and pass that instance to the plugin's ProbeVolumePlugins(config) func.
hostPathConfig := volume.VolumeConfig{
// transfer attributes from VolumeConfig to this instance of volume.VolumeConfig
}
nfsConfig := volume.VolumeConfig{
// TODO transfer config.PersistentVolumeRecyclerTimeoutNFS and other flags to this instance of VolumeConfig
// Configuring recyclers will be done in a follow-up PR
}
// HostPath recycling is for testing and development purposes only!
hostPathConfig := volume.VolumeConfig{
RecyclerMinimumTimeout: flags.PersistentVolumeRecyclerMinimumTimeoutHostPath,
RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutHostPath,
RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(),
}
if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, &hostPathConfig); err != nil {
glog.Fatalf("Could not create hostpath recycler pod from file %s: %+v", err)
}
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(hostPathConfig)...)
nfsConfig := volume.VolumeConfig{
RecyclerMinimumTimeout: flags.PersistentVolumeRecyclerMinimumTimeoutNFS,
RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutNFS,
RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(),
}
if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, &nfsConfig); err != nil {
glog.Fatalf("Could not create NFS recycler pod from file %s: %+v", err)
}
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)
return allPlugins
}
// attemptToLoadRecycler tries decoding a pod from a filepath for use as a recycler for a volume.
// If successful, this method will set the recycler on the config.
// If unsucessful, an error is returned.
func attemptToLoadRecycler(path string, config *volume.VolumeConfig) error {
if path != "" {
recyclerPod, err := io.LoadPodFromFile(path)
if err != nil {
return err
}
config.RecyclerPodTemplate = recyclerPod
}
return nil
}

View File

@ -208,6 +208,12 @@ proxy-logv
proxy-port-range
public-address-override
pvclaimbinder-sync-period
pv-recycler-pod-template-filepath-nfs
pv-recycler-minimum-timeout-nfs
pv-recycler-increment-timeout-nfs
pv-recycler-pod-template-filepath-hostpath
pv-recycler-minimum-timeout-hostpath
pv-recycler-timeout-increment-hostpath
read-only-port
really-crash-for-testing
reconcile-cooldown

View File

@ -199,7 +199,7 @@ func TestBindingWithExamples(t *testing.T) {
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
recycler := &PersistentVolumeRecycler{
kubeClient: client,
@ -388,7 +388,7 @@ func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.Persiste
return claim, nil
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolume.Spec.HostPath.Path,
}, nil

View File

@ -35,6 +35,8 @@ import (
"k8s.io/kubernetes/pkg/watch"
)
var _ volume.VolumeHost = &PersistentVolumeRecycler{}
// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims.
// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them
// available again for a new claim.

57
pkg/util/io/io.go Normal file
View File

@ -0,0 +1,57 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io
import (
"fmt"
"io/ioutil"
"os"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/latest"
)
// LoadPodFromFile will read, decode, and return a Pod from a file.
func LoadPodFromFile(filePath string) (*api.Pod, error) {
if filePath == "" {
return nil, fmt.Errorf("file path not specified")
}
podDef, err := ioutil.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read file path %s: %+v", filePath, err)
}
if len(podDef) == 0 {
return nil, fmt.Errorf("file was empty: %s", filePath)
}
pod := &api.Pod{}
if err := latest.Codec.DecodeInto(podDef, pod); err != nil {
return nil, fmt.Errorf("failed decoding file: %v", err)
}
return pod, nil
}
// SavePodToFile will encode and save a pod to a given path & permissions
func SavePodToFile(pod *api.Pod, filePath string, perm os.FileMode) error {
if filePath == "" {
return fmt.Errorf("file path not specified")
}
data, err := latest.Codec.Encode(pod)
if err != nil {
return fmt.Errorf("failed encoding pod: %v", err)
}
return ioutil.WriteFile(filePath, data, perm)
}

50
pkg/util/io/io_test.go Normal file
View File

@ -0,0 +1,50 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io
import (
"fmt"
"github.com/pborman/uuid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/volume"
"os"
"testing"
)
func TestSavePodToFile(t *testing.T) {
pod := volume.NewPersistentVolumeRecyclerPodTemplate()
// sets all default values on a pod for equality comparison after decoding from file
encoded, err := latest.Codec.Encode(pod)
latest.Codec.DecodeInto(encoded, pod)
path := fmt.Sprintf("/tmp/kube-io-test-%s", uuid.New())
defer os.Remove(path)
if err := SavePodToFile(pod, path, 777); err != nil {
t.Fatalf("failed to save pod to file: %v", err)
}
podFromFile, err := LoadPodFromFile(path)
if err != nil {
t.Fatalf("failed to load pod from file: %v", err)
}
if !api.Semantic.DeepEqual(pod, podFromFile) {
t.Errorf("\nexpected %#v\ngot %#v\n", pod, podFromFile)
}
}

View File

@ -21,26 +21,39 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
)
// This is the primary entrypoint for volume plugins.
// Tests covering recycling should not use this func but instead
// use their own array of plugins w/ a custom recyclerFunc as appropriate
func ProbeVolumePlugins(config volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{&hostPathPlugin{nil, newRecycler}}
// The volumeConfig arg provides the ability to configure volume behavior. It is implemented as a pointer to allow nils.
// The hostPathPlugin is used to store the volumeConfig and give it, when needed, to the func that creates HostPath Recyclers.
// Tests that exercise recycling should not use this func but instead use ProbeRecyclablePlugins() to override default behavior.
func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&hostPathPlugin{
host: nil,
newRecyclerFunc: newRecycler,
config: volumeConfig,
},
}
}
func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)) []volume.VolumePlugin {
return []volume.VolumePlugin{&hostPathPlugin{nil, recyclerFunc}}
func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&hostPathPlugin{
host: nil,
newRecyclerFunc: recyclerFunc,
config: volumeConfig,
},
}
}
type hostPathPlugin struct {
host volume.VolumeHost
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
config volume.VolumeConfig
}
var _ volume.VolumePlugin = &hostPathPlugin{}
@ -89,14 +102,20 @@ func (plugin *hostPathPlugin) NewCleaner(volName string, podUID types.UID, _ mou
}
func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(spec, plugin.host)
return plugin.newRecyclerFunc(spec, plugin.host, plugin.config)
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil")
}
return &hostPathRecycler{spec.Name(), spec.PersistentVolume.Spec.HostPath.Path, host}, nil
return &hostPathRecycler{
name: spec.Name(),
path: spec.PersistentVolume.Spec.HostPath.Path,
host: host,
config: config,
timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume),
}, nil
}
// HostPath volumes represent a bare host file or directory mount.
@ -153,60 +172,29 @@ func (c *hostPathCleaner) TearDownAt(dir string) error {
// hostPathRecycler scrubs a hostPath volume by running "rm -rf" on the volume in a pod
// This recycler only works on a single host cluster and is for testing purposes only.
type hostPathRecycler struct {
name string
path string
host volume.VolumeHost
name string
path string
host volume.VolumeHost
config volume.VolumeConfig
timeout int64
}
func (r *hostPathRecycler) GetPath() string {
return r.path
}
// Recycler provides methods to reclaim the volume resource.
// A HostPath is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. This is meant for
// development and testing in a single node cluster only.
// Recycle recycles/scrubs clean a HostPath volume.
// Recycle blocks until the pod has completed or any error occurs.
// The scrubber pod's is expected to succeed within 30 seconds when testing localhost.
// HostPath recycling only works in single node clusters and is meant for testing purposes only.
func (r *hostPathRecycler) Recycle() error {
timeout := int64(30)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
ActiveDeadlineSeconds: &timeout,
RestartPolicy: api.RestartPolicyNever,
Volumes: []api.Volume{
{
Name: "vol",
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{Path: r.path},
},
},
},
Containers: []api.Container{
{
Name: "scrubber",
Image: "gcr.io/google_containers/busybox",
// delete the contents of the volume, but not the directory itself
Command: []string{"/bin/sh"},
// the scrubber:
// 1. validates the /scrub directory exists
// 2. creates a text file in the directory to be scrubbed
// 3. performs rm -rf on the directory
// 4. tests to see if the directory is empty
// the pod fails if the error code is returned
Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "vol",
MountPath: "/scrub",
},
},
},
},
pod := r.config.RecyclerPodTemplate
// overrides
pod.Spec.ActiveDeadlineSeconds = &r.timeout
pod.GenerateName = "pv-recycler-hostpath-"
pod.Spec.Volumes[0].VolumeSource = api.VolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: r.path,
},
}
return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient())
return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient())
}

View File

@ -63,8 +63,8 @@ func TestGetAccessModes(t *testing.T) {
func TestRecycler(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
volumeHost := volume.NewFakeVolumeHost("/tmp/fake", nil, nil)
plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, volume.NewFakeRecycler}}, volumeHost)
pluginHost := volume.NewFakeVolumeHost("/tmp/fake", nil, nil)
plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, volume.NewFakeRecycler, volume.VolumeConfig{}}}, pluginHost)
spec := &volume.Spec{PersistentVolume: &api.PersistentVolume{Spec: api.PersistentVolumeSpec{PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/foo"}}}}}
plug, err := plugMgr.FindRecyclablePluginBySpec(spec)

View File

@ -30,16 +30,24 @@ import (
)
// This is the primary entrypoint for volume plugins.
// Tests covering recycling should not use this func but instead
// use their own array of plugins w/ a custom recyclerFunc as appropriate
func ProbeVolumePlugins(config volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{&nfsPlugin{nil, newRecycler}}
// The volumeConfig arg provides the ability to configure recycler behavior. It is implemented as a pointer to allow nils.
// The nfsPlugin is used to store the volumeConfig and give it, when needed, to the func that creates NFS Recyclers.
// Tests that exercise recycling should not use this func but instead use ProbeRecyclablePlugins() to override default behavior.
func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&nfsPlugin{
host: nil,
newRecyclerFunc: newRecycler,
config: volumeConfig,
},
}
}
type nfsPlugin struct {
host volume.VolumeHost
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
config volume.VolumeConfig
}
var _ volume.VolumePlugin = &nfsPlugin{}
@ -112,7 +120,7 @@ func (plugin *nfsPlugin) newCleanerInternal(volName string, podUID types.UID, mo
}
func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(spec, plugin.host)
return plugin.newRecyclerFunc(spec, plugin.host, plugin.config)
}
// NFS volumes represent a bare host file or directory mount of an NFS export.
@ -122,7 +130,7 @@ type nfs struct {
mounter mount.Interface
plugin *nfsPlugin
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
}
func (nfsVolume *nfs) GetPath() string {
@ -236,77 +244,46 @@ func (c *nfsCleaner) TearDownAt(dir string) error {
return nil
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
func newRecycler(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) {
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.NFS == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.NFS is nil")
}
return &nfsRecycler{
name: spec.Name(),
server: spec.PersistentVolume.Spec.NFS.Server,
path: spec.PersistentVolume.Spec.NFS.Path,
host: host,
name: spec.Name(),
server: spec.PersistentVolume.Spec.NFS.Server,
path: spec.PersistentVolume.Spec.NFS.Path,
host: host,
config: volumeConfig,
timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume),
}, nil
}
// nfsRecycler scrubs an NFS volume by running "rm -rf" on the volume in a pod.
type nfsRecycler struct {
name string
server string
path string
host volume.VolumeHost
name string
server string
path string
host volume.VolumeHost
config volume.VolumeConfig
timeout int64
}
func (r *nfsRecycler) GetPath() string {
return r.path
}
// Recycler provides methods to reclaim the volume resource.
// A NFS volume is recycled by scheduling a pod to run "rm -rf" on the contents of the volume.
// Recycle recycles/scrubs clean an NFS volume.
// Recycle blocks until the pod has completed or any error occurs.
// The scrubber pod's is expected to succeed within 5 minutes else an error will be returned
func (r *nfsRecycler) Recycle() error {
timeout := int64(300) // 5 minutes
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
ActiveDeadlineSeconds: &timeout,
RestartPolicy: api.RestartPolicyNever,
Volumes: []api.Volume{
{
Name: "vol",
VolumeSource: api.VolumeSource{
NFS: &api.NFSVolumeSource{
Server: r.server,
Path: r.path,
},
},
},
},
Containers: []api.Container{
{
Name: "scrubber",
Image: "gcr.io/google_containers/busybox",
// delete the contents of the volume, but not the directory itself
Command: []string{"/bin/sh"},
// the scrubber:
// 1. validates the /scrub directory exists
// 2. creates a text file to be scrubbed
// 3. performs rm -rf on the directory
// 4. tests to see if the directory is empty
// the pod fails if the error code is returned
Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "vol",
MountPath: "/scrub",
},
},
},
},
pod := r.config.RecyclerPodTemplate
// overrides
pod.Spec.ActiveDeadlineSeconds = &r.timeout
pod.GenerateName = "pv-recycler-nfs-"
pod.Spec.Volumes[0].VolumeSource = api.VolumeSource{
NFS: &api.NFSVolumeSource{
Server: r.server,
Path: r.path,
},
}
return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient())
return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient())
}

View File

@ -64,7 +64,7 @@ func TestGetAccessModes(t *testing.T) {
func TestRecycler(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{nil, newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{nil, newMockRecycler, volume.VolumeConfig{}}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
spec := &volume.Spec{PersistentVolume: &api.PersistentVolume{Spec: api.PersistentVolumeSpec{PersistentVolumeSource: api.PersistentVolumeSource{NFS: &api.NFSVolumeSource{Path: "/foo"}}}}}
plug, err := plugMgr.FindRecyclablePluginBySpec(spec)
@ -83,7 +83,7 @@ func TestRecycler(t *testing.T) {
}
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolume.Spec.NFS.Path,
}, nil

View File

@ -169,8 +169,22 @@ func (spec *Spec) Name() string {
// The binary should still use strong typing for this value when binding CLI values before they are passed as strings
// in OtherAttributes.
type VolumeConfig struct {
// thockin: do we want to wait on this until we have an actual use case? I can change the comments above to
// reflect our intention for one-off config.
// RecyclerPodTemplate is pod template that understands how to scrub clean a persistent volume after its release.
// The template is used by plugins which override specific properties of the pod in accordance with that plugin.
// See NewPersistentVolumeRecyclerPodTemplate for the properties that are expected to be overridden.
RecyclerPodTemplate *api.Pod
// RecyclerMinimumTimeout is the minimum amount of time in seconds for the recycler pod's ActiveDeadlineSeconds attribute.
// Added to the minimum timeout is the increment per Gi of capacity.
RecyclerMinimumTimeout int
// RecyclerTimeoutIncrement is the number of seconds added to the recycler pod's ActiveDeadlineSeconds for each
// Gi of capacity in the persistent volume.
// Example: 5Gi volume x 30s increment = 150s + 30s minimum = 180s ActiveDeadlineSeconds for recycler pod
RecyclerTimeoutIncrement int
// OtherAttributes stores config as strings. These strings are opaque to the system and only understood by the binary
// hosting the plugin and the plugin itself.
OtherAttributes map[string]string
}
@ -301,3 +315,49 @@ func (pm *VolumePluginMgr) FindRecyclablePluginBySpec(spec *Spec) (RecyclableVol
}
return nil, fmt.Errorf("no recyclable volume plugin matched")
}
// NewPersistentVolumeRecyclerPodTemplate creates a template for a recycler pod. By default, a recycler pod simply runs
// "rm -rf" on a volume and tests for emptiness. Most attributes of the template will be correct for most
// plugin implementations. The following attributes can be overridden per plugin via configuration:
//
// 1. pod.Spec.Volumes[0].VolumeSource must be overridden. Recycler implementations without a valid VolumeSource will fail.
// 2. pod.GenerateName helps distinguish recycler pods by name. Recommended. Default is "pv-recycler-".
// 3. pod.Spec.ActiveDeadlineSeconds gives the recycler pod a maximum timeout before failing. Recommended. Default is 60 seconds.
//
// See HostPath and NFS for working recycler examples
func NewPersistentVolumeRecyclerPodTemplate() *api.Pod {
timeout := int64(60)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-recycler-",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
ActiveDeadlineSeconds: &timeout,
RestartPolicy: api.RestartPolicyNever,
Volumes: []api.Volume{
{
Name: "vol",
// IMPORTANT! All plugins using this template MUST override pod.Spec.Volumes[0].VolumeSource
// Recycler implementations without a valid VolumeSource will fail.
VolumeSource: api.VolumeSource{},
},
},
Containers: []api.Container{
{
Name: "pv-recycler",
Image: "gcr.io/google_containers/busybox",
Command: []string{"/bin/sh"},
Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* /scrub/.* && test -z \"$(ls -A /scrub)\" || exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "vol",
MountPath: "/scrub",
},
},
},
},
},
}
return pod
}

View File

@ -176,7 +176,7 @@ func (fr *fakeRecycler) GetPath() string {
return fr.path
}
func NewFakeRecycler(spec *Spec, host VolumeHost) (Recycler, error) {
func NewFakeRecycler(spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) {
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
return nil, fmt.Errorf("fakeRecycler only supports spec.PersistentVolume.Spec.HostPath")
}

View File

@ -29,30 +29,31 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/resource"
)
// ScrubPodVolumeAndWatchUntilCompletion is intended for use with volume Recyclers. This function will
// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume Recyclers. This function will
// save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first.
// An attempt to delete a scrubber pod is always attempted before returning.
// pod - the pod designed by a volume plugin to scrub the volume's contents
// An attempt to delete a recycler pod is always attempted before returning.
// pod - the pod designed by a volume plugin to recycle the volume
// client - kube client for API operations.
func ScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, kubeClient client.Interface) error {
return internalScrubPodVolumeAndWatchUntilCompletion(pod, newScrubberClient(kubeClient))
func RecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, kubeClient client.Interface) error {
return internalRecycleVolumeByWatchingPodUntilCompletion(pod, newRecyclerClient(kubeClient))
}
// same as above func comments, except 'scrubberClient' is a narrower pod API interface to ease testing
func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient scrubberClient) error {
glog.V(5).Infof("Creating scrubber pod for volume %s\n", pod.Name)
pod, err := scrubberClient.CreatePod(pod)
// same as above func comments, except 'recyclerClient' is a narrower pod API interface to ease testing
func internalRecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, recyclerClient recyclerClient) error {
glog.V(5).Infof("Creating recycler pod for volume %s\n", pod.Name)
pod, err := recyclerClient.CreatePod(pod)
if err != nil {
return fmt.Errorf("Unexpected error creating a pod to scrub volume %s: %+v\n", pod.Name, err)
return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err)
}
defer scrubberClient.DeletePod(pod.Name, pod.Namespace)
defer recyclerClient.DeletePod(pod.Name, pod.Namespace)
stopChannel := make(chan struct{})
defer close(stopChannel)
nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel)
nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel)
for {
watchedPod := nextPod()
@ -71,39 +72,39 @@ func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient
}
}
// scrubberClient abstracts access to a Pod by providing a narrower interface.
// recyclerClient abstracts access to a Pod by providing a narrower interface.
// this makes it easier to mock a client for testing
type scrubberClient interface {
type recyclerClient interface {
CreatePod(pod *api.Pod) (*api.Pod, error)
GetPod(name, namespace string) (*api.Pod, error)
DeletePod(name, namespace string) error
WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod
}
func newScrubberClient(client client.Interface) scrubberClient {
return &realScrubberClient{client}
func newRecyclerClient(client client.Interface) recyclerClient {
return &realRecyclerClient{client}
}
type realScrubberClient struct {
type realRecyclerClient struct {
client client.Interface
}
func (c *realScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
func (c *realRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
return c.client.Pods(pod.Namespace).Create(pod)
}
func (c *realScrubberClient) GetPod(name, namespace string) (*api.Pod, error) {
func (c *realRecyclerClient) GetPod(name, namespace string) (*api.Pod, error) {
return c.client.Pods(namespace).Get(name)
}
func (c *realScrubberClient) DeletePod(name, namespace string) error {
func (c *realRecyclerClient) DeletePod(name, namespace string) error {
return c.client.Pods(namespace).Delete(name, nil)
}
// WatchPod returns a ListWatch for watching a pod. The stopChannel is used
// to close the reflector backing the watch. The caller is responsible for derring a close on the channel to
// stop the reflector.
func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
func (c *realRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
podLW := &cache.ListWatch{
@ -122,3 +123,18 @@ func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string, s
return obj.(*api.Pod)
}
}
// CalculateTimeoutForVolume calculates time for a Recycler pod to complete a recycle operation.
// The calculation and return value is either the minimumTimeout or the timeoutIncrement per Gi of storage size, whichever is greater.
func CalculateTimeoutForVolume(minimumTimeout, timeoutIncrement int, pv *api.PersistentVolume) int64 {
giQty := resource.MustParse("1Gi")
pvQty := pv.Spec.Capacity[api.ResourceStorage]
giSize := giQty.Value()
pvSize := pvQty.Value()
timeout := (pvSize / giSize) * int64(timeoutIncrement)
if timeout < int64(minimumTimeout) {
return int64(minimumTimeout)
} else {
return timeout
}
}

View File

@ -21,14 +21,15 @@ import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"strings"
)
func TestScrubberSuccess(t *testing.T) {
client := &mockScrubberClient{}
scrubber := &api.Pod{
func TestRecyclerSuccess(t *testing.T) {
client := &mockRecyclerClient{}
recycler := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "scrubber-test",
Name: "recycler-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
@ -36,20 +37,20 @@ func TestScrubberSuccess(t *testing.T) {
},
}
err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client)
err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client)
if err != nil {
t.Errorf("Unexpected error watching scrubber pod: %+v", err)
t.Errorf("Unexpected error watching recycler pod: %+v", err)
}
if !client.deletedCalled {
t.Errorf("Expected deferred client.Delete to be called on scrubber pod")
t.Errorf("Expected deferred client.Delete to be called on recycler pod")
}
}
func TestScrubberFailure(t *testing.T) {
client := &mockScrubberClient{}
scrubber := &api.Pod{
func TestRecyclerFailure(t *testing.T) {
client := &mockRecyclerClient{}
recycler := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "scrubber-test",
Name: "recycler-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
@ -58,31 +59,31 @@ func TestScrubberFailure(t *testing.T) {
},
}
err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client)
err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client)
if err == nil {
t.Fatalf("Expected pod failure but got nil error returned")
}
if err != nil {
if !strings.Contains(err.Error(), "foo") {
t.Errorf("Expected pod.Status.Message %s but got %s", scrubber.Status.Message, err)
t.Errorf("Expected pod.Status.Message %s but got %s", recycler.Status.Message, err)
}
}
if !client.deletedCalled {
t.Errorf("Expected deferred client.Delete to be called on scrubber pod")
t.Errorf("Expected deferred client.Delete to be called on recycler pod")
}
}
type mockScrubberClient struct {
type mockRecyclerClient struct {
pod *api.Pod
deletedCalled bool
}
func (c *mockScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
func (c *mockRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
c.pod = pod
return c.pod, nil
}
func (c *mockScrubberClient) GetPod(name, namespace string) (*api.Pod, error) {
func (c *mockRecyclerClient) GetPod(name, namespace string) (*api.Pod, error) {
if c.pod != nil {
return c.pod, nil
} else {
@ -90,13 +91,40 @@ func (c *mockScrubberClient) GetPod(name, namespace string) (*api.Pod, error) {
}
}
func (c *mockScrubberClient) DeletePod(name, namespace string) error {
func (c *mockRecyclerClient) DeletePod(name, namespace string) error {
c.deletedCalled = true
return nil
}
func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
func (c *mockRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
return func() *api.Pod {
return c.pod
}
}
func TestCalculateTimeoutForVolume(t *testing.T) {
pv := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("500M"),
},
},
}
timeout := CalculateTimeoutForVolume(50, 30, pv)
if timeout != 50 {
t.Errorf("Expected 50 for timeout but got %v", timeout)
}
pv.Spec.Capacity[api.ResourceStorage] = resource.MustParse("2Gi")
timeout = CalculateTimeoutForVolume(50, 30, pv)
if timeout != 60 {
t.Errorf("Expected 60 for timeout but got %v", timeout)
}
pv.Spec.Capacity[api.ResourceStorage] = resource.MustParse("150Gi")
timeout = CalculateTimeoutForVolume(50, 30, pv)
if timeout != 4500 {
t.Errorf("Expected 4500 for timeout but got %v", timeout)
}
}

View File

@ -29,6 +29,8 @@ import (
"k8s.io/kubernetes/pkg/controller/persistentvolume"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
)
func init() {
@ -205,3 +207,65 @@ func createTestVolumes() []*api.PersistentVolume {
},
}
}
func TestPersistentVolumeRecycler(t *testing.T) {
_, s := runAMaster(t)
defer s.Close()
deleteAllEtcdKeys()
client := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Default.Version()})
binder := volumeclaimbinder.NewPersistentVolumeClaimBinder(client, 1*time.Second)
binder.Run()
defer binder.Stop()
recycler, _ := volumeclaimbinder.NewPersistentVolumeRecycler(client, 1*time.Second, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}})
recycler.Run()
defer recycler.Stop()
// This PV will be claimed, released, and recycled.
pv := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{Name: "fake-pv"},
Spec: api.PersistentVolumeSpec{
PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{Path: "foo"}},
Capacity: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("10G")},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
},
}
pvc := &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{Name: "fake-pvc"},
Spec: api.PersistentVolumeClaimSpec{
Resources: api.ResourceRequirements{Requests: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse("5G")}},
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
},
}
watch, _ := client.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), "0")
defer watch.Stop()
_, _ = client.PersistentVolumes().Create(pv)
_, _ = client.PersistentVolumeClaims(api.NamespaceDefault).Create(pvc)
// wait until the binder pairs the volume and claim
waitForPersistentVolumePhase(watch, api.VolumeBound)
// deleting a claim releases the volume, after which it can be recycled
if err := client.PersistentVolumeClaims(api.NamespaceDefault).Delete(pvc.Name); err != nil {
t.Errorf("error deleting claim %s", pvc.Name)
}
waitForPersistentVolumePhase(watch, api.VolumeReleased)
waitForPersistentVolumePhase(watch, api.VolumeAvailable)
}
func waitForPersistentVolumePhase(w watch.Interface, phase api.PersistentVolumePhase) {
for {
event := <-w.ResultChan()
volume := event.Object.(*api.PersistentVolume)
if volume.Status.Phase == phase {
break
}
}
}