mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 07:20:13 +00:00
Merge pull request #37407 from mwielgus/ingress_ctrl_fix
Automatic merge from submit-queue Enable updates in federated ingress controller tests And make channel in test helper non-blocking. cc: @nikhiljindal @madhusudancs
This commit is contained in:
commit
778bab2b81
@ -57,6 +57,7 @@ go_test(
|
||||
"//pkg/runtime:go_default_library",
|
||||
"//pkg/types:go_default_library",
|
||||
"//pkg/util/wait:go_default_library",
|
||||
"//vendor:github.com/golang/glog",
|
||||
"//vendor:github.com/stretchr/testify/assert",
|
||||
],
|
||||
)
|
||||
|
@ -37,9 +37,14 @@ import (
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const (
|
||||
maxTrials = 20
|
||||
)
|
||||
|
||||
func TestIngressController(t *testing.T) {
|
||||
fakeClusterList := federationapi.ClusterList{Items: []federationapi.Cluster{}}
|
||||
fakeConfigMapList1 := apiv1.ConfigMapList{Items: []apiv1.ConfigMap{}}
|
||||
@ -56,7 +61,7 @@ func TestIngressController(t *testing.T) {
|
||||
fedIngressWatch := RegisterFakeWatch("ingresses", &fedClient.Fake)
|
||||
clusterWatch := RegisterFakeWatch("clusters", &fedClient.Fake)
|
||||
fedClusterUpdateChan := RegisterFakeCopyOnUpdate("clusters", &fedClient.Fake, clusterWatch)
|
||||
//fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch)
|
||||
fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch)
|
||||
|
||||
cluster1Client := &fakekubeclientset.Clientset{}
|
||||
RegisterFakeList("ingresses", &cluster1Client.Fake, &extensionsv1beta1.IngressList{Items: []extensionsv1beta1.Ingress{}})
|
||||
@ -64,7 +69,7 @@ func TestIngressController(t *testing.T) {
|
||||
cluster1IngressWatch := RegisterFakeWatch("ingresses", &cluster1Client.Fake)
|
||||
cluster1ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster1Client.Fake)
|
||||
cluster1IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
|
||||
// cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
|
||||
cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
|
||||
|
||||
cluster2Client := &fakekubeclientset.Clientset{}
|
||||
RegisterFakeList("ingresses", &cluster2Client.Fake, &extensionsv1beta1.IngressList{Items: []extensionsv1beta1.Ingress{}})
|
||||
@ -90,9 +95,9 @@ func TestIngressController(t *testing.T) {
|
||||
configMapInformer := ToFederatedInformerForTestOnly(ingressController.configMapFederatedInformer)
|
||||
configMapInformer.SetClientFactory(clientFactoryFunc)
|
||||
ingressController.clusterAvailableDelay = time.Second
|
||||
ingressController.ingressReviewDelay = 10 * time.Millisecond
|
||||
ingressController.configMapReviewDelay = 10 * time.Millisecond
|
||||
ingressController.smallDelay = 20 * time.Millisecond
|
||||
ingressController.ingressReviewDelay = 100 * time.Millisecond
|
||||
ingressController.configMapReviewDelay = 100 * time.Millisecond
|
||||
ingressController.smallDelay = 100 * time.Millisecond
|
||||
ingressController.updateTimeout = 5 * time.Second
|
||||
|
||||
stop := make(chan struct{})
|
||||
@ -102,7 +107,7 @@ func TestIngressController(t *testing.T) {
|
||||
// TODO: Here we are creating the ingress with first cluster annotation.
|
||||
// Add another test without that annotation when
|
||||
// https://github.com/kubernetes/kubernetes/issues/36540 is fixed.
|
||||
ing1 := extensionsv1beta1.Ingress{
|
||||
fedIngress := extensionsv1beta1.Ingress{
|
||||
ObjectMeta: apiv1.ObjectMeta{
|
||||
Name: "test-ingress",
|
||||
Namespace: "mynamespace",
|
||||
@ -131,64 +136,109 @@ func TestIngressController(t *testing.T) {
|
||||
|
||||
// Test add federated ingress.
|
||||
t.Log("Adding Federated Ingress")
|
||||
fedIngressWatch.Add(&ing1)
|
||||
/*
|
||||
// TODO: Re-enable this when we have fixed these flaky tests: https://github.com/kubernetes/kubernetes/issues/36540.
|
||||
t.Logf("Checking that approproate finalizers are added")
|
||||
// There should be 2 updates to add both the finalizers.
|
||||
updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan)
|
||||
assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, deletionhelper.FinalizerDeleteFromUnderlyingClusters))
|
||||
updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan)
|
||||
assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, apiv1.FinalizerOrphan), fmt.Sprintf("ingress does not have the orphan finalizer: %v", updatedIngress))
|
||||
ing1 = *updatedIngress
|
||||
*/
|
||||
fedIngressWatch.Add(&fedIngress)
|
||||
|
||||
t.Logf("Checking that approproate finalizers are added")
|
||||
// There should be 2 updates to add both the finalizers.
|
||||
updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan)
|
||||
assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, deletionhelper.FinalizerDeleteFromUnderlyingClusters))
|
||||
updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan)
|
||||
assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, apiv1.FinalizerOrphan), fmt.Sprintf("ingress does not have the orphan finalizer: %v", updatedIngress))
|
||||
fedIngress = *updatedIngress
|
||||
|
||||
t.Log("Checking that Ingress was correctly created in cluster 1")
|
||||
createdIngress := GetIngressFromChan(t, cluster1IngressCreateChan)
|
||||
assert.NotNil(t, createdIngress)
|
||||
assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress.Spec), "Spec of created ingress is not equal")
|
||||
assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress.ObjectMeta), "Metadata of created object is not equivalent")
|
||||
cluster1Ingress := *createdIngress
|
||||
assert.True(t, reflect.DeepEqual(fedIngress.Spec, cluster1Ingress.Spec), "Spec of created ingress is not equal")
|
||||
assert.True(t, util.ObjectMetaEquivalent(fedIngress.ObjectMeta, cluster1Ingress.ObjectMeta),
|
||||
"Metadata of created object is not equivalent")
|
||||
|
||||
// Wait for finalizers to appear in federation store.
|
||||
// assert.NoError(t, WaitForFinalizersInFederationStore(ingressController, ingressController.ingressInformerStore,
|
||||
// types.NamespacedName{Namespace: ing1.Namespace, Name: ing1.Name}.String()), "finalizers not found in federated ingress")
|
||||
assert.NoError(t, WaitForFinalizersInFederationStore(ingressController, ingressController.ingressInformerStore,
|
||||
types.NamespacedName{Namespace: fedIngress.Namespace, Name: fedIngress.Name}.String()), "finalizers not found in federated ingress")
|
||||
|
||||
// Wait for the cluster ingress to appear in cluster store.
|
||||
assert.NoError(t, WaitForIngressInClusterStore(ingressController.ingressFederatedInformer.GetTargetStore(), cluster1.Name,
|
||||
types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String()),
|
||||
"Created ingress not found in underlying cluster store")
|
||||
|
||||
/*
|
||||
// TODO: Re-enable this when we have fixed these flaky tests: https://github.com/kubernetes/kubernetes/issues/36540.
|
||||
// Test that IP address gets transferred from cluster ingress to federated ingress.
|
||||
t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress")
|
||||
createdIngress.Status.LoadBalancer.Ingress = append(createdIngress.Status.LoadBalancer.Ingress, apiv1.LoadBalancerIngress{IP: "1.2.3.4"})
|
||||
cluster1IngressWatch.Modify(createdIngress)
|
||||
// Wait for store to see the updated cluster ingress.
|
||||
assert.NoError(t, WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(),
|
||||
cluster1.Name, types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(),
|
||||
createdIngress.Status.LoadBalancer, 4*wait.ForeverTestTimeout))
|
||||
updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan)
|
||||
assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress")
|
||||
if updatedIngress != nil {
|
||||
assert.True(t, reflect.DeepEqual(createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress), fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v", createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress))
|
||||
t.Logf("expected: %v, actual: %v", createdIngress, updatedIngress)
|
||||
}
|
||||
// Test that IP address gets transferred from cluster ingress to federated ingress.
|
||||
t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress")
|
||||
cluster1Ingress.Status.LoadBalancer.Ingress = append(cluster1Ingress.Status.LoadBalancer.Ingress,
|
||||
apiv1.LoadBalancerIngress{IP: "1.2.3.4"})
|
||||
glog.Infof("Setting artificial IP address for cluster1 ingress")
|
||||
|
||||
for trial := 0; trial < maxTrials; trial++ {
|
||||
cluster1IngressWatch.Modify(&cluster1Ingress)
|
||||
// Wait for store to see the updated cluster ingress.
|
||||
key := types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String()
|
||||
if err := WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(),
|
||||
cluster1.Name, key, cluster1Ingress.Status.LoadBalancer, time.Second); err != nil {
|
||||
continue
|
||||
}
|
||||
if err := WaitForFedStatusUpdate(t, ingressController.ingressInformerStore,
|
||||
key, cluster1Ingress.Status.LoadBalancer, time.Second); err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for trial := 0; trial < maxTrials; trial++ {
|
||||
updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan)
|
||||
assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress")
|
||||
if updatedIngress == nil {
|
||||
return
|
||||
}
|
||||
if reflect.DeepEqual(cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress) {
|
||||
fedIngress.Status.LoadBalancer = updatedIngress.Status.LoadBalancer
|
||||
break
|
||||
} else {
|
||||
glog.Infof("Status check failed: expected: %v actual: %v", cluster1Ingress.Status, updatedIngress.Status)
|
||||
}
|
||||
}
|
||||
glog.Infof("Status check: expected: %v actual: %v", cluster1Ingress.Status, updatedIngress.Status)
|
||||
assert.True(t, reflect.DeepEqual(cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress),
|
||||
fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v",
|
||||
cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress))
|
||||
|
||||
assert.NoError(t, WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(),
|
||||
cluster1.Name, types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(),
|
||||
cluster1Ingress.Status.LoadBalancer, time.Second))
|
||||
assert.NoError(t, WaitForFedStatusUpdate(t, ingressController.ingressInformerStore,
|
||||
types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(),
|
||||
cluster1Ingress.Status.LoadBalancer, time.Second))
|
||||
t.Logf("expected: %v, actual: %v", createdIngress, updatedIngress)
|
||||
|
||||
// Test update federated ingress.
|
||||
if fedIngress.ObjectMeta.Annotations == nil {
|
||||
fedIngress.ObjectMeta.Annotations = make(map[string]string)
|
||||
}
|
||||
fedIngress.ObjectMeta.Annotations["A"] = "B"
|
||||
t.Log("Modifying Federated Ingress")
|
||||
fedIngressWatch.Modify(&fedIngress)
|
||||
t.Log("Checking that Ingress was correctly updated in cluster 1")
|
||||
var updatedIngress2 *extensionsv1beta1.Ingress
|
||||
|
||||
for trial := 0; trial < maxTrials; trial++ {
|
||||
updatedIngress2 = GetIngressFromChan(t, cluster1IngressUpdateChan)
|
||||
assert.NotNil(t, updatedIngress2)
|
||||
if updatedIngress2 == nil {
|
||||
return
|
||||
}
|
||||
if reflect.DeepEqual(fedIngress.Spec, updatedIngress.Spec) &&
|
||||
updatedIngress2.ObjectMeta.Annotations["A"] == fedIngress.ObjectMeta.Annotations["A"] {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, fedIngress.Spec), "Spec of updated ingress is not equal")
|
||||
assert.Equal(t, updatedIngress2.ObjectMeta.Annotations["A"], fedIngress.ObjectMeta.Annotations["A"], "Updated annotation not transferred from federated to cluster ingress.")
|
||||
|
||||
// Test update federated ingress.
|
||||
if updatedIngress.ObjectMeta.Annotations == nil {
|
||||
updatedIngress.ObjectMeta.Annotations = make(map[string]string)
|
||||
}
|
||||
updatedIngress.ObjectMeta.Annotations["A"] = "B"
|
||||
t.Log("Modifying Federated Ingress")
|
||||
fedIngressWatch.Modify(updatedIngress)
|
||||
t.Log("Checking that Ingress was correctly updated in cluster 1")
|
||||
updatedIngress2 := GetIngressFromChan(t, cluster1IngressUpdateChan)
|
||||
assert.NotNil(t, updatedIngress2)
|
||||
assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, updatedIngress.Spec), "Spec of updated ingress is not equal")
|
||||
assert.Equal(t, updatedIngress2.ObjectMeta.Annotations["A"], updatedIngress.ObjectMeta.Annotations["A"], "Updated annotation not transferred from federated to cluster ingress.")
|
||||
*/
|
||||
// Test add cluster
|
||||
t.Log("Adding a second cluster")
|
||||
ing1.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first.
|
||||
fedIngressWatch.Modify(&ing1)
|
||||
|
||||
fedIngress.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first.
|
||||
fedIngressWatch.Modify(&fedIngress)
|
||||
clusterWatch.Add(cluster2)
|
||||
// First check that the original values are not equal - see above comment
|
||||
assert.NotEqual(t, cfg1.Data[uidKey], cfg2.Data[uidKey], fmt.Sprintf("ConfigMap in cluster 2 must initially not equal that in cluster 1 for this test - please fix test"))
|
||||
@ -196,8 +246,9 @@ func TestIngressController(t *testing.T) {
|
||||
t.Log("Checking that the ingress got created in cluster 2")
|
||||
createdIngress2 := GetIngressFromChan(t, cluster2IngressCreateChan)
|
||||
assert.NotNil(t, createdIngress2)
|
||||
assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress2.Spec), "Spec of created ingress is not equal")
|
||||
assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent")
|
||||
assert.True(t, reflect.DeepEqual(fedIngress.Spec, createdIngress2.Spec), "Spec of created ingress is not equal")
|
||||
t.Logf("created meta: %v fed meta: %v", createdIngress2.ObjectMeta, fedIngress.ObjectMeta)
|
||||
assert.True(t, util.ObjectMetaEquivalent(fedIngress.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent")
|
||||
|
||||
t.Log("Checking that the configmap in cluster 2 got updated.")
|
||||
updatedConfigMap2 := GetConfigMapFromChan(cluster2ConfigMapUpdateChan)
|
||||
@ -292,3 +343,17 @@ func WaitForStatusUpdate(t *testing.T, store util.FederatedReadOnlyStore, cluste
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// Wait for ingress status to be updated to match the desiredStatus.
|
||||
func WaitForFedStatusUpdate(t *testing.T, store cache.Store, key string, desiredStatus apiv1.LoadBalancerStatus, timeout time.Duration) error {
|
||||
retryInterval := 100 * time.Millisecond
|
||||
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
|
||||
obj, found, err := store.GetByKey(key)
|
||||
if !found || err != nil {
|
||||
return false, err
|
||||
}
|
||||
ingress := obj.(*extensionsv1beta1.Ingress)
|
||||
return reflect.DeepEqual(ingress.Status.LoadBalancer, desiredStatus), nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
@ -36,6 +36,10 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
pushTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// A structure that distributes eventes to multiple watchers.
|
||||
type WatcherDispatcher struct {
|
||||
sync.Mutex
|
||||
@ -58,6 +62,7 @@ func (wd *WatcherDispatcher) Stop() {
|
||||
wd.Lock()
|
||||
defer wd.Unlock()
|
||||
close(wd.stopChan)
|
||||
glog.Infof("Stopping WatcherDispatcher")
|
||||
for _, watcher := range wd.watchers {
|
||||
watcher.Stop()
|
||||
}
|
||||
@ -141,7 +146,7 @@ func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
|
||||
dispatcher := &WatcherDispatcher{
|
||||
watchers: make([]*watch.RaceFreeFakeWatcher, 0),
|
||||
eventsSoFar: make([]*watch.Event, 0),
|
||||
orderExecution: make(chan func()),
|
||||
orderExecution: make(chan func(), 100),
|
||||
stopChan: make(chan struct{}),
|
||||
}
|
||||
go func() {
|
||||
@ -199,13 +204,22 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
|
||||
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
|
||||
updateAction := action.(core.UpdateAction)
|
||||
originalObj := updateAction.GetObject()
|
||||
glog.V(7).Infof("Updating %s: %v", resource, updateAction.GetObject())
|
||||
|
||||
// Create a copy of the object here to prevent data races while reading the object in go routine.
|
||||
obj := copy(originalObj)
|
||||
watcher.orderExecution <- func() {
|
||||
operation := func() {
|
||||
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
|
||||
watcher.Modify(obj)
|
||||
objChan <- obj
|
||||
}
|
||||
select {
|
||||
case watcher.orderExecution <- operation:
|
||||
break
|
||||
case <-time.After(pushTimeout):
|
||||
glog.Errorf("Fake client execution channel blocked")
|
||||
glog.Errorf("Tried to push %v", updateAction)
|
||||
}
|
||||
return true, originalObj, nil
|
||||
})
|
||||
return objChan
|
||||
|
Loading…
Reference in New Issue
Block a user