Refactor volume controller parameters into a structure

persistentvolumecontroller.NewPersistentVolumeController has 11 arguments now,
put them into a structure.

Also, rename NewPersistentVolumeController to NewController, persistentvolume
is already name of the package.

Fixes #30219
This commit is contained in:
Jan Safranek 2016-09-26 14:15:25 +02:00
parent 13357bd653
commit a54c9e2887
5 changed files with 75 additions and 81 deletions

View File

@ -425,19 +425,16 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, stop <
if err != nil { if err != nil {
glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
} }
volumeController := persistentvolumecontroller.NewPersistentVolumeController( params := persistentvolumecontroller.ControllerParameters{
client("persistent-volume-binder"), KubeClient: client("persistent-volume-binder"),
s.PVClaimBinderSyncPeriod.Duration, SyncPeriod: s.PVClaimBinderSyncPeriod.Duration,
alphaProvisioner, AlphaProvisioner: alphaProvisioner,
ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
cloud, Cloud: cloud,
s.ClusterName, ClusterName: s.ClusterName,
nil, // volumeSource EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
nil, // claimSource }
nil, // classSource volumeController := persistentvolumecontroller.NewController(params)
nil, // eventRecorder
s.VolumeConfiguration.EnableDynamicProvisioning,
)
volumeController.Run(wait.NeverStop) volumeController.Run(wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -294,19 +294,16 @@ func (s *CMServer) Run(_ []string) error {
if err != nil { if err != nil {
glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err) glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
} }
volumeController := persistentvolumecontroller.NewPersistentVolumeController( params := persistentvolumecontroller.ControllerParameters{
clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")), KubeClient: clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")),
s.PVClaimBinderSyncPeriod.Duration, SyncPeriod: s.PVClaimBinderSyncPeriod.Duration,
alphaProvisioner, AlphaProvisioner: alphaProvisioner,
kubecontrollermanager.ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), VolumePlugins: kubecontrollermanager.ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
cloud, Cloud: cloud,
s.ClusterName, ClusterName: s.ClusterName,
nil, // volumeSource EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
nil, // claimSource }
nil, // classSource volumeController := persistentvolumecontroller.NewController(params)
nil, // eventRecorder
s.VolumeConfiguration.EnableDynamicProvisioning,
)
volumeController.Run(wait.NeverStop) volumeController.Run(wait.NeverStop)
var rootCA []byte var rootCA []byte

View File

