From a3888335f785ea93d1af22756ae3313daece1e81 Mon Sep 17 00:00:00 2001 From: d00369826 Date: Thu, 15 Sep 2016 18:30:06 +0800 Subject: [PATCH] fix endpoint controller hot loop Change-Id: I0f667006f310fdca6abe324f9ea03537679e9163 --- .../mesos/pkg/service/endpoints_controller.go | 88 ++++++++------- .../endpoint/endpoints_controller.go | 104 +++++++++--------- 2 files changed, 100 insertions(+), 92 deletions(-) diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 95b9a7ed212..4eef19c8815 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -53,7 +53,7 @@ type EndpointController interface { func NewEndpointController(client *clientset.Clientset) *endpointController { e := &endpointController{ client: client, - queue: workqueue.New(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), } e.serviceStore.Store, e.serviceController = cache.NewInformer( &cache.ListWatch{ @@ -108,7 +108,7 @@ type endpointController struct { // more often than services with few pods; it also would cause a // service that's inserted multiple times to be processed more than // necessary. - queue *workqueue.Type + queue workqueue.RateLimitingInterface // Since we join two objects, we'll watch both of them with // controllers. @@ -158,7 +158,7 @@ func (e *endpointController) addPod(obj interface{}) { pod := obj.(*api.Pod) services, err := e.getPodServiceMemberships(pod) if err != nil { - glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err) + utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)) return } for key := range services { @@ -176,7 +176,7 @@ func (e *endpointController) updatePod(old, cur interface{}) { newPod := old.(*api.Pod) services, err := e.getPodServiceMemberships(newPod) if err != nil { - glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err) + utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)) return } @@ -185,7 +185,7 @@ func (e *endpointController) updatePod(old, cur interface{}) { if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) { oldServices, err := e.getPodServiceMemberships(oldPod) if err != nil { - glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err) + utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)) return } services = services.Union(oldServices) @@ -207,9 +207,9 @@ func (e *endpointController) deletePod(obj interface{}) { } podKey, err := keyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) } - glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, kservice.FullServiceResyncPeriod) + glog.V(4).Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, kservice.FullServiceResyncPeriod) // TODO: keep a map of pods to services to handle this condition. } @@ -218,7 +218,7 @@ func (e *endpointController) deletePod(obj interface{}) { func (e *endpointController) enqueueService(obj interface{}) { key, err := keyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) } e.queue.Add(key) @@ -229,24 +229,35 @@ func (e *endpointController) enqueueService(obj interface{}) { // workqueue guarantees that they will not end up processing the same service // at the same time. func (e *endpointController) worker() { - for { - func() { - key, quit := e.queue.Get() - if quit { - return - } - // Use defer: in the unlikely event that there's a - // panic, we'd still like this to get marked done-- - // otherwise the controller will not be able to sync - // this service again until it is restarted. - defer e.queue.Done(key) - e.syncService(key.(string)) - }() + for e.processNextWorkItem() { } } +func (e *endpointController) processNextWorkItem() bool { + eKey, quit := e.queue.Get() + if quit { + return false + } + // Use defer: in the unlikely event that there's a + // panic, we'd still like this to get marked done-- + // otherwise the controller will not be able to sync + // this service again until it is restarted. + defer e.queue.Done(eKey) + + err := e.syncService(eKey.(string)) + if err == nil { + e.queue.Forget(eKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("Sync %v failed with %v", eKey, err)) + e.queue.AddRateLimited(eKey) + + return true +} + // HACK(sttts): add annotations to the endpoint about the respective container ports -func (e *endpointController) syncService(key string) { +func (e *endpointController) syncService(key string) error { startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) @@ -260,23 +271,23 @@ func (e *endpointController) syncService(key string) { // doesn't completely solve the problem. See #6877. namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err) + utilruntime.HandleError(fmt.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)) // Don't retry, as the key isn't going to magically become understandable. - return + return nil } err = e.client.Endpoints(namespace).Delete(name, nil) if err != nil && !errors.IsNotFound(err) { - glog.Errorf("Error deleting endpoint %q: %v", key, err) - e.queue.Add(key) // Retry + utilruntime.HandleError(fmt.Errorf("Error deleting endpoint %q: %v", key, err)) + return err } - return + return nil } service := obj.(*api.Service) if service.Spec.Selector == nil { // services without a selector receive no endpoints from this controller; // these services will receive the endpoints that are created out-of-band via the REST API. - return + return nil } glog.V(5).Infof("About to update endpoints for service %q", key) @@ -284,9 +295,8 @@ func (e *endpointController) syncService(key string) { if err != nil { // Since we're getting stuff from a local cache, it is // basically impossible to get this error. - glog.Errorf("Error syncing service %q: %v", key, err) - e.queue.Add(key) // Retry - return + utilruntime.HandleError(fmt.Errorf("Error syncing service %q: %v", key, err)) + return err } subsets := []api.EndpointSubset{} @@ -346,14 +356,13 @@ func (e *endpointController) syncService(key string) { }, } } else { - glog.Errorf("Error getting endpoints: %v", err) - e.queue.Add(key) // Retry - return + utilruntime.HandleError(fmt.Errorf("Error getting endpoints: %v", err)) + return err } } if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) { glog.V(5).Infof("Endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) - return + return nil } newEndpoints := currentEndpoints newEndpoints.Subsets = subsets @@ -374,9 +383,10 @@ func (e *endpointController) syncService(key string) { _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) } if err != nil { - glog.Errorf("Error updating endpoints: %v", err) - e.queue.Add(key) // Retry + utilruntime.HandleError(fmt.Errorf("Error updating endpoints: %v", err)) + return err } + return nil } // checkLeftoverEndpoints lists all currently existing endpoints and adds their @@ -388,14 +398,14 @@ func (e *endpointController) syncService(key string) { func (e *endpointController) checkLeftoverEndpoints() { list, err := e.client.Endpoints(api.NamespaceAll).List(api.ListOptions{}) if err != nil { - glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err) + utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)) return } for i := range list.Items { ep := &list.Items[i] key, err := keyFunc(ep) if err != nil { - glog.Errorf("Unable to get key for endpoint %#v", ep) + utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep)) continue } e.queue.Add(key) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index fabd017e210..ba8d4500fbb 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -25,6 +25,7 @@ import ( "encoding/json" + "fmt" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/endpoints" @@ -75,7 +76,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client *client } e := &EndpointController{ client: client, - queue: workqueue.NewNamed("endpoint"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), } e.serviceStore.Store, e.serviceController = cache.NewInformer( @@ -139,7 +140,7 @@ type EndpointController struct { // more often than services with few pods; it also would cause a // service that's inserted multiple times to be processed more than // necessary. - queue *workqueue.Type + queue workqueue.RateLimitingInterface // Since we join two objects, we'll watch both of them with // controllers. @@ -154,8 +155,15 @@ type EndpointController struct { // endpoints will be handled in parallel. func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer e.queue.ShutDown() + go e.serviceController.Run(stopCh) go e.podController.Run(stopCh) + + if !cache.WaitForCacheSync(stopCh, e.podStoreSynced) { + return + } + for i := 0; i < workers; i++ { go wait.Until(e.worker, time.Second, stopCh) } @@ -170,7 +178,6 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { } <-stopCh - e.queue.ShutDown() } func (e *EndpointController) getPodServiceMemberships(pod *api.Pod) (sets.String, error) { @@ -197,7 +204,7 @@ func (e *EndpointController) addPod(obj interface{}) { pod := obj.(*api.Pod) services, err := e.getPodServiceMemberships(pod) if err != nil { - glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err) + utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)) return } for key := range services { @@ -218,7 +225,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) { } services, err := e.getPodServiceMemberships(newPod) if err != nil { - glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err) + utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)) return } @@ -227,7 +234,7 @@ func (e *EndpointController) updatePod(old, cur interface{}) { !hostNameAndDomainAreEqual(newPod, oldPod) { oldServices, err := e.getPodServiceMemberships(oldPod) if err != nil { - glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err) + utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)) return } services = services.Union(oldServices) @@ -274,10 +281,10 @@ func (e *EndpointController) deletePod(obj interface{}) { } podKey, err := keyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %#v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", obj, err)) return } - glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod) + glog.V(4).Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod) // TODO: keep a map of pods to services to handle this condition. } @@ -286,7 +293,7 @@ func (e *EndpointController) deletePod(obj interface{}) { func (e *EndpointController) enqueueService(obj interface{}) { key, err := keyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } @@ -298,36 +305,35 @@ func (e *EndpointController) enqueueService(obj interface{}) { // workqueue guarantees that they will not end up processing the same service // at the same time. func (e *EndpointController) worker() { - for { - func() { - key, quit := e.queue.Get() - if quit { - return - } - // Use defer: in the unlikely event that there's a - // panic, we'd still like this to get marked done-- - // otherwise the controller will not be able to sync - // this service again until it is restarted. - defer e.queue.Done(key) - e.syncService(key.(string)) - }() + for e.processNextWorkItem() { } } -func (e *EndpointController) syncService(key string) { +func (e *EndpointController) processNextWorkItem() bool { + eKey, quit := e.queue.Get() + if quit { + return false + } + defer e.queue.Done(eKey) + + err := e.syncService(eKey.(string)) + if err == nil { + e.queue.Forget(eKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("Sync %v failed with %v", eKey, err)) + e.queue.AddRateLimited(eKey) + + return true +} + +func (e *EndpointController) syncService(key string) error { startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) }() - if !e.podStoreSynced() { - // Sleep so we give the pod reflector goroutine a chance to run. - time.Sleep(PodStoreSyncedPollPeriod) - glog.V(4).Infof("Waiting for pods controller to sync, requeuing service %v", key) - e.queue.Add(key) - return - } - obj, exists, err := e.serviceStore.Store.GetByKey(key) if err != nil || !exists { // Delete the corresponding endpoint, as the service has been deleted. @@ -337,23 +343,22 @@ func (e *EndpointController) syncService(key string) { // doesn't completely solve the problem. See #6877. namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err) + utilruntime.HandleError(fmt.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)) // Don't retry, as the key isn't going to magically become understandable. - return + return nil } err = e.client.Endpoints(namespace).Delete(name, nil) if err != nil && !errors.IsNotFound(err) { - glog.Errorf("Error deleting endpoint %q: %v", key, err) - e.queue.Add(key) // Retry + return err } - return + return nil } service := obj.(*api.Service) if service.Spec.Selector == nil { // services without a selector receive no endpoints from this controller; // these services will receive the endpoints that are created out-of-band via the REST API. - return + return nil } glog.V(5).Infof("About to update endpoints for service %q", key) @@ -361,9 +366,7 @@ func (e *EndpointController) syncService(key string) { if err != nil { // Since we're getting stuff from a local cache, it is // basically impossible to get this error. - glog.Errorf("Error syncing service %q: %v", key, err) - e.queue.Add(key) // Retry - return + return err } subsets := []api.EndpointSubset{} @@ -375,7 +378,7 @@ func (e *EndpointController) syncService(key string) { if err == nil { tolerateUnreadyEndpoints = b } else { - glog.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err) + utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err)) } } @@ -457,9 +460,7 @@ func (e *EndpointController) syncService(key string) { }, } } else { - glog.Errorf("Error getting endpoints: %v", err) - e.queue.Add(key) // Retry - return + return err } } @@ -467,9 +468,7 @@ func (e *EndpointController) syncService(key string) { if len(podHostNames) > 0 { b, err := json.Marshal(podHostNames) if err != nil { - glog.Errorf("Error updating endpoints. Marshalling of hostnames failed.: %v", err) - e.queue.Add(key) // Retry - return + return err } serializedPodHostNames = string(b) } @@ -479,7 +478,7 @@ func (e *EndpointController) syncService(key string) { if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) { glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) - return + return nil } newEndpoints := currentEndpoints newEndpoints.Subsets = subsets @@ -509,11 +508,10 @@ func (e *EndpointController) syncService(key string) { // 2. policy is misconfigured, in which case no service would function anywhere. // Given the frequency of 1, we log at a lower level. glog.V(5).Infof("Forbidden from creating endpoints: %v", err) - } else { - utilruntime.HandleError(err) } - e.queue.Add(key) // Retry + return err } + return nil } // checkLeftoverEndpoints lists all currently existing endpoints and adds their @@ -525,14 +523,14 @@ func (e *EndpointController) syncService(key string) { func (e *EndpointController) checkLeftoverEndpoints() { list, err := e.client.Endpoints(api.NamespaceAll).List(api.ListOptions{}) if err != nil { - glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err) + utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)) return } for i := range list.Items { ep := &list.Items[i] key, err := keyFunc(ep) if err != nil { - glog.Errorf("Unable to get key for endpoint %#v", ep) + utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep)) continue } e.queue.Add(key)