Merge pull request #32776 from m1093782566/m109-fix-endpoint-controller-hotloop

Automatic merge from submit-queue

[Controller Manager] Fix endpoint controller hot loop and use utilruntime.HandleError to replace glog.Errorf

<!--  Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md
2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md
3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes
-->

**Why**:

Fix endpoint controller hot loop and use `utilruntime.HandleError` to replace `glog.Errorf`

**What**

1. Fix endpoint controller hot loop in `pkg/controller/endpoint`

2.  Fix endpoint controller hot loop in `contrib/mesos/pkg/service`

3. Sweep cases of `glog.Errorf` and use `utilruntime.HandleError` instead.

**Which issue this PR fixes**

Fixes #32843
Related issue is #30629 

**Special notes for your reviewer**:

@deads2k @derekwaynecarr 

The changes on `pkg/controller/endpoints_controller.go` and `contrib/mesos/pkg/service/endpoints_controller.go` are almost the same except `contrib/mesos/pkg/service/endpoints_controller.go` does not pass `podInformer` as the parameter of `NewEndpointController()`. 

So, I didn't wait `podStoreSynced` before `syncService()`(Just leave it as it was). Will it lead to a problem?
This commit is contained in:
Kubernetes Submit Queue 2016-09-17 20:01:41 -07:00 committed by GitHub
commit 41fc0a4506
2 changed files with 100 additions and 92 deletions

View File

