diff --git a/federation/pkg/federation-controller/util/federated_informer.go b/federation/pkg/federation-controller/util/federated_informer.go new file mode 100644 index 00000000000..9a77b8bd933 --- /dev/null +++ b/federation/pkg/federation-controller/util/federated_informer.go @@ -0,0 +1,388 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "reflect" + "sync" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + api "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/controller/framework" + pkg_runtime "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +const ( + clusterSyncPeriod = 10 * time.Minute + userAgentName = "federation-service-controller" +) + +// FederatedReadOnlyStore is an overlay over multiple stores created in federated clusters. +type FederatedReadOnlyStore interface { + // Returns all items in the store. + List() ([]interface{}, error) + + // GetByKey returns the item stored under the given key in the specified cluster (if exist). + GetByKey(clusterName string, key string) (interface{}, bool, error) + + // Returns the items stored under the given key in all clusters. + GetFromAllClusters(key string) ([]interface{}, error) + + // Checks whether stores for all clusters form the lists (and only these) are there and + // are synced. This is only a basic check whether the data inside of the store is usable. + // It is not a full synchronization/locking mechanism it only tries to ensure that out-of-sync + // issues occur less often. All users of the interface should assume + // that there may be significant delays in content updates of all kinds and write their + // code that it doesn't break if something is slightly out-of-sync. + ClustersSynced(clusters []*federation_api.Cluster) bool +} + +// An interface to access federation members and clients. +type FederationView interface { + // GetClientsetForCluster returns a clientset for the cluster, if present. + GetClientsetForCluster(clusterName string) (federation_release_1_4.Interface, error) + + // GetReadyClusers returns all clusters for which the sub-informers are run. + GetReadyClusters() ([]*federation_api.Cluster, error) + + // GetReadyCluster returns the cluster with the given name, if found. + GetReadyCluster(name string) (*federation_api.Cluster, bool, error) + + // ClustersSynced returns true if the view is synced (for the first time). + ClustersSynced() bool +} + +// A structure that combines an informer running agains federated api server and listening for cluster updates +// with multiple Kubernetes API informers (called target informers) running against federation members. Whenever a new +// cluster is added to the federation an informer is created for it using TargetInformerFactory. Infomrers are stoped +// when a cluster is either put offline of deleted. It is assumed that some controller keeps an eye on the cluster list +// and thus the clusters in ETCD are up to date. +type FederatedInformer interface { + FederationView + + // Returns a store created over all stores from target informers. + GetTargetStore() FederatedReadOnlyStore + + // Starts all the processes. + Start() + + // Stops all the processes inside the informer. + Stop() +} + +// A function that should be used to create an informer on the target object. Store should use +// framework.DeletionHandlingMetaNamespaceKeyFunc as a keying function. +type TargetInformerFactory func(*federation_api.Cluster, federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface) + +// Builds a FederatedInformer for the given federation client and factory. +func NewFederatedInformer(federationClient federation_release_1_4.Interface, targetInformerFactory TargetInformerFactory) FederatedInformer { + federatedInformer := &federatedInformerImpl{ + targetInformerFactory: targetInformerFactory, + clientFactory: func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) { + clusterConfig, err := BuildClusterConfig(cluster) + if err != nil && clusterConfig != nil { + clientset := federation_release_1_4.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, userAgentName)) + return clientset, nil + } + return nil, err + }, + targetInformers: make(map[string]informer), + } + + federatedInformer.clusterInformer.store, federatedInformer.clusterInformer.controller = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { + return federationClient.Federation().Clusters().List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return federationClient.Federation().Clusters().Watch(options) + }, + }, + &federation_api.Cluster{}, + clusterSyncPeriod, + framework.ResourceEventHandlerFuncs{ + DeleteFunc: func(old interface{}) { + oldCluster, ok := old.(*federation_api.Cluster) + if ok { + federatedInformer.deleteCluster(oldCluster) + } + }, + AddFunc: func(cur interface{}) { + curCluster, ok := cur.(*federation_api.Cluster) + if ok && isClusterReady(curCluster) { + federatedInformer.addCluster(curCluster) + } + }, + UpdateFunc: func(old, cur interface{}) { + oldCluster, ok := old.(*federation_api.Cluster) + if !ok { + return + } + curCluster, ok := cur.(*federation_api.Cluster) + if !ok { + return + } + if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) { + federatedInformer.deleteCluster(oldCluster) + if isClusterReady(curCluster) { + federatedInformer.addCluster(curCluster) + } + } + }, + }, + ) + return federatedInformer +} + +func isClusterReady(cluster *federation_api.Cluster) bool { + for _, condition := range cluster.Status.Conditions { + if condition.Type == federation_api.ClusterReady { + if condition.Status == api_v1.ConditionTrue { + return true + } + } + } + return false +} + +type informer struct { + controller framework.ControllerInterface + store cache.Store + stopChan chan struct{} +} + +type federatedInformerImpl struct { + sync.Mutex + + // Informer on federated clusters. + clusterInformer informer + + // Target informers factory + targetInformerFactory TargetInformerFactory + + // Structures returned by targetInformerFactory + targetInformers map[string]informer + + // A function to build clients. + clientFactory func(*federation_api.Cluster) (federation_release_1_4.Interface, error) +} + +type federatedStoreImpl struct { + federatedInformer *federatedInformerImpl +} + +func (f *federatedInformerImpl) Stop() { + f.Lock() + defer f.Unlock() + + close(f.clusterInformer.stopChan) + for _, informer := range f.targetInformers { + close(informer.stopChan) + } +} + +func (f *federatedInformerImpl) Start() { + f.Lock() + defer f.Unlock() + + f.clusterInformer.stopChan = make(chan struct{}) + go f.clusterInformer.controller.Run(f.clusterInformer.stopChan) +} + +// GetClientsetForCluster returns a clientset for the cluster, if present. +func (f *federatedInformerImpl) GetClientsetForCluster(clusterName string) (federation_release_1_4.Interface, error) { + f.Lock() + defer f.Unlock() + return f.getClientsetForClusterUnlocked(clusterName) +} + +func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName string) (federation_release_1_4.Interface, error) { + // No locking needed. Will happen in f.GetCluster. + if cluster, found, err := f.getReadyClusterUnlocked(clusterName); found && err == nil { + return f.clientFactory(cluster) + } else { + if err != nil { + return nil, err + } + } + return nil, fmt.Errorf("cluster %s not found", clusterName) +} + +// GetReadyClusers returns all clusters for which the sub-informers are run. +func (f *federatedInformerImpl) GetReadyClusters() ([]*federation_api.Cluster, error) { + f.Lock() + defer f.Unlock() + + items := f.clusterInformer.store.List() + result := make([]*federation_api.Cluster, 0, len(items)) + for _, item := range items { + if cluster, ok := item.(*federation_api.Cluster); ok { + if isClusterReady(cluster) { + result = append(result, cluster) + } + } else { + return nil, fmt.Errorf("wrong data in FederatedInformerImpl cluster store: %v", item) + } + } + return result, nil +} + +// GetCluster returns the cluster with the given name, if found. +func (f *federatedInformerImpl) GetReadyCluster(name string) (*federation_api.Cluster, bool, error) { + f.Lock() + defer f.Unlock() + return f.getReadyClusterUnlocked(name) +} + +func (f *federatedInformerImpl) getReadyClusterUnlocked(name string) (*federation_api.Cluster, bool, error) { + if obj, exist, err := f.clusterInformer.store.GetByKey(name); exist && err == nil { + if cluster, ok := obj.(*federation_api.Cluster); ok { + if isClusterReady(cluster) { + return cluster, true, nil + } + return nil, false, nil + + } + return nil, false, fmt.Errorf("wrong data in FederatedInformerImpl cluster store: %v", obj) + + } else { + return nil, false, err + } +} + +// Synced returns true if the view is synced (for the first time) +func (f *federatedInformerImpl) ClustersSynced() bool { + f.Lock() + defer f.Unlock() + return f.clusterInformer.controller.HasSynced() +} + +// Adds the given cluster to federated informer. +func (f *federatedInformerImpl) addCluster(cluster *federation_api.Cluster) { + f.Lock() + defer f.Unlock() + name := cluster.Name + if client, err := f.getClientsetForClusterUnlocked(name); err == nil { + store, controller := f.targetInformerFactory(cluster, client) + targetInformer := informer{ + controller: controller, + store: store, + stopChan: make(chan struct{}), + } + f.targetInformers[name] = targetInformer + go targetInformer.controller.Run(targetInformer.stopChan) + } else { + // TODO: create also an event for cluster. + glog.Errorf("Failed to create a client for cluster: %v", err) + } +} + +// Removes the cluster from federated informer. +func (f *federatedInformerImpl) deleteCluster(cluster *federation_api.Cluster) { + f.Lock() + defer f.Unlock() + name := cluster.Name + if targetInformer, found := f.targetInformers[name]; found { + close(targetInformer.stopChan) + } + delete(f.targetInformers, name) +} + +// Returns a store created over all stores from target informers. +func (f *federatedInformerImpl) GetTargetStore() FederatedReadOnlyStore { + return &federatedStoreImpl{ + federatedInformer: f, + } +} + +// Returns all items in the store. +func (fs *federatedStoreImpl) List() ([]interface{}, error) { + fs.federatedInformer.Lock() + defer fs.federatedInformer.Unlock() + + result := make([]interface{}, 0) + for _, targetInformer := range fs.federatedInformer.targetInformers { + values := targetInformer.store.List() + result = append(result, values...) + } + return result, nil +} + +// GetByKey returns the item stored under the given key in the specified cluster (if exist). +func (fs *federatedStoreImpl) GetByKey(clusterName string, key string) (interface{}, bool, error) { + fs.federatedInformer.Lock() + defer fs.federatedInformer.Unlock() + if targetInformer, found := fs.federatedInformer.targetInformers[clusterName]; found { + return targetInformer.store.GetByKey(key) + } + return nil, false, nil +} + +// Returns the items stored under the given key in all clusters. +func (fs *federatedStoreImpl) GetFromAllClusters(key string) ([]interface{}, error) { + fs.federatedInformer.Lock() + defer fs.federatedInformer.Unlock() + + result := make([]interface{}, 0) + for _, targetInformer := range fs.federatedInformer.targetInformers { + value, exist, err := targetInformer.store.GetByKey(key) + if err != nil { + return nil, err + } + if exist { + result = append(result, value) + } + } + return result, nil +} + +// GetKey for returns the key under which the item would be put in the store. +func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string { + // TODO: support other keying functions. + key, _ := framework.DeletionHandlingMetaNamespaceKeyFunc(item) + return key +} + +// Checks whether stores for all clusters form the lists (and only these) are there and +// are synced. +func (fs *federatedStoreImpl) ClustersSynced(clusters []*federation_api.Cluster) bool { + fs.federatedInformer.Lock() + defer fs.federatedInformer.Unlock() + + if len(fs.federatedInformer.targetInformers) != len(clusters) { + return false + } + for _, cluster := range clusters { + if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found { + if !targetInformer.controller.HasSynced() { + return false + } + } else { + return false + } + } + return true +} diff --git a/federation/pkg/federation-controller/util/federated_informer_test.go b/federation/pkg/federation-controller/util/federated_informer_test.go new file mode 100644 index 00000000000..4d022184bcd --- /dev/null +++ b/federation/pkg/federation-controller/util/federated_informer_test.go @@ -0,0 +1,132 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + "time" + + federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" + federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4" + fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" + api "k8s.io/kubernetes/pkg/api" + api_v1 "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" + + "github.com/stretchr/testify/assert" +) + +// Basic test for Federated Informer. Checks whether the subinformer are added and deleted +// when the corresponding cluster entries appear and dissapear from etcd. +func TestFederatedInformer(t *testing.T) { + fakeClient := &fake_federation_release_1_4.Clientset{} + + // Add a single cluster to federation and remove it when needed. + cluster := federation_api.Cluster{ + ObjectMeta: api_v1.ObjectMeta{ + Name: "mycluster", + }, + Status: federation_api.ClusterStatus{ + Conditions: []federation_api.ClusterCondition{ + {Type: federation_api.ClusterReady, Status: api_v1.ConditionTrue}, + }, + }, + } + fakeClient.AddReactor("list", "clusters", func(action core.Action) (bool, runtime.Object, error) { + return true, &federation_api.ClusterList{Items: []federation_api.Cluster{cluster}}, nil + }) + deleteChan := make(chan struct{}) + fakeClient.AddWatchReactor("clusters", func(action core.Action) (bool, watch.Interface, error) { + fakeWatch := watch.NewFake() + go func() { + <-deleteChan + fakeWatch.Delete(&cluster) + }() + return true, fakeWatch, nil + }) + + // There is a single service ns1/s1 in cluster mycluster. + service := api_v1.Service{ + ObjectMeta: api_v1.ObjectMeta{ + Namespace: "ns1", + Name: "s1", + }, + } + fakeClient.AddReactor("list", "services", func(action core.Action) (bool, runtime.Object, error) { + return true, &api_v1.ServiceList{Items: []api_v1.Service{service}}, nil + }) + fakeClient.AddWatchReactor("services", func(action core.Action) (bool, watch.Interface, error) { + return true, watch.NewFake(), nil + }) + + targetInformerFactory := func(cluster *federation_api.Cluster, clientset federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface) { + return framework.NewInformer( + &cache.ListWatch{ + ListFunc: func(options api.ListOptions) (runtime.Object, error) { + return clientset.Core().Services(api_v1.NamespaceAll).List(options) + }, + WatchFunc: func(options api.ListOptions) (watch.Interface, error) { + return clientset.Core().Services(api_v1.NamespaceAll).Watch(options) + }, + }, + &api_v1.Service{}, + 10*time.Second, + framework.ResourceEventHandlerFuncs{}) + } + + informer := NewFederatedInformer(fakeClient, targetInformerFactory).(*federatedInformerImpl) + informer.clientFactory = func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) { + return fakeClient, nil + } + assert.NotNil(t, informer) + informer.Start() + + // Wait until mycluster is synced. + for !informer.GetTargetStore().ClustersSynced([]*federation_api.Cluster{&cluster}) { + time.Sleep(time.Millisecond * 100) + } + readyClusters, err := informer.GetReadyClusters() + assert.NoError(t, err) + assert.Contains(t, readyClusters, &cluster) + serviceList, err := informer.GetTargetStore().List() + assert.NoError(t, err) + assert.Contains(t, serviceList, &service) + service1, found, err := informer.GetTargetStore().GetByKey("mycluster", "ns1/s1") + assert.NoError(t, err) + assert.True(t, found) + assert.EqualValues(t, &service, service1) + + // All checked, lets delete the cluster. + deleteChan <- struct{}{} + for !informer.GetTargetStore().ClustersSynced([]*federation_api.Cluster{}) { + time.Sleep(time.Millisecond * 100) + } + readyClusters, err = informer.GetReadyClusters() + assert.NoError(t, err) + assert.Empty(t, readyClusters) + + serviceList, err = informer.GetTargetStore().List() + assert.NoError(t, err) + assert.Empty(t, serviceList) + + // Test complete. + informer.Stop() +}