mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Fix ChangeCRD test
This commit is contained in:
parent
a8ef6e9f01
commit
2854d84056
@ -59,9 +59,74 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
ns := "default"
|
ns := "default"
|
||||||
noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
|
noxuNamespacedResourceClient := newNamespacedCustomResourceVersionedClient(ns, dynamicClient, noxuDefinition, "v1beta1")
|
||||||
|
|
||||||
stopChan := make(chan struct{})
|
updateCRD := func() {
|
||||||
|
noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
|
||||||
|
v2 := noxuDefinitionToUpdate.Spec.Versions[0]
|
||||||
|
v2.Name = "v2"
|
||||||
|
v2.Served = true
|
||||||
|
v2.Storage = false
|
||||||
|
noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
|
||||||
|
} else {
|
||||||
|
noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
|
||||||
|
}
|
||||||
|
if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up 10 watchers for custom resource.
|
||||||
|
// We can't exercise them in a loop the same way as get requests, as watchcache
|
||||||
|
// can reject them with 429 and Retry-After: 1 if it is uninitialized and even
|
||||||
|
// though 429 is automatically retried, with frequent watchcache terminations and
|
||||||
|
// reinitializations they could either end-up being rejected N times and fail or
|
||||||
|
// or not initialize until the last watchcache reinitialization and then not be
|
||||||
|
// terminated. Thus we exercise their termination explicitly at the beginning.
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("unexpected error establishing watch: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for event := range w.ResultChan() {
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Added, watch.Modified, watch.Deleted:
|
||||||
|
// all expected
|
||||||
|
default:
|
||||||
|
t.Errorf("unexpected watch event: %#v", event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Let all the established watches soak request loops soak
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
// Update CRD and ensure that all watches are gracefully terminated.
|
||||||
|
updateCRD()
|
||||||
|
|
||||||
|
drained := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
defer close(drained)
|
||||||
|
wg.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-drained:
|
||||||
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
|
t.Fatal("timed out waiting for watchers to be terminated")
|
||||||
|
}
|
||||||
|
|
||||||
|
stopChan := make(chan struct{})
|
||||||
|
|
||||||
// Set up loop to modify CRD in the background
|
// Set up loop to modify CRD in the background
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -76,28 +141,11 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
noxuDefinitionToUpdate, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.TODO(), noxuDefinition.Name, metav1.GetOptions{})
|
updateCRD()
|
||||||
if err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(noxuDefinitionToUpdate.Spec.Versions) == 1 {
|
|
||||||
v2 := noxuDefinitionToUpdate.Spec.Versions[0]
|
|
||||||
v2.Name = "v2"
|
|
||||||
v2.Served = true
|
|
||||||
v2.Storage = false
|
|
||||||
noxuDefinitionToUpdate.Spec.Versions = append(noxuDefinitionToUpdate.Spec.Versions, v2)
|
|
||||||
} else {
|
|
||||||
noxuDefinitionToUpdate.Spec.Versions = noxuDefinitionToUpdate.Spec.Versions[0:1]
|
|
||||||
}
|
|
||||||
if _, err := apiExtensionsClient.ApiextensionsV1().CustomResourceDefinitions().Update(context.TODO(), noxuDefinitionToUpdate, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) {
|
|
||||||
t.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Set up 10 loops creating and reading and watching custom resources
|
// Set up 10 loops creating and reading custom resources
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
@ -120,32 +168,6 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func(i int) {
|
|
||||||
defer wg.Done()
|
|
||||||
for {
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
select {
|
|
||||||
case <-stopChan:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
w, err := noxuNamespacedResourceClient.Watch(context.TODO(), metav1.ListOptions{})
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("unexpected error establishing watch: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for event := range w.ResultChan() {
|
|
||||||
switch event.Type {
|
|
||||||
case watch.Added, watch.Modified, watch.Deleted:
|
|
||||||
// all expected
|
|
||||||
default:
|
|
||||||
t.Errorf("unexpected watch event: %#v", event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let all the established get request loops soak
|
// Let all the established get request loops soak
|
||||||
@ -155,7 +177,7 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
close(stopChan)
|
close(stopChan)
|
||||||
|
|
||||||
// Let loops drain
|
// Let loops drain
|
||||||
drained := make(chan struct{})
|
drained = make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(drained)
|
defer close(drained)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -164,6 +186,6 @@ func TestChangeCRD(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case <-drained:
|
case <-drained:
|
||||||
case <-time.After(wait.ForeverTestTimeout):
|
case <-time.After(wait.ForeverTestTimeout):
|
||||||
t.Error("timed out waiting for clients to complete")
|
t.Fatal("timed out waiting for clients to complete")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user