From 63799452b22958734cd3727191602ac7215804e1 Mon Sep 17 00:00:00 2001 From: nikhiljindal Date: Tue, 22 Nov 2016 15:32:24 -0800 Subject: [PATCH] Fixing the logic to select first cluster in federated ingress controller --- .../ingress/ingress_controller.go | 74 +++++++++++++------ .../ingress/ingress_controller_test.go | 7 +- 2 files changed, 58 insertions(+), 23 deletions(-) diff --git a/federation/pkg/federation-controller/ingress/ingress_controller.go b/federation/pkg/federation-controller/ingress/ingress_controller.go index 6a8a614976c..21151ea16e4 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller.go @@ -53,6 +53,11 @@ const ( uidConfigMapName = "ingress-uid" // Name of the config-map and key the ingress controller stores its uid in. uidConfigMapNamespace = "kube-system" uidKey = "uid" + // Annotation on the ingress in federation control plane that is used to keep + // track of the first cluster in which we create ingress. + // We wait for ingress to be created in this cluster before creating it any + // other cluster. + firstClusterAnnotation = "ingress.federation.kubernetes.io/first-cluster" ) type IngressController struct { @@ -643,6 +648,31 @@ func (ic *IngressController) updateClusterIngressUIDToMasters(cluster *federatio } } +func (ic *IngressController) isClusterReady(clusterName string) bool { + cluster, isReady, err := ic.ingressFederatedInformer.GetReadyCluster(clusterName) + return isReady && err == nil && cluster != nil +} + +// updateAnnotationOnIngress updates the annotation with the given key on the given federated ingress. +// Queues the ingress for resync when done. +func (ic *IngressController) updateAnnotationOnIngress(ingress *extensions_v1beta1.Ingress, key, value string) { + if ingress.ObjectMeta.Annotations == nil { + ingress.ObjectMeta.Annotations = make(map[string]string) + } + ingress.ObjectMeta.Annotations[key] = value + ingressName := types.NamespacedName{Name: ingress.Name, Namespace: ingress.Namespace} + glog.V(4).Infof("Attempting to update annotation %s:%s on base federated ingress: %v", key, value, ingressName) + if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(ingress.Namespace).Update(ingress); err != nil { + glog.Errorf("Failed to update annotation %s:%s on federated ingress %q, will try again later: %v", key, value, ingressName, err) + ic.deliverIngress(ingressName, ic.ingressReviewDelay, true) + return + } else { + glog.V(4).Infof("Successfully updated annotation %s:%s on federated ingress %q, after update: %q", key, value, ingress, updatedFedIngress) + ic.deliverIngress(ingressName, ic.smallDelay, false) + return + } +} + func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { glog.V(4).Infof("Reconciling ingress %q for all clusters", ingress) if !ic.isSynced() { @@ -705,8 +735,9 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { operations := make([]util.FederatedOperation, 0) - for clusterIndex, cluster := range clusters { + for _, cluster := range clusters { baseIPName, baseIPAnnotationExists := baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] + firstClusterName, firstClusterExists := baseIngress.ObjectMeta.Annotations[firstClusterAnnotation] clusterIngressObj, clusterIngressFound, err := ic.ingressFederatedInformer.GetTargetStore().GetByKey(cluster.Name, key) if err != nil { glog.Errorf("Failed to get cached ingress %s for cluster %s, will retry: %v", ingress, cluster.Name, err) @@ -739,22 +770,33 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { ic.eventRecorder.Eventf(baseIngress, api.EventTypeNormal, "CreateInCluster", "Creating ingress in cluster %s", cluster.Name) - // We always first create an ingress in the first available cluster. Once that ingress + // We always first create an ingress in the first available cluster. Once that ingress // has been created and allocated a global IP (visible via an annotation), // we record that annotation on the federated ingress, and create all other cluster // ingresses with that same global IP. - // Note: If the first cluster becomes (e.g. temporarily) unavailable, the second cluster will be allocated - // index 0, but eventually all ingresses will share the single global IP recorded in the annotation - // of the federated ingress. - if baseIPAnnotationExists || (clusterIndex == 0) { - glog.V(4).Infof("No existing Ingress %s in cluster %s (index %d) and static IP annotation (%q) on base ingress - queuing a create operation", ingress, cluster.Name, clusterIndex, staticIPNameKeyWritable) + // Note: If the first cluster becomes (e.g. temporarily) unavailable, the + // second cluster will become the first cluster, but eventually all ingresses + // will share the single global IP recorded in the annotation of the + // federated ingress. + haveFirstCluster := firstClusterExists && firstClusterName != "" && ic.isClusterReady(firstClusterName) + if !haveFirstCluster { + glog.V(4).Infof("No cluster has been chosen as the first cluster. Electing cluster %s as the first cluster to create ingress in", cluster.Name) + ic.updateAnnotationOnIngress(baseIngress, firstClusterAnnotation, cluster.Name) + return + } + if baseIPAnnotationExists || firstClusterName == cluster.Name { + if baseIPAnnotationExists { + glog.V(4).Infof("No existing Ingress %s in cluster %s and static IP annotation (%q) exists on base ingress - queuing a create operation", ingress, cluster.Name, staticIPNameKeyWritable) + } else { + glog.V(4).Infof("No existing Ingress %s in cluster %s and no static IP annotation (%q) on base ingress - queuing a create operation in first cluster", ingress, cluster.Name, staticIPNameKeyWritable) + } operations = append(operations, util.FederatedOperation{ Type: util.OperationTypeAdd, Obj: desiredIngress, ClusterName: cluster.Name, }) } else { - glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero. Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex, ingress) + glog.V(4).Infof("No annotation %q exists on ingress %q in federation and waiting for ingress in cluster %s. Not queueing create operation for ingress until annotation exists", staticIPNameKeyWritable, ingress, firstClusterName) } } else { clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress) @@ -766,20 +808,8 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) { if (!baseIPAnnotationExists && clusterIPNameExists) || (!baseLBStatusExists && clusterLBStatusExists) { // copy the IP name from the readonly annotation on the cluster ingress, to the writable annotation on the federated ingress glog.V(4).Infof(logStr, "Transferring") if !baseIPAnnotationExists && clusterIPNameExists { - if baseIngress.ObjectMeta.Annotations == nil { - baseIngress.ObjectMeta.Annotations = make(map[string]string) - } - baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] = clusterIPName - glog.V(4).Infof("Attempting to update base federated ingress annotations: %v", baseIngress) - if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil { - glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err) - ic.deliverIngress(ingress, ic.ingressReviewDelay, true) - return - } else { - glog.V(4).Infof("Successfully updated federated ingress %q (added IP annotation), after update: %q", ingress, updatedFedIngress) - ic.deliverIngress(ingress, ic.smallDelay, false) - return - } + ic.updateAnnotationOnIngress(baseIngress, staticIPNameKeyWritable, clusterIPName) + return } if !baseLBStatusExists && clusterLBStatusExists { lbstatusObj, lbErr := conversion.NewCloner().DeepCopy(&clusterIngress.Status.LoadBalancer) diff --git a/federation/pkg/federation-controller/ingress/ingress_controller_test.go b/federation/pkg/federation-controller/ingress/ingress_controller_test.go index 6eb978f10cc..5eaebe69022 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller_test.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller_test.go @@ -99,11 +99,17 @@ func TestIngressController(t *testing.T) { t.Log("Running Ingress Controller") ingressController.Run(stop) + // TODO: Here we are creating the ingress with first cluster annotation. + // Add another test without that annotation when + // https://github.com/kubernetes/kubernetes/issues/36540 is fixed. ing1 := extensions_v1beta1.Ingress{ ObjectMeta: api_v1.ObjectMeta{ Name: "test-ingress", Namespace: "mynamespace", SelfLink: "/api/v1/namespaces/mynamespace/ingress/test-ingress", + Annotations: map[string]string{ + firstClusterAnnotation: cluster1.Name, + }, }, Status: extensions_v1beta1.IngressStatus{ LoadBalancer: api_v1.LoadBalancerStatus{ @@ -181,7 +187,6 @@ func TestIngressController(t *testing.T) { */ // Test add cluster t.Log("Adding a second cluster") - ing1.Annotations = make(map[string]string) ing1.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first. fedIngressWatch.Modify(&ing1) clusterWatch.Add(cluster2)