Federation - common libs - cluster lifecycle handlers in FederatedInformer

This commit is contained in:
Marcin Wielgus 2016-08-05 10:24:25 +02:00
parent 42a12a4cd6
commit fc263877a5
2 changed files with 80 additions and 2 deletions

View File

@ -45,6 +45,9 @@ type FederatedReadOnlyStore interface {
// Returns all items in the store.
List() ([]interface{}, error)
// Returns all items from a cluster.
ListFromCluster(clusterName string) ([]interface{}, error)
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
GetByKey(clusterName string, key string) (interface{}, bool, error)
@ -97,8 +100,24 @@ type FederatedInformer interface {
// framework.DeletionHandlingMetaNamespaceKeyFunc as a keying function.
type TargetInformerFactory func(*federation_api.Cluster, federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface)
// A structure with cluster lifecycle handler functions. Cluster is available (and ClusterAvailable is fired)
// when it is created in federated etcd and ready. Cluster becomes unavailable (and ClusterUnavailable is fired)
// when it is either deleted or becomes not ready. When cluster spec (IP)is modified both ClusterAvailable
// and ClusterUnavailable are fired.
type ClusterLifecycleHandlerFuncs struct {
// Fired when the cluster becomes available.
ClusterAvailable func(*federation_api.Cluster)
// Fired when the cluster becomes unavailable. The second arg contains data that was present
// in the cluster before deletion.
ClusterUnavailable func(*federation_api.Cluster, []interface{})
}
// Builds a FederatedInformer for the given federation client and factory.
func NewFederatedInformer(federationClient federation_release_1_4.Interface, targetInformerFactory TargetInformerFactory) FederatedInformer {
func NewFederatedInformer(
federationClient federation_release_1_4.Interface,
targetInformerFactory TargetInformerFactory,
clusterLifecycle ClusterLifecycleHandlerFuncs) FederatedInformer {
federatedInformer := &federatedInformerImpl{
targetInformerFactory: targetInformerFactory,
clientFactory: func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) {
@ -112,6 +131,15 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar
targetInformers: make(map[string]informer),
}
getClusterData := func(name string) []interface{} {
data, err := federatedInformer.GetTargetStore().ListFromCluster(name)
if err != nil {
glog.Errorf("Failed to list %s content: %v", name, err)
return make([]interface{}, 0)
}
return data
}
federatedInformer.clusterInformer.store, federatedInformer.clusterInformer.controller = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
@ -127,13 +155,23 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar
DeleteFunc: func(old interface{}) {
oldCluster, ok := old.(*federation_api.Cluster)
if ok {
var data []interface{}
if clusterLifecycle.ClusterUnavailable != nil {
data = getClusterData(oldCluster.Name)
}
federatedInformer.deleteCluster(oldCluster)
if clusterLifecycle.ClusterUnavailable != nil {
clusterLifecycle.ClusterUnavailable(oldCluster, data)
}
}
},
AddFunc: func(cur interface{}) {
curCluster, ok := cur.(*federation_api.Cluster)
if ok && isClusterReady(curCluster) {
federatedInformer.addCluster(curCluster)
if clusterLifecycle.ClusterAvailable != nil {
clusterLifecycle.ClusterAvailable(curCluster)
}
}
},
UpdateFunc: func(old, cur interface{}) {
@ -146,9 +184,20 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar
return
}
if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
var data []interface{}
if clusterLifecycle.ClusterUnavailable != nil {
data = getClusterData(oldCluster.Name)
}
federatedInformer.deleteCluster(oldCluster)
if clusterLifecycle.ClusterUnavailable != nil {
clusterLifecycle.ClusterUnavailable(oldCluster, data)
}
if isClusterReady(curCluster) {
federatedInformer.addCluster(curCluster)
if clusterLifecycle.ClusterAvailable != nil {
clusterLifecycle.ClusterAvailable(curCluster)
}
}
}
},
@ -331,6 +380,19 @@ func (fs *federatedStoreImpl) List() ([]interface{}, error) {
return result, nil
}
// Returns all items in the given cluster.
func (fs *federatedStoreImpl) ListFromCluster(clusterName string) ([]interface{}, error) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
result := make([]interface{}, 0)
if targetInformer, found := fs.federatedInformer.targetInformers[clusterName]; found {
values := targetInformer.store.List()
result = append(result, values...)
}
return result, nil
}
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
func (fs *federatedStoreImpl) GetByKey(clusterName string, key string) (interface{}, bool, error) {
fs.federatedInformer.Lock()

View File

@ -92,7 +92,20 @@ func TestFederatedInformer(t *testing.T) {
framework.ResourceEventHandlerFuncs{})
}
informer := NewFederatedInformer(fakeClient, targetInformerFactory).(*federatedInformerImpl)
addedClusters := make(chan string, 1)
deletedClusters := make(chan string, 1)
lifecycle := ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) {
addedClusters <- cluster.Name
close(addedClusters)
},
ClusterUnavailable: func(cluster *federation_api.Cluster, _ []interface{}) {
deletedClusters <- cluster.Name
close(deletedClusters)
},
}
informer := NewFederatedInformer(fakeClient, targetInformerFactory, lifecycle).(*federatedInformerImpl)
informer.clientFactory = func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) {
return fakeClient, nil
}
@ -113,6 +126,7 @@ func TestFederatedInformer(t *testing.T) {
assert.NoError(t, err)
assert.True(t, found)
assert.EqualValues(t, &service, service1)
assert.Equal(t, "mycluster", <-addedClusters)
// All checked, lets delete the cluster.
deleteChan <- struct{}{}
@ -127,6 +141,8 @@ func TestFederatedInformer(t *testing.T) {
assert.NoError(t, err)
assert.Empty(t, serviceList)
assert.Equal(t, "mycluster", <-deletedClusters)
// Test complete.
informer.Stop()
}