mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #30130 from mwielgus/cluster-lifecycle
Automatic merge from submit-queue Federation - common libs - cluster lifecycle handlers in FederatedInformer Adds cluster lifecycle handlers to FederatedInformer to execute some actions when a cluster becomes available or unavailable. ref: #29347 cc: @nikhiljindal @wojtek-t @kubernetes/sig-cluster-federation
This commit is contained in:
commit
5f3cc4a941
@ -45,6 +45,9 @@ type FederatedReadOnlyStore interface {
|
||||
// Returns all items in the store.
|
||||
List() ([]interface{}, error)
|
||||
|
||||
// Returns all items from a cluster.
|
||||
ListFromCluster(clusterName string) ([]interface{}, error)
|
||||
|
||||
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
|
||||
GetByKey(clusterName string, key string) (interface{}, bool, error)
|
||||
|
||||
@ -97,8 +100,24 @@ type FederatedInformer interface {
|
||||
// framework.DeletionHandlingMetaNamespaceKeyFunc as a keying function.
|
||||
type TargetInformerFactory func(*federation_api.Cluster, federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface)
|
||||
|
||||
// A structure with cluster lifecycle handler functions. Cluster is available (and ClusterAvailable is fired)
|
||||
// when it is created in federated etcd and ready. Cluster becomes unavailable (and ClusterUnavailable is fired)
|
||||
// when it is either deleted or becomes not ready. When cluster spec (IP)is modified both ClusterAvailable
|
||||
// and ClusterUnavailable are fired.
|
||||
type ClusterLifecycleHandlerFuncs struct {
|
||||
// Fired when the cluster becomes available.
|
||||
ClusterAvailable func(*federation_api.Cluster)
|
||||
// Fired when the cluster becomes unavailable. The second arg contains data that was present
|
||||
// in the cluster before deletion.
|
||||
ClusterUnavailable func(*federation_api.Cluster, []interface{})
|
||||
}
|
||||
|
||||
// Builds a FederatedInformer for the given federation client and factory.
|
||||
func NewFederatedInformer(federationClient federation_release_1_4.Interface, targetInformerFactory TargetInformerFactory) FederatedInformer {
|
||||
func NewFederatedInformer(
|
||||
federationClient federation_release_1_4.Interface,
|
||||
targetInformerFactory TargetInformerFactory,
|
||||
clusterLifecycle ClusterLifecycleHandlerFuncs) FederatedInformer {
|
||||
|
||||
federatedInformer := &federatedInformerImpl{
|
||||
targetInformerFactory: targetInformerFactory,
|
||||
clientFactory: func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) {
|
||||
@ -112,6 +131,15 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar
|
||||
targetInformers: make(map[string]informer),
|
||||
}
|
||||
|
||||
getClusterData := func(name string) []interface{} {
|
||||
data, err := federatedInformer.GetTargetStore().ListFromCluster(name)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list %s content: %v", name, err)
|
||||
return make([]interface{}, 0)
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
federatedInformer.clusterInformer.store, federatedInformer.clusterInformer.controller = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
|
||||
@ -127,13 +155,23 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar
|
||||
DeleteFunc: func(old interface{}) {
|
||||
oldCluster, ok := old.(*federation_api.Cluster)
|
||||
if ok {
|
||||
var data []interface{}
|
||||
if clusterLifecycle.ClusterUnavailable != nil {
|
||||
data = getClusterData(oldCluster.Name)
|
||||
}
|
||||
federatedInformer.deleteCluster(oldCluster)
|
||||
if clusterLifecycle.ClusterUnavailable != nil {
|
||||
clusterLifecycle.ClusterUnavailable(oldCluster, data)
|
||||
}
|
||||
}
|
||||
},
|
||||
AddFunc: func(cur interface{}) {
|
||||
curCluster, ok := cur.(*federation_api.Cluster)
|
||||
if ok && isClusterReady(curCluster) {
|
||||
federatedInformer.addCluster(curCluster)
|
||||
if clusterLifecycle.ClusterAvailable != nil {
|
||||
clusterLifecycle.ClusterAvailable(curCluster)
|
||||
}
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
@ -146,9 +184,20 @@ func NewFederatedInformer(federationClient federation_release_1_4.Interface, tar
|
||||
return
|
||||
}
|
||||
if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
|
||||
var data []interface{}
|
||||
if clusterLifecycle.ClusterUnavailable != nil {
|
||||
data = getClusterData(oldCluster.Name)
|
||||
}
|
||||
federatedInformer.deleteCluster(oldCluster)
|
||||
if clusterLifecycle.ClusterUnavailable != nil {
|
||||
clusterLifecycle.ClusterUnavailable(oldCluster, data)
|
||||
}
|
||||
|
||||
if isClusterReady(curCluster) {
|
||||
federatedInformer.addCluster(curCluster)
|
||||
if clusterLifecycle.ClusterAvailable != nil {
|
||||
clusterLifecycle.ClusterAvailable(curCluster)
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -331,6 +380,19 @@ func (fs *federatedStoreImpl) List() ([]interface{}, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Returns all items in the given cluster.
|
||||
func (fs *federatedStoreImpl) ListFromCluster(clusterName string) ([]interface{}, error) {
|
||||
fs.federatedInformer.Lock()
|
||||
defer fs.federatedInformer.Unlock()
|
||||
|
||||
result := make([]interface{}, 0)
|
||||
if targetInformer, found := fs.federatedInformer.targetInformers[clusterName]; found {
|
||||
values := targetInformer.store.List()
|
||||
result = append(result, values...)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
|
||||
func (fs *federatedStoreImpl) GetByKey(clusterName string, key string) (interface{}, bool, error) {
|
||||
fs.federatedInformer.Lock()
|
||||
|
@ -92,7 +92,20 @@ func TestFederatedInformer(t *testing.T) {
|
||||
framework.ResourceEventHandlerFuncs{})
|
||||
}
|
||||
|
||||
informer := NewFederatedInformer(fakeClient, targetInformerFactory).(*federatedInformerImpl)
|
||||
addedClusters := make(chan string, 1)
|
||||
deletedClusters := make(chan string, 1)
|
||||
lifecycle := ClusterLifecycleHandlerFuncs{
|
||||
ClusterAvailable: func(cluster *federation_api.Cluster) {
|
||||
addedClusters <- cluster.Name
|
||||
close(addedClusters)
|
||||
},
|
||||
ClusterUnavailable: func(cluster *federation_api.Cluster, _ []interface{}) {
|
||||
deletedClusters <- cluster.Name
|
||||
close(deletedClusters)
|
||||
},
|
||||
}
|
||||
|
||||
informer := NewFederatedInformer(fakeClient, targetInformerFactory, lifecycle).(*federatedInformerImpl)
|
||||
informer.clientFactory = func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) {
|
||||
return fakeClient, nil
|
||||
}
|
||||
@ -113,6 +126,7 @@ func TestFederatedInformer(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, found)
|
||||
assert.EqualValues(t, &service, service1)
|
||||
assert.Equal(t, "mycluster", <-addedClusters)
|
||||
|
||||
// All checked, lets delete the cluster.
|
||||
deleteChan <- struct{}{}
|
||||
@ -127,6 +141,8 @@ func TestFederatedInformer(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Empty(t, serviceList)
|
||||
|
||||
assert.Equal(t, "mycluster", <-deletedClusters)
|
||||
|
||||
// Test complete.
|
||||
informer.Stop()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user