mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Merge pull request #75389 from jpbetz/pagination-v1
Paginate watch cache->etcd List calls & reflector init/resync List calls not served by watch cache
This commit is contained in:
commit
c79fbabf23
@ -56,6 +56,12 @@ var (
|
|||||||
emptyFunc = func() {}
|
emptyFunc = func() {}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// storageWatchListPageSize is the cacher's request chunk size of
|
||||||
|
// initial and resync watch lists to storage.
|
||||||
|
storageWatchListPageSize = int64(10000)
|
||||||
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(initCounter)
|
prometheus.MustRegister(initCounter)
|
||||||
}
|
}
|
||||||
@ -231,7 +237,7 @@ type Cacher struct {
|
|||||||
// given configuration.
|
// given configuration.
|
||||||
func NewCacherFromConfig(config Config) *Cacher {
|
func NewCacherFromConfig(config Config) *Cacher {
|
||||||
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
|
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
|
||||||
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||||
|
|
||||||
obj := config.NewFunc()
|
obj := config.NewFunc()
|
||||||
@ -242,12 +248,16 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
|
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
|
||||||
|
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
|
||||||
|
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
||||||
|
reflector.WatchListPageSize = storageWatchListPageSize
|
||||||
cacher := &Cacher{
|
cacher := &Cacher{
|
||||||
ready: newReady(),
|
ready: newReady(),
|
||||||
storage: config.Storage,
|
storage: config.Storage,
|
||||||
objectType: reflect.TypeOf(obj),
|
objectType: reflect.TypeOf(obj),
|
||||||
watchCache: watchCache,
|
watchCache: watchCache,
|
||||||
reflector: cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0),
|
reflector: reflector,
|
||||||
versioner: config.Versioner,
|
versioner: config.Versioner,
|
||||||
triggerFunc: config.TriggerPublisherFunc,
|
triggerFunc: config.TriggerPublisherFunc,
|
||||||
watcherIdx: 0,
|
watcherIdx: 0,
|
||||||
@ -849,7 +859,8 @@ type cacherListerWatcher struct {
|
|||||||
newListFunc func() runtime.Object
|
newListFunc func() runtime.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
|
// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
|
||||||
|
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
|
||||||
return &cacherListerWatcher{
|
return &cacherListerWatcher{
|
||||||
storage: storage,
|
storage: storage,
|
||||||
resourcePrefix: resourcePrefix,
|
resourcePrefix: resourcePrefix,
|
||||||
@ -860,7 +871,14 @@ func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, ne
|
|||||||
// Implements cache.ListerWatcher interface.
|
// Implements cache.ListerWatcher interface.
|
||||||
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
|
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
list := lw.newListFunc()
|
list := lw.newListFunc()
|
||||||
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", storage.Everything, list); err != nil {
|
pred := storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
Limit: options.Limit,
|
||||||
|
Continue: options.Continue,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return list, nil
|
return list, nil
|
||||||
|
@ -384,6 +384,8 @@ func (s *store) GuaranteedUpdate(
|
|||||||
|
|
||||||
// GetToList implements storage.Interface.GetToList.
|
// GetToList implements storage.Interface.GetToList.
|
||||||
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
||||||
|
trace := utiltrace.New(fmt.Sprintf("GetToList etcd3: key=%v, resourceVersion=%s, limit: %d, continue: %s", key, resourceVersion, pred.Limit, pred.Continue))
|
||||||
|
defer trace.LogIfLong(500 * time.Millisecond)
|
||||||
listPtr, err := meta.GetItemsPtr(listObj)
|
listPtr, err := meta.GetItemsPtr(listObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -487,6 +489,8 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
|
|||||||
|
|
||||||
// List implements storage.Interface.List.
|
// List implements storage.Interface.List.
|
||||||
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
||||||
|
trace := utiltrace.New(fmt.Sprintf("List etcd3: key=%v, resourceVersion=%s, limit: %d, continue: %s", key, resourceVersion, pred.Limit, pred.Continue))
|
||||||
|
defer trace.LogIfLong(500 * time.Millisecond)
|
||||||
listPtr, err := meta.GetItemsPtr(listObj)
|
listPtr, err := meta.GetItemsPtr(listObj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -692,3 +692,85 @@ func TestRandomWatchDeliver(t *testing.T) {
|
|||||||
watched++
|
watched++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCacherListerWatcher(t *testing.T) {
|
||||||
|
prefix := "pods"
|
||||||
|
fn := func() runtime.Object { return &example.PodList{} }
|
||||||
|
server, store := newEtcdTestStorage(t, prefix)
|
||||||
|
defer server.Terminate(t)
|
||||||
|
|
||||||
|
podFoo := makeTestPod("foo")
|
||||||
|
podBar := makeTestPod("bar")
|
||||||
|
podBaz := makeTestPod("baz")
|
||||||
|
|
||||||
|
_ = updatePod(t, store, podFoo, nil)
|
||||||
|
_ = updatePod(t, store, podBar, nil)
|
||||||
|
_ = updatePod(t, store, podBaz, nil)
|
||||||
|
|
||||||
|
lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn)
|
||||||
|
|
||||||
|
obj, err := lw.List(metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("List failed: %v", err)
|
||||||
|
}
|
||||||
|
pl, ok := obj.(*example.PodList)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected PodList but got %t", pl)
|
||||||
|
}
|
||||||
|
if len(pl.Items) != 3 {
|
||||||
|
t.Errorf("Expected PodList of length 3 but got %d", len(pl.Items))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacherListerWatcherPagination(t *testing.T) {
|
||||||
|
prefix := "pods"
|
||||||
|
fn := func() runtime.Object { return &example.PodList{} }
|
||||||
|
server, store := newEtcdTestStorage(t, prefix)
|
||||||
|
defer server.Terminate(t)
|
||||||
|
|
||||||
|
podFoo := makeTestPod("foo")
|
||||||
|
podBar := makeTestPod("bar")
|
||||||
|
podBaz := makeTestPod("baz")
|
||||||
|
|
||||||
|
_ = updatePod(t, store, podFoo, nil)
|
||||||
|
_ = updatePod(t, store, podBar, nil)
|
||||||
|
_ = updatePod(t, store, podBaz, nil)
|
||||||
|
|
||||||
|
lw := cacherstorage.NewCacherListerWatcher(store, prefix, fn)
|
||||||
|
|
||||||
|
obj1, err := lw.List(metav1.ListOptions{Limit: 2})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("List failed: %v", err)
|
||||||
|
}
|
||||||
|
limit1, ok := obj1.(*example.PodList)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected PodList but got %t", limit1)
|
||||||
|
}
|
||||||
|
if len(limit1.Items) != 2 {
|
||||||
|
t.Errorf("Expected PodList of length 2 but got %d", len(limit1.Items))
|
||||||
|
}
|
||||||
|
if limit1.Continue == "" {
|
||||||
|
t.Errorf("Expected list to have Continue but got none")
|
||||||
|
}
|
||||||
|
obj2, err := lw.List(metav1.ListOptions{Limit: 2, Continue: limit1.Continue})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("List failed: %v", err)
|
||||||
|
}
|
||||||
|
limit2, ok := obj2.(*example.PodList)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected PodList but got %t", limit2)
|
||||||
|
}
|
||||||
|
if limit2.Continue != "" {
|
||||||
|
t.Errorf("Expected list not to have Continue, but got %s", limit1.Continue)
|
||||||
|
}
|
||||||
|
|
||||||
|
if limit1.Items[0].Name != podBar.Name {
|
||||||
|
t.Errorf("Expected list1.Items[0] to be %s but got %s", podBar.Name, limit1.Items[0].Name)
|
||||||
|
}
|
||||||
|
if limit1.Items[1].Name != podBaz.Name {
|
||||||
|
t.Errorf("Expected list1.Items[1] to be %s but got %s", podBaz.Name, limit1.Items[1].Name)
|
||||||
|
}
|
||||||
|
if limit2.Items[0].Name != podFoo.Name {
|
||||||
|
t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package cache
|
package cache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
@ -38,6 +39,7 @@ import (
|
|||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/client-go/tools/pager"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/utils/trace"
|
"k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
@ -68,6 +70,9 @@ type Reflector struct {
|
|||||||
lastSyncResourceVersion string
|
lastSyncResourceVersion string
|
||||||
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
|
||||||
lastSyncResourceVersionMutex sync.RWMutex
|
lastSyncResourceVersionMutex sync.RWMutex
|
||||||
|
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
|
||||||
|
// Defaults to pager.PageSize.
|
||||||
|
WatchListPageSize int64
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -179,7 +184,16 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
|
|||||||
panicCh <- r
|
panicCh <- r
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
list, err = r.listerWatcher.List(options)
|
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
|
||||||
|
// list request will return the full response.
|
||||||
|
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
return r.listerWatcher.List(opts)
|
||||||
|
}))
|
||||||
|
if r.WatchListPageSize != 0 {
|
||||||
|
pager.PageSize = r.WatchListPageSize
|
||||||
|
}
|
||||||
|
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
||||||
|
list, err = pager.List(context.Background(), options)
|
||||||
close(listCh)
|
close(listCh)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
@ -387,3 +387,46 @@ func TestReflectorResync(t *testing.T) {
|
|||||||
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
|
t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflectorWatchListPageSize(t *testing.T) {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
|
|
||||||
|
lw := &testLW{
|
||||||
|
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
// Stop once the reflector begins watching since we're only interested in the list.
|
||||||
|
close(stopCh)
|
||||||
|
fw := watch.NewFake()
|
||||||
|
return fw, nil
|
||||||
|
},
|
||||||
|
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
if options.Limit != 4 {
|
||||||
|
t.Fatalf("Expected list Limit of 4 but got %d", options.Limit)
|
||||||
|
}
|
||||||
|
pods := make([]v1.Pod, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
|
||||||
|
}
|
||||||
|
switch options.Continue {
|
||||||
|
case "":
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C1"}, Items: pods[0:4]}, nil
|
||||||
|
case "C1":
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10", Continue: "C2"}, Items: pods[4:8]}, nil
|
||||||
|
case "C2":
|
||||||
|
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[8:10]}, nil
|
||||||
|
default:
|
||||||
|
t.Fatalf("Unrecognized continue: %s", options.Continue)
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r := NewReflector(lw, &v1.Pod{}, s, 0)
|
||||||
|
// Set the reflector to paginate the list request in 4 item chunks.
|
||||||
|
r.WatchListPageSize = 4
|
||||||
|
r.ListAndWatch(stopCh)
|
||||||
|
|
||||||
|
results := s.List()
|
||||||
|
if len(results) != 10 {
|
||||||
|
t.Errorf("Expected 10 results, got %d", len(results))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -65,8 +65,16 @@ func setup(t *testing.T, groupVersions ...schema.GroupVersion) (*httptest.Server
|
|||||||
return setupWithResources(t, groupVersions, nil)
|
return setupWithResources(t, groupVersions, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setupWithOptions(t *testing.T, opts *framework.MasterConfigOptions, groupVersions ...schema.GroupVersion) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
|
||||||
|
return setupWithResourcesWithOptions(t, opts, groupVersions, nil)
|
||||||
|
}
|
||||||
|
|
||||||
func setupWithResources(t *testing.T, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
|
func setupWithResources(t *testing.T, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
|
||||||
masterConfig := framework.NewIntegrationTestMasterConfig()
|
return setupWithResourcesWithOptions(t, &framework.MasterConfigOptions{}, groupVersions, resources)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupWithResourcesWithOptions(t *testing.T, opts *framework.MasterConfigOptions, groupVersions []schema.GroupVersion, resources []schema.GroupVersionResource) (*httptest.Server, clientset.Interface, framework.CloseFunc) {
|
||||||
|
masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(opts)
|
||||||
if len(groupVersions) > 0 || len(resources) > 0 {
|
if len(groupVersions) > 0 || len(resources) > 0 {
|
||||||
resourceConfig := master.DefaultAPIResourceConfigSource()
|
resourceConfig := master.DefaultAPIResourceConfigSource()
|
||||||
resourceConfig.EnableVersions(groupVersions...)
|
resourceConfig.EnableVersions(groupVersions...)
|
||||||
@ -189,6 +197,62 @@ func Test202StatusCode(t *testing.T) {
|
|||||||
verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202)
|
verifyStatusCode(t, "DELETE", s.URL+path.Join("/apis/apps/v1/namespaces", ns.Name, "replicasets", rs.Name), cascDel, 202)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListResourceVersion0(t *testing.T) {
|
||||||
|
var testcases = []struct {
|
||||||
|
name string
|
||||||
|
watchCacheEnabled bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "watchCacheOn",
|
||||||
|
watchCacheEnabled: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "watchCacheOff",
|
||||||
|
watchCacheEnabled: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range testcases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)()
|
||||||
|
etcdOptions := framework.DefaultEtcdOptions()
|
||||||
|
etcdOptions.EnableWatchCache = tc.watchCacheEnabled
|
||||||
|
s, clientSet, closeFn := setupWithOptions(t, &framework.MasterConfigOptions{EtcdOptions: etcdOptions})
|
||||||
|
defer closeFn()
|
||||||
|
|
||||||
|
ns := framework.CreateTestingNamespace("list-paging", s, t)
|
||||||
|
defer framework.DeleteTestingNamespace(ns, s, t)
|
||||||
|
|
||||||
|
rsClient := clientSet.AppsV1().ReplicaSets(ns.Name)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
rs := newRS(ns.Name)
|
||||||
|
rs.Name = fmt.Sprintf("test-%d", i)
|
||||||
|
if _, err := rsClient.Create(rs); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pagerFn := func(opts metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
return rsClient.List(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
p := pager.New(pager.SimplePageFunc(pagerFn))
|
||||||
|
p.PageSize = 3
|
||||||
|
listObj, err := p.List(context.Background(), metav1.ListOptions{ResourceVersion: "0"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected list error: %v", err)
|
||||||
|
}
|
||||||
|
items, err := meta.ExtractList(listObj)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to extract list from %v", listObj)
|
||||||
|
}
|
||||||
|
if len(items) != 10 {
|
||||||
|
t.Errorf("Expected list size of 10 but got %d", len(items))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAPIListChunking(t *testing.T) {
|
func TestAPIListChunking(t *testing.T) {
|
||||||
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)()
|
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIListChunking, true)()
|
||||||
s, clientSet, closeFn := setup(t)
|
s, clientSet, closeFn := setup(t)
|
||||||
|
@ -242,7 +242,13 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
|
|||||||
|
|
||||||
// NewIntegrationTestMasterConfig returns the master config appropriate for most integration tests.
|
// NewIntegrationTestMasterConfig returns the master config appropriate for most integration tests.
|
||||||
func NewIntegrationTestMasterConfig() *master.Config {
|
func NewIntegrationTestMasterConfig() *master.Config {
|
||||||
masterConfig := NewMasterConfig()
|
return NewIntegrationTestMasterConfigWithOptions(&MasterConfigOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewIntegrationTestMasterConfigWithOptions returns the master config appropriate for most integration tests
|
||||||
|
// configured with the provided options.
|
||||||
|
func NewIntegrationTestMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config {
|
||||||
|
masterConfig := NewMasterConfigWithOptions(opts)
|
||||||
masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
|
masterConfig.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
|
||||||
masterConfig.ExtraConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource()
|
masterConfig.ExtraConfig.APIResourceConfigSource = master.DefaultAPIResourceConfigSource()
|
||||||
|
|
||||||
@ -252,13 +258,32 @@ func NewIntegrationTestMasterConfig() *master.Config {
|
|||||||
return masterConfig
|
return masterConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMasterConfig returns a basic master config.
|
// MasterConfigOptions are the configurable options for a new integration test master config.
|
||||||
func NewMasterConfig() *master.Config {
|
type MasterConfigOptions struct {
|
||||||
|
EtcdOptions *options.EtcdOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultEtcdOptions are the default EtcdOptions for use with integration tests.
|
||||||
|
func DefaultEtcdOptions() *options.EtcdOptions {
|
||||||
// This causes the integration tests to exercise the etcd
|
// This causes the integration tests to exercise the etcd
|
||||||
// prefix code, so please don't change without ensuring
|
// prefix code, so please don't change without ensuring
|
||||||
// sufficient coverage in other ways.
|
// sufficient coverage in other ways.
|
||||||
etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil))
|
etcdOptions := options.NewEtcdOptions(storagebackend.NewDefaultConfig(uuid.New(), nil))
|
||||||
etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
|
etcdOptions.StorageConfig.Transport.ServerList = []string{GetEtcdURL()}
|
||||||
|
return etcdOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMasterConfig returns a basic master config.
|
||||||
|
func NewMasterConfig() *master.Config {
|
||||||
|
return NewMasterConfigWithOptions(&MasterConfigOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMasterConfigWithOptions returns a basic master config configured with the provided options.
|
||||||
|
func NewMasterConfigWithOptions(opts *MasterConfigOptions) *master.Config {
|
||||||
|
etcdOptions := DefaultEtcdOptions()
|
||||||
|
if opts.EtcdOptions != nil {
|
||||||
|
etcdOptions = opts.EtcdOptions
|
||||||
|
}
|
||||||
|
|
||||||
info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
info, _ := runtime.SerializerInfoForMediaType(legacyscheme.Codecs.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
||||||
ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info)
|
ns := NewSingleContentTypeSerializer(legacyscheme.Scheme, info)
|
||||||
|
Loading…
Reference in New Issue
Block a user