mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Big move and rename
- remove persistentvolume_ prefix from all files - split controller.go into controller.go and controller_base.go (to have them under 1500 lines for github)
This commit is contained in:
parent
c5fe1f943c
commit
7f549511e2
@ -22,17 +22,13 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/errors"
|
|
||||||
"k8s.io/kubernetes/pkg/client/cache"
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/controller/framework"
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
"k8s.io/kubernetes/pkg/conversion"
|
"k8s.io/kubernetes/pkg/conversion"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
|
||||||
vol "k8s.io/kubernetes/pkg/volume"
|
vol "k8s.io/kubernetes/pkg/volume"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
@ -144,280 +140,6 @@ type PersistentVolumeController struct {
|
|||||||
createProvisionedPVInterval time.Duration
|
createProvisionedPVInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPersistentVolumeController creates a new PersistentVolumeController
|
|
||||||
func NewPersistentVolumeController(
|
|
||||||
kubeClient clientset.Interface,
|
|
||||||
syncPeriod time.Duration,
|
|
||||||
provisioner vol.ProvisionableVolumePlugin,
|
|
||||||
recyclers []vol.VolumePlugin,
|
|
||||||
cloud cloudprovider.Interface,
|
|
||||||
clusterName string) *PersistentVolumeController {
|
|
||||||
|
|
||||||
broadcaster := record.NewBroadcaster()
|
|
||||||
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
|
||||||
recorder := broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"})
|
|
||||||
|
|
||||||
controller := &PersistentVolumeController{
|
|
||||||
kubeClient: kubeClient,
|
|
||||||
eventRecorder: recorder,
|
|
||||||
runningOperations: make(map[string]bool),
|
|
||||||
cloud: cloud,
|
|
||||||
provisioner: provisioner,
|
|
||||||
clusterName: clusterName,
|
|
||||||
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
|
|
||||||
createProvisionedPVInterval: createProvisionedPVInterval,
|
|
||||||
}
|
|
||||||
controller.recyclePluginMgr.InitPlugins(recyclers, controller)
|
|
||||||
if controller.provisioner != nil {
|
|
||||||
if err := controller.provisioner.Init(controller); err != nil {
|
|
||||||
glog.Errorf("PersistentVolumeController: error initializing provisioner plugin: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
volumeSource := &cache.ListWatch{
|
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
|
||||||
return kubeClient.Core().PersistentVolumes().List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
|
||||||
return kubeClient.Core().PersistentVolumes().Watch(options)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
claimSource := &cache.ListWatch{
|
|
||||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
|
||||||
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
|
|
||||||
},
|
|
||||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
|
||||||
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
controller.initializeController(syncPeriod, volumeSource, claimSource)
|
|
||||||
|
|
||||||
return controller
|
|
||||||
}
|
|
||||||
|
|
||||||
// initializeController prepares watching for PersistentVolume and
|
|
||||||
// PersistentVolumeClaim events from given sources. This should be used to
|
|
||||||
// initialize the controller for real operation (with real event sources) and
|
|
||||||
// also during testing (with fake ones).
|
|
||||||
func (ctrl *PersistentVolumeController) initializeController(syncPeriod time.Duration, volumeSource, claimSource cache.ListerWatcher) {
|
|
||||||
glog.V(4).Infof("initializing PersistentVolumeController, sync every %s", syncPeriod.String())
|
|
||||||
ctrl.volumes.store, ctrl.volumeController = framework.NewIndexerInformer(
|
|
||||||
volumeSource,
|
|
||||||
&api.PersistentVolume{},
|
|
||||||
syncPeriod,
|
|
||||||
framework.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: ctrl.addVolume,
|
|
||||||
UpdateFunc: ctrl.updateVolume,
|
|
||||||
DeleteFunc: ctrl.deleteVolume,
|
|
||||||
},
|
|
||||||
cache.Indexers{"accessmodes": accessModesIndexFunc},
|
|
||||||
)
|
|
||||||
ctrl.claims, ctrl.claimController = framework.NewInformer(
|
|
||||||
claimSource,
|
|
||||||
&api.PersistentVolumeClaim{},
|
|
||||||
syncPeriod,
|
|
||||||
framework.ResourceEventHandlerFuncs{
|
|
||||||
AddFunc: ctrl.addClaim,
|
|
||||||
UpdateFunc: ctrl.updateClaim,
|
|
||||||
DeleteFunc: ctrl.deleteClaim,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// addVolume is callback from framework.Controller watching PersistentVolume
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
|
||||||
if !ctrl.isFullySynced() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
pv, ok := obj.(*api.PersistentVolume)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := ctrl.syncVolume(pv); err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the controller
|
|
||||||
// recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateVolume is callback from framework.Controller watching PersistentVolume
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) {
|
|
||||||
if !ctrl.isFullySynced() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
newVolume, ok := newObj.(*api.PersistentVolume)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := ctrl.syncVolume(newVolume); err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the controller
|
|
||||||
// recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// deleteVolume is callback from framework.Controller watching PersistentVolume
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
|
|
||||||
if !ctrl.isFullySynced() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var volume *api.PersistentVolume
|
|
||||||
var ok bool
|
|
||||||
volume, ok = obj.(*api.PersistentVolume)
|
|
||||||
if !ok {
|
|
||||||
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
|
||||||
volume, ok = unknown.Obj.(*api.PersistentVolume)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolume but deleteVolume received %+v", unknown.Obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Expected PersistentVolume but deleteVolume received %+v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !ok || volume == nil || volume.Spec.ClaimRef == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists {
|
|
||||||
if claim, ok := claimObj.(*api.PersistentVolumeClaim); ok && claim != nil {
|
|
||||||
// sync the claim when its volume is deleted. Explicitly syncing the
|
|
||||||
// claim here in response to volume deletion prevents the claim from
|
|
||||||
// waiting until the next sync period for its Lost status.
|
|
||||||
err := ctrl.syncClaim(claim)
|
|
||||||
if err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the
|
|
||||||
// controller recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", claimToClaimKey(claim), err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", claimToClaimKey(claim), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Cannot convert object from claim cache to claim %q!?: %+v", claimrefToClaimKey(volume.Spec.ClaimRef), claimObj)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// addClaim is callback from framework.Controller watching PersistentVolumeClaim
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
|
|
||||||
if !ctrl.isFullySynced() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
claim, ok := obj.(*api.PersistentVolumeClaim)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := ctrl.syncClaim(claim); err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the controller
|
|
||||||
// recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateClaim is callback from framework.Controller watching PersistentVolumeClaim
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) {
|
|
||||||
if !ctrl.isFullySynced() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := ctrl.syncClaim(newClaim); err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the controller
|
|
||||||
// recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// deleteClaim is callback from framework.Controller watching PersistentVolumeClaim
|
|
||||||
// events.
|
|
||||||
func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
|
|
||||||
if !ctrl.isFullySynced() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var volume *api.PersistentVolume
|
|
||||||
var claim *api.PersistentVolumeClaim
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
claim, ok = obj.(*api.PersistentVolumeClaim)
|
|
||||||
if !ok {
|
|
||||||
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
|
||||||
claim, ok = unknown.Obj.(*api.PersistentVolumeClaim)
|
|
||||||
if !ok {
|
|
||||||
glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %+v", unknown.Obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %+v", obj)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !ok || claim == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists {
|
|
||||||
if volume, ok = pvObj.(*api.PersistentVolume); ok {
|
|
||||||
// sync the volume when its claim is deleted. Explicitly sync'ing the
|
|
||||||
// volume here in response to claim deletion prevents the volume from
|
|
||||||
// waiting until the next sync period for its Release.
|
|
||||||
if volume != nil {
|
|
||||||
err := ctrl.syncVolume(volume)
|
|
||||||
if err != nil {
|
|
||||||
if errors.IsConflict(err) {
|
|
||||||
// Version conflict error happens quite often and the
|
|
||||||
// controller recovers from it easily.
|
|
||||||
glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
|
|
||||||
} else {
|
|
||||||
glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
glog.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, pvObj)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncClaim is the main controller method to decide what to do with a claim.
|
// syncClaim is the main controller method to decide what to do with a claim.
|
||||||
// It's invoked by appropriate framework.Controller callbacks when a claim is
|
// It's invoked by appropriate framework.Controller callbacks when a claim is
|
||||||
// created, updated or periodically synced. We do not differentiate between
|
// created, updated or periodically synced. We do not differentiate between
|
||||||
@ -761,36 +483,6 @@ func (ctrl *PersistentVolumeController) syncVolume(volume *api.PersistentVolume)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts all of this controller's control loops
|
|
||||||
func (ctrl *PersistentVolumeController) Run() {
|
|
||||||
glog.V(4).Infof("starting PersistentVolumeController")
|
|
||||||
|
|
||||||
if ctrl.volumeControllerStopCh == nil {
|
|
||||||
ctrl.volumeControllerStopCh = make(chan struct{})
|
|
||||||
go ctrl.volumeController.Run(ctrl.volumeControllerStopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ctrl.claimControllerStopCh == nil {
|
|
||||||
ctrl.claimControllerStopCh = make(chan struct{})
|
|
||||||
go ctrl.claimController.Run(ctrl.claimControllerStopCh)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop gracefully shuts down this controller
|
|
||||||
func (ctrl *PersistentVolumeController) Stop() {
|
|
||||||
glog.V(4).Infof("stopping PersistentVolumeController")
|
|
||||||
close(ctrl.volumeControllerStopCh)
|
|
||||||
close(ctrl.claimControllerStopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
// isFullySynced returns true, if both volume and claim caches are fully loaded
|
|
||||||
// after startup.
|
|
||||||
// We do not want to process events with not fully loaded caches - e.g. we might
|
|
||||||
// recycle/delete PVs that don't have corresponding claim in the cache yet.
|
|
||||||
func (ctrl *PersistentVolumeController) isFullySynced() bool {
|
|
||||||
return ctrl.volumeController.HasSynced() && ctrl.claimController.HasSynced()
|
|
||||||
}
|
|
||||||
|
|
||||||
// updateClaimPhase saves new claim phase to API server.
|
// updateClaimPhase saves new claim phase to API server.
|
||||||
func (ctrl *PersistentVolumeController) updateClaimPhase(claim *api.PersistentVolumeClaim, phase api.PersistentVolumeClaimPhase) (*api.PersistentVolumeClaim, error) {
|
func (ctrl *PersistentVolumeController) updateClaimPhase(claim *api.PersistentVolumeClaim, phase api.PersistentVolumeClaimPhase) (*api.PersistentVolumeClaim, error) {
|
||||||
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: set phase %s", claimToClaimKey(claim), phase)
|
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: set phase %s", claimToClaimKey(claim), phase)
|
||||||
@ -1530,49 +1222,3 @@ func (ctrl *PersistentVolumeController) finishRunningOperation(operationName str
|
|||||||
func (ctrl *PersistentVolumeController) startRunningOperation(operationName string) {
|
func (ctrl *PersistentVolumeController) startRunningOperation(operationName string) {
|
||||||
ctrl.runningOperations[operationName] = true
|
ctrl.runningOperations[operationName] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stateless functions
|
|
||||||
|
|
||||||
func hasAnnotation(obj api.ObjectMeta, ann string) bool {
|
|
||||||
_, found := obj.Annotations[ann]
|
|
||||||
return found
|
|
||||||
}
|
|
||||||
|
|
||||||
func setAnnotation(obj *api.ObjectMeta, ann string, value string) {
|
|
||||||
if obj.Annotations == nil {
|
|
||||||
obj.Annotations = make(map[string]string)
|
|
||||||
}
|
|
||||||
obj.Annotations[ann] = value
|
|
||||||
}
|
|
||||||
|
|
||||||
func getClaimStatusForLogging(claim *api.PersistentVolumeClaim) string {
|
|
||||||
everBound := hasAnnotation(claim.ObjectMeta, annBindCompleted)
|
|
||||||
boundByController := hasAnnotation(claim.ObjectMeta, annBoundByController)
|
|
||||||
|
|
||||||
return fmt.Sprintf("phase: %s, bound to: %q, wasEverBound: %v, boundByController: %v", claim.Status.Phase, claim.Spec.VolumeName, everBound, boundByController)
|
|
||||||
}
|
|
||||||
|
|
||||||
func getVolumeStatusForLogging(volume *api.PersistentVolume) string {
|
|
||||||
boundByController := hasAnnotation(volume.ObjectMeta, annBoundByController)
|
|
||||||
claimName := ""
|
|
||||||
if volume.Spec.ClaimRef != nil {
|
|
||||||
claimName = fmt.Sprintf("%s/%s (uid: %s)", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, volume.Spec.ClaimRef.UID)
|
|
||||||
}
|
|
||||||
return fmt.Sprintf("phase: %s, bound to: %q, boundByController: %v", volume.Status.Phase, claimName, boundByController)
|
|
||||||
}
|
|
||||||
|
|
||||||
// isVolumeBoundToClaim returns true, if given volume is pre-bound or bound
|
|
||||||
// to specific claim. Both claim.Name and claim.Namespace must be equal.
|
|
||||||
// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
|
|
||||||
func isVolumeBoundToClaim(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) bool {
|
|
||||||
if volume.Spec.ClaimRef == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
390
pkg/controller/persistentvolume/controller_base.go
Normal file
390
pkg/controller/persistentvolume/controller_base.go
Normal file
@ -0,0 +1,390 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package persistentvolume
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/errors"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
vol "k8s.io/kubernetes/pkg/volume"
|
||||||
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This file contains the controller base functionality, i.e. framework to
|
||||||
|
// process PV/PVC added/updated/deleted events. The real binding, provisioning,
|
||||||
|
// recycling and deleting is done in controller.go
|
||||||
|
|
||||||
|
// NewPersistentVolumeController creates a new PersistentVolumeController
|
||||||
|
func NewPersistentVolumeController(
|
||||||
|
kubeClient clientset.Interface,
|
||||||
|
syncPeriod time.Duration,
|
||||||
|
provisioner vol.ProvisionableVolumePlugin,
|
||||||
|
recyclers []vol.VolumePlugin,
|
||||||
|
cloud cloudprovider.Interface,
|
||||||
|
clusterName string) *PersistentVolumeController {
|
||||||
|
|
||||||
|
broadcaster := record.NewBroadcaster()
|
||||||
|
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
||||||
|
recorder := broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"})
|
||||||
|
|
||||||
|
controller := &PersistentVolumeController{
|
||||||
|
kubeClient: kubeClient,
|
||||||
|
eventRecorder: recorder,
|
||||||
|
runningOperations: make(map[string]bool),
|
||||||
|
cloud: cloud,
|
||||||
|
provisioner: provisioner,
|
||||||
|
clusterName: clusterName,
|
||||||
|
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
|
||||||
|
createProvisionedPVInterval: createProvisionedPVInterval,
|
||||||
|
}
|
||||||
|
controller.recyclePluginMgr.InitPlugins(recyclers, controller)
|
||||||
|
if controller.provisioner != nil {
|
||||||
|
if err := controller.provisioner.Init(controller); err != nil {
|
||||||
|
glog.Errorf("PersistentVolumeController: error initializing provisioner plugin: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
volumeSource := &cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return kubeClient.Core().PersistentVolumes().List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return kubeClient.Core().PersistentVolumes().Watch(options)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
claimSource := &cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
controller.initializeController(syncPeriod, volumeSource, claimSource)
|
||||||
|
|
||||||
|
return controller
|
||||||
|
}
|
||||||
|
|
||||||
|
// initializeController prepares watching for PersistentVolume and
|
||||||
|
// PersistentVolumeClaim events from given sources. This should be used to
|
||||||
|
// initialize the controller for real operation (with real event sources) and
|
||||||
|
// also during testing (with fake ones).
|
||||||
|
func (ctrl *PersistentVolumeController) initializeController(syncPeriod time.Duration, volumeSource, claimSource cache.ListerWatcher) {
|
||||||
|
glog.V(4).Infof("initializing PersistentVolumeController, sync every %s", syncPeriod.String())
|
||||||
|
ctrl.volumes.store, ctrl.volumeController = framework.NewIndexerInformer(
|
||||||
|
volumeSource,
|
||||||
|
&api.PersistentVolume{},
|
||||||
|
syncPeriod,
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: ctrl.addVolume,
|
||||||
|
UpdateFunc: ctrl.updateVolume,
|
||||||
|
DeleteFunc: ctrl.deleteVolume,
|
||||||
|
},
|
||||||
|
cache.Indexers{"accessmodes": accessModesIndexFunc},
|
||||||
|
)
|
||||||
|
ctrl.claims, ctrl.claimController = framework.NewInformer(
|
||||||
|
claimSource,
|
||||||
|
&api.PersistentVolumeClaim{},
|
||||||
|
syncPeriod,
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: ctrl.addClaim,
|
||||||
|
UpdateFunc: ctrl.updateClaim,
|
||||||
|
DeleteFunc: ctrl.deleteClaim,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// addVolume is callback from framework.Controller watching PersistentVolume
|
||||||
|
// events.
|
||||||
|
func (ctrl *PersistentVolumeController) addVolume(obj interface{}) {
|
||||||
|
if !ctrl.isFullySynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pv, ok := obj.(*api.PersistentVolume)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("expected PersistentVolume but handler received %+v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := ctrl.syncVolume(pv); err != nil {
|
||||||
|
if errors.IsConflict(err) {
|
||||||
|
// Version conflict error happens quite often and the controller
|
||||||
|
// recovers from it easily.
|
||||||
|
glog.V(3).Infof("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("PersistentVolumeController could not add volume %q: %+v", pv.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateVolume is callback from framework.Controller watching PersistentVolume
|
||||||
|
// events.
|
||||||
|
func (ctrl *PersistentVolumeController) updateVolume(oldObj, newObj interface{}) {
|
||||||
|
if !ctrl.isFullySynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
newVolume, ok := newObj.(*api.PersistentVolume)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := ctrl.syncVolume(newVolume); err != nil {
|
||||||
|
if errors.IsConflict(err) {
|
||||||
|
// Version conflict error happens quite often and the controller
|
||||||
|
// recovers from it easily.
|
||||||
|
glog.V(3).Infof("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("PersistentVolumeController could not update volume %q: %+v", newVolume.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteVolume is callback from framework.Controller watching PersistentVolume
|
||||||
|
// events.
|
||||||
|
func (ctrl *PersistentVolumeController) deleteVolume(obj interface{}) {
|
||||||
|
if !ctrl.isFullySynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var volume *api.PersistentVolume
|
||||||
|
var ok bool
|
||||||
|
volume, ok = obj.(*api.PersistentVolume)
|
||||||
|
if !ok {
|
||||||
|
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
||||||
|
volume, ok = unknown.Obj.(*api.PersistentVolume)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Expected PersistentVolume but deleteVolume received %+v", unknown.Obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Errorf("Expected PersistentVolume but deleteVolume received %+v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok || volume == nil || volume.Spec.ClaimRef == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if claimObj, exists, _ := ctrl.claims.GetByKey(claimrefToClaimKey(volume.Spec.ClaimRef)); exists {
|
||||||
|
if claim, ok := claimObj.(*api.PersistentVolumeClaim); ok && claim != nil {
|
||||||
|
// sync the claim when its volume is deleted. Explicitly syncing the
|
||||||
|
// claim here in response to volume deletion prevents the claim from
|
||||||
|
// waiting until the next sync period for its Lost status.
|
||||||
|
err := ctrl.syncClaim(claim)
|
||||||
|
if err != nil {
|
||||||
|
if errors.IsConflict(err) {
|
||||||
|
// Version conflict error happens quite often and the
|
||||||
|
// controller recovers from it easily.
|
||||||
|
glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", claimToClaimKey(claim), err)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", claimToClaimKey(claim), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Errorf("Cannot convert object from claim cache to claim %q!?: %+v", claimrefToClaimKey(volume.Spec.ClaimRef), claimObj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// addClaim is callback from framework.Controller watching PersistentVolumeClaim
|
||||||
|
// events.
|
||||||
|
func (ctrl *PersistentVolumeController) addClaim(obj interface{}) {
|
||||||
|
if !ctrl.isFullySynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
claim, ok := obj.(*api.PersistentVolumeClaim)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := ctrl.syncClaim(claim); err != nil {
|
||||||
|
if errors.IsConflict(err) {
|
||||||
|
// Version conflict error happens quite often and the controller
|
||||||
|
// recovers from it easily.
|
||||||
|
glog.V(3).Infof("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("PersistentVolumeController could not add claim %q: %+v", claimToClaimKey(claim), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateClaim is callback from framework.Controller watching PersistentVolumeClaim
|
||||||
|
// events.
|
||||||
|
func (ctrl *PersistentVolumeController) updateClaim(oldObj, newObj interface{}) {
|
||||||
|
if !ctrl.isFullySynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Expected PersistentVolumeClaim but updateClaim received %+v", newObj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := ctrl.syncClaim(newClaim); err != nil {
|
||||||
|
if errors.IsConflict(err) {
|
||||||
|
// Version conflict error happens quite often and the controller
|
||||||
|
// recovers from it easily.
|
||||||
|
glog.V(3).Infof("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("PersistentVolumeController could not update claim %q: %+v", claimToClaimKey(newClaim), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteClaim is callback from framework.Controller watching PersistentVolumeClaim
|
||||||
|
// events.
|
||||||
|
func (ctrl *PersistentVolumeController) deleteClaim(obj interface{}) {
|
||||||
|
if !ctrl.isFullySynced() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var volume *api.PersistentVolume
|
||||||
|
var claim *api.PersistentVolumeClaim
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
claim, ok = obj.(*api.PersistentVolumeClaim)
|
||||||
|
if !ok {
|
||||||
|
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
|
||||||
|
claim, ok = unknown.Obj.(*api.PersistentVolumeClaim)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %+v", unknown.Obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Errorf("Expected PersistentVolumeClaim but deleteClaim received %+v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok || claim == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if pvObj, exists, _ := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName); exists {
|
||||||
|
if volume, ok = pvObj.(*api.PersistentVolume); ok {
|
||||||
|
// sync the volume when its claim is deleted. Explicitly sync'ing the
|
||||||
|
// volume here in response to claim deletion prevents the volume from
|
||||||
|
// waiting until the next sync period for its Release.
|
||||||
|
if volume != nil {
|
||||||
|
err := ctrl.syncVolume(volume)
|
||||||
|
if err != nil {
|
||||||
|
if errors.IsConflict(err) {
|
||||||
|
// Version conflict error happens quite often and the
|
||||||
|
// controller recovers from it easily.
|
||||||
|
glog.V(3).Infof("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
|
||||||
|
} else {
|
||||||
|
glog.Errorf("PersistentVolumeController could not update volume %q from deleteClaim handler: %+v", volume.Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.Errorf("Cannot convert object from volume cache to volume %q!?: %+v", claim.Spec.VolumeName, pvObj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts all of this controller's control loops
|
||||||
|
func (ctrl *PersistentVolumeController) Run() {
|
||||||
|
glog.V(4).Infof("starting PersistentVolumeController")
|
||||||
|
|
||||||
|
if ctrl.volumeControllerStopCh == nil {
|
||||||
|
ctrl.volumeControllerStopCh = make(chan struct{})
|
||||||
|
go ctrl.volumeController.Run(ctrl.volumeControllerStopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ctrl.claimControllerStopCh == nil {
|
||||||
|
ctrl.claimControllerStopCh = make(chan struct{})
|
||||||
|
go ctrl.claimController.Run(ctrl.claimControllerStopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop gracefully shuts down this controller
|
||||||
|
func (ctrl *PersistentVolumeController) Stop() {
|
||||||
|
glog.V(4).Infof("stopping PersistentVolumeController")
|
||||||
|
close(ctrl.volumeControllerStopCh)
|
||||||
|
close(ctrl.claimControllerStopCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
// isFullySynced returns true, if both volume and claim caches are fully loaded
|
||||||
|
// after startup.
|
||||||
|
// We do not want to process events with not fully loaded caches - e.g. we might
|
||||||
|
// recycle/delete PVs that don't have corresponding claim in the cache yet.
|
||||||
|
func (ctrl *PersistentVolumeController) isFullySynced() bool {
|
||||||
|
return ctrl.volumeController.HasSynced() && ctrl.claimController.HasSynced()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stateless functions
|
||||||
|
|
||||||
|
func hasAnnotation(obj api.ObjectMeta, ann string) bool {
|
||||||
|
_, found := obj.Annotations[ann]
|
||||||
|
return found
|
||||||
|
}
|
||||||
|
|
||||||
|
func setAnnotation(obj *api.ObjectMeta, ann string, value string) {
|
||||||
|
if obj.Annotations == nil {
|
||||||
|
obj.Annotations = make(map[string]string)
|
||||||
|
}
|
||||||
|
obj.Annotations[ann] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
func getClaimStatusForLogging(claim *api.PersistentVolumeClaim) string {
|
||||||
|
bound := hasAnnotation(claim.ObjectMeta, annBindCompleted)
|
||||||
|
boundByController := hasAnnotation(claim.ObjectMeta, annBoundByController)
|
||||||
|
|
||||||
|
return fmt.Sprintf("phase: %s, bound to: %q, bindCompleted: %v, boundByController: %v", claim.Status.Phase, claim.Spec.VolumeName, bound, boundByController)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getVolumeStatusForLogging(volume *api.PersistentVolume) string {
|
||||||
|
boundByController := hasAnnotation(volume.ObjectMeta, annBoundByController)
|
||||||
|
claimName := ""
|
||||||
|
if volume.Spec.ClaimRef != nil {
|
||||||
|
claimName = fmt.Sprintf("%s/%s (uid: %s)", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, volume.Spec.ClaimRef.UID)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("phase: %s, bound to: %q, boundByController: %v", volume.Status.Phase, claimName, boundByController)
|
||||||
|
}
|
||||||
|
|
||||||
|
// isVolumeBoundToClaim returns true, if given volume is pre-bound or bound
|
||||||
|
// to specific claim. Both claim.Name and claim.Namespace must be equal.
|
||||||
|
// If claim.UID is present in volume.Spec.ClaimRef, it must be equal too.
|
||||||
|
func isVolumeBoundToClaim(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) bool {
|
||||||
|
if volume.Spec.ClaimRef == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if claim.Name != volume.Spec.ClaimRef.Name || claim.Namespace != volume.Spec.ClaimRef.Namespace {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if volume.Spec.ClaimRef.UID != "" && claim.UID != volume.Spec.ClaimRef.UID {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user