@ -53,7 +53,7 @@ type EndpointController interface {
func NewEndpointController(client *clientset.Clientset) *endpointController {
e := &endpointController{
client: client,
queue: workqueue.New(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
}
e.serviceStore.Store, e.serviceController = cache.NewInformer(
&cache.ListWatch{
@ -108,7 +108,7 @@ type endpointController struct {
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue *workqueue.Type
queue workqueue.RateLimitingInterface
// Since we join two objects, we'll watch both of them with
// controllers.
@ -158,7 +158,7 @@ func (e *endpointController) addPod(obj interface{}) {
pod := obj.(*api.Pod)
services, err := e.getPodServiceMemberships(pod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err))
return
}
for key := range services {
@ -176,7 +176,7 @@ func (e *endpointController) updatePod(old, cur interface{}) {
newPod := old.(*api.Pod)
services, err := e.getPodServiceMemberships(newPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
return
}
@ -185,7 +185,7 @@ func (e *endpointController) updatePod(old, cur interface{}) {
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
return
}
services = services.Union(oldServices)
@ -207,9 +207,9 @@ func (e *endpointController) deletePod(obj interface{}) {
}
podKey, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
}
glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, kservice.FullServiceResyncPeriod)
glog.V(4).Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, kservice.FullServiceResyncPeriod)
// TODO: keep a map of pods to services to handle this condition.
}
@ -218,7 +218,7 @@ func (e *endpointController) deletePod(obj interface{}) {
func (e *endpointController) enqueueService(obj interface{}) {
key, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
}
e.queue.Add(key)
@ -229,24 +229,35 @@ func (e *endpointController) enqueueService(obj interface{}) {
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *endpointController) worker() {
for {
func() {
key, quit := e.queue.Get()
if quit {
return
}
// Use defer: in the unlikely event that there's a
// panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e.queue.Done(key)
e.syncService(key.(string))
}()
for e.processNextWorkItem() {
}
}
func (e *endpointController) processNextWorkItem() bool {
eKey, quit := e.queue.Get()
if quit {
return false
}
// Use defer: in the unlikely event that there's a
// panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e.queue.Done(eKey)
err := e.syncService(eKey.(string))
if err == nil {
e.queue.Forget(eKey)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %v failed with %v", eKey, err))
e.queue.AddRateLimited(eKey)
return true
}
// HACK(sttts): add annotations to the endpoint about the respective container ports
func (e *endpointController) syncService(key string) {
func (e *endpointController) syncService(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
@ -260,23 +271,23 @@ func (e *endpointController) syncService(key string) {
// doesn't completely solve the problem. See #6877.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
utilruntime.HandleError(fmt.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err))
// Don't retry, as the key isn't going to magically become understandable.
return
return nil
}
err = e.client.Endpoints(namespace).Delete(name, nil)
if err != nil && !errors.IsNotFound(err) {
glog.Errorf("Error deleting endpoint %q: %v", key, err)
e.queue.Add(key) // Retry
utilruntime.HandleError(fmt.Errorf("Error deleting endpoint %q: %v", key, err))
return err
}
return
return nil
}
service := obj.(*api.Service)
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return
return nil
}
glog.V(5).Infof("About to update endpoints for service %q", key)
@ -284,9 +295,8 @@ func (e *endpointController) syncService(key string) {
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
glog.Errorf("Error syncing service %q: %v", key, err)
e.queue.Add(key) // Retry
return
utilruntime.HandleError(fmt.Errorf("Error syncing service %q: %v", key, err))
return err
}
subsets := []api.EndpointSubset{}
@ -346,14 +356,13 @@ func (e *endpointController) syncService(key string) {
},
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
e.queue.Add(key) // Retry
return
utilruntime.HandleError(fmt.Errorf("Error getting endpoints: %v", err))
return err
}
}
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
glog.V(5).Infof("Endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return
return nil
}
newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets
@ -374,9 +383,10 @@ func (e *endpointController) syncService(key string) {
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
}
if err != nil {
glog.Errorf("Error updating endpoints: %v", err)
e.queue.Add(key) // Retry
utilruntime.HandleError(fmt.Errorf("Error updating endpoints: %v", err))
return err
}
return nil
}
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
@ -388,14 +398,14 @@ func (e *endpointController) syncService(key string) {
func (e *endpointController) checkLeftoverEndpoints() {
list, err := e.client.Endpoints(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)
utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
return
}
for i := range list.Items {
ep := &list.Items[i]
key, err := keyFunc(ep)
if err != nil {
glog.Errorf("Unable to get key for endpoint %#v", ep)
utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
continue
}
e.queue.Add(key)

View File

@ -25,6 +25,7 @@ import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
@ -75,7 +76,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client
}
e := &EndpointController{
client: client,
queue: workqueue.NewNamed("endpoint"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
}
e.serviceStore.Store, e.serviceController = cache.NewInformer(
@ -139,7 +140,7 @@ type EndpointController struct {
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue *workqueue.Type
queue workqueue.RateLimitingInterface
// Since we join two objects, we'll watch both of them with
// controllers.
@ -154,8 +155,15 @@ type EndpointController struct {
// endpoints will be handled in parallel.
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer e.queue.ShutDown()
go e.serviceController.Run(stopCh)
go e.podController.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, e.podStoreSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(e.worker, time.Second, stopCh)
}
@ -170,7 +178,6 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
}
<-stopCh
e.queue.ShutDown()
}
func (e *EndpointController) getPodServiceMemberships(pod *api.Pod) (sets.String, error) {
@ -197,7 +204,7 @@ func (e *EndpointController) addPod(obj interface{}) {
pod := obj.(*api.Pod)
services, err := e.getPodServiceMemberships(pod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err))
return
}
for key := range services {
@ -218,7 +225,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
}
services, err := e.getPodServiceMemberships(newPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
return
}
@ -227,7 +234,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) {
!hostNameAndDomainAreEqual(newPod, oldPod) {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
return
}
services = services.Union(oldServices)
@ -274,10 +281,10 @@ func (e *EndpointController) deletePod(obj interface{}) {
}
podKey, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", obj, err))
return
}
glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod)
glog.V(4).Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod)
// TODO: keep a map of pods to services to handle this condition.
}
@ -286,7 +293,7 @@ func (e *EndpointController) deletePod(obj interface{}) {
func (e *EndpointController) enqueueService(obj interface{}) {
key, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
@ -298,36 +305,35 @@ func (e *EndpointController) enqueueService(obj interface{}) {
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *EndpointController) worker() {
for {
func() {
key, quit := e.queue.Get()
if quit {
return
}
// Use defer: in the unlikely event that there's a
// panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e.queue.Done(key)
e.syncService(key.(string))
}()
for e.processNextWorkItem() {
}
}
func (e *EndpointController) syncService(key string) {
func (e *EndpointController) processNextWorkItem() bool {
eKey, quit := e.queue.Get()
if quit {
return false
}
defer e.queue.Done(eKey)
err := e.syncService(eKey.(string))
if err == nil {
e.queue.Forget(eKey)
return true
}
utilruntime.HandleError(fmt.Errorf("Sync %v failed with %v", eKey, err))
e.queue.AddRateLimited(eKey)
return true
}
func (e *EndpointController) syncService(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
}()
if !e.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.V(4).Infof("Waiting for pods controller to sync, requeuing service %v", key)
e.queue.Add(key)
return
}
obj, exists, err := e.serviceStore.Store.GetByKey(key)
if err != nil || !exists {
// Delete the corresponding endpoint, as the service has been deleted.
@ -337,23 +343,22 @@ func (e *EndpointController) syncService(key string) {
// doesn't completely solve the problem. See #6877.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
utilruntime.HandleError(fmt.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err))
// Don't retry, as the key isn't going to magically become understandable.
return
return nil
}
err = e.client.Endpoints(namespace).Delete(name, nil)
if err != nil && !errors.IsNotFound(err) {
glog.Errorf("Error deleting endpoint %q: %v", key, err)
e.queue.Add(key) // Retry
return err
}
return
return nil
}
service := obj.(*api.Service)
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return
return nil
}
glog.V(5).Infof("About to update endpoints for service %q", key)
@ -361,9 +366,7 @@ func (e *EndpointController) syncService(key string) {
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
glog.Errorf("Error syncing service %q: %v", key, err)
e.queue.Add(key) // Retry
return
return err
}
subsets := []api.EndpointSubset{}
@ -375,7 +378,7 @@ func (e *EndpointController) syncService(key string) {
if err == nil {
tolerateUnreadyEndpoints = b
} else {
glog.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err)
utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err))
}
}
@ -457,9 +460,7 @@ func (e *EndpointController) syncService(key string) {
},
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
e.queue.Add(key) // Retry
return
return err
}
}
@ -467,9 +468,7 @@ func (e *EndpointController) syncService(key string) {
if len(podHostNames) > 0 {
b, err := json.Marshal(podHostNames)
if err != nil {
glog.Errorf("Error updating endpoints. Marshalling of hostnames failed.: %v", err)
e.queue.Add(key) // Retry
return
return err
}
serializedPodHostNames = string(b)
}
@ -479,7 +478,7 @@ func (e *EndpointController) syncService(key string) {
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) &&
reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return
return nil
}
newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets
@ -509,11 +508,10 @@ func (e *EndpointController) syncService(key string) {
// 2. policy is misconfigured, in which case no service would function anywhere.
// Given the frequency of 1, we log at a lower level.
glog.V(5).Infof("Forbidden from creating endpoints: %v", err)
} else {
utilruntime.HandleError(err)
}
e.queue.Add(key) // Retry
return err
}
return nil
}
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
@ -525,14 +523,14 @@ func (e *EndpointController) syncService(key string) {
func (e *EndpointController) checkLeftoverEndpoints() {
list, err := e.client.Endpoints(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)
utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
return
}
for i := range list.Items {
ep := &list.Items[i]
key, err := keyFunc(ep)
if err != nil {
glog.Errorf("Unable to get key for endpoint %#v", ep)
utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
continue
}
e.queue.Add(key)