add names for workqueues to gather controller latency/depth metrics

This commit is contained in:
deads2k 2016-08-22 14:15:45 -04:00
parent 0f8869d308
commit 4317173d3f
13 changed files with 15 additions and 15 deletions

View File

@ -79,7 +79,7 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
cc := &CertificateController{
kubeClient: kubeClient,
queue: workqueue.New(),
queue: workqueue.NewNamed("certificate"),
signer: ca,
approveAllKubeletCSRsForGroup: approveAllKubeletCSRsForGroup,
}

View File

@ -125,7 +125,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
},
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.New(),
queue: workqueue.NewNamed("daemonset"),
}
// Manage addition/update of daemon sets.
dsc.dsStore.Store, dsc.dsController = framework.NewInformer(

View File

@ -107,7 +107,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
dc := &DeploymentController{
client: client,
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
}
dc.dStore.Indexer, dc.dController = framework.NewIndexerInformer(

View File

@ -88,7 +88,7 @@ func NewDisruptionController(podInformer framework.SharedIndexInformer, kubeClie
dc := &DisruptionController{
kubeClient: kubeClient,
podController: podInformer.GetController(),
queue: workqueue.New(),
queue: workqueue.NewNamed("disruption"),
broadcaster: record.NewBroadcaster(),
}
dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})

View File

@ -76,7 +76,7 @@ func NewEndpointController(podInformer framework.SharedIndexInformer, client *cl
}
e := &EndpointController{
client: client,
queue: workqueue.New(),
queue: workqueue.NewNamed("endpoint"),
}
e.serviceStore.Store, e.serviceController = framework.NewInformer(

View File

@ -94,7 +94,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
},
expectations: controller.NewControllerExpectations(),
queue: workqueue.New(),
queue: workqueue.NewNamed("job"),
recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
}

View File

@ -67,7 +67,7 @@ func NewNamespaceController(
namespaceController := &NamespaceController{
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
groupVersionResources: groupVersionResources,
opCache: operationNotSupportedCache{},
finalizerToken: finalizerToken,

View File

@ -94,7 +94,7 @@ func NewPetSetController(podInformer framework.SharedIndexInformer, kubeClient *
newSyncer: func(blockingPet *pcb) *petSyncer {
return &petSyncer{pc, blockingPet}
},
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "petset"),
}
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{

View File

@ -139,7 +139,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.New(),
queue: workqueue.NewNamed("replicaset"),
garbageCollectorEnabled: garbageCollectorEnabled,
}

View File

@ -143,7 +143,7 @@ func newReplicationManager(eventRecorder record.EventRecorder, podInformer frame
},
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"),
garbageCollectorEnabled: garbageCollectorEnabled,
}

View File

@ -79,8 +79,8 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *Resour
// build the resource quota controller
rq := &ResourceQuotaController{
kubeClient: options.KubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller_resourcequota_primary"),
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "controller_resourcequota_priority"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
resyncPeriod: options.ResyncPeriod,
registry: options.Registry,
replenishmentControllers: []framework.ControllerInterface{},

View File

@ -81,8 +81,8 @@ func NewTokensController(cl clientset.Interface, options TokensControllerOptions
token: options.TokenGenerator,
rootCA: options.RootCA,
syncServiceAccountQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncSecretQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncServiceAccountQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount_tokens_service"),
syncSecretQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount_tokens_secret"),
maxRetries: maxRetries,
}

View File

@ -105,7 +105,7 @@ func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, loc
registry: registry,
queue: workqueue.New(),
queue: workqueue.NewNamed("admission_quota_controller"),
work: map[string][]*admissionWaiter{},
dirtyWork: map[string][]*admissionWaiter{},
inProgress: sets.String{},