diff --git a/federation/pkg/federation-controller/ingress/BUILD b/federation/pkg/federation-controller/ingress/BUILD index 7d54a3f2e0d..3b63433a4bd 100644 --- a/federation/pkg/federation-controller/ingress/BUILD +++ b/federation/pkg/federation-controller/ingress/BUILD @@ -57,6 +57,7 @@ go_test( "//pkg/runtime:go_default_library", "//pkg/types:go_default_library", "//pkg/util/wait:go_default_library", + "//vendor:github.com/golang/glog", "//vendor:github.com/stretchr/testify/assert", ], ) diff --git a/federation/pkg/federation-controller/ingress/ingress_controller_test.go b/federation/pkg/federation-controller/ingress/ingress_controller_test.go index 3612e0cea34..5a3d987334d 100644 --- a/federation/pkg/federation-controller/ingress/ingress_controller_test.go +++ b/federation/pkg/federation-controller/ingress/ingress_controller_test.go @@ -37,9 +37,14 @@ import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/wait" + "github.com/golang/glog" "github.com/stretchr/testify/assert" ) +const ( + maxTrials = 20 +) + func TestIngressController(t *testing.T) { fakeClusterList := federationapi.ClusterList{Items: []federationapi.Cluster{}} fakeConfigMapList1 := apiv1.ConfigMapList{Items: []apiv1.ConfigMap{}} @@ -56,7 +61,7 @@ func TestIngressController(t *testing.T) { fedIngressWatch := RegisterFakeWatch("ingresses", &fedClient.Fake) clusterWatch := RegisterFakeWatch("clusters", &fedClient.Fake) fedClusterUpdateChan := RegisterFakeCopyOnUpdate("clusters", &fedClient.Fake, clusterWatch) - //fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch) + fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch) cluster1Client := &fakekubeclientset.Clientset{} RegisterFakeList("ingresses", &cluster1Client.Fake, &extensionsv1beta1.IngressList{Items: []extensionsv1beta1.Ingress{}}) @@ -64,7 +69,7 @@ func TestIngressController(t *testing.T) { cluster1IngressWatch := RegisterFakeWatch("ingresses", &cluster1Client.Fake) cluster1ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster1Client.Fake) cluster1IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1IngressWatch) - // cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch) + cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch) cluster2Client := &fakekubeclientset.Clientset{} RegisterFakeList("ingresses", &cluster2Client.Fake, &extensionsv1beta1.IngressList{Items: []extensionsv1beta1.Ingress{}}) @@ -90,9 +95,9 @@ func TestIngressController(t *testing.T) { configMapInformer := ToFederatedInformerForTestOnly(ingressController.configMapFederatedInformer) configMapInformer.SetClientFactory(clientFactoryFunc) ingressController.clusterAvailableDelay = time.Second - ingressController.ingressReviewDelay = 10 * time.Millisecond - ingressController.configMapReviewDelay = 10 * time.Millisecond - ingressController.smallDelay = 20 * time.Millisecond + ingressController.ingressReviewDelay = 100 * time.Millisecond + ingressController.configMapReviewDelay = 100 * time.Millisecond + ingressController.smallDelay = 100 * time.Millisecond ingressController.updateTimeout = 5 * time.Second stop := make(chan struct{}) @@ -102,7 +107,7 @@ func TestIngressController(t *testing.T) { // TODO: Here we are creating the ingress with first cluster annotation. // Add another test without that annotation when // https://github.com/kubernetes/kubernetes/issues/36540 is fixed. - ing1 := extensionsv1beta1.Ingress{ + fedIngress := extensionsv1beta1.Ingress{ ObjectMeta: apiv1.ObjectMeta{ Name: "test-ingress", Namespace: "mynamespace", @@ -131,64 +136,109 @@ func TestIngressController(t *testing.T) { // Test add federated ingress. t.Log("Adding Federated Ingress") - fedIngressWatch.Add(&ing1) - /* - // TODO: Re-enable this when we have fixed these flaky tests: https://github.com/kubernetes/kubernetes/issues/36540. - t.Logf("Checking that approproate finalizers are added") - // There should be 2 updates to add both the finalizers. - updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan) - assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, deletionhelper.FinalizerDeleteFromUnderlyingClusters)) - updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan) - assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, apiv1.FinalizerOrphan), fmt.Sprintf("ingress does not have the orphan finalizer: %v", updatedIngress)) - ing1 = *updatedIngress - */ + fedIngressWatch.Add(&fedIngress) + + t.Logf("Checking that approproate finalizers are added") + // There should be 2 updates to add both the finalizers. + updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan) + assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, deletionhelper.FinalizerDeleteFromUnderlyingClusters)) + updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan) + assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, apiv1.FinalizerOrphan), fmt.Sprintf("ingress does not have the orphan finalizer: %v", updatedIngress)) + fedIngress = *updatedIngress + t.Log("Checking that Ingress was correctly created in cluster 1") createdIngress := GetIngressFromChan(t, cluster1IngressCreateChan) assert.NotNil(t, createdIngress) - assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress.Spec), "Spec of created ingress is not equal") - assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress.ObjectMeta), "Metadata of created object is not equivalent") + cluster1Ingress := *createdIngress + assert.True(t, reflect.DeepEqual(fedIngress.Spec, cluster1Ingress.Spec), "Spec of created ingress is not equal") + assert.True(t, util.ObjectMetaEquivalent(fedIngress.ObjectMeta, cluster1Ingress.ObjectMeta), + "Metadata of created object is not equivalent") + // Wait for finalizers to appear in federation store. - // assert.NoError(t, WaitForFinalizersInFederationStore(ingressController, ingressController.ingressInformerStore, - // types.NamespacedName{Namespace: ing1.Namespace, Name: ing1.Name}.String()), "finalizers not found in federated ingress") + assert.NoError(t, WaitForFinalizersInFederationStore(ingressController, ingressController.ingressInformerStore, + types.NamespacedName{Namespace: fedIngress.Namespace, Name: fedIngress.Name}.String()), "finalizers not found in federated ingress") + // Wait for the cluster ingress to appear in cluster store. assert.NoError(t, WaitForIngressInClusterStore(ingressController.ingressFederatedInformer.GetTargetStore(), cluster1.Name, types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String()), "Created ingress not found in underlying cluster store") - /* - // TODO: Re-enable this when we have fixed these flaky tests: https://github.com/kubernetes/kubernetes/issues/36540. - // Test that IP address gets transferred from cluster ingress to federated ingress. - t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress") - createdIngress.Status.LoadBalancer.Ingress = append(createdIngress.Status.LoadBalancer.Ingress, apiv1.LoadBalancerIngress{IP: "1.2.3.4"}) - cluster1IngressWatch.Modify(createdIngress) - // Wait for store to see the updated cluster ingress. - assert.NoError(t, WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(), - cluster1.Name, types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(), - createdIngress.Status.LoadBalancer, 4*wait.ForeverTestTimeout)) - updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan) - assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress") - if updatedIngress != nil { - assert.True(t, reflect.DeepEqual(createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress), fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v", createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress)) - t.Logf("expected: %v, actual: %v", createdIngress, updatedIngress) - } + // Test that IP address gets transferred from cluster ingress to federated ingress. + t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress") + cluster1Ingress.Status.LoadBalancer.Ingress = append(cluster1Ingress.Status.LoadBalancer.Ingress, + apiv1.LoadBalancerIngress{IP: "1.2.3.4"}) + glog.Infof("Setting artificial IP address for cluster1 ingress") + + for trial := 0; trial < maxTrials; trial++ { + cluster1IngressWatch.Modify(&cluster1Ingress) + // Wait for store to see the updated cluster ingress. + key := types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String() + if err := WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(), + cluster1.Name, key, cluster1Ingress.Status.LoadBalancer, time.Second); err != nil { + continue + } + if err := WaitForFedStatusUpdate(t, ingressController.ingressInformerStore, + key, cluster1Ingress.Status.LoadBalancer, time.Second); err != nil { + continue + } + } + + for trial := 0; trial < maxTrials; trial++ { + updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan) + assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress") + if updatedIngress == nil { + return + } + if reflect.DeepEqual(cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress) { + fedIngress.Status.LoadBalancer = updatedIngress.Status.LoadBalancer + break + } else { + glog.Infof("Status check failed: expected: %v actual: %v", cluster1Ingress.Status, updatedIngress.Status) + } + } + glog.Infof("Status check: expected: %v actual: %v", cluster1Ingress.Status, updatedIngress.Status) + assert.True(t, reflect.DeepEqual(cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress), + fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v", + cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress)) + + assert.NoError(t, WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(), + cluster1.Name, types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(), + cluster1Ingress.Status.LoadBalancer, time.Second)) + assert.NoError(t, WaitForFedStatusUpdate(t, ingressController.ingressInformerStore, + types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(), + cluster1Ingress.Status.LoadBalancer, time.Second)) + t.Logf("expected: %v, actual: %v", createdIngress, updatedIngress) + + // Test update federated ingress. + if fedIngress.ObjectMeta.Annotations == nil { + fedIngress.ObjectMeta.Annotations = make(map[string]string) + } + fedIngress.ObjectMeta.Annotations["A"] = "B" + t.Log("Modifying Federated Ingress") + fedIngressWatch.Modify(&fedIngress) + t.Log("Checking that Ingress was correctly updated in cluster 1") + var updatedIngress2 *extensionsv1beta1.Ingress + + for trial := 0; trial < maxTrials; trial++ { + updatedIngress2 = GetIngressFromChan(t, cluster1IngressUpdateChan) + assert.NotNil(t, updatedIngress2) + if updatedIngress2 == nil { + return + } + if reflect.DeepEqual(fedIngress.Spec, updatedIngress.Spec) && + updatedIngress2.ObjectMeta.Annotations["A"] == fedIngress.ObjectMeta.Annotations["A"] { + break + } + } + + assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, fedIngress.Spec), "Spec of updated ingress is not equal") + assert.Equal(t, updatedIngress2.ObjectMeta.Annotations["A"], fedIngress.ObjectMeta.Annotations["A"], "Updated annotation not transferred from federated to cluster ingress.") - // Test update federated ingress. - if updatedIngress.ObjectMeta.Annotations == nil { - updatedIngress.ObjectMeta.Annotations = make(map[string]string) - } - updatedIngress.ObjectMeta.Annotations["A"] = "B" - t.Log("Modifying Federated Ingress") - fedIngressWatch.Modify(updatedIngress) - t.Log("Checking that Ingress was correctly updated in cluster 1") - updatedIngress2 := GetIngressFromChan(t, cluster1IngressUpdateChan) - assert.NotNil(t, updatedIngress2) - assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, updatedIngress.Spec), "Spec of updated ingress is not equal") - assert.Equal(t, updatedIngress2.ObjectMeta.Annotations["A"], updatedIngress.ObjectMeta.Annotations["A"], "Updated annotation not transferred from federated to cluster ingress.") - */ // Test add cluster t.Log("Adding a second cluster") - ing1.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first. - fedIngressWatch.Modify(&ing1) + + fedIngress.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first. + fedIngressWatch.Modify(&fedIngress) clusterWatch.Add(cluster2) // First check that the original values are not equal - see above comment assert.NotEqual(t, cfg1.Data[uidKey], cfg2.Data[uidKey], fmt.Sprintf("ConfigMap in cluster 2 must initially not equal that in cluster 1 for this test - please fix test")) @@ -196,8 +246,9 @@ func TestIngressController(t *testing.T) { t.Log("Checking that the ingress got created in cluster 2") createdIngress2 := GetIngressFromChan(t, cluster2IngressCreateChan) assert.NotNil(t, createdIngress2) - assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress2.Spec), "Spec of created ingress is not equal") - assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent") + assert.True(t, reflect.DeepEqual(fedIngress.Spec, createdIngress2.Spec), "Spec of created ingress is not equal") + t.Logf("created meta: %v fed meta: %v", createdIngress2.ObjectMeta, fedIngress.ObjectMeta) + assert.True(t, util.ObjectMetaEquivalent(fedIngress.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent") t.Log("Checking that the configmap in cluster 2 got updated.") updatedConfigMap2 := GetConfigMapFromChan(cluster2ConfigMapUpdateChan) @@ -292,3 +343,17 @@ func WaitForStatusUpdate(t *testing.T, store util.FederatedReadOnlyStore, cluste }) return err } + +// Wait for ingress status to be updated to match the desiredStatus. +func WaitForFedStatusUpdate(t *testing.T, store cache.Store, key string, desiredStatus apiv1.LoadBalancerStatus, timeout time.Duration) error { + retryInterval := 100 * time.Millisecond + err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) { + obj, found, err := store.GetByKey(key) + if !found || err != nil { + return false, err + } + ingress := obj.(*extensionsv1beta1.Ingress) + return reflect.DeepEqual(ingress.Status.LoadBalancer, desiredStatus), nil + }) + return err +} diff --git a/federation/pkg/federation-controller/util/test/test_helper.go b/federation/pkg/federation-controller/util/test/test_helper.go index d2f06283711..71da59430a4 100644 --- a/federation/pkg/federation-controller/util/test/test_helper.go +++ b/federation/pkg/federation-controller/util/test/test_helper.go @@ -36,6 +36,10 @@ import ( "github.com/golang/glog" ) +const ( + pushTimeout = 5 * time.Second +) + // A structure that distributes eventes to multiple watchers. type WatcherDispatcher struct { sync.Mutex @@ -58,6 +62,7 @@ func (wd *WatcherDispatcher) Stop() { wd.Lock() defer wd.Unlock() close(wd.stopChan) + glog.Infof("Stopping WatcherDispatcher") for _, watcher := range wd.watchers { watcher.Stop() } @@ -141,7 +146,7 @@ func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher { dispatcher := &WatcherDispatcher{ watchers: make([]*watch.RaceFreeFakeWatcher, 0), eventsSoFar: make([]*watch.Event, 0), - orderExecution: make(chan func()), + orderExecution: make(chan func(), 100), stopChan: make(chan struct{}), } go func() { @@ -199,13 +204,22 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) { updateAction := action.(core.UpdateAction) originalObj := updateAction.GetObject() + glog.V(7).Infof("Updating %s: %v", resource, updateAction.GetObject()) + // Create a copy of the object here to prevent data races while reading the object in go routine. obj := copy(originalObj) - watcher.orderExecution <- func() { + operation := func() { glog.V(4).Infof("Object updated. Writing to channel: %v", obj) watcher.Modify(obj) objChan <- obj } + select { + case watcher.orderExecution <- operation: + break + case <-time.After(pushTimeout): + glog.Errorf("Fake client execution channel blocked") + glog.Errorf("Tried to push %v", updateAction) + } return true, originalObj, nil }) return objChan