mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-03 10:17:46 +00:00
Handle review comments
This commit is contained in:
@@ -479,9 +479,10 @@ func (s *ServiceController) init() error {
|
|||||||
type reconciliationStatus string
|
type reconciliationStatus string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
statusAllOk = reconciliationStatus("ALL_OK")
|
statusAllOk = reconciliationStatus("ALL_OK")
|
||||||
statusError = reconciliationStatus("ERROR")
|
statusRecoverableError = reconciliationStatus("RECOVERABLE_ERROR")
|
||||||
statusNotSynced = reconciliationStatus("NOSYNC")
|
statusNonRecoverableError = reconciliationStatus("NON_RECOVERABLE_ERROR")
|
||||||
|
statusNotSynced = reconciliationStatus("NOSYNC")
|
||||||
)
|
)
|
||||||
|
|
||||||
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// fedServiceWorker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
@@ -494,19 +495,19 @@ func (s *ServiceController) fedServiceWorker() {
|
|||||||
}
|
}
|
||||||
defer s.queue.Done(key)
|
defer s.queue.Done(key)
|
||||||
service := key.(string)
|
service := key.(string)
|
||||||
status, err := s.reconcileService(service)
|
status := s.reconcileService(service)
|
||||||
switch status {
|
switch status {
|
||||||
case statusAllOk:
|
case statusAllOk:
|
||||||
break
|
break
|
||||||
case statusError:
|
|
||||||
runtime.HandleError(fmt.Errorf("Error reconciling service %q: %v, delivering again", service, err))
|
|
||||||
s.deliverService(service, 0, true)
|
|
||||||
case statusNotSynced:
|
case statusNotSynced:
|
||||||
glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service)
|
glog.V(5).Infof("Delivering notification for %q after clusterAvailableDelay", service)
|
||||||
s.deliverService(service, s.clusterAvailableDelay, false)
|
s.deliverService(service, s.clusterAvailableDelay, false)
|
||||||
|
case statusRecoverableError:
|
||||||
|
s.deliverService(service, 0, true)
|
||||||
|
case statusNonRecoverableError:
|
||||||
|
// error is already logged, do nothing
|
||||||
default:
|
default:
|
||||||
runtime.HandleError(fmt.Errorf("Unhandled reconciliation status for %q: %s, delivering again", service, status))
|
// unreachable
|
||||||
s.deliverService(service, s.reviewDelay, false)
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -1207,34 +1208,39 @@ func (s *ServiceController) isSynced() bool {
|
|||||||
|
|
||||||
// reconcileService triggers reconciliation of a federated service with corresponding services in federated clusters.
|
// reconcileService triggers reconciliation of a federated service with corresponding services in federated clusters.
|
||||||
// This function is called on service Addition/Deletion/Updation either in federated cluster or in federation.
|
// This function is called on service Addition/Deletion/Updation either in federated cluster or in federation.
|
||||||
func (s *ServiceController) reconcileService(key string) (reconciliationStatus, error) {
|
func (s *ServiceController) reconcileService(key string) reconciliationStatus {
|
||||||
if !s.isSynced() {
|
if !s.isSynced() {
|
||||||
glog.V(4).Infof("Data store not synced, delaying reconcilation: %v", key)
|
glog.V(4).Infof("Data store not synced, delaying reconcilation: %v", key)
|
||||||
return statusNotSynced, nil
|
return statusNotSynced
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err))
|
runtime.HandleError(fmt.Errorf("Invalid key %q recieved, unable to split key to namespace and name, err: %v", key, err))
|
||||||
return statusError, err
|
return statusNonRecoverableError
|
||||||
}
|
}
|
||||||
|
|
||||||
service, err := s.serviceStore.Services(namespace).Get(name)
|
service, err := s.serviceStore.Services(namespace).Get(name)
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
// Not a federated service, ignoring.
|
// Not a federated service, ignoring.
|
||||||
return statusAllOk, nil
|
return statusAllOk
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return statusError, err
|
runtime.HandleError(fmt.Errorf("Failed to retrieve federated service %q from store: %v", key, err))
|
||||||
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(3).Infof("Reconciling federated service: %s", key)
|
glog.V(3).Infof("Reconciling federated service: %s", key)
|
||||||
|
|
||||||
// Create a copy before modifying the service to prevent race condition with other readers of service from store
|
// Create a copy before modifying the service to prevent race condition with other readers of service from store
|
||||||
fedServiceObj, err := api.Scheme.DeepCopy(service)
|
fedServiceObj, err := api.Scheme.DeepCopy(service)
|
||||||
|
if err != nil {
|
||||||
|
runtime.HandleError(fmt.Errorf("Error in copying obj: %s, %v", key, err))
|
||||||
|
return statusNonRecoverableError
|
||||||
|
}
|
||||||
fedService, ok := fedServiceObj.(*v1.Service)
|
fedService, ok := fedServiceObj.(*v1.Service)
|
||||||
if err != nil || !ok {
|
if err != nil || !ok {
|
||||||
runtime.HandleError(fmt.Errorf("Error in retrieving obj from store: %s, %v", key, err))
|
runtime.HandleError(fmt.Errorf("Unknown obj recieved from store: %#v, %v", fedServiceObj, err))
|
||||||
return statusError, err
|
return statusNonRecoverableError
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle deletion of federated service
|
// Handle deletion of federated service
|
||||||
@@ -1242,19 +1248,19 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus,
|
|||||||
if err := s.delete(fedService); err != nil {
|
if err := s.delete(fedService); err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("Failed to delete %s: %v", key, err))
|
runtime.HandleError(fmt.Errorf("Failed to delete %s: %v", key, err))
|
||||||
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteFailed", "Deleting service failed: %v", err)
|
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteFailed", "Deleting service failed: %v", err)
|
||||||
return statusError, err
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
glog.V(3).Infof("Deleting federated service succeeded: %s", key)
|
glog.V(3).Infof("Deleting federated service succeeded: %s", key)
|
||||||
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteSucceed", "Deleting service succeeded")
|
s.eventRecorder.Eventf(fedService, api.EventTypeNormal, "DeleteSucceed", "Deleting service succeeded")
|
||||||
return statusAllOk, nil
|
return statusAllOk
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the required finalizers before creating a service in underlying clusters. This ensures that the
|
// Add the required finalizers before creating a service in underlying clusters. This ensures that the
|
||||||
// dependent services in underlying clusters are deleted when the federated service is deleted.
|
// dependent services in underlying clusters are deleted when the federated service is deleted.
|
||||||
updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(fedService)
|
updatedServiceObj, err := s.deletionHelper.EnsureFinalizers(fedService)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("Failed to ensure setting finalizer for service %s: %v", key, err)
|
runtime.HandleError(fmt.Errorf("Failed to ensure setting finalizer for service %s: %v", key, err))
|
||||||
return statusError, err
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
fedService = updatedServiceObj.(*v1.Service)
|
fedService = updatedServiceObj.(*v1.Service)
|
||||||
|
|
||||||
@@ -1262,7 +1268,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus,
|
|||||||
clusters, err := s.federatedInformer.GetReadyClusters()
|
clusters, err := s.federatedInformer.GetReadyClusters()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("Failed to get ready cluster list: %v", err))
|
runtime.HandleError(fmt.Errorf("Failed to get ready cluster list: %v", err))
|
||||||
return statusError, err
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
|
|
||||||
newLBStatus := newLoadbalancerStatus()
|
newLBStatus := newLoadbalancerStatus()
|
||||||
@@ -1272,7 +1278,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus,
|
|||||||
// Aggregate all operations to perform on all federated clusters
|
// Aggregate all operations to perform on all federated clusters
|
||||||
operation, err := s.getOperationsToPerformOnCluster(cluster, fedService)
|
operation, err := s.getOperationsToPerformOnCluster(cluster, fedService)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return statusError, err
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
if operation != nil {
|
if operation != nil {
|
||||||
operations = append(operations, *operation)
|
operations = append(operations, *operation)
|
||||||
@@ -1281,7 +1287,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus,
|
|||||||
// Aggregate LoadBalancerStatus from all services in federated clusters to update status in federated service
|
// Aggregate LoadBalancerStatus from all services in federated clusters to update status in federated service
|
||||||
lbStatus, err := s.getServiceStatusInCluster(cluster, key)
|
lbStatus, err := s.getServiceStatusInCluster(cluster, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return statusError, err
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
if len(lbStatus.Ingress) > 0 {
|
if len(lbStatus.Ingress) > 0 {
|
||||||
newLBStatus.Ingress = append(newLBStatus.Ingress, lbStatus.Ingress...)
|
newLBStatus.Ingress = append(newLBStatus.Ingress, lbStatus.Ingress...)
|
||||||
@@ -1289,7 +1295,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus,
|
|||||||
// Add/Update federated service ingress only if there are reachable endpoints backing the lb service
|
// Add/Update federated service ingress only if there are reachable endpoints backing the lb service
|
||||||
endpoints, err := s.getServiceEndpointsInCluster(cluster, key)
|
endpoints, err := s.getServiceEndpointsInCluster(cluster, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return statusError, err
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
// if there are no endpoints created for the service then the loadbalancer ingress
|
// if there are no endpoints created for the service then the loadbalancer ingress
|
||||||
// is not reachable, so do not consider such loadbalancer ingresses for federated
|
// is not reachable, so do not consider such loadbalancer ingresses for federated
|
||||||
@@ -1313,7 +1319,7 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.IsAlreadyExists(err) {
|
if !errors.IsAlreadyExists(err) {
|
||||||
runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err))
|
runtime.HandleError(fmt.Errorf("Failed to execute updates for %s: %v", key, err))
|
||||||
return statusError, err
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1321,11 +1327,11 @@ func (s *ServiceController) reconcileService(key string) (reconciliationStatus,
|
|||||||
// Update the federated service if there are any updates in clustered service (status/endpoints)
|
// Update the federated service if there are any updates in clustered service (status/endpoints)
|
||||||
err = s.updateFederatedService(fedService, newLBStatus, newServiceIngress)
|
err = s.updateFederatedService(fedService, newLBStatus, newServiceIngress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return statusError, err
|
return statusRecoverableError
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(5).Infof("Everything is in order in federated clusters for service %s", key)
|
glog.V(5).Infof("Everything is in order in federated clusters for service %s", key)
|
||||||
return statusAllOk, nil
|
return statusAllOk
|
||||||
}
|
}
|
||||||
|
|
||||||
// getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service
|
// getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service
|
||||||
|
Reference in New Issue
Block a user