diff --git a/federation/pkg/federation-controller/service/BUILD b/federation/pkg/federation-controller/service/BUILD index 994f64fd6c6..0d8472bd3db 100644 --- a/federation/pkg/federation-controller/service/BUILD +++ b/federation/pkg/federation-controller/service/BUILD @@ -23,6 +23,7 @@ go_library( "//federation/pkg/dnsprovider:go_default_library", "//federation/pkg/dnsprovider/rrstype:go_default_library", "//federation/pkg/federation-controller/util:go_default_library", + "//federation/pkg/federation-controller/util/clusterselector:go_default_library", "//federation/pkg/federation-controller/util/deletionhelper:go_default_library", "//pkg/api:go_default_library", "//pkg/api/v1:go_default_library", diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 69a547aa185..10b6c2a2b85 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -42,6 +42,7 @@ import ( v1beta1 "k8s.io/kubernetes/federation/apis/federation/v1beta1" fedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset" fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" @@ -475,7 +476,7 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus { operations := make([]fedutil.FederatedOperation, 0) for _, cluster := range clusters { // Aggregate all operations to perform on all federated clusters - operation, err := s.getOperationsToPerformOnCluster(cluster, fedService) + operation, err := getOperationsToPerformOnCluster(s.federatedInformer, cluster, fedService, clusterselector.SendToCluster) if err != nil { return statusRecoverableError } @@ -529,43 +530,40 @@ func (s *ServiceController) reconcileService(key string) reconciliationStatus { return statusAllOk } +type clusterSelectorFunc func(map[string]string, map[string]string) (bool, error) + // getOperationsToPerformOnCluster returns the operations to be performed so that clustered service is in sync with federated service -func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Cluster, fedService *v1.Service) (*fedutil.FederatedOperation, error) { +func getOperationsToPerformOnCluster(informer fedutil.FederatedInformer, cluster *v1beta1.Cluster, fedService *v1.Service, selector clusterSelectorFunc) (*fedutil.FederatedOperation, error) { var operation *fedutil.FederatedOperation + var operationType fedutil.FederatedOperationType = "" key := types.NamespacedName{Namespace: fedService.Namespace, Name: fedService.Name}.String() - clusterServiceObj, serviceFound, err := s.federatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + clusterServiceObj, found, err := informer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { runtime.HandleError(fmt.Errorf("Failed to get %s service from %s: %v", key, cluster.Name, err)) return nil, err } - if !serviceFound { - desiredService := &v1.Service{ - ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta), - Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)), - } - desiredService.ResourceVersion = "" - glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService) + send, err := selector(cluster.Labels, fedService.ObjectMeta.Annotations) + if err != nil { + glog.Errorf("Error processing ClusterSelector cluster: %s for service map: %s error: %s", cluster.Name, key, err.Error()) + return nil, err + } else if !send { + glog.V(5).Infof("Skipping cluster: %s for service: %s reason: cluster selectors do not match: %-v %-v", cluster.Name, key, cluster.ObjectMeta.Labels, fedService.ObjectMeta.Annotations[v1beta1.FederationClusterSelectorAnnotation]) + } - operation = &fedutil.FederatedOperation{ - Type: fedutil.OperationTypeAdd, - Obj: desiredService, - ClusterName: cluster.Name, - Key: key, - } - } else { + desiredService := &v1.Service{ + ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(fedService.ObjectMeta), + Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)), + } + switch { + case found && send: clusterService, ok := clusterServiceObj.(*v1.Service) if !ok { runtime.HandleError(fmt.Errorf("Unexpected error for %q: %v", key, err)) return nil, err } - desiredService := &v1.Service{ - ObjectMeta: fedutil.DeepCopyRelevantObjectMeta(clusterService.ObjectMeta), - Spec: *(fedutil.DeepCopyApiTypeOrPanic(&fedService.Spec).(*v1.ServiceSpec)), - } - // ClusterIP and NodePort are allocated to Service by cluster, so retain the same if any while updating desiredService.Spec.ClusterIP = clusterService.Spec.ClusterIP for _, cPort := range clusterService.Spec.Ports { @@ -583,21 +581,32 @@ func (s *ServiceController) getOperationsToPerformOnCluster(cluster *v1beta1.Clu // Update existing service, if needed. if !Equivalent(desiredService, clusterService) { + operationType = fedutil.OperationTypeUpdate + glog.V(4).Infof("Service in underlying cluster %s does not match, Desired: %+v, Existing: %+v", cluster.Name, desiredService, clusterService) // ResourceVersion of cluster service can be different from federated service, // so do not update ResourceVersion while updating cluster service desiredService.ResourceVersion = clusterService.ResourceVersion - - operation = &fedutil.FederatedOperation{ - Type: fedutil.OperationTypeUpdate, - Obj: desiredService, - ClusterName: cluster.Name, - Key: key, - } } else { glog.V(5).Infof("Service in underlying cluster %s is up to date: %+v", cluster.Name, desiredService) } + case found && !send: + operationType = fedutil.OperationTypeDelete + case !found && send: + operationType = fedutil.OperationTypeAdd + desiredService.ResourceVersion = "" + + glog.V(4).Infof("Creating service in underlying cluster %s: %+v", cluster.Name, desiredService) + } + + if len(operationType) > 0 { + operation = &fedutil.FederatedOperation{ + Type: operationType, + Obj: desiredService, + ClusterName: cluster.Name, + Key: key, + } } return operation, nil } diff --git a/federation/pkg/federation-controller/service/servicecontroller_test.go b/federation/pkg/federation-controller/service/servicecontroller_test.go index c1ba3f4580a..a98ae0936c4 100644 --- a/federation/pkg/federation-controller/service/servicecontroller_test.go +++ b/federation/pkg/federation-controller/service/servicecontroller_test.go @@ -53,6 +53,8 @@ const ( serviceEndpoint2 = "192.168.1.1" ) +var awfulError error = errors.NewGone("Something bad happened") + func TestServiceController(t *testing.T) { glog.Infof("Creating fake infrastructure") fedClient := &fakefedclientset.Clientset{} @@ -207,6 +209,53 @@ func TestServiceController(t *testing.T) { close(stop) } +func TestGetOperationsToPerformOnCluster(t *testing.T) { + obj := NewService("test-service-1", 80) + cluster1 := NewClusterWithRegionZone("cluster1", v1.ConditionTrue, "region1", "zone1") + fedClient := &fakefedclientset.Clientset{} + sc := New(fedClient) + + testCases := map[string]struct { + expectedSendErr bool + sendToCluster bool + operationType fedutil.FederatedOperationType + }{ + "sendToCluster error returned": { + expectedSendErr: true, + }, + "Missing object and not matching ClusterSelector should result in no operations": { + sendToCluster: false, + }, + "Missing object and matching ClusterSelector should result in add operation": { + operationType: fedutil.OperationTypeAdd, + sendToCluster: true, + }, + // Update and Delete scenarios are tested in TestServiceController + } + for testName, testCase := range testCases { + t.Run(testName, func(t *testing.T) { + + operations, err := getOperationsToPerformOnCluster(sc.federatedInformer, cluster1, obj, func(map[string]string, map[string]string) (bool, error) { + if testCase.expectedSendErr { + return false, awfulError + } + return testCase.sendToCluster, nil + }) + if testCase.expectedSendErr { + require.Error(t, err, "An error was expected") + } else { + require.NoError(t, err, "An error was not expected") + } + if len(testCase.operationType) == 0 { + require.Nil(t, operations, "An operation was not expected") + } else { + require.NotNil(t, operations, "A single operation was expected") + require.Equal(t, testCase.operationType, operations.Type, "Unexpected operation returned") + } + }) + } +} + func NewService(name string, port int32) *v1.Service { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{