Merge pull request #29939 from mwielgus/federated_informer

Automatic merge from submit-queue

Federation - common libs - FederatedInformer

Fixes #29383

Will add more tests after the first pass of the review.

ref: #29347

cc: @nikhiljindal @wojtek-t
This commit is contained in:
Kubernetes Submit Queue 2016-08-03 13:43:37 -07:00 committed by GitHub
commit ac266ae85b
2 changed files with 520 additions and 0 deletions

View File

@ -0,0 +1,388 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"reflect"
"sync"
"time"
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
api "k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const (
clusterSyncPeriod = 10 * time.Minute
userAgentName = "federation-service-controller"
)
// FederatedReadOnlyStore is an overlay over multiple stores created in federated clusters.
type FederatedReadOnlyStore interface {
// Returns all items in the store.
List() ([]interface{}, error)
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
GetByKey(clusterName string, key string) (interface{}, bool, error)
// Returns the items stored under the given key in all clusters.
GetFromAllClusters(key string) ([]interface{}, error)
// Checks whether stores for all clusters form the lists (and only these) are there and
// are synced. This is only a basic check whether the data inside of the store is usable.
// It is not a full synchronization/locking mechanism it only tries to ensure that out-of-sync
// issues occur less often. All users of the interface should assume
// that there may be significant delays in content updates of all kinds and write their
// code that it doesn't break if something is slightly out-of-sync.
ClustersSynced(clusters []*federation_api.Cluster) bool
}
// An interface to access federation members and clients.
type FederationView interface {
// GetClientsetForCluster returns a clientset for the cluster, if present.
GetClientsetForCluster(clusterName string) (federation_release_1_4.Interface, error)
// GetReadyClusers returns all clusters for which the sub-informers are run.
GetReadyClusters() ([]*federation_api.Cluster, error)
// GetReadyCluster returns the cluster with the given name, if found.
GetReadyCluster(name string) (*federation_api.Cluster, bool, error)
// ClustersSynced returns true if the view is synced (for the first time).
ClustersSynced() bool
}
// A structure that combines an informer running agains federated api server and listening for cluster updates
// with multiple Kubernetes API informers (called target informers) running against federation members. Whenever a new
// cluster is added to the federation an informer is created for it using TargetInformerFactory. Infomrers are stoped
// when a cluster is either put offline of deleted. It is assumed that some controller keeps an eye on the cluster list
// and thus the clusters in ETCD are up to date.
type FederatedInformer interface {
FederationView
// Returns a store created over all stores from target informers.
GetTargetStore() FederatedReadOnlyStore
// Starts all the processes.
Start()
// Stops all the processes inside the informer.
Stop()
}
// A function that should be used to create an informer on the target object. Store should use
// framework.DeletionHandlingMetaNamespaceKeyFunc as a keying function.
type TargetInformerFactory func(*federation_api.Cluster, federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface)
// Builds a FederatedInformer for the given federation client and factory.
func NewFederatedInformer(federationClient federation_release_1_4.Interface, targetInformerFactory TargetInformerFactory) FederatedInformer {
federatedInformer := &federatedInformerImpl{
targetInformerFactory: targetInformerFactory,
clientFactory: func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) {
clusterConfig, err := BuildClusterConfig(cluster)
if err != nil && clusterConfig != nil {
clientset := federation_release_1_4.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, userAgentName))
return clientset, nil
}
return nil, err
},
targetInformers: make(map[string]informer),
}
federatedInformer.clusterInformer.store, federatedInformer.clusterInformer.controller = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return federationClient.Federation().Clusters().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return federationClient.Federation().Clusters().Watch(options)
},
},
&federation_api.Cluster{},
clusterSyncPeriod,
framework.ResourceEventHandlerFuncs{
DeleteFunc: func(old interface{}) {
oldCluster, ok := old.(*federation_api.Cluster)
if ok {
federatedInformer.deleteCluster(oldCluster)
}
},
AddFunc: func(cur interface{}) {
curCluster, ok := cur.(*federation_api.Cluster)
if ok && isClusterReady(curCluster) {
federatedInformer.addCluster(curCluster)
}
},
UpdateFunc: func(old, cur interface{}) {
oldCluster, ok := old.(*federation_api.Cluster)
if !ok {
return
}
curCluster, ok := cur.(*federation_api.Cluster)
if !ok {
return
}
if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) {
federatedInformer.deleteCluster(oldCluster)
if isClusterReady(curCluster) {
federatedInformer.addCluster(curCluster)
}
}
},
},
)
return federatedInformer
}
func isClusterReady(cluster *federation_api.Cluster) bool {
for _, condition := range cluster.Status.Conditions {
if condition.Type == federation_api.ClusterReady {
if condition.Status == api_v1.ConditionTrue {
return true
}
}
}
return false
}
type informer struct {
controller framework.ControllerInterface
store cache.Store
stopChan chan struct{}
}
type federatedInformerImpl struct {
sync.Mutex
// Informer on federated clusters.
clusterInformer informer
// Target informers factory
targetInformerFactory TargetInformerFactory
// Structures returned by targetInformerFactory
targetInformers map[string]informer
// A function to build clients.
clientFactory func(*federation_api.Cluster) (federation_release_1_4.Interface, error)
}
type federatedStoreImpl struct {
federatedInformer *federatedInformerImpl
}
func (f *federatedInformerImpl) Stop() {
f.Lock()
defer f.Unlock()
close(f.clusterInformer.stopChan)
for _, informer := range f.targetInformers {
close(informer.stopChan)
}
}
func (f *federatedInformerImpl) Start() {
f.Lock()
defer f.Unlock()
f.clusterInformer.stopChan = make(chan struct{})
go f.clusterInformer.controller.Run(f.clusterInformer.stopChan)
}
// GetClientsetForCluster returns a clientset for the cluster, if present.
func (f *federatedInformerImpl) GetClientsetForCluster(clusterName string) (federation_release_1_4.Interface, error) {
f.Lock()
defer f.Unlock()
return f.getClientsetForClusterUnlocked(clusterName)
}
func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName string) (federation_release_1_4.Interface, error) {
// No locking needed. Will happen in f.GetCluster.
if cluster, found, err := f.getReadyClusterUnlocked(clusterName); found && err == nil {
return f.clientFactory(cluster)
} else {
if err != nil {
return nil, err
}
}
return nil, fmt.Errorf("cluster %s not found", clusterName)
}
// GetReadyClusers returns all clusters for which the sub-informers are run.
func (f *federatedInformerImpl) GetReadyClusters() ([]*federation_api.Cluster, error) {
f.Lock()
defer f.Unlock()
items := f.clusterInformer.store.List()
result := make([]*federation_api.Cluster, 0, len(items))
for _, item := range items {
if cluster, ok := item.(*federation_api.Cluster); ok {
if isClusterReady(cluster) {
result = append(result, cluster)
}
} else {
return nil, fmt.Errorf("wrong data in FederatedInformerImpl cluster store: %v", item)
}
}
return result, nil
}
// GetCluster returns the cluster with the given name, if found.
func (f *federatedInformerImpl) GetReadyCluster(name string) (*federation_api.Cluster, bool, error) {
f.Lock()
defer f.Unlock()
return f.getReadyClusterUnlocked(name)
}
func (f *federatedInformerImpl) getReadyClusterUnlocked(name string) (*federation_api.Cluster, bool, error) {
if obj, exist, err := f.clusterInformer.store.GetByKey(name); exist && err == nil {
if cluster, ok := obj.(*federation_api.Cluster); ok {
if isClusterReady(cluster) {
return cluster, true, nil
}
return nil, false, nil
}
return nil, false, fmt.Errorf("wrong data in FederatedInformerImpl cluster store: %v", obj)
} else {
return nil, false, err
}
}
// Synced returns true if the view is synced (for the first time)
func (f *federatedInformerImpl) ClustersSynced() bool {
f.Lock()
defer f.Unlock()
return f.clusterInformer.controller.HasSynced()
}
// Adds the given cluster to federated informer.
func (f *federatedInformerImpl) addCluster(cluster *federation_api.Cluster) {
f.Lock()
defer f.Unlock()
name := cluster.Name
if client, err := f.getClientsetForClusterUnlocked(name); err == nil {
store, controller := f.targetInformerFactory(cluster, client)
targetInformer := informer{
controller: controller,
store: store,
stopChan: make(chan struct{}),
}
f.targetInformers[name] = targetInformer
go targetInformer.controller.Run(targetInformer.stopChan)
} else {
// TODO: create also an event for cluster.
glog.Errorf("Failed to create a client for cluster: %v", err)
}
}
// Removes the cluster from federated informer.
func (f *federatedInformerImpl) deleteCluster(cluster *federation_api.Cluster) {
f.Lock()
defer f.Unlock()
name := cluster.Name
if targetInformer, found := f.targetInformers[name]; found {
close(targetInformer.stopChan)
}
delete(f.targetInformers, name)
}
// Returns a store created over all stores from target informers.
func (f *federatedInformerImpl) GetTargetStore() FederatedReadOnlyStore {
return &federatedStoreImpl{
federatedInformer: f,
}
}
// Returns all items in the store.
func (fs *federatedStoreImpl) List() ([]interface{}, error) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
result := make([]interface{}, 0)
for _, targetInformer := range fs.federatedInformer.targetInformers {
values := targetInformer.store.List()
result = append(result, values...)
}
return result, nil
}
// GetByKey returns the item stored under the given key in the specified cluster (if exist).
func (fs *federatedStoreImpl) GetByKey(clusterName string, key string) (interface{}, bool, error) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
if targetInformer, found := fs.federatedInformer.targetInformers[clusterName]; found {
return targetInformer.store.GetByKey(key)
}
return nil, false, nil
}
// Returns the items stored under the given key in all clusters.
func (fs *federatedStoreImpl) GetFromAllClusters(key string) ([]interface{}, error) {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
result := make([]interface{}, 0)
for _, targetInformer := range fs.federatedInformer.targetInformers {
value, exist, err := targetInformer.store.GetByKey(key)
if err != nil {
return nil, err
}
if exist {
result = append(result, value)
}
}
return result, nil
}
// GetKey for returns the key under which the item would be put in the store.
func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string {
// TODO: support other keying functions.
key, _ := framework.DeletionHandlingMetaNamespaceKeyFunc(item)
return key
}
// Checks whether stores for all clusters form the lists (and only these) are there and
// are synced.
func (fs *federatedStoreImpl) ClustersSynced(clusters []*federation_api.Cluster) bool {
fs.federatedInformer.Lock()
defer fs.federatedInformer.Unlock()
if len(fs.federatedInformer.targetInformers) != len(clusters) {
return false
}
for _, cluster := range clusters {
if targetInformer, found := fs.federatedInformer.targetInformers[cluster.Name]; found {
if !targetInformer.controller.HasSynced() {
return false
}
} else {
return false
}
}
return true
}

