diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 8617b5f3192..ee07c237474 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -479,9 +479,10 @@ func (s *ServiceController) init() error { type reconciliationStatus string const ( - statusAllOk = reconciliationStatus("ALL_OK") - statusError = reconciliationStatus("ERROR") - statusNotSynced = reconciliationStatus("NOSYNC") + statusAllOk = reconciliationStatus("ALL_OK") + statusRecoverableError = reconciliationStatus("RECOVERABLE_ERROR") + statusNonRecoverableError = reconciliationStatus("NON_RECOVERABLE_ERROR") + statusNotSynced = reconciliationStatus("NOSYNC") ) // fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done. @@ -494,19 +495,19 @@ func (s *ServiceController) fedServiceWorker() { } defer s.queue.Done(key) service := key.(string) - status, err := s.reconcileService(service) + status := s.reconcileService(service) switch status { case statusAllOk: break - case statusError: - runtime.HandleError(fmt.Errorf("Error reconciling service %q: %v, delivering again", service, err)) - s.deliverService(service, 0, true) case statusNotSynced: glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service) s.deliverService(service, s.clusterAvailableDelay, false) + case statusRecoverableError: + s.deliverService(service, 0, true) + case statusNonRecoverableError: + // error is already logged, do nothing default: - runtime.HandleError(fmt.Errorf("Unhandled reconciliation status for %q: %s, delivering again", service, status)) - s.deliverService(service, s.reviewDelay, false) + // unreachable } }() } @@ -1207,34 +1208,39 @@ func (s *ServiceController) isSynced() bool { // reconcileService triggers reconciliation of a federated service with corresponding services in federated clusters. // This function is called on service Addition/Deletion/Updation either in federated cluster or in federation. -func (s *ServiceController) reconcileService(key string) (reconciliationStatus, error) { +func (s *ServiceController) reconcileService(key string) reconciliationStatus { if !s.isSynced() { glog.V(4).Infof("Data store not synced, delaying reconcilation: %v", key) - return statusNotSynced, nil + return statusNotSynced } namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err)) - return statusError, err + runtime.HandleError(fmt.Errorf("Invalid key %q recieved, unable to split key to namespace and name, err: %v", key, err)) + return statusNonRecoverableError } service, err := s.serviceStore.Services(namespace).Get(name) if errors.IsNotFound(err) { // Not a federated service, ignoring. - return statusAllOk, nil + return statusAllOk } else if err != nil { - return statusError, err + runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err)) + return statusRecoverableError } glog.V(3).Infof("Reconciling federated service: %s", key) // Create a copy before modifying the service to prevent race condition with other readers of service from store fedServiceObj, err := api.Scheme.DeepCopy(service) + if err != nil { + runtime.HandleError(fmt.Errorf("Error in copying obj: %s, %v", key, err)) + return statusNonRecoverableError + } fedService, ok := fedServiceObj.(*v1.Service) if err != nil || !ok { - runtime.HandleError(fmt.Errorf("Error in retrieving obj from store: %s, %v", key, err)) - return statusError, err + runtime.HandleError(fmt.Errorf("Unknown obj recieved from store: %#v, %v", fedServiceObj, err)) + return statusNonRecoverableError } // Handle deletion of federated service @@ -1242,19 +1248,19 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, if err := s.delete(fedService); err != nil { runtime.HandleError(fmt.Errorf("Failed to delete %s: %v", key, err)) s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteFailed", "Deleting service failed: %v", err) - return statusError, err + return statusRecoverableError } glog.V(3).Infof("Deleting federated service succeeded: %s", key) s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteSucceed", "Deleting service succeeded") - return statusAllOk, nil + return statusAllOk } // Add the required finalizers before creating a service in underlying clusters. This ensures that the // dependent services in underlying clusters are deleted when the federated service is deleted. updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(fedService) if err != nil { - glog.Warningf("Failed to ensure setting finalizer for service %s: %v", key, err) - return statusError, err + runtime.HandleError(fmt.Errorf("Failed to ensure setting finalizer for service %s: %v", key, err)) + return statusRecoverableError } fedService = updatedServiceObj.(*v1.Service) @@ -1262,7 +1268,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, clusters, err := s.federatedInformer.GetReadyClusters() if err != nil { runtime.HandleError(fmt.Errorf("Failed to get ready cluster list: %v", err)) - return statusError, err + return statusRecoverableError } newLBStatus := newLoadbalancerStatus() @@ -1272,7 +1278,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, // Aggregate all operations to perform on all federated clusters operation, err := s.getOperationsToPerformOnCluster(cluster, fedService) if err != nil { - return statusError, err + return statusRecoverableError } if operation != nil { operations = append(operations, *operation) @@ -1281,7 +1287,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, // Aggregate LoadBalancerStatus from all services in federated clusters to update status in federated service lbStatus, err := s.getServiceStatusInCluster(cluster, key) if err != nil { - return statusError, err + return statusRecoverableError } if len(lbStatus.Ingress) > 0 { newLBStatus.Ingress = append(newLBStatus.Ingress, lbStatus.Ingress...) @@ -1289,7 +1295,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, // Add/Update federated service ingress only if there are reachable endpoints backing the lb service endpoints, err := s.getServiceEndpointsInCluster(cluster, key) if err != nil { - return statusError, err + return statusRecoverableError } // if there are no endpoints created for the service then the loadbalancer ingress // is not reachable, so do not consider such loadbalancer ingresses for federated @@ -1313,7 +1319,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, if err != nil { if !errors.IsAlreadyExists(err) { runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err)) - return statusError, err + return statusRecoverableError } } } @@ -1321,11 +1327,11 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus, // Update the federated service if there are any updates in clustered service (status/endpoints) err = s.updateFederatedService(fedService, newLBStatus, newServiceIngress) if err != nil { - return statusError, err + return statusRecoverableError } glog.V(5).Infof("Everything is in order in federated clusters for service %s", key) - return statusAllOk, nil + return statusAllOk } // getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service