@ -594,19 +594,18 @@ func newTestController(kubeClient clientset.Interface, volumeSource, claimSource
if classSource == nil { if classSource == nil {
classSource = fcache.NewFakeControllerSource() classSource = fcache.NewFakeControllerSource()
} }
ctrl := NewPersistentVolumeController(
kubeClient, params := ControllerParameters{
5*time.Second, // sync period KubeClient: kubeClient,
nil, // alpha provisioner SyncPeriod: 5 * time.Second,
[]vol.VolumePlugin{}, // recyclers VolumePlugins: []vol.VolumePlugin{},
nil, // cloud VolumeSource: volumeSource,
"", ClaimSource: claimSource,
volumeSource, ClassSource: classSource,
claimSource, EventRecorder: record.NewFakeRecorder(1000),
classSource, EnableDynamicProvisioning: enableDynamicProvisioning,
record.NewFakeRecorder(1000), // event recorder }
enableDynamicProvisioning, ctrl := NewController(params)
)
// Speed up the test // Speed up the test
ctrl.createProvisionedPVInterval = 5 * time.Millisecond ctrl.createProvisionedPVInterval = 5 * time.Millisecond

View File

@ -43,77 +43,84 @@ import (
// process PV/PVC added/updated/deleted events. The real binding, provisioning, // process PV/PVC added/updated/deleted events. The real binding, provisioning,
// recycling and deleting is done in pv_controller.go // recycling and deleting is done in pv_controller.go
// NewPersistentVolumeController creates a new PersistentVolumeController // ControllerParameters contains arguments for creation of a new
func NewPersistentVolumeController( // PersistentVolume controller.
kubeClient clientset.Interface, type ControllerParameters struct {
syncPeriod time.Duration, KubeClient clientset.Interface
alphaProvisioner vol.ProvisionableVolumePlugin, SyncPeriod time.Duration
volumePlugins []vol.VolumePlugin, AlphaProvisioner vol.ProvisionableVolumePlugin
cloud cloudprovider.Interface, VolumePlugins []vol.VolumePlugin
clusterName string, Cloud cloudprovider.Interface
volumeSource, claimSource, classSource cache.ListerWatcher, ClusterName string
eventRecorder record.EventRecorder, VolumeSource, ClaimSource, ClassSource cache.ListerWatcher
enableDynamicProvisioning bool, EventRecorder record.EventRecorder
) *PersistentVolumeController { EnableDynamicProvisioning bool
}
// NewController creates a new PersistentVolume controller
func NewController(p ControllerParameters) *PersistentVolumeController {
eventRecorder := p.EventRecorder
if eventRecorder == nil { if eventRecorder == nil {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: p.KubeClient.Core().Events("")})
eventRecorder = broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"}) eventRecorder = broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"})
} }
controller := &PersistentVolumeController{ controller := &PersistentVolumeController{
volumes: newPersistentVolumeOrderedIndex(), volumes: newPersistentVolumeOrderedIndex(),
claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
kubeClient: kubeClient, kubeClient: p.KubeClient,
eventRecorder: eventRecorder, eventRecorder: eventRecorder,
runningOperations: goroutinemap.NewGoRoutineMap(false /* exponentialBackOffOnError */), runningOperations: goroutinemap.NewGoRoutineMap(false /* exponentialBackOffOnError */),
cloud: cloud, cloud: p.Cloud,
enableDynamicProvisioning: enableDynamicProvisioning, enableDynamicProvisioning: p.EnableDynamicProvisioning,
clusterName: clusterName, clusterName: p.ClusterName,
createProvisionedPVRetryCount: createProvisionedPVRetryCount, createProvisionedPVRetryCount: createProvisionedPVRetryCount,
createProvisionedPVInterval: createProvisionedPVInterval, createProvisionedPVInterval: createProvisionedPVInterval,
alphaProvisioner: alphaProvisioner, alphaProvisioner: p.AlphaProvisioner,
} }
controller.volumePluginMgr.InitPlugins(volumePlugins, controller) controller.volumePluginMgr.InitPlugins(p.VolumePlugins, controller)
if controller.alphaProvisioner != nil { if controller.alphaProvisioner != nil {
if err := controller.alphaProvisioner.Init(controller); err != nil { if err := controller.alphaProvisioner.Init(controller); err != nil {
glog.Errorf("PersistentVolumeController: error initializing alpha provisioner plugin: %v", err) glog.Errorf("PersistentVolumeController: error initializing alpha provisioner plugin: %v", err)
} }
} }
volumeSource := p.VolumeSource
if volumeSource == nil { if volumeSource == nil {
volumeSource = &cache.ListWatch{ volumeSource = &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().PersistentVolumes().List(options) return p.KubeClient.Core().PersistentVolumes().List(options)
}, },
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().PersistentVolumes().Watch(options) return p.KubeClient.Core().PersistentVolumes().Watch(options)
}, },
} }
} }
controller.volumeSource = volumeSource controller.volumeSource = volumeSource
claimSource := p.ClaimSource
if claimSource == nil { if claimSource == nil {
claimSource = &cache.ListWatch{ claimSource = &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options) return p.KubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
}, },
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options) return p.KubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
}, },
} }
} }
controller.claimSource = claimSource controller.claimSource = claimSource
classSource := p.ClassSource
if classSource == nil { if classSource == nil {
classSource = &cache.ListWatch{ classSource = &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Storage().StorageClasses().List(options) return p.KubeClient.Storage().StorageClasses().List(options)
}, },
WatchFunc: func(options api.ListOptions) (watch.Interface, error) { WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.Storage().StorageClasses().Watch(options) return p.KubeClient.Storage().StorageClasses().Watch(options)
}, },
} }
} }
@ -122,7 +129,7 @@ func NewPersistentVolumeController(
_, controller.volumeController = cache.NewIndexerInformer( _, controller.volumeController = cache.NewIndexerInformer(
volumeSource, volumeSource,
&api.PersistentVolume{}, &api.PersistentVolume{},
syncPeriod, p.SyncPeriod,
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: controller.addVolume, AddFunc: controller.addVolume,
UpdateFunc: controller.updateVolume, UpdateFunc: controller.updateVolume,
@ -133,7 +140,7 @@ func NewPersistentVolumeController(
_, controller.claimController = cache.NewInformer( _, controller.claimController = cache.NewInformer(
claimSource, claimSource,
&api.PersistentVolumeClaim{}, &api.PersistentVolumeClaim{},
syncPeriod, p.SyncPeriod,
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: controller.addClaim, AddFunc: controller.addClaim,
UpdateFunc: controller.updateClaim, UpdateFunc: controller.updateClaim,
@ -148,7 +155,7 @@ func NewPersistentVolumeController(
classSource, classSource,
&storage.StorageClass{}, &storage.StorageClass{},
controller.classes, controller.classes,
syncPeriod, p.SyncPeriod,
) )
return controller return controller
} }

View File

@ -1124,20 +1124,14 @@ func createClients(ns *api.Namespace, t *testing.T, s *httptest.Server, syncPeri
} }
plugins := []volume.VolumePlugin{plugin} plugins := []volume.VolumePlugin{plugin}
cloud := &fake_cloud.FakeCloud{} cloud := &fake_cloud.FakeCloud{}
ctrl := persistentvolumecontroller.NewController(
syncPeriod = getSyncPeriod(syncPeriod) persistentvolumecontroller.ControllerParameters{
ctrl := persistentvolumecontroller.NewPersistentVolumeController( KubeClient: binderClient,
binderClient, SyncPeriod: getSyncPeriod(syncPeriod),
syncPeriod, VolumePlugins: plugins,
nil, // alpha provisioner Cloud: cloud,
plugins, EnableDynamicProvisioning: true,
cloud, })
"", // cluster name
nil, // volumeSource
nil, // claimSource
nil, // classSource
nil, // eventRecorder
true) // enableDynamicProvisioning
watchPV, err := testClient.PersistentVolumes().Watch(api.ListOptions{}) watchPV, err := testClient.PersistentVolumes().Watch(api.ListOptions{})
if err != nil { if err != nil {