diff --git a/test/integration/apiserver/apiserver_test.go b/test/integration/apiserver/apiserver_test.go index 381fb009b14..1c50153b380 100644 --- a/test/integration/apiserver/apiserver_test.go +++ b/test/integration/apiserver/apiserver_test.go @@ -389,135 +389,139 @@ func TestListOptions(t *testing.T) { for _, watchCacheEnabled := range []bool{true, false} { t.Run(fmt.Sprintf("watchCacheEnabled=%t", watchCacheEnabled), func(t *testing.T) { - tCtx := ktesting.Init(t) - prefix := path.Join("/", guuid.New().String(), "registry") - etcdConfig := storagebackend.Config{ - Prefix: prefix, - Transport: storagebackend.TransportConfig{ServerList: []string{framework.GetEtcdURL()}}, - } - rawClient, kvClient, err := integration.GetEtcdClients(etcdConfig.Transport) - if err != nil { - t.Fatal(err) - } - // kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to - // close the client (which we can do by closing rawClient). - defer func() { - err := rawClient.Close() - if err != nil { - t.Fatal(err) - } - }() - - var compactedRv string - var oldestUncompactedRv int64 - for i := 0; i < 15; i++ { - rs := newRS("default") - rs.Name = fmt.Sprintf("test-%d", i) - serializer := protobuf.NewSerializer(nil, nil) - buf := bytes.Buffer{} - err := serializer.Encode(rs, &buf) - if err != nil { - t.Fatal(err) - } - key := prefix + "/replicasets/default/" + rs.Name - - resp, err := kvClient.Put(tCtx, key, buf.String()) - if err != nil { - t.Fatal(err) - } - if i == 0 { - compactedRv = strconv.FormatInt(resp.Header.Revision, 10) // We compact this first resource version below - } - // delete the first 5, and then compact them - if i < 5 { - if _, err := kvClient.Delete(tCtx, key); err != nil { - t.Fatal(err) - } - oldestUncompactedRv = resp.Header.Revision - } - } - _, err = kvClient.Compact(tCtx, int64(oldestUncompactedRv)) - if err != nil { - t.Fatal(err) - } - - clientSet, _, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{ - ModifyServerRunOptions: func(opts *options.ServerRunOptions) { - opts.Etcd.EnableWatchCache = watchCacheEnabled - opts.Etcd.StorageConfig = etcdConfig - }, - }) - defer tearDownFn() - - rsClient := clientSet.AppsV1().ReplicaSets("default") - - listObj, err := rsClient.List(tCtx, metav1.ListOptions{ - Limit: 6, - }) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - validContinueToken := listObj.Continue - - // test all combinations of these, for both watch cache enabled and disabled: - limits := []int64{0, 6} - continueTokens := []string{"", validContinueToken, invalidContinueToken} - rvs := []string{"", "0", compactedRv, invalidResourceVersion} - rvMatches := []metav1.ResourceVersionMatch{ - "", - metav1.ResourceVersionMatchNotOlderThan, - metav1.ResourceVersionMatchExact, - invalidResourceVersionMatch, - } - - for _, limit := range limits { - for _, continueToken := range continueTokens { - for _, rv := range rvs { - for _, rvMatch := range rvMatches { - rvName := "" - switch rv { - case "": - rvName = "empty" - case "0": - rvName = "0" - case compactedRv: - rvName = "compacted" - case invalidResourceVersion: - rvName = "invalid" - default: - rvName = "unknown" - } - - continueName := "" - switch continueToken { - case "": - continueName = "empty" - case validContinueToken: - continueName = "valid" - case invalidContinueToken: - continueName = "invalid" - default: - continueName = "unknown" - } - - name := fmt.Sprintf("limit=%d continue=%s rv=%s rvMatch=%s", limit, continueName, rvName, rvMatch) - t.Run(name, func(t *testing.T) { - opts := metav1.ListOptions{ - ResourceVersion: rv, - ResourceVersionMatch: rvMatch, - Continue: continueToken, - Limit: limit, - } - testListOptionsCase(t, rsClient, watchCacheEnabled, opts, compactedRv) - }) - } - } - } - } + testListOptions(t, watchCacheEnabled) }) } } +func testListOptions(t *testing.T, watchCacheEnabled bool) { + tCtx := ktesting.Init(t) + prefix := path.Join("/", guuid.New().String(), "registry") + etcdConfig := storagebackend.Config{ + Prefix: prefix, + Transport: storagebackend.TransportConfig{ServerList: []string{framework.GetEtcdURL()}}, + } + rawClient, kvClient, err := integration.GetEtcdClients(etcdConfig.Transport) + if err != nil { + t.Fatal(err) + } + // kvClient is a wrapper around rawClient and to avoid leaking goroutines we need to + // close the client (which we can do by closing rawClient). + defer func() { + err := rawClient.Close() + if err != nil { + t.Fatal(err) + } + }() + + var compactedRv string + var oldestUncompactedRv int64 + for i := 0; i < 15; i++ { + rs := newRS("default") + rs.Name = fmt.Sprintf("test-%d", i) + serializer := protobuf.NewSerializer(nil, nil) + buf := bytes.Buffer{} + err := serializer.Encode(rs, &buf) + if err != nil { + t.Fatal(err) + } + key := prefix + "/replicasets/default/" + rs.Name + + resp, err := kvClient.Put(tCtx, key, buf.String()) + if err != nil { + t.Fatal(err) + } + if i == 0 { + compactedRv = strconv.FormatInt(resp.Header.Revision, 10) // We compact this first resource version below + } + // delete the first 5, and then compact them + if i < 5 { + if _, err := kvClient.Delete(tCtx, key); err != nil { + t.Fatal(err) + } + oldestUncompactedRv = resp.Header.Revision + } + } + _, err = kvClient.Compact(tCtx, int64(oldestUncompactedRv)) + if err != nil { + t.Fatal(err) + } + + clientSet, _, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{ + ModifyServerRunOptions: func(opts *options.ServerRunOptions) { + opts.Etcd.EnableWatchCache = watchCacheEnabled + opts.Etcd.StorageConfig = etcdConfig + }, + }) + defer tearDownFn() + + rsClient := clientSet.AppsV1().ReplicaSets("default") + + listObj, err := rsClient.List(tCtx, metav1.ListOptions{ + Limit: 6, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + validContinueToken := listObj.Continue + + // test all combinations of these, for both watch cache enabled and disabled: + limits := []int64{0, 6} + continueTokens := []string{"", validContinueToken, invalidContinueToken} + rvs := []string{"", "0", compactedRv, invalidResourceVersion} + rvMatches := []metav1.ResourceVersionMatch{ + "", + metav1.ResourceVersionMatchNotOlderThan, + metav1.ResourceVersionMatchExact, + invalidResourceVersionMatch, + } + + for _, limit := range limits { + for _, continueToken := range continueTokens { + for _, rv := range rvs { + for _, rvMatch := range rvMatches { + rvName := "" + switch rv { + case "": + rvName = "empty" + case "0": + rvName = "0" + case compactedRv: + rvName = "compacted" + case invalidResourceVersion: + rvName = "invalid" + default: + rvName = "unknown" + } + + continueName := "" + switch continueToken { + case "": + continueName = "empty" + case validContinueToken: + continueName = "valid" + case invalidContinueToken: + continueName = "invalid" + default: + continueName = "unknown" + } + + name := fmt.Sprintf("limit=%d continue=%s rv=%s rvMatch=%s", limit, continueName, rvName, rvMatch) + t.Run(name, func(t *testing.T) { + opts := metav1.ListOptions{ + ResourceVersion: rv, + ResourceVersionMatch: rvMatch, + Continue: continueToken, + Limit: limit, + } + testListOptionsCase(t, rsClient, watchCacheEnabled, opts, compactedRv) + }) + } + } + } + } +} + func testListOptionsCase(t *testing.T, rsClient appsv1.ReplicaSetInterface, watchCacheEnabled bool, opts metav1.ListOptions, compactedRv string) { listObj, err := rsClient.List(context.Background(), opts)