Merge pull request #31600 from quinton-hoole/2016-08-10-fed-ingress-ctrl-share-uid-config

Automatic merge from submit-queue

Federated Ingress: unify UID's across Cluster Ingress Controllers
This commit is contained in:
Kubernetes Submit Queue 2016-09-13 11:48:18 -07:00 committed by GitHub
commit 0d3799b8e2
7 changed files with 590 additions and 154 deletions

View File

@ -17,7 +17,9 @@ limitations under the License.
package ingress package ingress
import ( import (
"fmt"
"reflect" "reflect"
"sync"
"time" "time"
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
@ -25,12 +27,14 @@ import (
"k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4" kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
pkg_runtime "k8s.io/kubernetes/pkg/runtime" pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/flowcontrol"
@ -40,24 +44,40 @@ import (
) )
const ( const (
allClustersKey = "ALL_CLUSTERS" // Special cluster name which denotes all clusters - only used internally. It's not a valid cluster name, so effectively reserved.
staticIPAnnotationKey = "ingress.kubernetes.io/static-ip" // TODO: Get this directly from the Kubernetes Ingress Controller constant allClustersKey = ".ALL_CLUSTERS"
// TODO: Get the constants below directly from the Kubernetes Ingress Controller constants - but thats in a separate repo
staticIPNameKeyWritable = "kubernetes.io/ingress.global-static-ip-name" // The writable annotation on Ingress to tell the controller to use a specific, named, static IP
staticIPNameKeyReadonly = "static-ip" // The readonly key via which the cluster's Ingress Controller communicates which static IP it used. If staticIPNameKeyWritable above is specified, it is used.
uidAnnotationKey = "kubernetes.io/ingress.uid" // The annotation on federation clusters, where we store the ingress UID
uidConfigMapName = "ingress-uid" // Name of the config-map and key the ingress controller stores its uid in.
uidConfigMapNamespace = "kube-system"
uidKey = "uid"
) )
type IngressController struct { type IngressController struct {
// For triggering single ingress reconciliation. This is used when there is an sync.Mutex // Lock used for leader election
// For triggering single ingress reconcilation. This is used when there is an
// add/update/delete operation on an ingress in either federated API server or // add/update/delete operation on an ingress in either federated API server or
// in some member of the federation. // in some member of the federation.
ingressDeliverer *util.DelayingDeliverer ingressDeliverer *util.DelayingDeliverer
// For triggering reconciliation of all ingresses. This is used when // For triggering reconcilation of cluster ingress controller configmap and
// a new cluster becomes available. // all ingresses. This is used when a new cluster becomes available.
clusterDeliverer *util.DelayingDeliverer clusterDeliverer *util.DelayingDeliverer
// For triggering reconcilation of cluster ingress controller configmap.
// This is used when a configmap is updated in the cluster.
configMapDeliverer *util.DelayingDeliverer
// Contains ingresses present in members of federation. // Contains ingresses present in members of federation.
ingressFederatedInformer util.FederatedInformer ingressFederatedInformer util.FederatedInformer
// For updating members of federation. // Contains ingress controller configmaps present in members of federation.
federatedUpdater util.FederatedUpdater configMapFederatedInformer util.FederatedInformer
// For updating ingresses in members of federation.
federatedIngressUpdater util.FederatedUpdater
// For updating configmaps in members of federation.
federatedConfigMapUpdater util.FederatedUpdater
// Definitions of ingresses that should be federated. // Definitions of ingresses that should be federated.
ingressInformerStore cache.Store ingressInformerStore cache.Store
// Informer controller for ingresses that should be federated. // Informer controller for ingresses that should be federated.
@ -68,11 +88,14 @@ type IngressController struct {
// Backoff manager for ingresses // Backoff manager for ingresses
ingressBackoff *flowcontrol.Backoff ingressBackoff *flowcontrol.Backoff
// Backoff manager for configmaps
configMapBackoff *flowcontrol.Backoff
// For events // For events
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
ingressReviewDelay time.Duration ingressReviewDelay time.Duration
configMapReviewDelay time.Duration
clusterAvailableDelay time.Duration clusterAvailableDelay time.Duration
smallDelay time.Duration smallDelay time.Duration
updateTimeout time.Duration updateTimeout time.Duration
@ -80,23 +103,26 @@ type IngressController struct {
// NewIngressController returns a new ingress controller // NewIngressController returns a new ingress controller
func NewIngressController(client federation_release_1_4.Interface) *IngressController { func NewIngressController(client federation_release_1_4.Interface) *IngressController {
glog.V(4).Infof("->NewIngressController V(4)")
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client)) broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client))
recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-ingress-controller"}) recorder := broadcaster.NewRecorder(api.EventSource{Component: "federated-ingress-controller"})
ic := &IngressController{ ic := &IngressController{
federatedApiClient: client, federatedApiClient: client,
ingressReviewDelay: time.Second * 10, ingressReviewDelay: time.Second * 10,
configMapReviewDelay: time.Second * 10,
clusterAvailableDelay: time.Second * 20, clusterAvailableDelay: time.Second * 20,
smallDelay: time.Second * 3, smallDelay: time.Second * 3,
updateTimeout: time.Second * 30, updateTimeout: time.Second * 30,
ingressBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute), ingressBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
eventRecorder: recorder, eventRecorder: recorder,
configMapBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
} }
// Build deliverers for triggering reconciliations. // Build deliverers for triggering reconciliations.
ic.ingressDeliverer = util.NewDelayingDeliverer() ic.ingressDeliverer = util.NewDelayingDeliverer()
ic.clusterDeliverer = util.NewDelayingDeliverer() ic.clusterDeliverer = util.NewDelayingDeliverer()
ic.configMapDeliverer = util.NewDelayingDeliverer()
// Start informer in federated API servers on ingresses that should be federated. // Start informer in federated API servers on ingresses that should be federated.
ic.ingressInformerStore, ic.ingressInformerController = framework.NewInformer( ic.ingressInformerStore, ic.ingressInformerController = framework.NewInformer(
@ -142,24 +168,72 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr
&util.ClusterLifecycleHandlerFuncs{ &util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) { ClusterAvailable: func(cluster *federation_api.Cluster) {
// When new cluster becomes available process all the ingresses again. // When new cluster becomes available process all the ingresses again, and configure it's ingress controller's configmap with the correct UID
ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay)) ic.clusterDeliverer.DeliverAfter(cluster.Name, cluster, ic.clusterAvailableDelay)
}, },
}, },
) )
// Federated updater along with Create/Update/Delete operations. // Federated informer on configmaps for ingress controllers in members of the federation.
ic.federatedUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer, ic.configMapFederatedInformer = util.NewFederatedInformer(
client,
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) {
glog.V(4).Infof("Returning new informer for cluster %q", cluster.Name)
return framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
if targetClient == nil {
glog.Errorf("Internal error: targetClient is nil")
}
return targetClient.Core().ConfigMaps(uidConfigMapNamespace).List(options) // we only want to list one by name - unfortunately Kubernetes don't have a selector for that.
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
if targetClient == nil {
glog.Errorf("Internal error: targetClient is nil")
}
return targetClient.Core().ConfigMaps(uidConfigMapNamespace).Watch(options) // as above
},
},
&v1.ConfigMap{},
controller.NoResyncPeriodFunc(),
// Trigger reconcilation whenever the ingress controller's configmap in a federated cluster is changed. In most cases it
// would be just confirmation that the configmap for the ingress controller is correct.
util.NewTriggerOnAllChanges(
func(obj pkg_runtime.Object) {
ic.deliverConfigMapObj(cluster.Name, obj, ic.configMapReviewDelay, false)
},
))
},
&util.ClusterLifecycleHandlerFuncs{
ClusterAvailable: func(cluster *federation_api.Cluster) {
ic.clusterDeliverer.DeliverAfter(cluster.Name, cluster, ic.clusterAvailableDelay)
},
},
)
// Federated ingress updater along with Create/Update/Delete operations.
ic.federatedIngressUpdater = util.NewFederatedUpdater(ic.ingressFederatedInformer,
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
ingress := obj.(*extensions_v1beta1.Ingress) ingress := obj.(*extensions_v1beta1.Ingress)
glog.V(4).Infof("Attempting to create Ingress: %v", ingress) glog.V(4).Infof("Attempting to create Ingress: %v", ingress)
_, err := client.Extensions().Ingresses(ingress.Namespace).Create(ingress) _, err := client.Extensions().Ingresses(ingress.Namespace).Create(ingress)
if err != nil {
glog.Errorf("Error creating ingress %q: %v", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}, err)
} else {
glog.V(4).Infof("Successfully created ingress %q", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace})
}
return err return err
}, },
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
ingress := obj.(*extensions_v1beta1.Ingress) ingress := obj.(*extensions_v1beta1.Ingress)
glog.V(4).Infof("Attempting to update Ingress: %v", ingress) glog.V(4).Infof("Attempting to update Ingress: %v", ingress)
_, err := client.Extensions().Ingresses(ingress.Namespace).Update(ingress) _, err := client.Extensions().Ingresses(ingress.Namespace).Update(ingress)
if err != nil {
glog.V(4).Infof("Failed to update Ingress: %v", err)
} else {
glog.V(4).Infof("Successfully updated Ingress: %q", types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace})
}
return err return err
}, },
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error { func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
@ -168,6 +242,35 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr
err := client.Extensions().Ingresses(ingress.Namespace).Delete(ingress.Name, &api.DeleteOptions{}) err := client.Extensions().Ingresses(ingress.Namespace).Delete(ingress.Name, &api.DeleteOptions{})
return err return err
}) })
// Federated configmap updater along with Create/Update/Delete operations. Only Update should ever be called.
ic.federatedConfigMapUpdater = util.NewFederatedUpdater(ic.configMapFederatedInformer,
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
configMap := obj.(*v1.ConfigMap)
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.Errorf("Internal error: Incorrectly attempting to create ConfigMap: %q", configMapName)
_, err := client.Core().ConfigMaps(configMap.Namespace).Create(configMap)
return err
},
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
configMap := obj.(*v1.ConfigMap)
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.V(4).Infof("Attempting to update ConfigMap: %v", configMap)
_, err := client.Core().ConfigMaps(configMap.Namespace).Update(configMap)
if err == nil {
glog.V(4).Infof("Successfully updated ConfigMap %q", configMapName)
} else {
glog.V(4).Infof("Failed to update ConfigMap %q: %v", configMapName, err)
}
return err
},
func(client kube_release_1_4.Interface, obj pkg_runtime.Object) error {
configMap := obj.(*v1.ConfigMap)
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.Errorf("Internal error: Incorrectly attempting to delete ConfigMap: %q", configMapName)
err := client.Core().ConfigMaps(configMap.Namespace).Delete(configMap.Name, &api.DeleteOptions{})
return err
})
return ic return ic
} }
@ -176,25 +279,46 @@ func (ic *IngressController) Run(stopChan <-chan struct{}) {
go ic.ingressInformerController.Run(stopChan) go ic.ingressInformerController.Run(stopChan)
glog.Infof("... Starting Ingress Federated Informer") glog.Infof("... Starting Ingress Federated Informer")
ic.ingressFederatedInformer.Start() ic.ingressFederatedInformer.Start()
glog.Infof("... Starting ConfigMap Federated Informer")
ic.configMapFederatedInformer.Start()
go func() { go func() {
<-stopChan <-stopChan
glog.Infof("Stopping Ingress Controller") glog.Infof("Stopping Ingress Federated Informer")
ic.ingressFederatedInformer.Stop() ic.ingressFederatedInformer.Stop()
glog.Infof("Stopping ConfigMap Federated Informer")
ic.configMapFederatedInformer.Stop()
}() }()
ic.ingressDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) { ic.ingressDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
ingress := item.Value.(types.NamespacedName) ingress := item.Value.(types.NamespacedName)
glog.V(4).Infof("Ingress change delivered, reconciling: %v", ingress) glog.V(4).Infof("Ingress change delivered, reconciling: %v", ingress)
ic.reconcileIngress(ingress) ic.reconcileIngress(ingress)
}) })
ic.clusterDeliverer.StartWithHandler(func(_ *util.DelayingDelivererItem) { ic.clusterDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
glog.V(4).Infof("Cluster change delivered, reconciling ingresses") clusterName := item.Key
ic.reconcileIngressesOnClusterChange() if clusterName != allClustersKey {
glog.V(4).Infof("Cluster change delivered for cluster %q, reconciling configmap and ingress for that cluster", clusterName)
} else {
glog.V(4).Infof("Cluster change delivered for all clusters, reconciling configmaps and ingresses for all clusters")
}
ic.reconcileIngressesOnClusterChange(clusterName)
ic.reconcileConfigMapForCluster(clusterName)
})
ic.configMapDeliverer.StartWithHandler(func(item *util.DelayingDelivererItem) {
clusterName := item.Key
if clusterName != allClustersKey {
glog.V(4).Infof("ConfigMap change delivered for cluster %q, reconciling configmap for that cluster", clusterName)
} else {
glog.V(4).Infof("ConfigMap change delivered for all clusters, reconciling configmaps for all clusters")
}
ic.reconcileConfigMapForCluster(clusterName)
}) })
go func() { go func() {
select { select {
case <-time.After(time.Minute): case <-time.After(time.Minute):
glog.V(4).Infof("Ingress controller is garbage collecting") glog.V(4).Infof("Ingress controller is garbage collecting")
ic.ingressBackoff.GC() ic.ingressBackoff.GC()
ic.configMapBackoff.GC()
glog.V(4).Infof("Ingress controller garbage collection complete")
case <-stopChan: case <-stopChan:
return return
} }
@ -218,40 +342,240 @@ func (ic *IngressController) deliverIngress(ingress types.NamespacedName, delay
ic.ingressDeliverer.DeliverAfter(key, ingress, delay) ic.ingressDeliverer.DeliverAfter(key, ingress, delay)
} }
func (ic *IngressController) deliverConfigMapObj(clusterName string, obj interface{}, delay time.Duration, failed bool) {
configMap := obj.(*v1.ConfigMap)
ic.deliverConfigMap(clusterName, types.NamespacedName{Namespace: configMap.Namespace, Name: configMap.Name}, delay, failed)
}
func (ic *IngressController) deliverConfigMap(cluster string, configMap types.NamespacedName, delay time.Duration, failed bool) {
key := cluster
if failed {
ic.configMapBackoff.Next(key, time.Now())
delay = delay + ic.configMapBackoff.Get(key)
} else {
ic.configMapBackoff.Reset(key)
}
glog.V(4).Infof("Delivering ConfigMap for cluster %q (delay %q): %s", cluster, delay, configMap)
ic.configMapDeliverer.DeliverAfter(key, configMap, delay)
}
// Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet // Check whether all data stores are in sync. False is returned if any of the informer/stores is not yet
// synced with the corresponding api server. // synced with the corresponding api server.
func (ic *IngressController) isSynced() bool { func (ic *IngressController) isSynced() bool {
if !ic.ingressFederatedInformer.ClustersSynced() { if !ic.ingressFederatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced") glog.V(2).Infof("Cluster list not synced for ingress federated informer")
return false return false
} }
clusters, err := ic.ingressFederatedInformer.GetReadyClusters() clusters, err := ic.ingressFederatedInformer.GetReadyClusters()
if err != nil { if err != nil {
glog.Errorf("Failed to get ready clusters: %v", err) glog.Errorf("Failed to get ready clusters for ingress federated informer: %v", err)
return false return false
} }
if !ic.ingressFederatedInformer.GetTargetStore().ClustersSynced(clusters) { if !ic.ingressFederatedInformer.GetTargetStore().ClustersSynced(clusters) {
glog.V(2).Infof("Target store not synced") glog.V(2).Infof("Target store not synced for ingress federated informer")
return false
}
if !ic.configMapFederatedInformer.ClustersSynced() {
glog.V(2).Infof("Cluster list not synced for config map federated informer")
return false
}
clusters, err = ic.configMapFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters for configmap federated informer: %v", err)
return false
}
if !ic.configMapFederatedInformer.GetTargetStore().ClustersSynced(clusters) {
glog.V(2).Infof("Target store not synced for configmap federated informer")
return false return false
} }
glog.V(4).Infof("Cluster list is synced") glog.V(4).Infof("Cluster list is synced")
return true return true
} }
// The function triggers reconciliation of all federated ingresses. // The function triggers reconcilation of all federated ingresses. clusterName is the name of the cluster that changed
func (ic *IngressController) reconcileIngressesOnClusterChange() { // but all ingresses in all clusters are reconciled
glog.V(4).Infof("Reconciling ingresses on cluster change") func (ic *IngressController) reconcileIngressesOnClusterChange(clusterName string) {
glog.V(4).Infof("Reconciling ingresses on cluster change for cluster %q", clusterName)
if !ic.isSynced() { if !ic.isSynced() {
ic.clusterDeliverer.DeliverAt(allClustersKey, nil, time.Now().Add(ic.clusterAvailableDelay)) glog.V(4).Infof("Not synced, will try again later to reconcile ingresses.")
ic.clusterDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay)
} }
for _, obj := range ic.ingressInformerStore.List() { ingressList := ic.ingressInformerStore.List()
if len(ingressList) <= 0 {
glog.V(4).Infof("No federated ingresses to reconcile.")
}
for _, obj := range ingressList {
ingress := obj.(*extensions_v1beta1.Ingress) ingress := obj.(*extensions_v1beta1.Ingress)
ic.deliverIngress(types.NamespacedName{Namespace: ingress.Namespace, Name: ingress.Name}, ic.smallDelay, false) nsName := types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace}
glog.V(4).Infof("Delivering federated ingress %q for cluster %q", nsName, clusterName)
ic.deliverIngress(nsName, ic.smallDelay, false)
}
}
/*
reconcileConfigMapForCluster ensures that the configmap for the ingress controller in the cluster has objectmeta.data.UID
consistent with all the other clusters in the federation. If clusterName == allClustersKey, then all avaliable clusters
configmaps are reconciled.
*/
func (ic *IngressController) reconcileConfigMapForCluster(clusterName string) {
glog.V(4).Infof("Reconciling ConfigMap for cluster(s) %q", clusterName)
if !ic.isSynced() {
ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay)
return
}
if clusterName == allClustersKey {
clusters, err := ic.configMapFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get ready clusters. redelivering %q: %v", clusterName, err)
ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay)
return
}
for _, cluster := range clusters {
glog.V(4).Infof("Delivering ConfigMap for cluster(s) %q", clusterName)
ic.configMapDeliverer.DeliverAt(cluster.Name, nil, time.Now())
}
return
} else {
cluster, found, err := ic.configMapFederatedInformer.GetReadyCluster(clusterName)
if err != nil || !found {
glog.Errorf("Internal error: Cluster %q queued for configmap reconciliation, but not found. Will try again later: error = %v", clusterName, err)
ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.clusterAvailableDelay)
return
}
uidConfigMapNamespacedName := types.NamespacedName{Name: uidConfigMapName, Namespace: uidConfigMapNamespace}
configMapObj, found, err := ic.configMapFederatedInformer.GetTargetStore().GetByKey(cluster.Name, uidConfigMapNamespacedName.String())
if !found || err != nil {
glog.Errorf("Failed to get ConfigMap %q for cluster %q. Will try again later: %v", uidConfigMapNamespacedName, cluster.Name, err)
ic.configMapDeliverer.DeliverAfter(clusterName, nil, ic.configMapReviewDelay)
return
}
glog.V(4).Infof("Successfully got ConfigMap %q for cluster %q.", uidConfigMapNamespacedName, clusterName)
configMap, ok := configMapObj.(*v1.ConfigMap)
if !ok {
glog.Errorf("Internal error: The object in the ConfigMap cache for cluster %q configmap %q is not a *ConfigMap", cluster.Name, uidConfigMapNamespacedName)
return
}
ic.reconcileConfigMap(cluster, configMap)
return
}
}
/*
reconcileConfigMap ensures that the configmap in the cluster has a UID
consistent with the federation cluster's associated annotation.
1. If the UID in the configmap differs from the UID stored in the cluster's annotation, the configmap is updated.
2. If the UID annotation is missing from the cluster, the cluster's UID annotation is updated to be consistent
with the master cluster.
3. If there is no elected master cluster, this cluster attempts to elect itself as the master cluster.
In cases 2 and 3, the configmaps will be updated in the next cycle, triggered by the federation cluster update(s)
*/
func (ic *IngressController) reconcileConfigMap(cluster *federation_api.Cluster, configMap *v1.ConfigMap) {
ic.Lock() // TODO: Reduce the scope of this master election lock.
defer ic.Unlock()
configMapNsName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.V(4).Infof("Reconciling ConfigMap %q in cluster %q", configMapNsName, cluster.Name)
clusterIngressUID, clusterIngressUIDExists := cluster.ObjectMeta.Annotations[uidAnnotationKey]
configMapUID, ok := configMap.Data[uidKey]
if !ok {
glog.Errorf("Warning: ConfigMap %q in cluster %q does not contain data key %q. Therefore it cannot become the master.", configMapNsName, cluster.Name, uidKey)
}
if !clusterIngressUIDExists || clusterIngressUID == "" {
ic.updateClusterIngressUIDToMasters(cluster, configMapUID) // Second argument is the fallback, in case this is the only cluster, in which case it becomes the master
return
}
if configMapUID != clusterIngressUID { // An update is required
glog.V(4).Infof("Ingress UID update is required: configMapUID %q not equal to clusterIngressUID %q", configMapUID, clusterIngressUID)
configMap.Data[uidKey] = clusterIngressUID
operations := []util.FederatedOperation{{
Type: util.OperationTypeUpdate,
Obj: configMap,
ClusterName: cluster.Name,
}}
glog.V(4).Infof("Calling federatedConfigMapUpdater.Update() - operations: %v", operations)
err := ic.federatedConfigMapUpdater.Update(operations, ic.updateTimeout)
if err != nil {
configMapName := types.NamespacedName{Name: configMap.Name, Namespace: configMap.Namespace}
glog.Errorf("Failed to execute update of ConfigMap %q on cluster %q: %v", configMapName, cluster.Name, err)
ic.configMapDeliverer.DeliverAfter(cluster.Name, nil, ic.configMapReviewDelay)
}
} else {
glog.V(4).Infof("Ingress UID update is not required: configMapUID %q is equal to clusterIngressUID %q", configMapUID, clusterIngressUID)
}
}
/*
getMasterCluster returns the cluster which is the elected master w.r.t. ingress UID, and it's ingress UID.
If there is no elected master cluster, an error is returned.
All other clusters must use the ingress UID of the elected master.
*/
func (ic *IngressController) getMasterCluster() (master *federation_api.Cluster, ingressUID string, err error) {
clusters, err := ic.configMapFederatedInformer.GetReadyClusters()
if err != nil {
glog.Errorf("Failed to get cluster list: %v", err)
return nil, "", err
}
for _, c := range clusters {
UID, exists := c.ObjectMeta.Annotations[uidAnnotationKey]
if exists && UID != "" { // Found the master cluster
glog.V(4).Infof("Found master cluster %q with annotation %q=%q", c.Name, uidAnnotationKey, UID)
return c, UID, nil
}
}
return nil, "", fmt.Errorf("Failed to find master cluster with annotation %q", uidAnnotationKey)
}
/*
updateClusterIngressUIDToMasters takes the ingress UID annotation on the master cluster and applies it to cluster.
If there is no master cluster, then fallbackUID is used (and hence this cluster becomes the master).
*/
func (ic *IngressController) updateClusterIngressUIDToMasters(cluster *federation_api.Cluster, fallbackUID string) {
masterCluster, masterUID, err := ic.getMasterCluster()
clusterObj, clusterErr := conversion.NewCloner().DeepCopy(cluster) // Make a clone so that we don't clobber our input param
cluster, ok := clusterObj.(*federation_api.Cluster)
if clusterErr != nil || !ok {
glog.Errorf("Internal error: Failed clone cluster resource while attempting to add master ingress UID annotation (%q = %q) from master cluster %q to cluster %q, will try again later: %v", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name, err)
return
}
if err == nil {
if masterCluster.Name != cluster.Name { // We're not the master, need to get in sync
cluster.ObjectMeta.Annotations[uidAnnotationKey] = masterUID
if _, err = ic.federatedApiClient.Federation().Clusters().Update(cluster); err != nil {
glog.Errorf("Failed to add master ingress UID annotation (%q = %q) from master cluster %q to cluster %q, will try again later: %v", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name, err)
return
} else {
glog.V(4).Infof("Successfully added master ingress UID annotation (%q = %q) from master cluster %q to cluster %q.", uidAnnotationKey, masterUID, masterCluster.Name, cluster.Name)
}
} else {
glog.V(4).Infof("Cluster %q with ingress UID is already the master with annotation (%q = %q), no need to update.", cluster.Name, uidAnnotationKey, cluster.ObjectMeta.Annotations[uidAnnotationKey])
}
} else {
glog.V(2).Infof("No master cluster found to source an ingress UID from for cluster %q. Attempting to elect new master cluster %q with ingress UID %q = %q", cluster.Name, cluster.Name, uidAnnotationKey, fallbackUID)
if fallbackUID != "" {
cluster.ObjectMeta.Annotations[uidAnnotationKey] = fallbackUID
if _, err = ic.federatedApiClient.Federation().Clusters().Update(cluster); err != nil {
glog.Errorf("Failed to add ingress UID annotation (%q = %q) to cluster %q. No master elected. Will try again later: %v", uidAnnotationKey, fallbackUID, cluster.Name, err)
} else {
glog.V(4).Infof("Successfully added ingress UID annotation (%q = %q) to cluster %q.", uidAnnotationKey, fallbackUID, cluster.Name)
}
} else {
glog.Errorf("No master cluster exists, and fallbackUID for cluster %q is invalid (%q). This probably means that no clusters have an ingress controller configmap with key %q. Federated Ingress currently supports clusters running Google Loadbalancer Controller (\"GLBC\")", cluster.Name, fallbackUID, uidKey)
}
} }
} }
func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
glog.V(4).Infof("Reconciling ingress %q", ingress) glog.V(4).Infof("Reconciling ingress %q for all clusters", ingress)
if !ic.isSynced() { if !ic.isSynced() {
ic.deliverIngress(ingress, ic.clusterAvailableDelay, false) ic.deliverIngress(ingress, ic.clusterAvailableDelay, false)
return return
@ -269,22 +593,29 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
glog.V(4).Infof("Ingress %q is not federated. Ignoring.", ingress) glog.V(4).Infof("Ingress %q is not federated. Ignoring.", ingress)
return return
} }
baseIngress := baseIngressObj.(*extensions_v1beta1.Ingress) baseIngress, ok := baseIngressObj.(*extensions_v1beta1.Ingress)
if !ok {
glog.Errorf("Internal Error: Object retrieved from ingressInformerStore with key %q is not of correct type *extensions_v1beta1.Ingress: %v", key, baseIngressObj)
} else {
glog.V(4).Infof("Base (federated) ingress: %v", baseIngress)
}
clusters, err := ic.ingressFederatedInformer.GetReadyClusters() clusters, err := ic.ingressFederatedInformer.GetReadyClusters()
if err != nil { if err != nil {
glog.Errorf("Failed to get cluster list: %v", err) glog.Errorf("Failed to get cluster list: %v", err)
ic.deliverIngress(ingress, ic.clusterAvailableDelay, false) ic.deliverIngress(ingress, ic.clusterAvailableDelay, false)
return return
} else {
glog.V(4).Infof("Found %d ready clusters across which to reconcile ingress %q", len(clusters), ingress)
} }
operations := make([]util.FederatedOperation, 0) operations := make([]util.FederatedOperation, 0)
for clusterIndex, cluster := range clusters { for clusterIndex, cluster := range clusters {
_, baseIPExists := baseIngress.ObjectMeta.Annotations[staticIPAnnotationKey] baseIPName, baseIPAnnotationExists := baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable]
clusterIngressObj, found, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) clusterIngressObj, clusterIngressFound, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key)
if err != nil { if err != nil {
glog.Errorf("Failed to get %s from %s: %v", ingress, cluster.Name, err) glog.Errorf("Failed to get cached ingress %s for cluster %s, will retry: %v", ingress, cluster.Name, err)
ic.deliverIngress(ingress, 0, true) ic.deliverIngress(ingress, 0, true)
return return
} }
@ -292,8 +623,26 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
ObjectMeta: baseIngress.ObjectMeta, ObjectMeta: baseIngress.ObjectMeta,
Spec: baseIngress.Spec, Spec: baseIngress.Spec,
} }
objMeta, err := conversion.NewCloner().DeepCopy(baseIngress.ObjectMeta)
if err != nil {
glog.Errorf("Error deep copying ObjectMeta: %v", err)
}
objSpec, err := conversion.NewCloner().DeepCopy(baseIngress.Spec)
if err != nil {
glog.Errorf("Error deep copying Spec: %v", err)
}
desiredIngress.ObjectMeta, ok = objMeta.(v1.ObjectMeta)
if !ok {
glog.Errorf("Internal error: Failed to cast to v1.ObjectMeta: %v", objMeta)
}
desiredIngress.Spec = objSpec.(extensions_v1beta1.IngressSpec)
if !ok {
glog.Errorf("Internal error: Failed to cast to extensions_v1beta1.IngressSpec: %v", objSpec)
}
glog.V(4).Infof("Desired Ingress: %v", desiredIngress)
if !found { if !clusterIngressFound {
glog.V(4).Infof("No existing Ingress %s in cluster %s - checking if appropriate to queue a create operation", ingress, cluster.Name)
// We can't supply server-created fields when creating a new object. // We can't supply server-created fields when creating a new object.
desiredIngress.ObjectMeta.ResourceVersion = "" desiredIngress.ObjectMeta.ResourceVersion = ""
desiredIngress.ObjectMeta.UID = "" desiredIngress.ObjectMeta.UID = ""
@ -307,41 +656,61 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
// Note: If the first cluster becomes (e.g. temporarily) unavailable, the second cluster will be allocated // Note: If the first cluster becomes (e.g. temporarily) unavailable, the second cluster will be allocated
// index 0, but eventually all ingresses will share the single global IP recorded in the annotation // index 0, but eventually all ingresses will share the single global IP recorded in the annotation
// of the federated ingress. // of the federated ingress.
if baseIPExists || (clusterIndex == 0) { if baseIPAnnotationExists || (clusterIndex == 0) {
glog.V(4).Infof("No existing Ingress %s in cluster %s (index %d) and static IP annotation (%q) on base ingress - queuing a create operation", ingress, cluster.Name, clusterIndex, staticIPNameKeyWritable)
operations = append(operations, util.FederatedOperation{ operations = append(operations, util.FederatedOperation{
Type: util.OperationTypeAdd, Type: util.OperationTypeAdd,
Obj: desiredIngress, Obj: desiredIngress,
ClusterName: cluster.Name, ClusterName: cluster.Name,
}) })
} else {
glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero. Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex)
} }
} else { } else {
clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress) clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress)
glog.V(4).Infof("Found existing Ingress %s in cluster %s - checking if update is required", ingress, cluster.Name) glog.V(4).Infof("Found existing Ingress %s in cluster %s - checking if update is required (in either direction)", ingress, cluster.Name)
clusterIPName, clusterIPExists := clusterIngress.ObjectMeta.Annotations[staticIPAnnotationKey] clusterIPName, clusterIPNameExists := clusterIngress.ObjectMeta.Annotations[staticIPNameKeyReadonly]
if !baseIPExists && clusterIPExists { baseLBStatusExists := len(baseIngress.Status.LoadBalancer.Ingress) > 0
// Add annotation to federated ingress via API. clusterLBStatusExists := len(clusterIngress.Status.LoadBalancer.Ingress) > 0
original, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Get(baseIngress.Name) logStr := fmt.Sprintf("Cluster ingress %q has annotation %q=%q, loadbalancer status exists? [%v], federated ingress has annotation %q=%q, loadbalancer status exists? [%v]. %%s annotation and/or loadbalancer status from cluster ingress to federated ingress.", ingress, staticIPNameKeyReadonly, clusterIPName, clusterLBStatusExists, staticIPNameKeyWritable, baseIPName, baseLBStatusExists)
if err == nil { if (!baseIPAnnotationExists && clusterIPNameExists) || (!baseLBStatusExists && clusterLBStatusExists) { // copy the IP name from the readonly annotation on the cluster ingress, to the writable annotation on the federated ingress
original.ObjectMeta.Annotations[staticIPAnnotationKey] = clusterIPName glog.V(4).Infof(logStr, "Transferring")
if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(original); err != nil { if !baseIPAnnotationExists && clusterIPNameExists {
glog.Errorf("Failed to add static IP annotation to federated ingress %q: %v", ingress, err) baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] = clusterIPName
}
if !baseLBStatusExists && clusterLBStatusExists {
lbstatusObj, lbErr := conversion.NewCloner().DeepCopy(&clusterIngress.Status.LoadBalancer)
lbstatus, ok := lbstatusObj.(*v1.LoadBalancerStatus)
if lbErr != nil || !ok {
glog.Errorf("Internal error: Failed to clone LoadBalancerStatus of %q in cluster %q while attempting to update master loadbalancer ingress status, will try again later. error: %v, Object to be cloned: %v", ingress, cluster.Name, lbErr, lbstatusObj)
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
return
}
baseIngress.Status.LoadBalancer = *lbstatus
}
glog.V(4).Infof("Attempting to update base federated ingress: %v", baseIngress)
if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil {
glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err)
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
return
} else {
glog.V(4).Infof("Successfully added static IP annotation to federated ingress: %q", ingress)
ic.deliverIngress(ingress, ic.smallDelay, false)
return
} }
} else { } else {
glog.Errorf("Failed to get federated ingress %q: %v", ingress, err) glog.V(4).Infof(logStr, "Not transferring")
} }
} // Update existing cluster ingress, if needed.
// Update existing ingress, if needed. if util.ObjectMetaEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) &&
if !util.ObjectMetaIsEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) || reflect.DeepEqual(desiredIngress.Spec, clusterIngress.Spec) &&
!reflect.DeepEqual(desiredIngress.Spec, clusterIngress.Spec) { reflect.DeepEqual(baseIngress.Status.LoadBalancer.Ingress, clusterIngress.Status.LoadBalancer.Ingress) {
// TODO: In some cases Ingress controllers in the clusters add annotations, so we ideally need to exclude those from glog.V(4).Infof("Ingress %q in cluster %q does not need an update: cluster ingress is equivalent to federated ingress", ingress, cluster.Name)
// the equivalence comparison to cut down on unnecessary updates. } else {
glog.V(4).Infof("Ingress %s in cluster %s needs an update: cluster ingress %v is not equivalent to federated ingress %v", ingress, cluster.Name, clusterIngress, desiredIngress) glog.V(4).Infof("Ingress %s in cluster %s needs an update: cluster ingress %v is not equivalent to federated ingress %v", ingress, cluster.Name, clusterIngress, desiredIngress)
// We need to use server-created fields from the cluster, not the desired object when updating. if !util.ObjectMetaEquivalent(desiredIngress.ObjectMeta, clusterIngress.ObjectMeta) {
desiredIngress.ObjectMeta.ResourceVersion = clusterIngress.ObjectMeta.ResourceVersion
desiredIngress.ObjectMeta.UID = clusterIngress.ObjectMeta.UID
// Merge any annotations on the federated ingress onto the underlying cluster ingress, // Merge any annotations on the federated ingress onto the underlying cluster ingress,
// overwriting duplicates. // overwriting duplicates.
// TODO: We should probably use a PATCH operation for this instead.
for key, val := range baseIngress.ObjectMeta.Annotations { for key, val := range baseIngress.ObjectMeta.Annotations {
desiredIngress.ObjectMeta.Annotations[key] = val desiredIngress.ObjectMeta.Annotations[key] = val
} }
@ -353,17 +722,21 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
Obj: desiredIngress, Obj: desiredIngress,
ClusterName: cluster.Name, ClusterName: cluster.Name,
}) })
// TODO: Transfer any readonly (target-proxy, url-map etc) annotations from the master cluster to the federation, if this is the master cluster.
// This is only for consistency, so that the federation ingress metadata matches the underlying clusters. It's not actually required.
}
} }
} }
} }
if len(operations) == 0 { if len(operations) == 0 {
// Everything is in order // Everything is in order
glog.V(4).Infof("Ingress %q is up-to-date in all clusters - no propagation to clusters required.", ingress)
return return
} }
glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations) glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations)
err = ic.federatedUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) { err = ic.federatedIngressUpdater.UpdateWithOnError(operations, ic.updateTimeout, func(op util.FederatedOperation, operror error) {
ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "FailedUpdateInCluster", ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "FailedClusterUpdate",
"Ingress update in cluster %s failed: %v", op.ClusterName, operror) "Ingress update in cluster %s failed: %v", op.ClusterName, operror)
}) })
if err != nil { if err != nil {
@ -371,7 +744,4 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
ic.deliverIngress(ingress, ic.ingressReviewDelay, true) ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
return return
} }
// Evertyhing is in order but lets be double sure - TODO: quinton: Why? This seems like a hack.
ic.deliverIngress(ingress, ic.ingressReviewDelay, false)
} }