View File

@ -0,0 +1,132 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
"time"
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4"
fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake"
api "k8s.io/kubernetes/pkg/api"
api_v1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/stretchr/testify/assert"
)
// Basic test for Federated Informer. Checks whether the subinformer are added and deleted
// when the corresponding cluster entries appear and dissapear from etcd.
func TestFederatedInformer(t *testing.T) {
fakeClient := &fake_federation_release_1_4.Clientset{}
// Add a single cluster to federation and remove it when needed.
cluster := federation_api.Cluster{
ObjectMeta: api_v1.ObjectMeta{
Name: "mycluster",
},
Status: federation_api.ClusterStatus{
Conditions: []federation_api.ClusterCondition{
{Type: federation_api.ClusterReady, Status: api_v1.ConditionTrue},
},
},
}
fakeClient.AddReactor("list", "clusters", func(action core.Action) (bool, runtime.Object, error) {
return true, &federation_api.ClusterList{Items: []federation_api.Cluster{cluster}}, nil
})
deleteChan := make(chan struct{})
fakeClient.AddWatchReactor("clusters", func(action core.Action) (bool, watch.Interface, error) {
fakeWatch := watch.NewFake()
go func() {
<-deleteChan
fakeWatch.Delete(&cluster)
}()
return true, fakeWatch, nil
})
// There is a single service ns1/s1 in cluster mycluster.
service := api_v1.Service{
ObjectMeta: api_v1.ObjectMeta{
Namespace: "ns1",
Name: "s1",
},
}
fakeClient.AddReactor("list", "services", func(action core.Action) (bool, runtime.Object, error) {
return true, &api_v1.ServiceList{Items: []api_v1.Service{service}}, nil
})
fakeClient.AddWatchReactor("services", func(action core.Action) (bool, watch.Interface, error) {
return true, watch.NewFake(), nil
})
targetInformerFactory := func(cluster *federation_api.Cluster, clientset federation_release_1_4.Interface) (cache.Store, framework.ControllerInterface) {
return framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return clientset.Core().Services(api_v1.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Services(api_v1.NamespaceAll).Watch(options)
},
},
&api_v1.Service{},
10*time.Second,
framework.ResourceEventHandlerFuncs{})
}
informer := NewFederatedInformer(fakeClient, targetInformerFactory).(*federatedInformerImpl)
informer.clientFactory = func(cluster *federation_api.Cluster) (federation_release_1_4.Interface, error) {
return fakeClient, nil
}
assert.NotNil(t, informer)
informer.Start()
// Wait until mycluster is synced.
for !informer.GetTargetStore().ClustersSynced([]*federation_api.Cluster{&cluster}) {
time.Sleep(time.Millisecond * 100)
}
readyClusters, err := informer.GetReadyClusters()
assert.NoError(t, err)
assert.Contains(t, readyClusters, &cluster)
serviceList, err := informer.GetTargetStore().List()
assert.NoError(t, err)
assert.Contains(t, serviceList, &service)
service1, found, err := informer.GetTargetStore().GetByKey("mycluster", "ns1/s1")
assert.NoError(t, err)
assert.True(t, found)
assert.EqualValues(t, &service, service1)
// All checked, lets delete the cluster.
deleteChan <- struct{}{}
for !informer.GetTargetStore().ClustersSynced([]*federation_api.Cluster{}) {
time.Sleep(time.Millisecond * 100)
}
readyClusters, err = informer.GetReadyClusters()
assert.NoError(t, err)
assert.Empty(t, readyClusters)
serviceList, err = informer.GetTargetStore().List()
assert.NoError(t, err)
assert.Empty(t, serviceList)
// Test complete.
informer.Stop()
}