mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-06 07:57:35 +00:00
Merge pull request #25881 from jsafrane/devel/pv-add-cache
Automatic merge from submit-queue volume controller: Add cache with the latest version of PVs and PVCs When the controller binds a PV to PVC, it saves both objects to etcd. However, there is still an old version of these objects in the controller Informer cache. So, when a new PVC comes, the PV is still seen as available and may get bound to the new PVC. This will be blocked by etcd, still, it creates unnecessary traffic that slows everything down. To make everything worse, when periodic sync with the old PVC is performed, this PVC is seen by the controller as Pending (while it's already Bound on etcd) and will be bound to a different PV. Writing to this PV won't be blocked by etcd, only subsequent write of the PVC fails. So, the controller will need to roll back the PV in another transaction(s). The controller can keep itself pretty busy this way. Also, we save bound PVs (and PVCs) as two transactions - we save say PV.Spec first and then .Status. The controller gets "PV.Spec updated" event from etcd and tries to fix the Status, as it seems to the controller it's outdated. This write again fails - there already is a correct version in etcd. As we can't influence the Informer cache, it is read-only to the controller, this patch introduces second cache in the controller, which holds latest and greatest version on PVs and PVCs to prevent these useless writes to etcd . It gets updated with events from etcd *and* after etcd confirms successful save of PV/PVC modified by the controller. The cache stores only *pointers* to PVs/PVCs, so in ideal case it shares the actual object data with the informer cache. They will diverge only for a short time when the controller modifies something and the informer cache did not get update events yet. @kubernetes/sig-storage
This commit is contained in:
@@ -106,10 +106,8 @@ const createProvisionedPVInterval = 10 * time.Second
|
||||
// framework.Controllers that watch PerstentVolume and PersistentVolumeClaim
|
||||
// changes.
|
||||
type PersistentVolumeController struct {
|
||||
volumes persistentVolumeOrderedIndex
|
||||
volumeController *framework.Controller
|
||||
volumeControllerStopCh chan struct{}
|
||||
claims cache.Store
|
||||
claimController *framework.Controller
|
||||
claimControllerStopCh chan struct{}
|
||||
kubeClient clientset.Interface
|
||||
@@ -119,6 +117,14 @@ type PersistentVolumeController struct {
|
||||
provisioner vol.ProvisionableVolumePlugin
|
||||
clusterName string
|
||||
|
||||
// Cache of the last known version of volumes and claims. This cache is
|
||||
// thread safe as long as the volumes/claims there are not modified, they
|
||||
// must be cloned before any modification. These caches get updated both by
|
||||
// "xxx added/updated/deleted" events from etcd and by the controller when
|
||||
// it saves newer version to etcd.
|
||||
volumes persistentVolumeOrderedIndex
|
||||
claims cache.Store
|
||||
|
||||
// PersistentVolumeController keeps track of long running operations and
|
||||
// makes sure it won't start the same operation twice in parallel.
|
||||
// Each operation is identified by unique operationName.
|
||||
@@ -507,6 +513,11 @@ func (ctrl *PersistentVolumeController) updateClaimPhase(claim *api.PersistentVo
|
||||
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: set phase %s failed: %v", claimToClaimKey(claim), phase, err)
|
||||
return newClaim, err
|
||||
}
|
||||
_, err = storeObjectUpdate(ctrl.claims, newClaim, "claim")
|
||||
if err != nil {
|
||||
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
|
||||
return newClaim, err
|
||||
}
|
||||
glog.V(2).Infof("claim %q entered phase %q", claimToClaimKey(claim), phase)
|
||||
return newClaim, nil
|
||||
}
|
||||
@@ -559,6 +570,11 @@ func (ctrl *PersistentVolumeController) updateVolumePhase(volume *api.Persistent
|
||||
glog.V(4).Infof("updating PersistentVolume[%s]: set phase %s failed: %v", volume.Name, phase, err)
|
||||
return newVol, err
|
||||
}
|
||||
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
|
||||
if err != nil {
|
||||
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
|
||||
return newVol, err
|
||||
}
|
||||
glog.V(2).Infof("volume %q entered phase %q", volume.Name, phase)
|
||||
return newVol, err
|
||||
}
|
||||
@@ -639,6 +655,11 @@ func (ctrl *PersistentVolumeController) bindVolumeToClaim(volume *api.Persistent
|
||||
glog.V(4).Infof("updating PersistentVolume[%s]: binding to %q failed: %v", volume.Name, claimToClaimKey(claim), err)
|
||||
return newVol, err
|
||||
}
|
||||
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
|
||||
if err != nil {
|
||||
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
|
||||
return newVol, err
|
||||
}
|
||||
glog.V(4).Infof("updating PersistentVolume[%s]: bound to %q", newVol.Name, claimToClaimKey(claim))
|
||||
return newVol, nil
|
||||
}
|
||||
@@ -696,6 +717,11 @@ func (ctrl *PersistentVolumeController) bindClaimToVolume(claim *api.PersistentV
|
||||
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volume.Name, err)
|
||||
return newClaim, err
|
||||
}
|
||||
_, err = storeObjectUpdate(ctrl.claims, newClaim, "claim")
|
||||
if err != nil {
|
||||
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
|
||||
return newClaim, err
|
||||
}
|
||||
glog.V(4).Infof("updating PersistentVolumeClaim[%s]: bound to %q", claimToClaimKey(claim), volume.Name)
|
||||
return newClaim, nil
|
||||
}
|
||||
@@ -785,6 +811,11 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum
|
||||
glog.V(4).Infof("updating PersistentVolume[%s]: rollback failed: %v", volume.Name, err)
|
||||
return err
|
||||
}
|
||||
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
|
||||
if err != nil {
|
||||
glog.V(4).Infof("updating PersistentVolume[%s]: cannot update internal cache: %v", volume.Name, err)
|
||||
return err
|
||||
}
|
||||
glog.V(4).Infof("updating PersistentVolume[%s]: rolled back", newVol.Name)
|
||||
|
||||
// Update the status
|
||||
@@ -1127,9 +1158,16 @@ func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interfa
|
||||
// Try to create the PV object several times
|
||||
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
|
||||
glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
|
||||
if _, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil {
|
||||
var newVol *api.PersistentVolume
|
||||
if newVol, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil {
|
||||
// Save succeeded.
|
||||
glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))
|
||||
|
||||
_, err = storeObjectUpdate(ctrl.volumes.store, newVol, "volume")
|
||||
if err != nil {
|
||||
// We will get an "volume added" event soon, this is not a big error
|
||||
glog.V(4).Infof("provisionClaimOperation [%s]: cannot update internal cache: %v", volume.Name, err)
|
||||
}
|
||||
break
|
||||
}
|
||||
// Save failed, try again after a while.
|
||||
|
||||
Reference in New Issue
Block a user