mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Ingress test fixes
This commit is contained in:
parent
c7785fb114
commit
ac8184dfcc
@ -37,9 +37,14 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
maxTrials = 20
|
||||||
|
)
|
||||||
|
|
||||||
func TestIngressController(t *testing.T) {
|
func TestIngressController(t *testing.T) {
|
||||||
fakeClusterList := federationapi.ClusterList{Items: []federationapi.Cluster{}}
|
fakeClusterList := federationapi.ClusterList{Items: []federationapi.Cluster{}}
|
||||||
fakeConfigMapList1 := apiv1.ConfigMapList{Items: []apiv1.ConfigMap{}}
|
fakeConfigMapList1 := apiv1.ConfigMapList{Items: []apiv1.ConfigMap{}}
|
||||||
@ -56,7 +61,7 @@ func TestIngressController(t *testing.T) {
|
|||||||
fedIngressWatch := RegisterFakeWatch("ingresses", &fedClient.Fake)
|
fedIngressWatch := RegisterFakeWatch("ingresses", &fedClient.Fake)
|
||||||
clusterWatch := RegisterFakeWatch("clusters", &fedClient.Fake)
|
clusterWatch := RegisterFakeWatch("clusters", &fedClient.Fake)
|
||||||
fedClusterUpdateChan := RegisterFakeCopyOnUpdate("clusters", &fedClient.Fake, clusterWatch)
|
fedClusterUpdateChan := RegisterFakeCopyOnUpdate("clusters", &fedClient.Fake, clusterWatch)
|
||||||
//fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch)
|
fedIngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &fedClient.Fake, fedIngressWatch)
|
||||||
|
|
||||||
cluster1Client := &fakekubeclientset.Clientset{}
|
cluster1Client := &fakekubeclientset.Clientset{}
|
||||||
RegisterFakeList("ingresses", &cluster1Client.Fake, &extensionsv1beta1.IngressList{Items: []extensionsv1beta1.Ingress{}})
|
RegisterFakeList("ingresses", &cluster1Client.Fake, &extensionsv1beta1.IngressList{Items: []extensionsv1beta1.Ingress{}})
|
||||||
@ -64,7 +69,7 @@ func TestIngressController(t *testing.T) {
|
|||||||
cluster1IngressWatch := RegisterFakeWatch("ingresses", &cluster1Client.Fake)
|
cluster1IngressWatch := RegisterFakeWatch("ingresses", &cluster1Client.Fake)
|
||||||
cluster1ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster1Client.Fake)
|
cluster1ConfigMapWatch := RegisterFakeWatch("configmaps", &cluster1Client.Fake)
|
||||||
cluster1IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
|
cluster1IngressCreateChan := RegisterFakeCopyOnCreate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
|
||||||
// cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
|
cluster1IngressUpdateChan := RegisterFakeCopyOnUpdate("ingresses", &cluster1Client.Fake, cluster1IngressWatch)
|
||||||
|
|
||||||
cluster2Client := &fakekubeclientset.Clientset{}
|
cluster2Client := &fakekubeclientset.Clientset{}
|
||||||
RegisterFakeList("ingresses", &cluster2Client.Fake, &extensionsv1beta1.IngressList{Items: []extensionsv1beta1.Ingress{}})
|
RegisterFakeList("ingresses", &cluster2Client.Fake, &extensionsv1beta1.IngressList{Items: []extensionsv1beta1.Ingress{}})
|
||||||
@ -90,9 +95,9 @@ func TestIngressController(t *testing.T) {
|
|||||||
configMapInformer := ToFederatedInformerForTestOnly(ingressController.configMapFederatedInformer)
|
configMapInformer := ToFederatedInformerForTestOnly(ingressController.configMapFederatedInformer)
|
||||||
configMapInformer.SetClientFactory(clientFactoryFunc)
|
configMapInformer.SetClientFactory(clientFactoryFunc)
|
||||||
ingressController.clusterAvailableDelay = time.Second
|
ingressController.clusterAvailableDelay = time.Second
|
||||||
ingressController.ingressReviewDelay = 10 * time.Millisecond
|
ingressController.ingressReviewDelay = 100 * time.Millisecond
|
||||||
ingressController.configMapReviewDelay = 10 * time.Millisecond
|
ingressController.configMapReviewDelay = 100 * time.Millisecond
|
||||||
ingressController.smallDelay = 20 * time.Millisecond
|
ingressController.smallDelay = 100 * time.Millisecond
|
||||||
ingressController.updateTimeout = 5 * time.Second
|
ingressController.updateTimeout = 5 * time.Second
|
||||||
|
|
||||||
stop := make(chan struct{})
|
stop := make(chan struct{})
|
||||||
@ -102,7 +107,7 @@ func TestIngressController(t *testing.T) {
|
|||||||
// TODO: Here we are creating the ingress with first cluster annotation.
|
// TODO: Here we are creating the ingress with first cluster annotation.
|
||||||
// Add another test without that annotation when
|
// Add another test without that annotation when
|
||||||
// https://github.com/kubernetes/kubernetes/issues/36540 is fixed.
|
// https://github.com/kubernetes/kubernetes/issues/36540 is fixed.
|
||||||
ing1 := extensionsv1beta1.Ingress{
|
fedIngress := extensionsv1beta1.Ingress{
|
||||||
ObjectMeta: apiv1.ObjectMeta{
|
ObjectMeta: apiv1.ObjectMeta{
|
||||||
Name: "test-ingress",
|
Name: "test-ingress",
|
||||||
Namespace: "mynamespace",
|
Namespace: "mynamespace",
|
||||||
@ -131,64 +136,109 @@ func TestIngressController(t *testing.T) {
|
|||||||
|
|
||||||
// Test add federated ingress.
|
// Test add federated ingress.
|
||||||
t.Log("Adding Federated Ingress")
|
t.Log("Adding Federated Ingress")
|
||||||
fedIngressWatch.Add(&ing1)
|
fedIngressWatch.Add(&fedIngress)
|
||||||
/*
|
|
||||||
// TODO: Re-enable this when we have fixed these flaky tests: https://github.com/kubernetes/kubernetes/issues/36540.
|
t.Logf("Checking that approproate finalizers are added")
|
||||||
t.Logf("Checking that approproate finalizers are added")
|
// There should be 2 updates to add both the finalizers.
|
||||||
// There should be 2 updates to add both the finalizers.
|
updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan)
|
||||||
updatedIngress := GetIngressFromChan(t, fedIngressUpdateChan)
|
assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, deletionhelper.FinalizerDeleteFromUnderlyingClusters))
|
||||||
assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, deletionhelper.FinalizerDeleteFromUnderlyingClusters))
|
updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan)
|
||||||
updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan)
|
assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, apiv1.FinalizerOrphan), fmt.Sprintf("ingress does not have the orphan finalizer: %v", updatedIngress))
|
||||||
assert.True(t, ingressController.hasFinalizerFunc(updatedIngress, apiv1.FinalizerOrphan), fmt.Sprintf("ingress does not have the orphan finalizer: %v", updatedIngress))
|
fedIngress = *updatedIngress
|
||||||
ing1 = *updatedIngress
|
|
||||||
*/
|
|
||||||
t.Log("Checking that Ingress was correctly created in cluster 1")
|
t.Log("Checking that Ingress was correctly created in cluster 1")
|
||||||
createdIngress := GetIngressFromChan(t, cluster1IngressCreateChan)
|
createdIngress := GetIngressFromChan(t, cluster1IngressCreateChan)
|
||||||
assert.NotNil(t, createdIngress)
|
assert.NotNil(t, createdIngress)
|
||||||
assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress.Spec), "Spec of created ingress is not equal")
|
cluster1Ingress := *createdIngress
|
||||||
assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress.ObjectMeta), "Metadata of created object is not equivalent")
|
assert.True(t, reflect.DeepEqual(fedIngress.Spec, cluster1Ingress.Spec), "Spec of created ingress is not equal")
|
||||||
|
assert.True(t, util.ObjectMetaEquivalent(fedIngress.ObjectMeta, cluster1Ingress.ObjectMeta),
|
||||||
|
"Metadata of created object is not equivalent")
|
||||||
|
|
||||||
// Wait for finalizers to appear in federation store.
|
// Wait for finalizers to appear in federation store.
|
||||||
// assert.NoError(t, WaitForFinalizersInFederationStore(ingressController, ingressController.ingressInformerStore,
|
assert.NoError(t, WaitForFinalizersInFederationStore(ingressController, ingressController.ingressInformerStore,
|
||||||
// types.NamespacedName{Namespace: ing1.Namespace, Name: ing1.Name}.String()), "finalizers not found in federated ingress")
|
types.NamespacedName{Namespace: fedIngress.Namespace, Name: fedIngress.Name}.String()), "finalizers not found in federated ingress")
|
||||||
|
|
||||||
// Wait for the cluster ingress to appear in cluster store.
|
// Wait for the cluster ingress to appear in cluster store.
|
||||||
assert.NoError(t, WaitForIngressInClusterStore(ingressController.ingressFederatedInformer.GetTargetStore(), cluster1.Name,
|
assert.NoError(t, WaitForIngressInClusterStore(ingressController.ingressFederatedInformer.GetTargetStore(), cluster1.Name,
|
||||||
types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String()),
|
types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String()),
|
||||||
"Created ingress not found in underlying cluster store")
|
"Created ingress not found in underlying cluster store")
|
||||||
|
|
||||||
/*
|
// Test that IP address gets transferred from cluster ingress to federated ingress.
|
||||||
// TODO: Re-enable this when we have fixed these flaky tests: https://github.com/kubernetes/kubernetes/issues/36540.
|
t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress")
|
||||||
// Test that IP address gets transferred from cluster ingress to federated ingress.
|
cluster1Ingress.Status.LoadBalancer.Ingress = append(cluster1Ingress.Status.LoadBalancer.Ingress,
|
||||||
t.Log("Checking that IP address gets transferred from cluster ingress to federated ingress")
|
apiv1.LoadBalancerIngress{IP: "1.2.3.4"})
|
||||||
createdIngress.Status.LoadBalancer.Ingress = append(createdIngress.Status.LoadBalancer.Ingress, apiv1.LoadBalancerIngress{IP: "1.2.3.4"})
|
glog.Infof("Setting artificial IP address for cluster1 ingress")
|
||||||
cluster1IngressWatch.Modify(createdIngress)
|
|
||||||
// Wait for store to see the updated cluster ingress.
|
for trial := 0; trial < maxTrials; trial++ {
|
||||||
assert.NoError(t, WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(),
|
cluster1IngressWatch.Modify(&cluster1Ingress)
|
||||||
cluster1.Name, types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(),
|
// Wait for store to see the updated cluster ingress.
|
||||||
createdIngress.Status.LoadBalancer, 4*wait.ForeverTestTimeout))
|
key := types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String()
|
||||||
updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan)
|
if err := WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(),
|
||||||
assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress")
|
cluster1.Name, key, cluster1Ingress.Status.LoadBalancer, time.Second); err != nil {
|
||||||
if updatedIngress != nil {
|
continue
|
||||||
assert.True(t, reflect.DeepEqual(createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress), fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v", createdIngress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress))
|
}
|
||||||
t.Logf("expected: %v, actual: %v", createdIngress, updatedIngress)
|
if err := WaitForFedStatusUpdate(t, ingressController.ingressInformerStore,
|
||||||
}
|
key, cluster1Ingress.Status.LoadBalancer, time.Second); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for trial := 0; trial < maxTrials; trial++ {
|
||||||
|
updatedIngress = GetIngressFromChan(t, fedIngressUpdateChan)
|
||||||
|
assert.NotNil(t, updatedIngress, "Cluster's ingress load balancer status was not correctly transferred to the federated ingress")
|
||||||
|
if updatedIngress == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if reflect.DeepEqual(cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress) {
|
||||||
|
fedIngress.Status.LoadBalancer = updatedIngress.Status.LoadBalancer
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
glog.Infof("Status check failed: expected: %v actual: %v", cluster1Ingress.Status, updatedIngress.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.Infof("Status check: expected: %v actual: %v", cluster1Ingress.Status, updatedIngress.Status)
|
||||||
|
assert.True(t, reflect.DeepEqual(cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress),
|
||||||
|
fmt.Sprintf("Ingress IP was not transferred from cluster ingress to federated ingress. %v is not equal to %v",
|
||||||
|
cluster1Ingress.Status.LoadBalancer.Ingress, updatedIngress.Status.LoadBalancer.Ingress))
|
||||||
|
|
||||||
|
assert.NoError(t, WaitForStatusUpdate(t, ingressController.ingressFederatedInformer.GetTargetStore(),
|
||||||
|
cluster1.Name, types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(),
|
||||||
|
cluster1Ingress.Status.LoadBalancer, time.Second))
|
||||||
|
assert.NoError(t, WaitForFedStatusUpdate(t, ingressController.ingressInformerStore,
|
||||||
|
types.NamespacedName{Namespace: createdIngress.Namespace, Name: createdIngress.Name}.String(),
|
||||||
|
cluster1Ingress.Status.LoadBalancer, time.Second))
|
||||||
|
t.Logf("expected: %v, actual: %v", createdIngress, updatedIngress)
|
||||||
|
|
||||||
|
// Test update federated ingress.
|
||||||
|
if fedIngress.ObjectMeta.Annotations == nil {
|
||||||
|
fedIngress.ObjectMeta.Annotations = make(map[string]string)
|
||||||
|
}
|
||||||
|
fedIngress.ObjectMeta.Annotations["A"] = "B"
|
||||||
|
t.Log("Modifying Federated Ingress")
|
||||||
|
fedIngressWatch.Modify(&fedIngress)
|
||||||
|
t.Log("Checking that Ingress was correctly updated in cluster 1")
|
||||||
|
var updatedIngress2 *extensionsv1beta1.Ingress
|
||||||
|
|
||||||
|
for trial := 0; trial < maxTrials; trial++ {
|
||||||
|
updatedIngress2 = GetIngressFromChan(t, cluster1IngressUpdateChan)
|
||||||
|
assert.NotNil(t, updatedIngress2)
|
||||||
|
if updatedIngress2 == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if reflect.DeepEqual(fedIngress.Spec, updatedIngress.Spec) &&
|
||||||
|
updatedIngress2.ObjectMeta.Annotations["A"] == fedIngress.ObjectMeta.Annotations["A"] {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, fedIngress.Spec), "Spec of updated ingress is not equal")
|
||||||
|
assert.Equal(t, updatedIngress2.ObjectMeta.Annotations["A"], fedIngress.ObjectMeta.Annotations["A"], "Updated annotation not transferred from federated to cluster ingress.")
|
||||||
|
|
||||||
// Test update federated ingress.
|
|
||||||
if updatedIngress.ObjectMeta.Annotations == nil {
|
|
||||||
updatedIngress.ObjectMeta.Annotations = make(map[string]string)
|
|
||||||
}
|
|
||||||
updatedIngress.ObjectMeta.Annotations["A"] = "B"
|
|
||||||
t.Log("Modifying Federated Ingress")
|
|
||||||
fedIngressWatch.Modify(updatedIngress)
|
|
||||||
t.Log("Checking that Ingress was correctly updated in cluster 1")
|
|
||||||
updatedIngress2 := GetIngressFromChan(t, cluster1IngressUpdateChan)
|
|
||||||
assert.NotNil(t, updatedIngress2)
|
|
||||||
assert.True(t, reflect.DeepEqual(updatedIngress2.Spec, updatedIngress.Spec), "Spec of updated ingress is not equal")
|
|
||||||
assert.Equal(t, updatedIngress2.ObjectMeta.Annotations["A"], updatedIngress.ObjectMeta.Annotations["A"], "Updated annotation not transferred from federated to cluster ingress.")
|
|
||||||
*/
|
|
||||||
// Test add cluster
|
// Test add cluster
|
||||||
t.Log("Adding a second cluster")
|
t.Log("Adding a second cluster")
|
||||||
ing1.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first.
|
|
||||||
fedIngressWatch.Modify(&ing1)
|
fedIngress.Annotations[staticIPNameKeyWritable] = "foo" // Make sure that the base object has a static IP name first.
|
||||||
|
fedIngressWatch.Modify(&fedIngress)
|
||||||
clusterWatch.Add(cluster2)
|
clusterWatch.Add(cluster2)
|
||||||
// First check that the original values are not equal - see above comment
|
// First check that the original values are not equal - see above comment
|
||||||
assert.NotEqual(t, cfg1.Data[uidKey], cfg2.Data[uidKey], fmt.Sprintf("ConfigMap in cluster 2 must initially not equal that in cluster 1 for this test - please fix test"))
|
assert.NotEqual(t, cfg1.Data[uidKey], cfg2.Data[uidKey], fmt.Sprintf("ConfigMap in cluster 2 must initially not equal that in cluster 1 for this test - please fix test"))
|
||||||
@ -196,8 +246,9 @@ func TestIngressController(t *testing.T) {
|
|||||||
t.Log("Checking that the ingress got created in cluster 2")
|
t.Log("Checking that the ingress got created in cluster 2")
|
||||||
createdIngress2 := GetIngressFromChan(t, cluster2IngressCreateChan)
|
createdIngress2 := GetIngressFromChan(t, cluster2IngressCreateChan)
|
||||||
assert.NotNil(t, createdIngress2)
|
assert.NotNil(t, createdIngress2)
|
||||||
assert.True(t, reflect.DeepEqual(ing1.Spec, createdIngress2.Spec), "Spec of created ingress is not equal")
|
assert.True(t, reflect.DeepEqual(fedIngress.Spec, createdIngress2.Spec), "Spec of created ingress is not equal")
|
||||||
assert.True(t, util.ObjectMetaEquivalent(ing1.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent")
|
t.Logf("created meta: %v fed meta: %v", createdIngress2.ObjectMeta, fedIngress.ObjectMeta)
|
||||||
|
assert.True(t, util.ObjectMetaEquivalent(fedIngress.ObjectMeta, createdIngress2.ObjectMeta), "Metadata of created object is not equivalent")
|
||||||
|
|
||||||
t.Log("Checking that the configmap in cluster 2 got updated.")
|
t.Log("Checking that the configmap in cluster 2 got updated.")
|
||||||
updatedConfigMap2 := GetConfigMapFromChan(cluster2ConfigMapUpdateChan)
|
updatedConfigMap2 := GetConfigMapFromChan(cluster2ConfigMapUpdateChan)
|
||||||
@ -292,3 +343,17 @@ func WaitForStatusUpdate(t *testing.T, store util.FederatedReadOnlyStore, cluste
|
|||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for ingress status to be updated to match the desiredStatus.
|
||||||
|
func WaitForFedStatusUpdate(t *testing.T, store cache.Store, key string, desiredStatus apiv1.LoadBalancerStatus, timeout time.Duration) error {
|
||||||
|
retryInterval := 100 * time.Millisecond
|
||||||
|
err := wait.PollImmediate(retryInterval, timeout, func() (bool, error) {
|
||||||
|
obj, found, err := store.GetByKey(key)
|
||||||
|
if !found || err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
ingress := obj.(*extensionsv1beta1.Ingress)
|
||||||
|
return reflect.DeepEqual(ingress.Status.LoadBalancer, desiredStatus), nil
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
@ -36,6 +36,10 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
pushTimeout = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// A structure that distributes eventes to multiple watchers.
|
// A structure that distributes eventes to multiple watchers.
|
||||||
type WatcherDispatcher struct {
|
type WatcherDispatcher struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
@ -58,6 +62,7 @@ func (wd *WatcherDispatcher) Stop() {
|
|||||||
wd.Lock()
|
wd.Lock()
|
||||||
defer wd.Unlock()
|
defer wd.Unlock()
|
||||||
close(wd.stopChan)
|
close(wd.stopChan)
|
||||||
|
glog.Infof("Stopping WatcherDispatcher")
|
||||||
for _, watcher := range wd.watchers {
|
for _, watcher := range wd.watchers {
|
||||||
watcher.Stop()
|
watcher.Stop()
|
||||||
}
|
}
|
||||||
@ -141,7 +146,7 @@ func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
|
|||||||
dispatcher := &WatcherDispatcher{
|
dispatcher := &WatcherDispatcher{
|
||||||
watchers: make([]*watch.RaceFreeFakeWatcher, 0),
|
watchers: make([]*watch.RaceFreeFakeWatcher, 0),
|
||||||
eventsSoFar: make([]*watch.Event, 0),
|
eventsSoFar: make([]*watch.Event, 0),
|
||||||
orderExecution: make(chan func()),
|
orderExecution: make(chan func(), 100),
|
||||||
stopChan: make(chan struct{}),
|
stopChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
@ -199,13 +204,22 @@ func RegisterFakeCopyOnUpdate(resource string, client *core.Fake, watcher *Watch
|
|||||||
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
|
client.AddReactor("update", resource, func(action core.Action) (bool, runtime.Object, error) {
|
||||||
updateAction := action.(core.UpdateAction)
|
updateAction := action.(core.UpdateAction)
|
||||||
originalObj := updateAction.GetObject()
|
originalObj := updateAction.GetObject()
|
||||||
|
glog.V(7).Infof("Updating %s: %v", resource, updateAction.GetObject())
|
||||||
|
|
||||||
// Create a copy of the object here to prevent data races while reading the object in go routine.
|
// Create a copy of the object here to prevent data races while reading the object in go routine.
|
||||||
obj := copy(originalObj)
|
obj := copy(originalObj)
|
||||||
watcher.orderExecution <- func() {
|
operation := func() {
|
||||||
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
|
glog.V(4).Infof("Object updated. Writing to channel: %v", obj)
|
||||||
watcher.Modify(obj)
|
watcher.Modify(obj)
|
||||||
objChan <- obj
|
objChan <- obj
|
||||||
}
|
}
|
||||||
|
select {
|
||||||
|
case watcher.orderExecution <- operation:
|
||||||
|
break
|
||||||
|
case <-time.After(pushTimeout):
|
||||||
|
glog.Errorf("Fake client execution channel blocked")
|
||||||
|
glog.Errorf("Tried to push %v", updateAction)
|
||||||
|
}
|
||||||
return true, originalObj, nil
|
return true, originalObj, nil
|
||||||
})
|
})
|
||||||
return objChan
|
return objChan
|
||||||
|
Loading…
Reference in New Issue
Block a user