mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Add resourceVersion=0 paginated list integration test for disabled and enabled watch cache
This commit is contained in:
parent
84723c2d3e
commit
e5a4f09ab3
@ -247,6 +247,8 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
|
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
|
||||||
|
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
|
||||||
|
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
||||||
reflector.WatchListPageSize = storageWatchListPageSize
|
reflector.WatchListPageSize = storageWatchListPageSize
|
||||||
cacher := &Cacher{
|
cacher := &Cacher{
|
||||||
ready: newReady(),
|
ready: newReady(),
|
||||||
|
@ -192,6 +192,7 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
if r.WatchListPageSize != 0 {
|
if r.WatchListPageSize != 0 {
|
||||||
pager.PageSize = r.WatchListPageSize
|
pager.PageSize = r.WatchListPageSize
|
||||||
}
|
}
|
||||||
|
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
||||||
list, err = pager.List(context.Background(), options)
|
list, err = pager.List(context.Background(), options)
|
||||||
close(listCh)
|
close(listCh)
|
||||||
}()
|
}()
|
||||||
|
@ -65,8 +65,16 @@ func setup(t *testing.T, groupVersions ...schema.GroupVersion) (*httptest.Server
|
|||||||
return setupWithResources(t, groupVersions, nil)
|
return setupWithResources(t, groupVersions, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setupWithOptions(t *testing.T, opts *framework.MasterConfigOptions, groupVersions ...schema.GroupVersion) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
|
||||||
|
return setupWithResourcesWithOptions(t, opts, groupVersions, nil)
|
||||||
|
}
|
||||||
|
|
||||||
func setupWithResources(t *testing.T, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
|
func setupWithResources(t *testing.T, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
|
||||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
return setupWithResourcesWithOptions(t, &framework.MasterConfigOptions{}, groupVersions, resources)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupWithResourcesWithOptions(t *testing.T, opts *framework.MasterConfigOptions, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
|
||||||
|
masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(opts)
|
||||||
if len(groupVersions) > 0 || len(resources) > 0 {
|
if len(groupVersions) > 0 || len(resources) > 0 {
|
||||||
resourceConfig := master.DefaultAPIResourceConfigSource()
|
resourceConfig := master.DefaultAPIResourceConfigSource()
|
||||||
resourceConfig.EnableVersions(groupVersions...)
|
resourceConfig.EnableVersions(groupVersions...)
|
||||||
@ -189,6 +197,62 @@ func Test202StatusCode(t *testing.T) {
|
|||||||
verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202)
|
verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListResourceVersion0(t *testing.T) {
|
||||||
|
var testcases = []struct {
|
||||||
|
name string
|
||||||
|
watchCacheEnabled bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "watchCacheOn",
|
||||||
|
watchCacheEnabled: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "watchCacheOff",
|
||||||
|
watchCacheEnabled: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testcases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)()
|
||||||
|
etcdOptions := framework.DefaultEtcdOptions()
|
||||||
|
etcdOptions.EnableWatchCache = tc.watchCacheEnabled
|
||||||
|
s, clientSet, closeFn := setupWithOptions(t, &framework.MasterConfigOptions{EtcdOptions: etcdOptions})
|
||||||
|
defer closeFn()
|
||||||
|
|
||||||
|
ns := framework.CreateTestingNamespace("list-paging", s, t)
|
||||||
|
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||||
|
|
||||||
|
rsClient := clientSet.AppsV1().ReplicaSets(ns.Name)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
rs := newRS(ns.Name)
|
||||||
|
rs.Name = fmt.Sprintf("test-%d", i)
|
||||||
|
if _, err := rsClient.Create(rs); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pagerFn := func(opts metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
return rsClient.List(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
p := pager.New(pager.SimplePageFunc(pagerFn))
|
||||||
|
p.PageSize = 3
|
||||||
|
listObj, err := p.List(context.Background(), metav1.ListOptions{ResourceVersion: "0"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected list error: %v", err)
|
||||||
|
}
|
||||||
|
items, err := meta.ExtractList(listObj)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to extract list from %v", listObj)
|
||||||
|
}
|
||||||
|
if len(items) != 10 {
|
||||||
|
t.Errorf("Expected list size of 10 but got %d", len(items))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAPIListChunking(t *testing.T) {
|
func TestAPIListChunking(t *testing.T) {
|
||||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)()
|
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)()
|
||||||
s, clientSet, closeFn := setup(t)
|
s, clientSet, closeFn := setup(t)
|
||||||
|
@ -242,7 +242,13 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
|
|||||||
|
|
||||||
// NewIntegrationTestMasterConfig returns the master config appropriate for most integration tests.
|
// NewIntegrationTestMasterConfig returns the master config appropriate for most integration tests.
|
||||||
func NewIntegrationTestMasterConfig() *master.Config {
|
func NewIntegrationTestMasterConfig() *master.Config {
|
||||||
masterConfig := NewMasterConfig()
|
return NewIntegrationTestMasterConfigWithOptions(&MasterConfigOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewIntegrationTestMasterConfigWithOptions returns the master config appropriate for most integration tests
|
||||||
|
// configured with the provided options.
|
||||||
|
func NewIntegrationTestMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config {
|
||||||
|
masterConfig := NewMasterConfigWithOptions(opts)
|
||||||
masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
|
masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
|
||||||
masterConfig.ExtraConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource()
|
masterConfig.ExtraConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource()
|
||||||
|
|
||||||
@ -252,13 +258,32 @@ func NewIntegrationTestMasterConfig() *master.Config {
|
|||||||
return masterConfig
|
return masterConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMasterConfig returns a basic master config.
|
// MasterConfigOptions are the configurable options for a new integration test master config.
|
||||||
func NewMasterConfig() *master.Config {
|
type MasterConfigOptions struct {
|
||||||
|
EtcdOptions *options.EtcdOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultEtcdOptions are the default EtcdOptions for use with integration tests.
|
||||||
|
func DefaultEtcdOptions() *options.EtcdOptions {
|
||||||
// This causes the integration tests to exercise the etcd
|
// This causes the integration tests to exercise the etcd
|
||||||
// prefix code, so please don't change without ensuring
|
// prefix code, so please don't change without ensuring
|
||||||
// sufficient coverage in other ways.
|
// sufficient coverage in other ways.
|
||||||
etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil))
|
etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil))
|
||||||
etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
|
etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
|
||||||
|
return etcdOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMasterConfig returns a basic master config.
|
||||||
|
func NewMasterConfig() *master.Config {
|
||||||
|
return NewMasterConfigWithOptions(&MasterConfigOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMasterConfigWithOptions returns a basic master config configured with the provided options.
|
||||||
|
func NewMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config {
|
||||||
|
etcdOptions := DefaultEtcdOptions()
|
||||||
|
if opts.EtcdOptions != nil {
|
||||||
|
etcdOptions = opts.EtcdOptions
|
||||||
|
}
|
||||||
|
|
||||||
info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
||||||
ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info)
|
ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info)
|
||||||
|
Loading…
Reference in New Issue
Block a user