View File

@ -24,6 +24,7 @@ import (
federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1" federation_api "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake" fake_federation_release_1_4 "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_4/fake"
"k8s.io/kubernetes/federation/pkg/federation-controller/util"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test" . "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
api_v1 "k8s.io/kubernetes/pkg/api/v1" api_v1 "k8s.io/kubernetes/pkg/api/v1"
extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions_v1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
@ -35,29 +36,40 @@ import (
) )
func TestIngressController(t *testing.T) { func TestIngressController(t *testing.T) {
fakeClusterList := federation_api.ClusterList{Items: []federation_api.Cluster{}}
fakeConfigMapList1 := api_v1.ConfigMapList{Items: []api_v1.ConfigMap{}}
fakeConfigMapList2 := api_v1.ConfigMapList{Items: []api_v1.ConfigMap{}}
cluster1 := NewCluster("cluster1", api_v1.ConditionTrue) cluster1 := NewCluster("cluster1", api_v1.ConditionTrue)
cluster2 := NewCluster("cluster2", api_v1.ConditionTrue) cluster2 := NewCluster("cluster2", api_v1.ConditionTrue)
cfg1 := NewConfigMap("foo")
cfg2 := NewConfigMap("bar") // Different UID from cfg1, so that we can check that they get reconciled.
fakeClient := &fake_federation_release_1_4.Clientset{} t.Log("Creating fake infrastructure")
RegisterFakeList("clusters", &fakeClient.Fake, &federation_api.ClusterList{Items: []federation_api.Cluster{*cluster1}}) fedClient := &fake_federation_release_1_4.Clientset{}
RegisterFakeList("ingresses", &fakeClient.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) RegisterFakeList("clusters", &fedClient.Fake, &fakeClusterList)
ingressWatch := RegisterFakeWatch("ingresses", &fakeClient.Fake) RegisterFakeList("ingresses", &fedClient.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
clusterWatch := RegisterFakeWatch("clusters", &fakeClient.Fake) fedIngressWatch := RegisterFakeWatch("ingresses", &fedClient.Fake)
clusterWatch := RegisterFakeWatch("clusters", &fedClient.Fake)
fedClusterUpdateChan := RegisterFakeCopyOnUpdate("clusters", &fedClient.Fake, clusterWatch)
fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch)
cluster1Client := &fake_kube_release_1_4.Clientset{} cluster1Client := &fake_kube_release_1_4.Clientset{}
cluster1Watch := RegisterFakeWatch("ingresses", &cluster1Client.Fake)
RegisterFakeList("ingresses", &cluster1Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) RegisterFakeList("ingresses", &cluster1Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
cluster1CreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1Watch) RegisterFakeList("configmaps", &cluster1Client.Fake, &fakeConfigMapList1)
cluster1UpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1Watch) cluster1IngressWatch := RegisterFakeWatch("ingresses", &cluster1Client.Fake)
cluster1ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster1Client.Fake)
cluster1IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
cluster2Client := &fake_kube_release_1_4.Clientset{} cluster2Client := &fake_kube_release_1_4.Clientset{}
cluster2Watch := RegisterFakeWatch("ingresses", &cluster2Client.Fake)
RegisterFakeList("ingresses", &cluster2Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}}) RegisterFakeList("ingresses", &cluster2Client.Fake, &extensions_v1beta1.IngressList{Items: []extensions_v1beta1.Ingress{}})
cluster2CreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster2Client.Fake, cluster2Watch) RegisterFakeList("configmaps", &cluster2Client.Fake, &fakeConfigMapList2)
cluster2IngressWatch := RegisterFakeWatch("ingresses", &cluster2Client.Fake)
cluster2ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster2Client.Fake)
cluster2IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster2Client.Fake, cluster2IngressWatch)
cluster2ConfigMapUpdateChan := RegisterFakeCopyOnUpdate("configmaps", &cluster2Client.Fake, cluster2ConfigMapWatch)
ingressController := NewIngressController(fakeClient) clientFactoryFunc := func(cluster *federation_api.Cluster) (kube_release_1_4.Interface, error) {
informer := ToFederatedInformerForTestOnly(ingressController.ingressFederatedInformer)
informer.SetClientFactory(func(cluster *federation_api.Cluster) (kube_release_1_4.Interface, error) {
switch cluster.Name { switch cluster.Name {
case cluster1.Name: case cluster1.Name:
return cluster1Client, nil return cluster1Client, nil
@ -66,50 +78,129 @@ func TestIngressController(t *testing.T) {
default: default:
return nil, fmt.Errorf("Unknown cluster") return nil, fmt.Errorf("Unknown cluster")
} }
}) }
ingressController := NewIngressController(fedClient)
ingressInformer := ToFederatedInformerForTestOnly(ingressController.ingressFederatedInformer)
ingressInformer.SetClientFactory(clientFactoryFunc)
configMapInformer := ToFederatedInformerForTestOnly(ingressController.configMapFederatedInformer)
configMapInformer.SetClientFactory(clientFactoryFunc)
ingressController.clusterAvailableDelay = time.Second ingressController.clusterAvailableDelay = time.Second
ingressController.ingressReviewDelay = 50 * time.Millisecond ingressController.ingressReviewDelay = 50 * time.Millisecond
ingressController.configMapReviewDelay = 50 * time.Millisecond
ingressController.smallDelay = 20 * time.Millisecond ingressController.smallDelay = 20 * time.Millisecond
ingressController.updateTimeout = 5 * time.Second ingressController.updateTimeout = 5 * time.Second
stop := make(chan struct{}) stop := make(chan struct{})
t.Log("Running Ingress Controller")
ingressController.Run(stop) ingressController.Run(stop)
ing1 := extensions_v1beta1.Ingress{ ing1 := extensions_v1beta1.Ingress{
ObjectMeta: api_v1.ObjectMeta{ ObjectMeta: api_v1.ObjectMeta{
Name: "test-ingress", Name: "test-ingress",
Namespace: "mynamespace", Namespace: "mynamespace",
SelfLink: "/api/v1/namespaces/mynamespaces/ingress/test-ingress", SelfLink: "/api/v1/namespaces/mynamespace/ingress/test-ingress",
Annotations: map[string]string{},
},
Status: extensions_v1beta1.IngressStatus{
LoadBalancer: api_v1.LoadBalancerStatus{
Ingress: make([]api_v1.LoadBalancerIngress, 0, 0),
},
}, },
} }
t.Log("Adding cluster 1")
clusterWatch.Add(cluster1)
t.Log("Adding Ingress UID ConfigMap to cluster 1")
cluster1ConfigMapWatch.Add(cfg1)
t.Log("Checking that UID annotation on Cluster 1 annotation was correctly updated")
cluster := GetClusterFromChan(fedClusterUpdateChan)
assert.NotNil(t, cluster)
assert.Equal(t, cluster.ObjectMeta.Annotations[uidAnnotationKey], cfg1.Data[uidKey])
// Test add federated ingress. // Test add federated ingress.
ingressWatch.Add(&ing1) t.Log("Adding Federated Ingress")
createdIngress := GetIngressFromChan(cluster1CreateChan) fedIngressWatch.Add(&ing1)
t.Log("Checking that Ingress was correctly created in cluster 1")
createdIngress := GetIngressFromChan(t, cluster1IngressCreateChan)
assert.NotNil(t, createdIngress) assert.NotNil(t, createdIngress)
assert.True(t, reflect.DeepEqual(&ing1, createdIngress)) assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress.Spec), "Spec of created ingress is not equal")
assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress.ObjectMeta), "Metadata of created object is not equivalent")
// Test that IP address gets transferred from cluster ingress to federated ingress.
t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress")
createdIngress.Status.LoadBalancer.Ingress = append(createdIngress.Status.LoadBalancer.Ingress, api_v1.LoadBalancerIngress{IP: "1.2.3.4"})
cluster1IngressWatch.Modify(createdIngress)
updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan)
assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress")
if updatedIngress != nil {
assert.True(t, reflect.DeepEqual(createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress), fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v", createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress))
}
// Test update federated ingress. // Test update federated ingress.
ing1.Annotations = map[string]string{ updatedIngress.ObjectMeta.Annotations["A"] = "B"
"A": "B", t.Log("Modifying Federated Ingress")
} fedIngressWatch.Modify(updatedIngress)
ingressWatch.Modify(&ing1) t.Log("Checking that Ingress was correctly updated in cluster 1")
updatedIngress := GetIngressFromChan(cluster1UpdateChan) updatedIngress2 := GetIngressFromChan(t, cluster1IngressUpdateChan)
assert.NotNil(t, updatedIngress) assert.NotNil(t, updatedIngress2)
assert.True(t, reflect.DeepEqual(&ing1, updatedIngress)) assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, updatedIngress.Spec), "Spec of updated ingress is not equal")
assert.True(t, util.ObjectMetaEquivalent(updatedIngress2.ObjectMeta, updatedIngress.ObjectMeta), "Metadata of updated object is not equivalent")
// Test add cluster // Test add cluster
ing1.Annotations[staticIPAnnotationKey] = "foo" // Make sure that the base object has a static IP name first. t.Log("Adding a second cluster")
ingressWatch.Modify(&ing1) ing1.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first.
fedIngressWatch.Modify(&ing1)
clusterWatch.Add(cluster2) clusterWatch.Add(cluster2)
createdIngress2 := GetIngressFromChan(cluster2CreateChan) // First check that the original values are not equal - see above comment
assert.NotEqual(t, cfg1.Data[uidKey], cfg2.Data[uidKey], fmt.Sprintf("ConfigMap in cluster 2 must initially not equal that in cluster 1 for this test - please fix test"))
cluster2ConfigMapWatch.Add(cfg2)
t.Log("Checking that the ingress got created in cluster 2")
createdIngress2 := GetIngressFromChan(t, cluster2IngressCreateChan)
assert.NotNil(t, createdIngress2) assert.NotNil(t, createdIngress2)
assert.True(t, reflect.DeepEqual(&ing1, createdIngress2)) assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress2.Spec), "Spec of created ingress is not equal")
assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent")
t.Log("Checking that the configmap in cluster 2 got updated.")
updatedConfigMap2 := GetConfigMapFromChan(cluster2ConfigMapUpdateChan)
assert.NotNil(t, updatedConfigMap2, fmt.Sprintf("ConfigMap in cluster 2 was not updated (or more likely the test is broken and the API type written is wrong)"))
if updatedConfigMap2 != nil {
assert.Equal(t, cfg1.Data[uidKey], updatedConfigMap2.Data[uidKey],
fmt.Sprintf("UID's in configmaps in cluster's 1 and 2 are not equal (%q != %q)", cfg1.Data["uid"], updatedConfigMap2.Data["uid"]))
}
close(stop) close(stop)
} }
func GetIngressFromChan(c chan runtime.Object) *extensions_v1beta1.Ingress { func GetIngressFromChan(t *testing.T, c chan runtime.Object) *extensions_v1beta1.Ingress {
ingress := GetObjectFromChan(c).(*extensions_v1beta1.Ingress) obj := GetObjectFromChan(c)
ingress, ok := obj.(*extensions_v1beta1.Ingress)
if !ok {
t.Logf("Object on channel was not of type *extensions_v1beta1.Ingress: %v", obj)
}
return ingress return ingress
} }
func GetConfigMapFromChan(c chan runtime.Object) *api_v1.ConfigMap {
configMap, _ := GetObjectFromChan(c).(*api_v1.ConfigMap)
return configMap
}
func GetClusterFromChan(c chan runtime.Object) *federation_api.Cluster {
cluster, _ := GetObjectFromChan(c).(*federation_api.Cluster)
return cluster
}
func NewConfigMap(uid string) *api_v1.ConfigMap {
return &api_v1.ConfigMap{
ObjectMeta: api_v1.ObjectMeta{
Name: uidConfigMapName,
Namespace: uidConfigMapNamespace,
SelfLink: "/api/v1/namespaces/" + uidConfigMapNamespace + "/configmap/" + uidConfigMapName,
Annotations: map[string]string{},
},
Data: map[string]string{
uidKey: uid,
},
}
}

