mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 16:29:21 +00:00
chore: update deprecated polling methods in apiextensions-apiserver
Signed-off-by: Omer Aplatony <omerap12@gmail.com>
This commit is contained in:
parent
feb3f92bc4
commit
fe46e47bd1
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
@ -258,14 +259,14 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
// we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer
|
||||
// to sync makes sure that the lister will be valid before we begin. There may still be races for CRDs added after startup,
|
||||
// but we won't go healthy until we can handle the ones already present.
|
||||
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
|
||||
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
||||
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(ctx genericapiserver.PostStartHookContext) error {
|
||||
return wait.PollUntilContextCancel(ctx.Context, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
|
||||
if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
|
||||
close(hasCRDInformerSyncedSignal)
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}, context.Done())
|
||||
})
|
||||
})
|
||||
|
||||
return s, nil
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
@ -297,7 +298,7 @@ func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struc
|
||||
}
|
||||
|
||||
// initially sync all group versions to make sure we serve complete discovery
|
||||
if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
|
||||
if err := wait.PollUntilContextCancel(context.Background(), time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
crds, err := c.crdLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to initially list CRDs: %v", err))
|
||||
@ -313,10 +314,11 @@ func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struc
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}, stopCh); err == wait.ErrWaitTimeout {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for discovery endpoint to initialize"))
|
||||
return
|
||||
} else if err != nil {
|
||||
}); err != nil {
|
||||
if err == context.DeadlineExceeded {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for initial discovery sync"))
|
||||
return
|
||||
}
|
||||
panic(fmt.Errorf("unexpected error: %v", err))
|
||||
}
|
||||
close(synchedCh)
|
||||
|
@ -235,7 +235,7 @@ func (c *CRDFinalizer) deleteInstances(crd *apiextensionsv1.CustomResourceDefini
|
||||
// now we need to wait until all the resources are deleted. Start with a simple poll before we do anything fancy.
|
||||
// TODO not all servers are synchronized on caches. It is possible for a stale one to still be creating things.
|
||||
// Once we have a mechanism for servers to indicate their states, we should check that for concurrence.
|
||||
err = wait.PollImmediate(5*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 1*time.Minute, true, func(ctx context.Context) (bool, error) {
|
||||
listObj, err := crClient.List(ctx, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
@ -72,8 +72,8 @@ func TestAPIApproval(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||
approvedKubeAPI, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), approvedKubeAPI.Name, metav1.GetOptions{})
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
approvedKubeAPI, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, approvedKubeAPI.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -219,8 +219,8 @@ func testWebhookConverter(t *testing.T, watchCache bool) {
|
||||
defer ctc.removeConversionWebhook(t)
|
||||
|
||||
// wait until new webhook is called the first time
|
||||
if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
_, err := ctc.versionedClient(marker.GetNamespace(), "v1alpha1").Get(context.TODO(), marker.GetName(), metav1.GetOptions{})
|
||||
if err := wait.PollUntilContextTimeout(context.Background(), time.Millisecond*100, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||
_, err = ctc.versionedClient(marker.GetNamespace(), "v1alpha1").Get(ctx, marker.GetName(), metav1.GetOptions{})
|
||||
select {
|
||||
case <-upCh:
|
||||
return true, nil
|
||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package conversion
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
@ -63,8 +64,8 @@ func StartConversionWebhookServer(handler http.Handler) (func(), *apiextensionsv
|
||||
}
|
||||
|
||||
// StartTLS returns immediately, there is a small chance of a race to avoid.
|
||||
if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
_, err := webhookServer.Client().Get(webhookServer.URL) // even a 404 is fine
|
||||
if err := wait.PollUntilContextTimeout(context.Background(), time.Millisecond*100, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||
_, err = webhookServer.Client().Get(webhookServer.URL) // even a 404 is fine
|
||||
return err == nil, nil
|
||||
}); err != nil {
|
||||
webhookServer.Close()
|
||||
|
@ -316,8 +316,8 @@ func testDefaulting(t *testing.T, watchCache bool) {
|
||||
addDefault("v1beta2", "c", "C")
|
||||
|
||||
t.Logf("wait until GET sees 'c' in both status and spec")
|
||||
if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
obj, err := fooClient.Get(context.TODO(), foo.GetName(), metav1.GetOptions{})
|
||||
if err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
obj, err := fooClient.Get(ctx, foo.GetName(), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -333,7 +333,7 @@ func testDefaulting(t *testing.T, watchCache bool) {
|
||||
mustExist(foo.Object, [][]string{{"spec", "a"}, {"spec", "b"}, {"spec", "c"}, {"status", "a"}, {"status", "b"}, {"status", "c"}})
|
||||
|
||||
t.Logf("wait until GET sees 'c' in both status and spec of cached get")
|
||||
if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
if err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
obj, err := fooClient.Get(context.TODO(), foo.GetName(), metav1.GetOptions{ResourceVersion: "0"})
|
||||
if err != nil {
|
||||
return false, err
|
||||
@ -409,8 +409,8 @@ func testDefaulting(t *testing.T, watchCache bool) {
|
||||
t.Logf("Add 'c' default to the REST version, remove it from the storage version, and wait until GET no longer sees it in both status and spec")
|
||||
addDefault("v1beta1", "c", "C")
|
||||
removeDefault("v1beta2", "c")
|
||||
if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
obj, err := fooClient.Get(context.TODO(), foo.GetName(), metav1.GetOptions{})
|
||||
if err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
obj, err := fooClient.Get(ctx, foo.GetName(), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -434,8 +434,8 @@ func testDefaulting(t *testing.T, watchCache bool) {
|
||||
removeDefault("v1beta1", "a")
|
||||
removeDefault("v1beta1", "b")
|
||||
removeDefault("v1beta1", "c")
|
||||
if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
obj, err := fooClient.Get(context.TODO(), foo.GetName(), metav1.GetOptions{})
|
||||
if err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
obj, err := fooClient.Get(ctx, foo.GetName(), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -347,7 +347,7 @@ func existsInDiscoveryV1(crd *apiextensionsv1.CustomResourceDefinition, apiExten
|
||||
func waitForCRDReadyWatchUnsafe(crd *apiextensionsv1.CustomResourceDefinition, apiExtensionsClient clientset.Interface) (*apiextensionsv1.CustomResourceDefinition, error) {
|
||||
// wait until all resources appears in discovery
|
||||
for _, version := range servedV1Versions(crd) {
|
||||
err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 500*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
return existsInDiscoveryV1(crd, apiExtensionsClient, version)
|
||||
})
|
||||
if err != nil {
|
||||
@ -375,7 +375,7 @@ func waitForCRDReady(crd *apiextensionsv1.CustomResourceDefinition, apiExtension
|
||||
// For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error.
|
||||
// This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen
|
||||
// before like the created RV could be too old to watch.
|
||||
err = wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 500*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
return isWatchCachePrimed(v1CRD, dynamicClientSet)
|
||||
})
|
||||
if err != nil {
|
||||
@ -396,7 +396,7 @@ func CreateNewV1CustomResourceDefinitionWatchUnsafe(v1CRD *apiextensionsv1.Custo
|
||||
|
||||
// wait until all resources appears in discovery
|
||||
for _, version := range servedV1Versions(v1CRD) {
|
||||
err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 500*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
return existsInDiscoveryV1(v1CRD, apiExtensionsClient, version)
|
||||
})
|
||||
if err != nil {
|
||||
@ -424,7 +424,7 @@ func CreateNewV1CustomResourceDefinition(v1CRD *apiextensionsv1.CustomResourceDe
|
||||
// For this test, we'll actually cycle, "list/watch/create/delete" until we get an RV from list that observes the create and not an error.
|
||||
// This way all the tests that are checking for watches don't have to worry about RV too old problems because crazy things *could* happen
|
||||
// before like the created RV could be too old to watch.
|
||||
err = wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 500*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
return isWatchCachePrimed(v1CRD, dynamicClientSet)
|
||||
})
|
||||
if err != nil {
|
||||
@ -518,7 +518,7 @@ func DeleteV1CustomResourceDefinition(crd *apiextensionsv1.CustomResourceDefinit
|
||||
return err
|
||||
}
|
||||
for _, version := range servedV1Versions(crd) {
|
||||
err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 500*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
exists, err := existsInDiscoveryV1(crd, apiExtensionsClient, version)
|
||||
return !exists, err
|
||||
})
|
||||
@ -540,7 +540,7 @@ func DeleteV1CustomResourceDefinitions(deleteListOpts metav1.ListOptions, apiExt
|
||||
}
|
||||
for _, crd := range list.Items {
|
||||
for _, version := range servedV1Versions(&crd) {
|
||||
err := wait.PollImmediate(500*time.Millisecond, 30*time.Second, func() (bool, error) {
|
||||
err := wait.PollUntilContextTimeout(context.Background(), 500*time.Second, 30*time.Second, true, func(ctx context.Context) (bool, error) {
|
||||
exists, err := existsInDiscoveryV1(&crd, apiExtensionsClient, version)
|
||||
return !exists, err
|
||||
})
|
||||
|
@ -209,8 +209,8 @@ func TestListTypes(t *testing.T) {
|
||||
}
|
||||
|
||||
t.Logf("Updating again with invalid values, eventually successfully due to ratcheting logic")
|
||||
err = wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
_, err = fooClient.Update(context.TODO(), modifiedInstance, metav1.UpdateOptions{})
|
||||
err = wait.PollUntilContextTimeout(context.Background(), time.Microsecond*100, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||
_, err = fooClient.Update(ctx, modifiedInstance, metav1.UpdateOptions{})
|
||||
if err == nil {
|
||||
return true, err
|
||||
}
|
||||
|
@ -777,8 +777,8 @@ func TestCRValidationOnCRDUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
// CR is now accepted
|
||||
err = wait.Poll(500*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
_, err := noxuResourceClient.Create(context.TODO(), instanceToCreate, metav1.CreateOptions{})
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 500*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||
_, err = noxuResourceClient.Create(ctx, instanceToCreate, metav1.CreateOptions{})
|
||||
if _, isStatus := err.(*apierrors.StatusError); isStatus {
|
||||
if apierrors.IsInvalid(err) {
|
||||
return false, nil
|
||||
@ -925,8 +925,8 @@ spec:
|
||||
// wait for condition with violations
|
||||
t.Log("Waiting for NonStructuralSchema condition")
|
||||
var cond *apiextensionsv1.CustomResourceDefinitionCondition
|
||||
err = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) {
|
||||
obj, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), name, metav1.GetOptions{})
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
obj, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -963,8 +963,8 @@ spec:
|
||||
|
||||
// wait for condition to go away
|
||||
t.Log("Wait for condition to disappear")
|
||||
err = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) {
|
||||
obj, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), name, metav1.GetOptions{})
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
obj, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -1529,8 +1529,8 @@ properties:
|
||||
if len(tst.expectedViolations) == 0 {
|
||||
// wait for condition to not appear
|
||||
var cond *apiextensionsv1.CustomResourceDefinitionCondition
|
||||
err := wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) {
|
||||
obj, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), betaCRD.Name, metav1.GetOptions{})
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
|
||||
obj, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, betaCRD.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -1540,7 +1540,7 @@ properties:
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != wait.ErrWaitTimeout {
|
||||
if err != context.DeadlineExceeded {
|
||||
t.Fatalf("expected no NonStructuralSchema condition, but got one: %v", cond)
|
||||
}
|
||||
return
|
||||
@ -1548,8 +1548,8 @@ properties:
|
||||
|
||||
// wait for condition to appear with the given violations
|
||||
var cond *apiextensionsv1.CustomResourceDefinitionCondition
|
||||
err = wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
obj, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), betaCRD.Name, metav1.GetOptions{})
|
||||
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||
obj, err := apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, betaCRD.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -88,11 +88,11 @@ func TestInternalVersionIsHandlerVersion(t *testing.T) {
|
||||
{
|
||||
t.Logf("patch of handler version v1beta1 (non-storage version) should succeed")
|
||||
i := 0
|
||||
err = wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
err = wait.PollUntilContextTimeout(context.Background(), time.Millisecond*100, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||
patch := []byte(fmt.Sprintf(`{"i": %d}`, i))
|
||||
i++
|
||||
|
||||
_, err := noxuNamespacedResourceClientV1beta1.Patch(context.TODO(), "foo", types.MergePatchType, patch, metav1.PatchOptions{})
|
||||
_, err = noxuNamespacedResourceClientV1beta1.Patch(ctx, "foo", types.MergePatchType, patch, metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
// work around "grpc: the client connection is closing" error
|
||||
// TODO: fix the grpc error
|
||||
@ -111,11 +111,11 @@ func TestInternalVersionIsHandlerVersion(t *testing.T) {
|
||||
t.Logf("patch of handler version v1beta2 (storage version) should fail")
|
||||
i := 0
|
||||
noxuNamespacedResourceClientV1beta2 := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta2") // use the storage version v1beta2
|
||||
err = wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
err = wait.PollUntilContextTimeout(context.Background(), time.Millisecond*100, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
|
||||
patch := []byte(fmt.Sprintf(`{"i": %d}`, i))
|
||||
i++
|
||||
|
||||
_, err := noxuNamespacedResourceClientV1beta2.Patch(context.TODO(), "foo", types.MergePatchType, patch, metav1.PatchOptions{})
|
||||
_, err = noxuNamespacedResourceClientV1beta2.Patch(ctx, "foo", types.MergePatchType, patch, metav1.PatchOptions{})
|
||||
assert.Error(t, err)
|
||||
|
||||
// work around "grpc: the client connection is closing" error
|
||||
|
Loading…
Reference in New Issue
Block a user