The capability to control duration via controller-manager flags,

and the option to shut off reconciliation.
This commit is contained in:
chrislovecnm
2017-01-06 15:24:51 -07:00
parent de59ede6b2
commit a973c38c7d
9 changed files with 58 additions and 25 deletions

View File

@@ -58,10 +58,6 @@ const (
// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
// DesiredStateOfWorldPopulator loop waits between successive executions
desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 1 * time.Minute
// reconcilerSyncDuration is the amount of time the reconciler sync states loop
// wait between successive executions
reconcilerSyncDuration time.Duration = 5 * time.Second
)
// AttachDetachController defines the operations supported by this controller.
@@ -78,7 +74,9 @@ func NewAttachDetachController(
pvcInformer kcache.SharedInformer,
pvInformer kcache.SharedInformer,
cloud cloudprovider.Interface,
plugins []volume.VolumePlugin) (AttachDetachController, error) {
plugins []volume.VolumePlugin,
disableReconciliation bool,
reconcilerSyncDuration time.Duration) (AttachDetachController, error) {
// TODO: The default resyncPeriod for shared informers is 12 hours, this is
// unacceptable for the attach/detach controller. For example, if a pod is
// skipped because the node it is scheduled to didn't set its annotation in
@@ -131,10 +129,13 @@ func NewAttachDetachController(
false)) // flag for experimental binary check for volume mount
adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater(
kubeClient, nodeInformer, adc.actualStateOfWorld)
// Default these to values in options
adc.reconciler = reconciler.NewReconciler(
reconcilerLoopPeriod,
reconcilerMaxWaitForUnmountDuration,
reconcilerSyncDuration,
disableReconciliation,
adc.desiredStateOfWorld,
adc.actualStateOfWorld,
adc.attacherDetacher,

View File

@@ -41,7 +41,9 @@ func Test_NewAttachDetachController_Positive(t *testing.T) {
pvcInformer,
pvInformer,
nil, /* cloud */
nil /* plugins */)
nil, /* plugins */
false,
time.Second*5)
// Assert
if err != nil {

View File

@@ -57,6 +57,7 @@ func NewReconciler(
loopPeriod time.Duration,
maxWaitForUnmountDuration time.Duration,
syncDuration time.Duration,
disableReconciliation bool,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld,
attacherDetacher operationexecutor.OperationExecutor,
@@ -65,6 +66,7 @@ func NewReconciler(
loopPeriod: loopPeriod,
maxWaitForUnmountDuration: maxWaitForUnmountDuration,
syncDuration: syncDuration,
disableReconciliation: disableReconciliation,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
attacherDetacher: attacherDetacher,
@@ -82,18 +84,27 @@ type reconciler struct {
attacherDetacher operationexecutor.OperationExecutor
nodeStatusUpdater statusupdater.NodeStatusUpdater
timeOfLastSync time.Time
disableReconciliation bool
}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(rc.reconciliationLoopFunc(), rc.loopPeriod, stopCh)
}
// reconciliationLoopFunc this can be disabled via cli option disableReconciliation.
// It periodically checks whether the attached volumes from actual state
// are still attached to the node and udpate the status if they are not.
func (rc *reconciler) reconciliationLoopFunc() func() {
return func() {
rc.reconcile()
// reconciler periodically checks whether the attached volumes from actual state
// are still attached to the node and udpate the status if they are not.
if time.Since(rc.timeOfLastSync) > rc.syncDuration {
if rc.disableReconciliation {
glog.V(5).Info("Skipping reconciling attached volumes still attached since it is disabled via the command line.")
} else if rc.syncDuration < time.Second {
glog.V(5).Info("Skipping reconciling attached volumes still attached since it is set to less than one second via the command line.")
} else if time.Since(rc.timeOfLastSync) > rc.syncDuration {
glog.V(5).Info("Starting reconciling attached volumes still attached")
rc.sync()
}
}

View File

@@ -55,7 +55,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
nsu := statusupdater.NewNodeStatusUpdater(
fakeKubeClient, nodeInformer, asw)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
// Act
ch := make(chan struct{})
@@ -83,7 +83,7 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
@@ -129,7 +129,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
@@ -196,7 +196,7 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
@@ -263,7 +263,7 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, dsw, asw, ad, nsu)
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu)
podName := "pod-uid"
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)