diff --git a/federation/cmd/federation-controller-manager/app/controllermanager.go b/federation/cmd/federation-controller-manager/app/controllermanager.go index 6612330f8a2..5e08b8caaf8 100644 --- a/federation/cmd/federation-controller-manager/app/controllermanager.go +++ b/federation/cmd/federation-controller-manager/app/controllermanager.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options" "k8s.io/kubernetes/federation/pkg/dnsprovider" clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster" + ingresscontroller "k8s.io/kubernetes/federation/pkg/federation-controller/ingress" namespacecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/namespace" replicasetcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/replicaset" secretcontroller "k8s.io/kubernetes/federation/pkg/federation-controller/secret" @@ -136,20 +137,19 @@ func Run(s *options.CMServer) error { } func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error { + glog.Infof("Loading client config for cluster controller %q", "cluster-controller") ccClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller")) + glog.Infof("Running cluster controller") go clustercontroller.NewclusterController(ccClientset, s.ClusterMonitorPeriod.Duration).Run() dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile) if err != nil { glog.Fatalf("Cloud provider could not be initialized: %v", err) } - scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName)) - servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ZoneName) - if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil { - glog.Errorf("Failed to start service controller: %v", err) - } + glog.Infof("Loading client config for namespace controller %q", "namespace-controller") nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller")) namespaceController := namespacecontroller.NewNamespaceController(nsClientset) + glog.Infof("Running namespace controller") namespaceController.Run(wait.NeverStop) secretcontrollerClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "secret-controller")) @@ -160,5 +160,19 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err replicaSetController := replicasetcontroller.NewReplicaSetController(replicaSetClientset) go replicaSetController.Run(s.ConcurrentReplicaSetSyncs, wait.NeverStop) + glog.Infof("Loading client config for ingress controller %q", "ingress-controller") + ingClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "ingress-controller")) + ingressController := ingresscontroller.NewIngressController(ingClientset) + glog.Infof("Running ingress controller") + ingressController.Run(wait.NeverStop) + + glog.Infof("Loading client config for service controller %q", servicecontroller.UserAgentName) + scClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName)) + servicecontroller := servicecontroller.New(scClientset, dns, s.FederationName, s.ZoneName) + glog.Infof("Running service controller") + if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil { + glog.Errorf("Failed to start service controller: %v", err) + } + select {} } diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go new file mode 100644 index 00000000000..685e837301b --- /dev/null +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -0,0 +1,358 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ingress + +import ( + "reflect" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + "k8s.io/kubernetes/pkg/api" + extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + "k8s.io/kubernetes/pkg/client/cache" + kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/flowcontrol" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +const ( + allClustersKey = "ALL_CLUSTERS" + staticIPAnnotationKey = "ingress.kubernetes.io/static-ip" // TODO: Get this directly from the Kubernetes Ingress Controller constant +) + +type IngressController struct { + // For triggering single ingress reconcilation. This is used when there is an + // add/update/delete operation on an ingress in either federated API server or + // in some member of the federation. + ingressDeliverer *util.DelayingDeliverer + + // For triggering reconcilation of all ingresses. This is used when + // a new cluster becomes available. + clusterDeliverer *util.DelayingDeliverer + + // Contains ingresses present in members of federation. + ingressFederatedInformer util.FederatedInformer + // For updating members of federation. + federatedUpdater util.FederatedUpdater + // Definitions of ingresses that should be federated. + ingressInformerStore cache.Store + // Informer controller for ingresses that should be federated. + ingressInformerController framework.ControllerInterface + + // Client to federated api server. + federatedApiClient federation_release_1_4.Interface + + // Backoff manager for ingresses + ingressBackoff *flowcontrol.Backoff + + ingressReviewDelay time.Duration + clusterAvailableDelay time.Duration + smallDelay time.Duration + updateTimeout time.Duration +} + +// NewIngressController returns a new ingress controller +func NewIngressController(client federation_release_1_4.Interface) *IngressController { + ic := &IngressController{ + federatedApiClient: client, + ingressReviewDelay: time.Second * 10, + clusterAvailableDelay: time.Second * 20, + smallDelay: time.Second * 3, + updateTimeout: time.Second * 30, + ingressBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), + } + + // Build deliverers for triggering reconcilations. + ic.ingressDeliverer = util.NewDelayingDeliverer() + ic.clusterDeliverer = util.NewDelayingDeliverer() + + // Start informer in federated API servers on ingresses that should be federated. + ic.ingressInformerStore, ic.ingressInformerController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return client.Extensions().Ingresses(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return client.Extensions().Ingresses(api.NamespaceAll).Watch(options) + }, + }, + &extensions_v1beta1.Ingress{}, + controller.NoResyncPeriodFunc(), + util.NewTriggerOnAllChanges( + func(obj pkg_runtime.Object) { + ic.deliverIngressObj(obj, 0, false) + }, + )) + + // Federated informer on ingresses in members of federation. + ic.ingressFederatedInformer = util.NewFederatedInformer( + client, + func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return targetClient.Extensions().Ingresses(api.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return targetClient.Extensions().Ingresses(api.NamespaceAll).Watch(options) + }, + }, + &extensions_v1beta1.Ingress{}, + controller.NoResyncPeriodFunc(), + // Trigger reconcilation whenever something in federated cluster is changed. In most cases it + // would be just confirmation that some ingress operation suceeded. + util.NewTriggerOnAllChanges( + func(obj pkg_runtime.Object) { + ic.deliverIngressObj(obj, ic.ingressReviewDelay, false) + }, + )) + }, + + &util.ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *federation_api.Cluster) { + // When new cluster becomes available process all the ingresses again. + ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay)) + }, + }, + ) + + // Federated updater along with Create/Update/Delete operations. + ic.federatedUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, + func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { + ingress := obj.(*extensions_v1beta1.Ingress) + glog.V(4).Infof("Attempting to create Ingress: %v", ingress) + _, err := client.Extensions().Ingresses(ingress.Namespace).Create(ingress) + return err + }, + func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { + ingress := obj.(*extensions_v1beta1.Ingress) + glog.V(4).Infof("Attempting to update Ingress: %v", ingress) + _, err := client.Extensions().Ingresses(ingress.Namespace).Update(ingress) + return err + }, + func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { + ingress := obj.(*extensions_v1beta1.Ingress) + glog.V(4).Infof("Attempting to delete Ingress: %v", ingress) + err := client.Extensions().Ingresses(ingress.Namespace).Delete(ingress.Name, &api.DeleteOptions{}) + return err + }) + return ic +} + +func (ic *IngressController) Run(stopChan <-chan struct{}) { + glog.Infof("Starting Ingress Controller") + go ic.ingressInformerController.Run(stopChan) + glog.Infof("... Starting Ingress Federated Informer") + ic.ingressFederatedInformer.Start() + go func() { + <-stopChan + glog.Infof("Stopping Ingress Controller") + ic.ingressFederatedInformer.Stop() + }() + ic.ingressDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { + ingress := item.Value.(types.NamespacedName) + glog.V(4).Infof("Ingress change delivered, reconciling: %v", ingress) + ic.reconcileIngress(ingress) + }) + ic.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { + glog.V(4).Infof("Cluster change delivered, reconciling ingresses") + ic.reconcileIngressesOnClusterChange() + }) + go func() { + select { + case <-time.After(time.Minute): + glog.V(4).Infof("Ingress controller is garbage collecting") + ic.ingressBackoff.GC() + case <-stopChan: + return + } + }() +} + +func (ic *IngressController) deliverIngressObj(obj interface{}, delay time.Duration, failed bool) { + ingress := obj.(*extensions_v1beta1.Ingress) + ic.deliverIngress(types.NamespacedName{Namespace: ingress.Namespace, Name: ingress.Name}, delay, failed) +} + +func (ic *IngressController) deliverIngress(ingress types.NamespacedName, delay time.Duration, failed bool) { + glog.V(4).Infof("Delivering ingress: %s", ingress) + key := ingress.String() + if failed { + ic.ingressBackoff.Next(key, time.Now()) + delay = delay + ic.ingressBackoff.Get(key) + } else { + ic.ingressBackoff.Reset(key) + } + ic.ingressDeliverer.DeliverAfter(key, ingress, delay) +} + +// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet +// synced with the coresponding api server. +func (ic *IngressController) isSynced() bool { + if !ic.ingressFederatedInformer.ClustersSynced() { + glog.V(2).Infof("Cluster list not synced") + return false + } + clusters, err := ic.ingressFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get ready clusters: %v", err) + return false + } + if !ic.ingressFederatedInformer.GetTargetStore().ClustersSynced(clusters) { + glog.V(2).Infof("Target store not synced") + return false + } + glog.V(4).Infof("Cluster list is synced") + return true +} + +// The function triggers reconcilation of all federated ingresses. +func (ic *IngressController) reconcileIngressesOnClusterChange() { + glog.V(4).Infof("Reconciling ingresses on cluster change") + if !ic.isSynced() { + ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay)) + } + for _, obj := range ic.ingressInformerStore.List() { + ingress := obj.(*extensions_v1beta1.Ingress) + ic.deliverIngress(types.NamespacedName{Namespace: ingress.Namespace, Name: ingress.Name}, ic.smallDelay, false) + } +} + +func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { + glog.V(4).Infof("Reconciling ingress %q", ingress) + if !ic.isSynced() { + ic.deliverIngress(ingress, ic.clusterAvailableDelay, false) + return + } + + key := ingress.String() + baseIngressObj, exist, err := ic.ingressInformerStore.GetByKey(key) + if err != nil { + glog.Errorf("Failed to query main ingress store for %v: %v", ingress, err) + ic.deliverIngress(ingress, 0, true) + return + } + if !exist { + // Not federated ingress, ignoring. + glog.V(4).Infof("Ingress %q is not federated. Ignoring.", ingress) + return + } + baseIngress := baseIngressObj.(*extensions_v1beta1.Ingress) + + clusters, err := ic.ingressFederatedInformer.GetReadyClusters() + if err != nil { + glog.Errorf("Failed to get cluster list: %v", err) + ic.deliverIngress(ingress, ic.clusterAvailableDelay, false) + return + } + + operations := make([]util.FederatedOperation, 0) + + for clusterIndex, cluster := range clusters { + _, baseIPExists := baseIngress.ObjectMeta.Annotations[staticIPAnnotationKey] + clusterIngressObj, found, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) + if err != nil { + glog.Errorf("Failed to get %s from %s: %v", ingress, cluster.Name, err) + ic.deliverIngress(ingress, 0, true) + return + } + desiredIngress := &extensions_v1beta1.Ingress{ + ObjectMeta: baseIngress.ObjectMeta, + Spec: baseIngress.Spec, + } + + if !found { + // We can't supply server-created fields when creating a new object. + desiredIngress.ObjectMeta.ResourceVersion = "" + desiredIngress.ObjectMeta.UID = "" + // We always first create an ingress in the first available cluster. Once that ingress + // has been created and allocated a global IP (visible via an annotation), + // we record that annotation on the federated ingress, and create all other cluster + // ingresses with that same global IP. + // Note: If the first cluster becomes (e.g. temporarily) unavailable, the second cluster will be allocated + // index 0, but eventually all ingresses will share the single global IP recorded in the annotation + // of the federated ingress. + if baseIPExists || (clusterIndex == 0) { + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeAdd, + Obj: desiredIngress, + ClusterName: cluster.Name, + }) + } + } else { + clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress) + glog.V(4).Infof("Found existing Ingress %s in cluster %s - checking if update is required", ingress, cluster.Name) + clusterIPName, clusterIPExists := clusterIngress.ObjectMeta.Annotations[staticIPAnnotationKey] + if !baseIPExists && clusterIPExists { + // Add annotation to federated ingress via API. + original, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Get(baseIngress.Name) + if err == nil { + original.ObjectMeta.Annotations[staticIPAnnotationKey] = clusterIPName + if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(original); err != nil { + glog.Errorf("Failed to add static IP annotation to federated ingress %q: %v", ingress, err) + } + } else { + glog.Errorf("Failed to get federated ingress %q: %v", ingress, err) + } + } + // Update existing ingress, if needed. + if !util.ObjectMetaIsEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) || + !reflect.DeepEqual(desiredIngress.Spec, clusterIngress.Spec) { + // TODO: In some cases Ingress controllers in the clusters add annotations, so we ideally need to exclude those from + // the equivalence comparison to cut down on unnecessary updates. + glog.V(4).Infof("Ingress %s in cluster %s needs an update: cluster ingress %v is not equivalent to federated ingress %v", ingress, cluster.Name, clusterIngress, desiredIngress) + // We need to use server-created fields from the cluster, not the desired object when updating. + desiredIngress.ObjectMeta.ResourceVersion = clusterIngress.ObjectMeta.ResourceVersion + desiredIngress.ObjectMeta.UID = clusterIngress.ObjectMeta.UID + // Merge any annotations on the federated ingress onto the underlying cluster ingress, + // overwriting duplicates. + // TODO: We should probably use a PATCH operation for this instead. + for key, val := range baseIngress.ObjectMeta.Annotations { + desiredIngress.ObjectMeta.Annotations[key] = val + } + operations = append(operations, util.FederatedOperation{ + Type: util.OperationTypeUpdate, + Obj: desiredIngress, + ClusterName: cluster.Name, + }) + } + } + } + + if len(operations) == 0 { + // Everything is in order + return + } + glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations) + err = ic.federatedUpdater.Update(operations, ic.updateTimeout) + if err != nil { + glog.Errorf("Failed to execute updates for %s: %v", ingress, err) + ic.deliverIngress(ingress, ic.ingressReviewDelay, true) + return + } + + // Evertyhing is in order but lets be double sure - TODO: quinton: Why? This seems like a hack. + ic.deliverIngress(ingress, ic.ingressReviewDelay, false) +} diff --git a/federation/pkg/federation-controller/ingress/ingress_controller_test.go b/federation/pkg/federation-controller/ingress/ingress_controller_test.go new file mode 100644 index 00000000000..48d95bbbea6 --- /dev/null +++ b/federation/pkg/federation-controller/ingress/ingress_controller_test.go @@ -0,0 +1,180 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ingress + +import ( + "fmt" + "reflect" + "testing" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + // federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + "k8s.io/kubernetes/federation/pkg/federation-controller/util" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" + kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" + fake_kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4/fake" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + + "github.com/stretchr/testify/assert" +) + +func TestIngressController(t *testing.T) { + cluster1 := mkCluster("cluster1", api_v1.ConditionTrue) + cluster2 := mkCluster("cluster2", api_v1.ConditionTrue) + + fakeClient := &fake_federation_release_1_4.Clientset{} + RegisterList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}}) + RegisterList("ingresses", &fakeClient.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) + ingressWatch := RegisterWatch("ingresses", &fakeClient.Fake) + clusterWatch := RegisterWatch("clusters", &fakeClient.Fake) + + cluster1Client := &fake_kube_release_1_4.Clientset{} + cluster1Watch := RegisterWatch("ingresses", &cluster1Client.Fake) + RegisterList("ingresses", &cluster1Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) + cluster1CreateChan := RegisterCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1Watch) + cluster1UpdateChan := RegisterCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1Watch) + + cluster2Client := &fake_kube_release_1_4.Clientset{} + cluster2Watch := RegisterWatch("ingresses", &cluster2Client.Fake) + RegisterList("ingresses", &cluster2Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) + cluster2CreateChan := RegisterCopyOnCreate("ingresses", &cluster2Client.Fake, cluster2Watch) + + ingressController := NewIngressController(fakeClient) + informer := toFederatedInformerForTestOnly(ingressController.ingressFederatedInformer) + informer.SetClientFactory(func(cluster *federation_api.Cluster) (kube_release_1_4.Interface, error) { + switch cluster.Name { + case cluster1.Name: + return cluster1Client, nil + case cluster2.Name: + return cluster2Client, nil + default: + return nil, fmt.Errorf("Unknown cluster") + } + }) + ingressController.clusterAvailableDelay = time.Second + ingressController.ingressReviewDelay = 50 * time.Millisecond + ingressController.smallDelay = 20 * time.Millisecond + ingressController.updateTimeout = 5 * time.Second + + stop := make(chan struct{}) + ingressController.Run(stop) + + ing1 := extensions_v1beta1.Ingress{ + ObjectMeta: api_v1.ObjectMeta{ + Name: "test-ingress", + Namespace: "mynamespace", + }, + } + + // Test add federated ingress. + ingressWatch.Add(&ing1) + createdIngress := GetIngressFromChan(cluster1CreateChan) + assert.NotNil(t, createdIngress) + assert.True(t, reflect.DeepEqual(&ing1, createdIngress)) + + // Test update federated ingress. + ing1.Annotations = map[string]string{ + "A": "B", + } + ingressWatch.Modify(&ing1) + updatedIngress := GetIngressFromChan(cluster1UpdateChan) + assert.NotNil(t, updatedIngress) + assert.True(t, reflect.DeepEqual(&ing1, updatedIngress)) + + // Test add cluster + ing1.Annotations[staticIPAnnotationKey] = "foo" // Make sure that the base object has a static IP name first. + ingressWatch.Modify(&ing1) + clusterWatch.Add(cluster2) + createdIngress2 := GetIngressFromChan(cluster2CreateChan) + assert.NotNil(t, createdIngress2) + assert.True(t, reflect.DeepEqual(&ing1, createdIngress2)) + + close(stop) +} + +func toFederatedInformerForTestOnly(informer util.FederatedInformer) util.FederatedInformerForTestOnly { + inter := informer.(interface{}) + return inter.(util.FederatedInformerForTestOnly) +} + +func mkCluster(name string, readyStatus api_v1.ConditionStatus) *federation_api.Cluster { + return &federation_api.Cluster{ + ObjectMeta: api_v1.ObjectMeta{ + Name: name, + }, + Status: federation_api.ClusterStatus{ + Conditions: []federation_api.ClusterCondition{ + {Type: federation_api.ClusterReady, Status: readyStatus}, + }, + }, + } +} + +func RegisterWatch(resource string, client *core.Fake) *watch.FakeWatcher { + watcher := watch.NewFake() + client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) { return true, watcher, nil }) + return watcher +} + +func RegisterList(resource string, client *core.Fake, obj runtime.Object) { + client.AddReactor("list", resource, func(action core.Action) (bool, runtime.Object, error) { + return true, obj, nil + }) +} + +func RegisterCopyOnCreate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + client.AddReactor("create", resource, func(action core.Action) (bool, runtime.Object, error) { + createAction := action.(core.CreateAction) + obj := createAction.GetObject() + go func() { + watcher.Add(obj) + objChan <- obj + }() + return true, obj, nil + }) + return objChan +} + +func RegisterCopyOnUpdate(resource string, client *core.Fake, watcher *watch.FakeWatcher) chan runtime.Object { + objChan := make(chan runtime.Object, 100) + client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { + updateAction := action.(core.UpdateAction) + obj := updateAction.GetObject() + go func() { + watcher.Modify(obj) + objChan <- obj + }() + return true, obj, nil + }) + return objChan +} + +func GetIngressFromChan(c chan runtime.Object) *extensions_v1beta1.Ingress { + select { + case obj := <-c: + ingress := obj.(*extensions_v1beta1.Ingress) + return ingress + case <-time.After(time.Minute): + return nil + } +} diff --git a/federation/pkg/federation-controller/util/federated_informer.go b/federation/pkg/federation-controller/util/federated_informer.go index bc60e3ebce4..201724ea1fe 100644 --- a/federation/pkg/federation-controller/util/federated_informer.go +++ b/federation/pkg/federation-controller/util/federated_informer.go @@ -38,7 +38,7 @@ import ( const ( clusterSyncPeriod = 10 * time.Minute - userAgentName = "federation-service-controller" + userAgentName = "federation-controller" ) // An object with an origin information. diff --git a/federation/pkg/federation-controller/util/util.go b/federation/pkg/federation-controller/util/util.go new file mode 100644 index 00000000000..e0bebee309c --- /dev/null +++ b/federation/pkg/federation-controller/util/util.go @@ -0,0 +1,43 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "reflect" + + "k8s.io/kubernetes/pkg/api/v1" +) + +/* +ObjectMetaIsEquivalent determines whether two ObjectMeta's (typically one from a federated API object, +and the other from a cluster object) are equivalent. +*/ +func ObjectMetaIsEquivalent(m1, m2 v1.ObjectMeta) bool { + // First make all of the read-only fields equal, then perform a deep equality comparison + m1.SelfLink = m2.SelfLink // Might be different in different cluster contexts. + m1.UID = m2.UID // Definitely different in different cluster contexts + m1.ResourceVersion = m2.ResourceVersion // Definitely different in different cluster contexts + m1.Generation = m2.Generation // Might be different in different cluster contexts. + m1.CreationTimestamp = m2.CreationTimestamp // Definitely different in different cluster contexts. + m1.DeletionTimestamp = m2.DeletionTimestamp // Might be different in different cluster contexts. + m1.OwnerReferences = nil // Might be different in different cluster contexts. + m2.OwnerReferences = nil + m1.Finalizers = nil // Might be different in different cluster contexts. + m2.Finalizers = nil + + return reflect.DeepEqual(m1, m2) +} diff --git a/pkg/types/namespacedname.go b/pkg/types/namespacedname.go index 70a9ac3e24e..1e2130da08b 100644 --- a/pkg/types/namespacedname.go +++ b/pkg/types/namespacedname.go @@ -16,6 +16,11 @@ limitations under the License. package types +import ( + "fmt" + "strings" +) + // NamespacedName comprises a resource name, with a mandatory namespace, // rendered as "/". Being a type captures intent and // helps make sure that UIDs, namespaced names and non-namespaced names @@ -29,7 +34,27 @@ type NamespacedName struct { Name string } +const ( + Separator = '/' +) + // String returns the general purpose string representation func (n NamespacedName) String() string { - return n.Namespace + "/" + n.Name + return fmt.Sprintf("%s%c%s", n.Namespace, Separator, n.Name) +} + +// NewNamespacedNameFromString parses the provided string and returns a NamespacedName. +// The expected format is as per String() above. +// If the input string is invalid, the returned NamespacedName has all empty string field values. +// This allows a single-value return from this function, while still allowing error checks in the caller. +// Note that an input string which does not include exactly one Separator is not a valid input (as it could never +// have neem returned by String() ) +func NewNamespacedNameFromString(s string) NamespacedName { + nn := NamespacedName{} + result := strings.Split(s, string(Separator)) + if len(result) == 2 { + nn.Namespace = result[0] + nn.Name = result[1] + } + return nn }