Add watchtools.Until for fetching watch events; Remove plain watch

This commit is contained in:
Caleb Woodbine 2020-07-22 09:29:09 +12:00
parent 9c2330db4c
commit fc8e24c686

View File

@ -157,6 +157,14 @@ var _ = SIGDescribe("Deployment", func() {
testDeploymentLabelSelectors := metav1.LabelSelector{ testDeploymentLabelSelectors := metav1.LabelSelector{
MatchLabels: testDeploymentLabels, MatchLabels: testDeploymentLabels,
} }
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = testDeploymentLabelsFlat
return f.ClientSet.AppsV1().Deployments(testNamespaceName).Watch(context.TODO(), options)
},
}
deploymentsList, err := f.ClientSet.AppsV1().Deployments("").List(context.TODO(), metav1.ListOptions{LabelSelector: testDeploymentLabelsFlat})
framework.ExpectNoError(err, "failed to list Endpoints")
ginkgo.By("creating a Deployment") ginkgo.By("creating a Deployment")
testDeployment := appsv1.Deployment{ testDeployment := appsv1.Deployment{
@ -180,36 +188,41 @@ var _ = SIGDescribe("Deployment", func() {
}, },
}, },
} }
_, err := f.ClientSet.AppsV1().Deployments(testNamespaceName).Create(context.TODO(), &testDeployment, metav1.CreateOptions{}) _, err = f.ClientSet.AppsV1().Deployments(testNamespaceName).Create(context.TODO(), &testDeployment, metav1.CreateOptions{})
framework.ExpectNoError(err, "failed to create Deployment %v in namespace %v", testDeploymentName, testNamespaceName) framework.ExpectNoError(err, "failed to create Deployment %v in namespace %v", testDeploymentName, testNamespaceName)
ginkgo.By("watching for the Deployment to be added") ginkgo.By("waiting for Deployment to be created")
dplmtWatchTimeoutSeconds := int64(180) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
dplmtWatch, err := f.ClientSet.AppsV1().Deployments(testNamespaceName).Watch(context.TODO(), metav1.ListOptions{LabelSelector: testDeploymentLabelsFlat, TimeoutSeconds: &dplmtWatchTimeoutSeconds}) defer cancel()
framework.ExpectNoError(err, "Failed to setup watch on newly created Deployment") _, err = watchtools.Until(ctx, deploymentsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
switch event.Type {
foundEvent := false case watch.Added:
dplmtWatchChan := dplmtWatch.ResultChan() if deployment, ok := event.Object.(*appsv1.Deployment); ok {
for event := range dplmtWatchChan { found := deployment.ObjectMeta.Name == testDeployment.Name &&
if event.Type == watch.Added { deployment.Labels["test-deployment-static"] == "true"
foundEvent = true return found, nil
break
} }
default:
framework.Logf("observed event type %v", event.Type)
} }
framework.ExpectEqual(foundEvent, true, "failed to find watch event %v", watch.Added) return false, nil
})
framework.ExpectNoError(err, "failed to see %v event", watch.Added)
ginkgo.By("waiting for all Replicas to be Ready") ginkgo.By("waiting for all Replicas to be Ready")
foundEvent = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
for event := range dplmtWatchChan { defer cancel()
deployment, ok := event.Object.(*appsv1.Deployment) _, err = watchtools.Until(ctx, deploymentsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
framework.ExpectEqual(ok, true, "unable to convert event.Object type") if deployment, ok := event.Object.(*appsv1.Deployment); ok {
if deployment.Status.AvailableReplicas == testDeploymentDefaultReplicas && found := deployment.ObjectMeta.Name == testDeployment.Name &&
deployment.Status.ReadyReplicas == testDeploymentDefaultReplicas { deployment.Labels["test-deployment-static"] == "true" &&
foundEvent = true deployment.Status.AvailableReplicas == testDeploymentDefaultReplicas &&
break deployment.Status.ReadyReplicas == testDeploymentDefaultReplicas
return found, nil
} }
} return false, nil
framework.ExpectEqual(foundEvent, true, "failed to see scale of replicas") })
framework.ExpectNoError(err, "failed to see replicas of %v in namespace %v scale to requested amount of %v", testDeployment.Name, testNamespaceName, testDeploymentDefaultReplicas)
ginkgo.By("patching the Deployment") ginkgo.By("patching the Deployment")
deploymentPatch, err := json.Marshal(map[string]interface{}{ deploymentPatch, err := json.Marshal(map[string]interface{}{
@ -233,30 +246,23 @@ var _ = SIGDescribe("Deployment", func() {
_, err = f.ClientSet.AppsV1().Deployments(testNamespaceName).Patch(context.TODO(), testDeploymentName, types.StrategicMergePatchType, []byte(deploymentPatch), metav1.PatchOptions{}) _, err = f.ClientSet.AppsV1().Deployments(testNamespaceName).Patch(context.TODO(), testDeploymentName, types.StrategicMergePatchType, []byte(deploymentPatch), metav1.PatchOptions{})
framework.ExpectNoError(err, "failed to patch Deployment") framework.ExpectNoError(err, "failed to patch Deployment")
foundEvent = false
for event := range dplmtWatchChan {
if event.Type == watch.Modified {
foundEvent = true
break
}
}
framework.ExpectEqual(foundEvent, true, "failed to see scale of replicas")
ginkgo.By("waiting for Replicas to scale") ginkgo.By("waiting for Replicas to scale")
foundEvent = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
for event := range dplmtWatchChan { defer cancel()
deployment, ok := event.Object.(*appsv1.Deployment) _, err = watchtools.Until(ctx, deploymentsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
framework.ExpectEqual(ok, true, "unable to convert event.Object type") if deployment, ok := event.Object.(*appsv1.Deployment); ok {
if deployment.Status.AvailableReplicas == testDeploymentMinimumReplicas && found := deployment.ObjectMeta.Name == testDeployment.Name &&
deployment.Status.ReadyReplicas == testDeploymentMinimumReplicas { deployment.Labels["test-deployment-static"] == "true" &&
foundEvent = true deployment.Status.AvailableReplicas == testDeploymentMinimumReplicas &&
break deployment.Status.ReadyReplicas == testDeploymentMinimumReplicas
return found, nil
} }
} return false, nil
framework.ExpectEqual(foundEvent, true, "failed to see scale of replicas") })
framework.ExpectNoError(err, "failed to see replicas of %v in namespace %v scale to requested amount of %v", testDeployment.Name, testNamespaceName, testDeploymentMinimumReplicas)
ginkgo.By("listing Deployments") ginkgo.By("listing Deployments")
deploymentsList, err := f.ClientSet.AppsV1().Deployments("").List(context.TODO(), metav1.ListOptions{LabelSelector: testDeploymentLabelsFlat}) deploymentsList, err = f.ClientSet.AppsV1().Deployments("").List(context.TODO(), metav1.ListOptions{LabelSelector: testDeploymentLabelsFlat})
framework.ExpectNoError(err, "failed to list Deployments") framework.ExpectNoError(err, "failed to list Deployments")
foundDeployment := false foundDeployment := false
for _, deploymentItem := range deploymentsList.Items { for _, deploymentItem := range deploymentsList.Items {
@ -285,14 +291,22 @@ var _ = SIGDescribe("Deployment", func() {
// currently this hasn't been able to hit the endpoint replaceAppsV1NamespacedDeploymentStatus // currently this hasn't been able to hit the endpoint replaceAppsV1NamespacedDeploymentStatus
_, err = dc.Resource(deploymentResource).Namespace(testNamespaceName).Update(context.TODO(), &testDeploymentUpdateUnstructured, metav1.UpdateOptions{}) //, "status") _, err = dc.Resource(deploymentResource).Namespace(testNamespaceName).Update(context.TODO(), &testDeploymentUpdateUnstructured, metav1.UpdateOptions{}) //, "status")
framework.ExpectNoError(err, "failed to update the DeploymentStatus") framework.ExpectNoError(err, "failed to update the DeploymentStatus")
foundEvent = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
for event := range dplmtWatchChan { defer cancel()
if event.Type == watch.Modified { _, err = watchtools.Until(ctx, deploymentsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
foundEvent = true switch event.Type {
break case watch.Modified:
if deployment, ok := event.Object.(*appsv1.Deployment); ok {
found := deployment.ObjectMeta.Name == testDeployment.Name &&
deployment.Labels["test-deployment-static"] == "true"
return found, nil
} }
default:
framework.Logf("observed event type %v", event.Type)
} }
framework.ExpectEqual(foundEvent, true, "failed to find watch event %v", watch.Modified) return false, nil
})
framework.ExpectNoError(err, "failed to see %v event", watch.Modified)
ginkgo.By("fetching the DeploymentStatus") ginkgo.By("fetching the DeploymentStatus")
deploymentGetUnstructured, err := dc.Resource(deploymentResource).Namespace(testNamespaceName).Get(context.TODO(), testDeploymentName, metav1.GetOptions{}, "status") deploymentGetUnstructured, err := dc.Resource(deploymentResource).Namespace(testNamespaceName).Get(context.TODO(), testDeploymentName, metav1.GetOptions{}, "status")
@ -302,25 +316,20 @@ var _ = SIGDescribe("Deployment", func() {
framework.ExpectNoError(err, "failed to convert the unstructured response to a Deployment") framework.ExpectNoError(err, "failed to convert the unstructured response to a Deployment")
framework.ExpectEqual(deploymentGet.Spec.Template.Spec.Containers[0].Image, testDeploymentUpdateImage, "failed to update image") framework.ExpectEqual(deploymentGet.Spec.Template.Spec.Containers[0].Image, testDeploymentUpdateImage, "failed to update image")
framework.ExpectEqual(deploymentGet.ObjectMeta.Labels["test-deployment"], "updated", "failed to update labels") framework.ExpectEqual(deploymentGet.ObjectMeta.Labels["test-deployment"], "updated", "failed to update labels")
foundEvent = false
for event := range dplmtWatchChan {
if event.Type == watch.Modified {
foundEvent = true
break
}
}
framework.ExpectEqual(foundEvent, true, "failed to find watch event %v", watch.Modified)
foundEvent = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
for event := range dplmtWatchChan { defer cancel()
deployment, ok := event.Object.(*appsv1.Deployment) _, err = watchtools.Until(ctx, deploymentsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
framework.ExpectEqual(ok, true, "unable to convert event.Object type") if deployment, ok := event.Object.(*appsv1.Deployment); ok {
if deployment.Status.ReadyReplicas == testDeploymentDefaultReplicas { found := deployment.ObjectMeta.Name == testDeployment.Name &&
foundEvent = true deployment.Labels["test-deployment-static"] == "true" &&
break deployment.Status.AvailableReplicas == testDeploymentDefaultReplicas &&
deployment.Status.ReadyReplicas == testDeploymentDefaultReplicas
return found, nil
} }
} return false, nil
framework.ExpectEqual(foundEvent, true, "failed to see scale of replicas") })
framework.ExpectNoError(err, "failed to see replicas of %v in namespace %v scale to requested amount of %v", testDeployment.Name, testNamespaceName, testDeploymentDefaultReplicas)
ginkgo.By("patching the DeploymentStatus") ginkgo.By("patching the DeploymentStatus")
deploymentStatusPatch, err := json.Marshal(map[string]interface{}{ deploymentStatusPatch, err := json.Marshal(map[string]interface{}{
@ -333,6 +342,22 @@ var _ = SIGDescribe("Deployment", func() {
}) })
framework.ExpectNoError(err, "failed to Marshal Deployment JSON patch") framework.ExpectNoError(err, "failed to Marshal Deployment JSON patch")
dc.Resource(deploymentResource).Namespace(testNamespaceName).Patch(context.TODO(), testDeploymentName, types.StrategicMergePatchType, []byte(deploymentStatusPatch), metav1.PatchOptions{}, "status") dc.Resource(deploymentResource).Namespace(testNamespaceName).Patch(context.TODO(), testDeploymentName, types.StrategicMergePatchType, []byte(deploymentStatusPatch), metav1.PatchOptions{}, "status")
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = watchtools.Until(ctx, deploymentsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Modified:
if deployment, ok := event.Object.(*appsv1.Deployment); ok {
found := deployment.ObjectMeta.Name == testDeployment.Name &&
deployment.Labels["test-deployment-static"] == "true"
return found, nil
}
default:
framework.Logf("observed event type %v", event.Type)
}
return false, nil
})
framework.ExpectNoError(err, "failed to see %v event", watch.Modified)
ginkgo.By("fetching the DeploymentStatus") ginkgo.By("fetching the DeploymentStatus")
deploymentGetUnstructured, err = dc.Resource(deploymentResource).Namespace(testNamespaceName).Get(context.TODO(), testDeploymentName, metav1.GetOptions{}, "status") deploymentGetUnstructured, err = dc.Resource(deploymentResource).Namespace(testNamespaceName).Get(context.TODO(), testDeploymentName, metav1.GetOptions{}, "status")
@ -342,36 +367,41 @@ var _ = SIGDescribe("Deployment", func() {
framework.ExpectNoError(err, "failed to convert the unstructured response to a Deployment") framework.ExpectNoError(err, "failed to convert the unstructured response to a Deployment")
framework.ExpectEqual(deploymentGet.Spec.Template.Spec.Containers[0].Image, testDeploymentUpdateImage, "failed to update image") framework.ExpectEqual(deploymentGet.Spec.Template.Spec.Containers[0].Image, testDeploymentUpdateImage, "failed to update image")
framework.ExpectEqual(deploymentGet.ObjectMeta.Labels["test-deployment"], "updated", "failed to update labels") framework.ExpectEqual(deploymentGet.ObjectMeta.Labels["test-deployment"], "updated", "failed to update labels")
foundEvent = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
for event := range dplmtWatchChan { defer cancel()
if event.Type == watch.Modified { _, err = watchtools.Until(ctx, deploymentsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
foundEvent = true if deployment, ok := event.Object.(*appsv1.Deployment); ok {
break found := deployment.ObjectMeta.Name == testDeployment.Name &&
} deployment.Labels["test-deployment-static"] == "true" &&
} deployment.Status.AvailableReplicas == testDeploymentDefaultReplicas &&
framework.ExpectEqual(foundEvent, true, "failed to find watch event %v", watch.Modified) deployment.Status.ReadyReplicas == testDeploymentDefaultReplicas &&
for event := range dplmtWatchChan { deployment.Spec.Template.Spec.Containers[0].Image == testDeploymentUpdateImage
deployment, ok := event.Object.(*appsv1.Deployment) return found, nil
framework.ExpectEqual(ok, true, "unable to convert event.Object type")
if deployment.Status.ReadyReplicas == testDeploymentDefaultReplicas {
break
}
} }
return false, nil
})
framework.ExpectNoError(err, "failed to see replicas of %v in namespace %v scale to requested amount of %v", testDeployment.Name, testNamespaceName, testDeploymentDefaultReplicas)
ginkgo.By("deleting the Deployment") ginkgo.By("deleting the Deployment")
err = f.ClientSet.AppsV1().Deployments(testNamespaceName).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: testDeploymentLabelsFlat}) err = f.ClientSet.AppsV1().Deployments(testNamespaceName).DeleteCollection(context.TODO(), metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: testDeploymentLabelsFlat})
framework.ExpectNoError(err, "failed to delete Deployment via collection") framework.ExpectNoError(err, "failed to delete Deployment via collection")
foundEvent = false ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
for event := range dplmtWatchChan { defer cancel()
deployment, ok := event.Object.(*appsv1.Deployment) _, err = watchtools.Until(ctx, deploymentsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
framework.ExpectEqual(ok, true, "unable to convert event.Object type") switch event.Type {
if event.Type == watch.Deleted && deployment.ObjectMeta.Name == testDeploymentName { case watch.Deleted:
foundEvent = true if deployment, ok := event.Object.(*appsv1.Deployment); ok {
break found := deployment.ObjectMeta.Name == testDeployment.Name &&
deployment.Labels["test-deployment-static"] == "true"
return found, nil
} }
default:
framework.Logf("observed event type %v", event.Type)
} }
framework.ExpectEqual(foundEvent, true, "failed to find watch event %v", watch.Deleted) return false, nil
})
framework.ExpectNoError(err, "failed to see %v event", watch.Deleted)
}) })
}) })