View File

@ -162,7 +162,7 @@ func (d *DelayingDeliverer) DeliverAfter(key string, value interface{}, delay ti
d.DeliverAt(key, value, time.Now().Add(delay)) d.DeliverAt(key, value, time.Now().Add(delay))
} }
// Gets target chanel of the deliverer. // Gets target channel of the deliverer.
func (d *DelayingDeliverer) GetTargetChannel() chan *DelayingDelivererItem { func (d *DelayingDeliverer) GetTargetChannel() chan *DelayingDelivererItem {
return d.targetChannel return d.targetChannel
} }

View File

@ -85,9 +85,9 @@ type FederationView interface {
ClustersSynced() bool ClustersSynced() bool
} }
// A structure that combines an informer running agains federated api server and listening for cluster updates // A structure that combines an informer running against federated api server and listening for cluster updates
// with multiple Kubernetes API informers (called target informers) running against federation members. Whenever a new // with multiple Kubernetes API informers (called target informers) running against federation members. Whenever a new
// cluster is added to the federation an informer is created for it using TargetInformerFactory. Infomrers are stoped // cluster is added to the federation an informer is created for it using TargetInformerFactory. Informers are stopped
// when a cluster is either put offline of deleted. It is assumed that some controller keeps an eye on the cluster list // when a cluster is either put offline of deleted. It is assumed that some controller keeps an eye on the cluster list
// and thus the clusters in ETCD are up to date. // and thus the clusters in ETCD are up to date.
type FederatedInformer interface { type FederatedInformer interface {
@ -186,18 +186,22 @@ func NewFederatedInformer(
if clusterLifecycle.ClusterAvailable != nil { if clusterLifecycle.ClusterAvailable != nil {
clusterLifecycle.ClusterAvailable(curCluster) clusterLifecycle.ClusterAvailable(curCluster)
} }
} else {
glog.Errorf("Cluster %v not added. Not of correct type, or cluster not ready.", cur)
} }
}, },
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
oldCluster, ok := old.(*federation_api.Cluster) oldCluster, ok := old.(*federation_api.Cluster)
if !ok { if !ok {
glog.Errorf("Internal error: Cluster %v not updated. Old cluster not of correct type.", old)
return return
} }
curCluster, ok := cur.(*federation_api.Cluster) curCluster, ok := cur.(*federation_api.Cluster)
if !ok { if !ok {
glog.Errorf("Internal error: Cluster %v not updated. New cluster not of correct type.", cur)
return return
} }
if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) { if isClusterReady(oldCluster) != isClusterReady(curCluster) || !reflect.DeepEqual(oldCluster.Spec, curCluster.Spec) || !reflect.DeepEqual(oldCluster.ObjectMeta.Annotations, curCluster.ObjectMeta.Annotations) {
var data []interface{} var data []interface{}
if clusterLifecycle.ClusterUnavailable != nil { if clusterLifecycle.ClusterUnavailable != nil {
data = getClusterData(oldCluster.Name) data = getClusterData(oldCluster.Name)
@ -213,6 +217,8 @@ func NewFederatedInformer(
clusterLifecycle.ClusterAvailable(curCluster) clusterLifecycle.ClusterAvailable(curCluster)
} }
} }
} else {
glog.V(4).Infof("Cluster %v not updated to %v as ready status and specs are identical", oldCluster, curCluster)
} }
}, },
}, },
@ -258,11 +264,14 @@ type federatedStoreImpl struct {
} }
func (f *federatedInformerImpl) Stop() { func (f *federatedInformerImpl) Stop() {
glog.V(4).Infof("Stopping federated informer.")
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
glog.V(4).Infof("... Closing cluster informer channel.")
close(f.clusterInformer.stopChan) close(f.clusterInformer.stopChan)
for _, informer := range f.targetInformers { for key, informer := range f.targetInformers {
glog.V(4).Infof("... Closing informer channel for %q.", key)
close(informer.stopChan) close(informer.stopChan)
} }
} }
@ -291,14 +300,16 @@ func (f *federatedInformerImpl) GetClientsetForCluster(clusterName string) (kube
func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName string) (kube_release_1_4.Interface, error) { func (f *federatedInformerImpl) getClientsetForClusterUnlocked(clusterName string) (kube_release_1_4.Interface, error) {
// No locking needed. Will happen in f.GetCluster. // No locking needed. Will happen in f.GetCluster.
glog.V(4).Infof("Getting clientset for cluster %q", clusterName)
if cluster, found, err := f.getReadyClusterUnlocked(clusterName); found && err == nil { if cluster, found, err := f.getReadyClusterUnlocked(clusterName); found && err == nil {
glog.V(4).Infof("Got clientset for cluster %q", clusterName)
return f.clientFactory(cluster) return f.clientFactory(cluster)
} else { } else {
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
return nil, fmt.Errorf("cluster %s not found", clusterName) return nil, fmt.Errorf("cluster %q not found", clusterName)
} }
// GetReadyClusers returns all clusters for which the sub-informers are run. // GetReadyClusers returns all clusters for which the sub-informers are run.
@ -441,7 +452,7 @@ func (fs *federatedStoreImpl) GetFromAllClusters(key string) ([]FederatedObject,
return result, nil return result, nil
} }
// GetKey for returns the key under which the item would be put in the store. // GetKeyFor returns the key under which the item would be put in the store.
func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string { func (fs *federatedStoreImpl) GetKeyFor(item interface{}) string {
// TODO: support other keying functions. // TODO: support other keying functions.
key, _ := framework.DeletionHandlingMetaNamespaceKeyFunc(item) key, _ := framework.DeletionHandlingMetaNamespaceKeyFunc(item)

View File

@ -64,10 +64,14 @@ func (wd *WatcherDispatcher) Add(obj runtime.Object) {
func (wd *WatcherDispatcher) Modify(obj runtime.Object) { func (wd *WatcherDispatcher) Modify(obj runtime.Object) {
wd.Lock() wd.Lock()
defer wd.Unlock() defer wd.Unlock()
glog.V(4).Infof("->WatcherDispatcher.Modify(%v)", obj)
wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Modified, Object: obj}) wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Modified, Object: obj})
for _, watcher := range wd.watchers { for i, watcher := range wd.watchers {
if !watcher.IsStopped() { if !watcher.IsStopped() {
glog.V(4).Infof("->Watcher(%d).Modify(%v)", i, obj)
watcher.Modify(obj) watcher.Modify(obj)
} else {
glog.V(4).Infof("->Watcher(%d) is stopped. Not calling Modify(%v)", i, obj)
} }
} }
} }
@ -173,7 +177,7 @@ func GetObjectFromChan(c chan runtime.Object) runtime.Object {
select { select {
case obj := <-c: case obj := <-c:
return obj return obj
case <-time.After(10 * time.Second): case <-time.After(20 * time.Second):
pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
return nil return nil
} }

View File

@ -1,43 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"reflect"
"k8s.io/kubernetes/pkg/api/v1"
)
/*
ObjectMetaIsEquivalent determines whether two ObjectMeta's (typically one from a federated API object,
and the other from a cluster object) are equivalent.
*/
func ObjectMetaIsEquivalent(m1, m2 v1.ObjectMeta) bool {
// First make all of the read-only fields equal, then perform a deep equality comparison
m1.SelfLink = m2.SelfLink // Might be different in different cluster contexts.
m1.UID = m2.UID // Definitely different in different cluster contexts
m1.ResourceVersion = m2.ResourceVersion // Definitely different in different cluster contexts
m1.Generation = m2.Generation // Might be different in different cluster contexts.
m1.CreationTimestamp = m2.CreationTimestamp // Definitely different in different cluster contexts.
m1.DeletionTimestamp = m2.DeletionTimestamp // Might be different in different cluster contexts.
m1.OwnerReferences = nil // Might be different in different cluster contexts.
m2.OwnerReferences = nil
m1.Finalizers = nil // Might be different in different cluster contexts.
m2.Finalizers = nil
return reflect.DeepEqual(m1, m2)
}

View File

@ -20,6 +20,8 @@ import (
"sync" "sync"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"github.com/golang/glog"
) )
// Interface can be implemented by anything that knows how to watch and report changes. // Interface can be implemented by anything that knows how to watch and report changes.
@ -100,6 +102,7 @@ func (f *FakeWatcher) Stop() {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
if !f.Stopped { if !f.Stopped {
glog.V(4).Infof("Stopping fake watcher.")
close(f.result) close(f.result)
f.Stopped = true f.Stopped = true
} }