Update to include watch tooling

This commit is contained in:
Caleb Woodbine 2020-05-28 08:56:38 +12:00
parent d7d9ebf482
commit 209c05546a

View File

@ -32,9 +32,10 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
watch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
watchtools "k8s.io/client-go/tools/watch"
watch "k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@ -104,6 +105,14 @@ var _ = SIGDescribe("ReplicationController", func() {
testRcInitialReplicaCount := int32(1) testRcInitialReplicaCount := int32(1)
testRcMaxReplicaCount := int32(2) testRcMaxReplicaCount := int32(2)
rcResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "replicationcontrollers"} rcResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "replicationcontrollers"}
expectedWatchEvents := []watch.Event{
{Type: watch.Added},
{Type: watch.Modified},
{Type: watch.Modified},
{Type: watch.Modified},
{Type: watch.Modified},
{Type: watch.Deleted},
}
rcTest := v1.ReplicationController{ rcTest := v1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -128,199 +137,247 @@ var _ = SIGDescribe("ReplicationController", func() {
}, },
} }
ginkgo.By("creating a ReplicationController") framework.WatchEventSequenceVerifier(context.TODO(), dc, rcResource, testRcNamespace, testRcName, metav1.ListOptions{LabelSelector: "test-rc-static=true"}, expectedWatchEvents, func(retryWatcher *watchtools.RetryWatcher) (actualWatchEvents []watch.Event) {
// Create a ReplicationController ginkgo.By("creating a ReplicationController")
_, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Create(context.TODO(), &rcTest, metav1.CreateOptions{}) // Create a ReplicationController
framework.ExpectNoError(err, "Failed to create ReplicationController") _, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Create(context.TODO(), &rcTest, metav1.CreateOptions{})
framework.ExpectNoError(err, "Failed to create ReplicationController")
// setup a watch for the RC ginkgo.By("waiting for RC to be added")
rcWatchTimeoutSeconds := int64(180) eventFound := false
rcWatch, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: "test-rc-static=true", TimeoutSeconds: &rcWatchTimeoutSeconds}) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
framework.ExpectNoError(err, "Failed to setup watch on newly created ReplicationController") defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
rcWatchChan := rcWatch.ResultChan() if watchEvent.Type != watch.Added {
return false, nil
ginkgo.By("waiting for RC to be added") }
eventFound := false actualWatchEvents = append(actualWatchEvents, watchEvent)
for watchEvent := range rcWatchChan {
if watchEvent.Type == watch.Added {
eventFound = true eventFound = true
break return true, nil
} })
} framework.ExpectNoError(err, "Wait until condition with watch events should not return an error")
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added) framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
ginkgo.By("waiting for available Replicas") ginkgo.By("waiting for available Replicas")
eventFound = false eventFound = false
for watchEvent := range rcWatchChan { ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
rc, ok := watchEvent.Object.(*v1.ReplicationController) defer cancel()
framework.ExpectEqual(ok, true, "Unable to convert type of ReplicationController watch watchEvent") _, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if rc.Status.Replicas == testRcInitialReplicaCount && rc.Status.ReadyReplicas == testRcInitialReplicaCount { var rc *v1.ReplicationController
rcBytes, err := json.Marshal(watchEvent.Object)
if err != nil {
return false, err
}
err = json.Unmarshal(rcBytes, &rc)
if err != nil {
return false, err
}
if rc.Status.Replicas != testRcInitialReplicaCount || rc.Status.ReadyReplicas != testRcInitialReplicaCount {
return false, nil
}
eventFound = true eventFound = true
break return true, nil
} })
} framework.ExpectNoError(err, "Wait for condition with watch events should not return an error")
framework.ExpectEqual(eventFound, true, "failed to find event with initial replica count") framework.ExpectEqual(eventFound, true, "RC has not reached ReadyReplicas count of %v", testRcInitialReplicaCount)
rcLabelPatchPayload, err := json.Marshal(v1.ReplicationController{ rcLabelPatchPayload, err := json.Marshal(v1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"test-rc": "patched"}, Labels: map[string]string{"test-rc": "patched"},
}, },
})
framework.ExpectNoError(err, "failed to marshal json of replicationcontroller label patch")
// Patch the ReplicationController
ginkgo.By("patching ReplicationController")
testRcPatched, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Patch(context.TODO(), testRcName, types.StrategicMergePatchType, []byte(rcLabelPatchPayload), metav1.PatchOptions{})
framework.ExpectNoError(err, "Failed to patch ReplicationController")
framework.ExpectEqual(testRcPatched.ObjectMeta.Labels["test-rc"], "patched", "failed to patch RC")
ginkgo.By("waiting for RC to be modified")
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Modified {
return false, nil
}
actualWatchEvents = append(actualWatchEvents, watchEvent)
eventFound = true
return true, nil
})
framework.ExpectNoError(err, "Wait until condition with watch events should not return an error")
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
rcStatusPatchPayload, err := json.Marshal(map[string]interface{}{
"status": map[string]interface{}{
"readyReplicas": 0,
"availableReplicas": 0,
},
})
framework.ExpectNoError(err, "Failed to marshal JSON of ReplicationController label patch")
// Patch the ReplicationController's status
ginkgo.By("patching ReplicationController status")
rcStatus, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Patch(context.TODO(), testRcName, types.StrategicMergePatchType, []byte(rcStatusPatchPayload), metav1.PatchOptions{}, "status")
framework.ExpectNoError(err, "Failed to patch ReplicationControllerStatus")
framework.ExpectEqual(rcStatus.Status.ReadyReplicas, int32(0), "ReplicationControllerStatus's readyReplicas does not equal 0")
ginkgo.By("waiting for RC to be modified")
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Modified {
return false, nil
}
actualWatchEvents = append(actualWatchEvents, watchEvent)
eventFound = true
return true, nil
})
framework.ExpectNoError(err, "Wait until condition with watch events should not return an error")
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
ginkgo.By("waiting for available Replicas")
_, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
var rc *v1.ReplicationController
rcBytes, err := json.Marshal(watchEvent.Object)
if err != nil {
return false, err
}
err = json.Unmarshal(rcBytes, &rc)
if err != nil {
return false, err
}
if rc.Status.Replicas != testRcInitialReplicaCount {
return false, nil
}
return true, nil
})
framework.ExpectEqual(eventFound, true, "Failed to find updated ready replica count")
ginkgo.By("fetching ReplicationController status")
rcStatusUnstructured, err := dc.Resource(rcResource).Namespace(testRcNamespace).Get(context.TODO(), testRcName, metav1.GetOptions{}, "status")
framework.ExpectNoError(err, "Failed to fetch ReplicationControllerStatus")
rcStatusUjson, err := json.Marshal(rcStatusUnstructured)
framework.ExpectNoError(err, "Failed to marshal json of replicationcontroller label patch")
json.Unmarshal(rcStatusUjson, &rcStatus)
framework.ExpectEqual(rcStatus.Status.Replicas, testRcInitialReplicaCount, "ReplicationController ReplicaSet cound does not match initial Replica count")
rcScalePatchPayload, err := json.Marshal(autoscalingv1.Scale{
Spec: autoscalingv1.ScaleSpec{
Replicas: testRcMaxReplicaCount,
},
})
framework.ExpectNoError(err, "Failed to marshal json of replicationcontroller label patch")
// Patch the ReplicationController's scale
ginkgo.By("patching ReplicationController scale")
_, err = f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Patch(context.TODO(), testRcName, types.StrategicMergePatchType, []byte(rcScalePatchPayload), metav1.PatchOptions{}, "scale")
framework.ExpectNoError(err, "Failed to patch ReplicationControllerScale")
ginkgo.By("waiting for RC to be modified")
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Modified {
return false, nil
}
actualWatchEvents = append(actualWatchEvents, watchEvent)
eventFound = true
return true, nil
})
framework.ExpectNoError(err, "Wait until condition with watch events should not return an error")
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
ginkgo.By("waiting for ReplicationController's scale to be the max amount")
eventFound = false
_, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
var rc *v1.ReplicationController
rcBytes, err := json.Marshal(watchEvent.Object)
if err != nil {
return false, err
}
err = json.Unmarshal(rcBytes, &rc)
if err != nil {
return false, err
}
if rc.ObjectMeta.Name != testRcName || rc.ObjectMeta.Namespace != testRcNamespace || rc.Status.Replicas != testRcMaxReplicaCount || rc.Status.ReadyReplicas != testRcMaxReplicaCount {
return false, nil
}
eventFound = true
return true, nil
})
framework.ExpectNoError(err, "Wait until condition with watch events should not return an error")
framework.ExpectEqual(eventFound, true, "Failed to find updated ready replica count")
// Get the ReplicationController
ginkgo.By("fetching ReplicationController; ensuring that it's patched")
rc, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Get(context.TODO(), testRcName, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to fetch ReplicationController")
framework.ExpectEqual(rc.ObjectMeta.Labels["test-rc"], "patched", "ReplicationController is missing a label from earlier patch")
rcStatusUpdatePayload := rc
rcStatusUpdatePayload.Status.AvailableReplicas = 1
rcStatusUpdatePayload.Status.ReadyReplicas = 1
// Replace the ReplicationController's status
ginkgo.By("updating ReplicationController status")
_, err = f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).UpdateStatus(context.TODO(), rcStatusUpdatePayload, metav1.UpdateOptions{})
framework.ExpectNoError(err, "failed to update ReplicationControllerStatus")
ginkgo.By("waiting for RC to be modified")
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Modified {
return false, nil
}
actualWatchEvents = append(actualWatchEvents, watchEvent)
eventFound = true
return true, nil
})
framework.ExpectNoError(err, "Wait until condition with watch events should not return an error")
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
ginkgo.By("listing all ReplicationControllers")
rcs, err := f.ClientSet.CoreV1().ReplicationControllers("").List(context.TODO(), metav1.ListOptions{LabelSelector: "test-rc-static=true"})
framework.ExpectNoError(err, "failed to list ReplicationController")
framework.ExpectEqual(len(rcs.Items) > 0, true)
ginkgo.By("checking that ReplicationController has expected values")
foundRc := false
for _, rcItem := range rcs.Items {
if rcItem.ObjectMeta.Name == testRcName &&
rcItem.ObjectMeta.Namespace == testRcNamespace &&
rcItem.ObjectMeta.Labels["test-rc-static"] == "true" &&
rcItem.ObjectMeta.Labels["test-rc"] == "patched" {
foundRc = true
}
}
framework.ExpectEqual(foundRc, true)
// Delete ReplicationController
ginkgo.By("deleting ReplicationControllers by collection")
err = f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "test-rc-static=true"})
framework.ExpectNoError(err, "Failed to delete ReplicationControllers")
ginkgo.By("waiting for ReplicationController to have a DELETED watchEvent")
eventFound = false
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
if watchEvent.Type != watch.Deleted {
return false, nil
}
actualWatchEvents = append(actualWatchEvents, watchEvent)
eventFound = true
return true, nil
})
framework.ExpectNoError(err, "Wait until condition with watch events should not return an error")
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
return actualWatchEvents
}) })
framework.ExpectNoError(err, "failed to marshal json of replicationcontroller label patch")
// Patch the ReplicationController
ginkgo.By("patching ReplicationController")
testRcPatched, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Patch(context.TODO(), testRcName, types.StrategicMergePatchType, []byte(rcLabelPatchPayload), metav1.PatchOptions{})
framework.ExpectNoError(err, "Failed to patch ReplicationController")
framework.ExpectEqual(testRcPatched.ObjectMeta.Labels["test-rc"], "patched", "failed to patch RC")
ginkgo.By("waiting for RC to be modified")
eventFound = false
for watchEvent := range rcWatchChan {
if watchEvent.Type == watch.Modified {
eventFound = true
break
}
}
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Modified)
rcStatusPatchPayload, err := json.Marshal(map[string]interface{}{
"status": map[string]interface{}{
"readyReplicas": 0,
"availableReplicas": 0,
},
})
framework.ExpectNoError(err, "Failed to marshal JSON of ReplicationController label patch")
// Patch the ReplicationController's status
ginkgo.By("patching ReplicationController status")
rcStatus, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Patch(context.TODO(), testRcName, types.StrategicMergePatchType, []byte(rcStatusPatchPayload), metav1.PatchOptions{}, "status")
framework.ExpectNoError(err, "Failed to patch ReplicationControllerStatus")
framework.ExpectEqual(rcStatus.Status.ReadyReplicas, int32(0), "ReplicationControllerStatus's readyReplicas does not equal 0")
ginkgo.By("waiting for RC to be modified")
eventFound = false
for watchEvent := range rcWatchChan {
if watchEvent.Type == watch.Modified {
eventFound = true
break
}
}
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Modified)
eventFound = false
for watchEvent := range rcWatchChan {
rc, ok := watchEvent.Object.(*v1.ReplicationController)
framework.ExpectEqual(ok, true, "Unable to convert type of ReplicationController watch watchEvent")
if rc.Status.Replicas == testRcInitialReplicaCount {
eventFound = true
break
}
}
framework.ExpectEqual(eventFound, true, "RC does not have initial replica count")
ginkgo.By("fetching ReplicationController status")
rcStatusUnstructured, err := dc.Resource(rcResource).Namespace(testRcNamespace).Get(context.TODO(), testRcName, metav1.GetOptions{}, "status")
framework.ExpectNoError(err, "Failed to fetch ReplicationControllerStatus")
rcStatusUjson, err := json.Marshal(rcStatusUnstructured)
framework.ExpectNoError(err, "Failed to marshal json of replicationcontroller label patch")
json.Unmarshal(rcStatusUjson, &rcStatus)
framework.ExpectEqual(rcStatus.Status.Replicas, testRcInitialReplicaCount, "ReplicationController ReplicaSet cound does not match initial Replica count")
rcScalePatchPayload, err := json.Marshal(autoscalingv1.Scale{
Spec: autoscalingv1.ScaleSpec{
Replicas: testRcMaxReplicaCount,
},
})
framework.ExpectNoError(err, "Failed to marshal json of replicationcontroller label patch")
// Patch the ReplicationController's scale
ginkgo.By("patching ReplicationController scale")
_, err = f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Patch(context.TODO(), testRcName, types.StrategicMergePatchType, []byte(rcScalePatchPayload), metav1.PatchOptions{}, "scale")
framework.ExpectNoError(err, "Failed to patch ReplicationControllerScale")
ginkgo.By("waiting for RC to be modified")
eventFound = false
for watchEvent := range rcWatchChan {
if watchEvent.Type == watch.Modified {
eventFound = true
break
}
}
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Modified)
var rcFromWatch *v1.ReplicationController
ginkgo.By("waiting for ReplicationController's scale to be the max amount")
foundRcWithMaxScale := false
for watchEvent := range rcWatchChan {
rc, ok := watchEvent.Object.(*v1.ReplicationController)
framework.ExpectEqual(ok, true, "Unable to convert type of ReplicationController watch watchEvent")
if rc.ObjectMeta.Name == testRcName && rc.ObjectMeta.Namespace == testRcNamespace && rc.Status.Replicas == testRcMaxReplicaCount && rc.Status.ReadyReplicas == testRcMaxReplicaCount {
foundRcWithMaxScale = true
rcFromWatch = rc
break
}
}
framework.ExpectEqual(foundRcWithMaxScale, true, "failed to locate a ReplicationController with max scale")
framework.ExpectEqual(rcFromWatch.Status.Replicas, testRcMaxReplicaCount, "ReplicationController ReplicasSet Scale does not match the expected scale")
// Get the ReplicationController
ginkgo.By("fetching ReplicationController; ensuring that it's patched")
rc, err := f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).Get(context.TODO(), testRcName, metav1.GetOptions{})
framework.ExpectNoError(err, "failed to fetch ReplicationController")
framework.ExpectEqual(rc.ObjectMeta.Labels["test-rc"], "patched", "ReplicationController is missing a label from earlier patch")
rcStatusUpdatePayload := rc
rcStatusUpdatePayload.Status.AvailableReplicas = 1
rcStatusUpdatePayload.Status.ReadyReplicas = 1
// Replace the ReplicationController's status
ginkgo.By("updating ReplicationController status")
rcStatus, err = f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).UpdateStatus(context.TODO(), rcStatusUpdatePayload, metav1.UpdateOptions{})
framework.ExpectNoError(err, "failed to update ReplicationControllerStatus")
framework.ExpectEqual(rcStatus.Status.ReadyReplicas, int32(1), "ReplicationControllerStatus readyReplicas does not equal 1")
ginkgo.By("waiting for RC to be modified")
eventFound = false
for watchEvent := range rcWatchChan {
if watchEvent.Type == watch.Modified {
eventFound = true
break
}
}
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Modified)
ginkgo.By("listing all ReplicationControllers")
rcs, err := f.ClientSet.CoreV1().ReplicationControllers("").List(context.TODO(), metav1.ListOptions{LabelSelector: "test-rc-static=true"})
framework.ExpectNoError(err, "failed to list ReplicationController")
framework.ExpectEqual(len(rcs.Items) > 0, true)
ginkgo.By("checking that ReplicationController has expected values")
foundRc := false
for _, rcItem := range rcs.Items {
if rcItem.ObjectMeta.Name == testRcName &&
rcItem.ObjectMeta.Namespace == testRcNamespace &&
rcItem.ObjectMeta.Labels["test-rc-static"] == "true" &&
rcItem.ObjectMeta.Labels["test-rc"] == "patched" {
foundRc = true
}
}
framework.ExpectEqual(foundRc, true)
// Delete ReplicationController
ginkgo.By("deleting ReplicationControllers by collection")
err = f.ClientSet.CoreV1().ReplicationControllers(testRcNamespace).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "test-rc-static=true"})
framework.ExpectNoError(err, "Failed to delete ReplicationControllers")
ginkgo.By("waiting for ReplicationController to have a DELETED watchEvent")
eventFound = false
for watchEvent := range rcWatchChan {
if watchEvent.Type == watch.Deleted {
eventFound = true
break
}
}
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Deleted)
}) })
}) })