diff --git a/federation/pkg/federation-controller/util/federated_informer.go b/federation/pkg/federation-controller/util/federated_informer.go index 9a77b8bd933..a1e7db62206 100644 --- a/federation/pkg/federation-controller/util/federated_informer.go +++ b/federation/pkg/federation-controller/util/federated_informer.go @@ -45,6 +45,9 @@ type FederatedReadOnlyStore interface { // Returns all items in the store. List() ([]interface{}, error) + // Returns all items from a cluster. + ListFromCluster(clusterName string) ([]interface{}, error) + // GetByKey returns the item stored under the given key in the specified cluster (if exist). GetByKey(clusterName string, key string) (interface{}, bool, error) @@ -97,8 +100,24 @@ type FederatedInformer interface { // framework.DeletionHandlingMetaNamespaceKeyFunc as a keying function. type TargetInformerFactory func(*federation_api.Cluster, federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface) +// A structure with cluster lifecycle handler functions. Cluster is available (and ClusterAvailable is fired) +// when it is created in federated etcd and ready. Cluster becomes unavailable (and ClusterUnavailable is fired) +// when it is either deleted or becomes not ready. When cluster spec (IP)is modified both ClusterAvailable +// and ClusterUnavailable are fired. +type ClusterLifecycleHandlerFuncs struct { + // Fired when the cluster becomes available. + ClusterAvailable func(*federation_api.Cluster) + // Fired when the cluster becomes unavailable. The second arg contains data that was present + // in the cluster before deletion. + ClusterUnavailable func(*federation_api.Cluster, []interface{}) +} + // Builds a FederatedInformer for the given federation client and factory. -func NewFederatedInformer(federationClient federation_release_1_4.Interface, targetInformerFactory TargetInformerFactory) FederatedInformer { +func NewFederatedInformer( + federationClient federation_release_1_4.Interface, + targetInformerFactory TargetInformerFactory, + clusterLifecycle ClusterLifecycleHandlerFuncs) FederatedInformer { + federatedInformer := &federatedInformerImpl{ targetInformerFactory: targetInformerFactory, clientFactory: func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) { @@ -112,6 +131,15 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar targetInformers: make(map[string]informer), } + getClusterData := func(name string) []interface{} { + data, err := federatedInformer.GetTargetStore().ListFromCluster(name) + if err != nil { + glog.Errorf("Failed to list %s content: %v", name, err) + return make([]interface{}, 0) + } + return data + } + federatedInformer.clusterInformer.store, federatedInformer.clusterInformer.controller = framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) { @@ -127,13 +155,23 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar DeleteFunc: func(old interface{}) { oldCluster, ok := old.(*federation_api.Cluster) if ok { + var data []interface{} + if clusterLifecycle.ClusterUnavailable != nil { + data = getClusterData(oldCluster.Name) + } federatedInformer.deleteCluster(oldCluster) + if clusterLifecycle.ClusterUnavailable != nil { + clusterLifecycle.ClusterUnavailable(oldCluster, data) + } } }, AddFunc: func(cur interface{}) { curCluster, ok := cur.(*federation_api.Cluster) if ok && isClusterReady(curCluster) { federatedInformer.addCluster(curCluster) + if clusterLifecycle.ClusterAvailable != nil { + clusterLifecycle.ClusterAvailable(curCluster) + } } }, UpdateFunc: func(old, cur interface{}) { @@ -146,9 +184,20 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar return } if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) { + var data []interface{} + if clusterLifecycle.ClusterUnavailable != nil { + data = getClusterData(oldCluster.Name) + } federatedInformer.deleteCluster(oldCluster) + if clusterLifecycle.ClusterUnavailable != nil { + clusterLifecycle.ClusterUnavailable(oldCluster, data) + } + if isClusterReady(curCluster) { federatedInformer.addCluster(curCluster) + if clusterLifecycle.ClusterAvailable != nil { + clusterLifecycle.ClusterAvailable(curCluster) + } } } }, @@ -331,6 +380,19 @@ func (fs *federatedStoreImpl) List() ([]interface{}, error) { return result, nil } +// Returns all items in the given cluster. +func (fs *federatedStoreImpl) ListFromCluster(clusterName string) ([]interface{}, error) { + fs.federatedInformer.Lock() + defer fs.federatedInformer.Unlock() + + result := make([]interface{}, 0) + if targetInformer, found := fs.federatedInformer.targetInformers[clusterName]; found { + values := targetInformer.store.List() + result = append(result, values...) + } + return result, nil +} + // GetByKey returns the item stored under the given key in the specified cluster (if exist). func (fs *federatedStoreImpl) GetByKey(clusterName string, key string) (interface{}, bool, error) { fs.federatedInformer.Lock() diff --git a/federation/pkg/federation-controller/util/federated_informer_test.go b/federation/pkg/federation-controller/util/federated_informer_test.go index 4d022184bcd..2fb1cc4295f 100644 --- a/federation/pkg/federation-controller/util/federated_informer_test.go +++ b/federation/pkg/federation-controller/util/federated_informer_test.go @@ -92,7 +92,20 @@ func TestFederatedInformer(t *testing.T) { framework.ResourceEventHandlerFuncs{}) } - informer := NewFederatedInformer(fakeClient, targetInformerFactory).(*federatedInformerImpl) + addedClusters := make(chan string, 1) + deletedClusters := make(chan string, 1) + lifecycle := ClusterLifecycleHandlerFuncs{ + ClusterAvailable: func(cluster *federation_api.Cluster) { + addedClusters <- cluster.Name + close(addedClusters) + }, + ClusterUnavailable: func(cluster *federation_api.Cluster, _ []interface{}) { + deletedClusters <- cluster.Name + close(deletedClusters) + }, + } + + informer := NewFederatedInformer(fakeClient, targetInformerFactory, lifecycle).(*federatedInformerImpl) informer.clientFactory = func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) { return fakeClient, nil } @@ -113,6 +126,7 @@ func TestFederatedInformer(t *testing.T) { assert.NoError(t, err) assert.True(t, found) assert.EqualValues(t, &service, service1) + assert.Equal(t, "mycluster", <-addedClusters) // All checked, lets delete the cluster. deleteChan <- struct{}{} @@ -127,6 +141,8 @@ func TestFederatedInformer(t *testing.T) { assert.NoError(t, err) assert.Empty(t, serviceList) + assert.Equal(t, "mycluster", <-deletedClusters) + // Test complete. informer.Stop() }