mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #80160 from wojtek-t/propagate_error_from_cacher_creation
Propagate error from creating cacher and storage decorators up
This commit is contained in:
commit
13af8f286d
@ -30,7 +30,10 @@ import (
|
|||||||
func TestPodLogValidates(t *testing.T) {
|
func TestPodLogValidates(t *testing.T) {
|
||||||
config, server := registrytest.NewEtcdStorage(t, "")
|
config, server := registrytest.NewEtcdStorage(t, "")
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
s, destroyFunc := generic.NewRawStorage(config)
|
s, destroyFunc, err := generic.NewRawStorage(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
defer destroyFunc()
|
defer destroyFunc()
|
||||||
store := &genericregistry.Store{
|
store := &genericregistry.Store{
|
||||||
Storage: genericregistry.DryRunnableStorage{Storage: s},
|
Storage: genericregistry.DryRunnableStorage{Storage: s},
|
||||||
@ -46,7 +49,7 @@ func TestPodLogValidates(t *testing.T) {
|
|||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
_, err := logRest.Get(genericapirequest.NewDefaultContext(), "test", tc)
|
_, err := logRest.Get(genericapirequest.NewDefaultContext(), "test", tc)
|
||||||
if !errors.IsInvalid(err) {
|
if !errors.IsInvalid(err) {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -61,7 +61,10 @@ var _ rangeallocation.RangeRegistry = &Etcd{}
|
|||||||
// NewEtcd returns an allocator that is backed by Etcd and can manage
|
// NewEtcd returns an allocator that is backed by Etcd and can manage
|
||||||
// persisting the snapshot state of allocation after each allocation is made.
|
// persisting the snapshot state of allocation after each allocation is made.
|
||||||
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) *Etcd {
|
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) *Etcd {
|
||||||
storage, d := generic.NewRawStorage(config)
|
storage, d, err := generic.NewRawStorage(config)
|
||||||
|
if err != nil {
|
||||||
|
panic(err) // TODO: Propagate error up
|
||||||
|
}
|
||||||
|
|
||||||
// TODO : Remove RegisterStorageCleanup below when PR
|
// TODO : Remove RegisterStorageCleanup below when PR
|
||||||
// https://github.com/kubernetes/kubernetes/pull/50690
|
// https://github.com/kubernetes/kubernetes/pull/50690
|
||||||
|
@ -47,7 +47,10 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa
|
|||||||
etcd := allocatorstore.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage)
|
etcd := allocatorstore.NewEtcd(mem, "/ranges/serviceips", api.Resource("serviceipallocations"), etcdStorage)
|
||||||
return etcd
|
return etcd
|
||||||
})
|
})
|
||||||
s, d := generic.NewRawStorage(etcdStorage)
|
s, d, err := generic.NewRawStorage(etcdStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create storage: %v", err)
|
||||||
|
}
|
||||||
destroyFunc := func() {
|
destroyFunc := func() {
|
||||||
d()
|
d()
|
||||||
server.Terminate(t)
|
server.Terminate(t)
|
||||||
|
@ -34,7 +34,10 @@ import (
|
|||||||
func newStorage(t *testing.T) (*ScaleREST, *etcd3testing.EtcdTestServer, storage.Interface, factory.DestroyFunc) {
|
func newStorage(t *testing.T) (*ScaleREST, *etcd3testing.EtcdTestServer, storage.Interface, factory.DestroyFunc) {
|
||||||
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
etcdStorage, server := registrytest.NewEtcdStorage(t, "")
|
||||||
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "controllers"}
|
restOptions := generic.RESTOptions{StorageConfig: etcdStorage, Decorator: generic.UndecoratedStorage, DeleteCollectionWorkers: 1, ResourcePrefix: "controllers"}
|
||||||
s, d := generic.NewRawStorage(etcdStorage)
|
s, d, err := generic.NewRawStorage(etcdStorage)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create storage: %v", err)
|
||||||
|
}
|
||||||
destroyFunc := func() {
|
destroyFunc := func() {
|
||||||
d()
|
d()
|
||||||
server.Terminate(t)
|
server.Terminate(t)
|
||||||
|
@ -23,7 +23,6 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library",
|
||||||
"//vendor/k8s.io/klog:go_default_library",
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -39,12 +39,15 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
|
|||||||
newFunc func() runtime.Object,
|
newFunc func() runtime.Object,
|
||||||
newListFunc func() runtime.Object,
|
newListFunc func() runtime.Object,
|
||||||
getAttrsFunc storage.AttrFunc,
|
getAttrsFunc storage.AttrFunc,
|
||||||
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) {
|
||||||
|
|
||||||
s, d := generic.NewRawStorage(storageConfig)
|
s, d, err := generic.NewRawStorage(storageConfig)
|
||||||
|
if err != nil {
|
||||||
|
return s, d, err
|
||||||
|
}
|
||||||
if capacity <= 0 {
|
if capacity <= 0 {
|
||||||
klog.V(5).Infof("Storage caching is disabled for %T", newFunc())
|
klog.V(5).Infof("Storage caching is disabled for %T", newFunc())
|
||||||
return s, d
|
return s, d, nil
|
||||||
}
|
}
|
||||||
if klog.V(5) {
|
if klog.V(5) {
|
||||||
klog.Infof("Storage caching is enabled for %T with capacity %v", newFunc(), capacity)
|
klog.Infof("Storage caching is enabled for %T with capacity %v", newFunc(), capacity)
|
||||||
@ -64,7 +67,10 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
|
|||||||
TriggerPublisherFunc: triggerFunc,
|
TriggerPublisherFunc: triggerFunc,
|
||||||
Codec: storageConfig.Codec,
|
Codec: storageConfig.Codec,
|
||||||
}
|
}
|
||||||
cacher := cacherstorage.NewCacherFromConfig(cacherConfig)
|
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, func() {}, err
|
||||||
|
}
|
||||||
destroyFunc := func() {
|
destroyFunc := func() {
|
||||||
cacher.Stop()
|
cacher.Stop()
|
||||||
d()
|
d()
|
||||||
@ -75,7 +81,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
|
|||||||
// merges as that shuts down storage properly
|
// merges as that shuts down storage properly
|
||||||
RegisterStorageCleanup(destroyFunc)
|
RegisterStorageCleanup(destroyFunc)
|
||||||
|
|
||||||
return cacher, destroyFunc
|
return cacher, destroyFunc, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1310,7 +1310,8 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
|||||||
|
|
||||||
if e.Storage.Storage == nil {
|
if e.Storage.Storage == nil {
|
||||||
e.Storage.Codec = opts.StorageConfig.Codec
|
e.Storage.Codec = opts.StorageConfig.Codec
|
||||||
e.Storage.Storage, e.DestroyFunc = opts.Decorator(
|
var err error
|
||||||
|
e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
|
||||||
opts.StorageConfig,
|
opts.StorageConfig,
|
||||||
prefix,
|
prefix,
|
||||||
keyFunc,
|
keyFunc,
|
||||||
@ -1319,6 +1320,9 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
|
|||||||
attrFunc,
|
attrFunc,
|
||||||
triggerFunc,
|
triggerFunc,
|
||||||
)
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
|
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
|
||||||
|
|
||||||
if opts.CountMetricPollPeriod > 0 {
|
if opts.CountMetricPollPeriod > 0 {
|
||||||
|
@ -1561,7 +1561,10 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
|
|||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: sc.Codec,
|
Codec: sc.Codec,
|
||||||
}
|
}
|
||||||
cacher := cacherstorage.NewCacherFromConfig(config)
|
cacher, err := cacherstorage.NewCacherFromConfig(config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
d := destroyFunc
|
d := destroyFunc
|
||||||
s = cacher
|
s = cacher
|
||||||
destroyFunc = func() {
|
destroyFunc = func() {
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||||
"k8s.io/klog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// StorageDecorator is a function signature for producing a storage.Interface
|
// StorageDecorator is a function signature for producing a storage.Interface
|
||||||
@ -33,7 +32,7 @@ type StorageDecorator func(
|
|||||||
newFunc func() runtime.Object,
|
newFunc func() runtime.Object,
|
||||||
newListFunc func() runtime.Object,
|
newListFunc func() runtime.Object,
|
||||||
getAttrsFunc storage.AttrFunc,
|
getAttrsFunc storage.AttrFunc,
|
||||||
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc)
|
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error)
|
||||||
|
|
||||||
// UndecoratedStorage returns the given a new storage from the given config
|
// UndecoratedStorage returns the given a new storage from the given config
|
||||||
// without any decoration.
|
// without any decoration.
|
||||||
@ -44,17 +43,13 @@ func UndecoratedStorage(
|
|||||||
newFunc func() runtime.Object,
|
newFunc func() runtime.Object,
|
||||||
newListFunc func() runtime.Object,
|
newListFunc func() runtime.Object,
|
||||||
getAttrsFunc storage.AttrFunc,
|
getAttrsFunc storage.AttrFunc,
|
||||||
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
|
trigger storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc, error) {
|
||||||
return NewRawStorage(config)
|
return NewRawStorage(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRawStorage creates the low level kv storage. This is a work-around for current
|
// NewRawStorage creates the low level kv storage. This is a work-around for current
|
||||||
// two layer of same storage interface.
|
// two layer of same storage interface.
|
||||||
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
|
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
|
||||||
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
|
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
|
||||||
s, d, err := factory.Create(*config)
|
return factory.Create(*config)
|
||||||
if err != nil {
|
|
||||||
klog.Fatalf("Unable to create storage backend: config (%v), err (%v)", config, err)
|
|
||||||
}
|
|
||||||
return s, d
|
|
||||||
}
|
}
|
||||||
|
@ -291,13 +291,13 @@ type Cacher struct {
|
|||||||
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
|
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
|
||||||
// its internal cache and updating its cache in the background based on the
|
// its internal cache and updating its cache in the background based on the
|
||||||
// given configuration.
|
// given configuration.
|
||||||
func NewCacherFromConfig(config Config) *Cacher {
|
func NewCacherFromConfig(config Config) (*Cacher, error) {
|
||||||
stopCh := make(chan struct{})
|
stopCh := make(chan struct{})
|
||||||
obj := config.NewFunc()
|
obj := config.NewFunc()
|
||||||
// Give this error when it is constructed rather than when you get the
|
// Give this error when it is constructed rather than when you get the
|
||||||
// first watch item, because it's much easier to track down that way.
|
// first watch item, because it's much easier to track down that way.
|
||||||
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
|
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
|
||||||
panic("storage codec doesn't seem to match given type: " + err.Error())
|
return nil, fmt.Errorf("storage codec doesn't seem to match given type: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
clock := clock.RealClock{}
|
clock := clock.RealClock{}
|
||||||
@ -363,7 +363,7 @@ func NewCacherFromConfig(config Config) *Cacher {
|
|||||||
)
|
)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return cacher
|
return cacher, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
||||||
|
@ -257,7 +257,7 @@ func init() {
|
|||||||
utilruntime.Must(examplev1.AddToScheme(scheme))
|
utilruntime.Must(examplev1.AddToScheme(scheme))
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
|
func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner, error) {
|
||||||
prefix := "pods"
|
prefix := "pods"
|
||||||
config := Config{
|
config := Config{
|
||||||
CacheCapacity: cap,
|
CacheCapacity: cap,
|
||||||
@ -270,7 +270,8 @@ func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
|
|||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||||
}
|
}
|
||||||
return NewCacherFromConfig(config), testVersioner{}
|
cacher, err := NewCacherFromConfig(config)
|
||||||
|
return cacher, testVersioner{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type dummyStorage struct {
|
type dummyStorage struct {
|
||||||
@ -328,7 +329,10 @@ func (d *dummyStorage) Count(_ string) (int64, error) {
|
|||||||
|
|
||||||
func TestListWithLimitAndRV0(t *testing.T) {
|
func TestListWithLimitAndRV0(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _ := newTestCacher(backingStorage, 0)
|
cacher, _, err := newTestCacher(backingStorage, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
pred := storage.SelectionPredicate{
|
pred := storage.SelectionPredicate{
|
||||||
@ -341,7 +345,7 @@ func TestListWithLimitAndRV0(t *testing.T) {
|
|||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.err = errDummy
|
backingStorage.err = errDummy
|
||||||
err := cacher.List(context.TODO(), "pods/ns", "0", pred, result)
|
err = cacher.List(context.TODO(), "pods/ns", "0", pred, result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("List with Limit and RV=0 should be served from cache: %v", err)
|
t.Errorf("List with Limit and RV=0 should be served from cache: %v", err)
|
||||||
}
|
}
|
||||||
@ -354,7 +358,10 @@ func TestListWithLimitAndRV0(t *testing.T) {
|
|||||||
|
|
||||||
func TestGetToListWithLimitAndRV0(t *testing.T) {
|
func TestGetToListWithLimitAndRV0(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _ := newTestCacher(backingStorage, 0)
|
cacher, _, err := newTestCacher(backingStorage, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
pred := storage.SelectionPredicate{
|
pred := storage.SelectionPredicate{
|
||||||
@ -367,7 +374,7 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
|
|||||||
|
|
||||||
// Inject error to underlying layer and check if cacher is not bypassed.
|
// Inject error to underlying layer and check if cacher is not bypassed.
|
||||||
backingStorage.err = errDummy
|
backingStorage.err = errDummy
|
||||||
err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result)
|
err = cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err)
|
t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err)
|
||||||
}
|
}
|
||||||
@ -380,7 +387,10 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
|
|||||||
|
|
||||||
func TestWatcherNotGoingBackInTime(t *testing.T) {
|
func TestWatcherNotGoingBackInTime(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _ := newTestCacher(backingStorage, 1000)
|
cacher, _, err := newTestCacher(backingStorage, 1000)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
@ -498,7 +508,10 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||||||
|
|
||||||
func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
|
func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _ := newTestCacher(backingStorage, 1000)
|
cacher, _, err := newTestCacher(backingStorage, 1000)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
@ -577,7 +590,10 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
|
|||||||
func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBookmarks, expectedBookmarks bool) {
|
func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBookmarks, expectedBookmarks bool) {
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, watchCacheEnabled)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, watchCacheEnabled)()
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _ := newTestCacher(backingStorage, 1000)
|
cacher, _, err := newTestCacher(backingStorage, 1000)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
@ -676,7 +692,10 @@ func TestCacherSendBookmarkEvents(t *testing.T) {
|
|||||||
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _ := newTestCacher(backingStorage, 1000)
|
cacher, _, err := newTestCacher(backingStorage, 1000)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
@ -686,7 +705,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
|||||||
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
|
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
|
||||||
|
|
||||||
resourceVersion := uint64(1000)
|
resourceVersion := uint64(1000)
|
||||||
err := cacher.watchCache.Add(&examplev1.Pod{
|
err = cacher.watchCache.Add(&examplev1.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: fmt.Sprintf("pod-0"),
|
Name: fmt.Sprintf("pod-0"),
|
||||||
Namespace: "ns",
|
Namespace: "ns",
|
||||||
@ -746,7 +765,10 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
|
|||||||
|
|
||||||
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
|
||||||
backingStorage := &dummyStorage{}
|
backingStorage := &dummyStorage{}
|
||||||
cacher, _ := newTestCacher(backingStorage, 1000)
|
cacher, _, err := newTestCacher(backingStorage, 1000)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Wait until cacher is initialized.
|
// Wait until cacher is initialized.
|
||||||
|
@ -99,7 +99,7 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServ
|
|||||||
return server, storage
|
return server, storage
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner) {
|
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner, error) {
|
||||||
prefix := "pods"
|
prefix := "pods"
|
||||||
v := etcd3.APIObjectVersioner{}
|
v := etcd3.APIObjectVersioner{}
|
||||||
config := cacherstorage.Config{
|
config := cacherstorage.Config{
|
||||||
@ -113,7 +113,8 @@ func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage
|
|||||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||||
}
|
}
|
||||||
return cacherstorage.NewCacherFromConfig(config), v
|
cacher, err := cacherstorage.NewCacherFromConfig(config)
|
||||||
|
return cacher, v, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeTestPod(name string) *example.Pod {
|
func makeTestPod(name string) *example.Pod {
|
||||||
@ -148,7 +149,10 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
|
|||||||
func TestGet(t *testing.T) {
|
func TestGet(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
podFoo := makeTestPod("foo")
|
podFoo := makeTestPod("foo")
|
||||||
@ -179,7 +183,10 @@ func TestGet(t *testing.T) {
|
|||||||
func TestGetToList(t *testing.T) {
|
func TestGetToList(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
storedObj := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
storedObj := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
||||||
@ -235,7 +242,10 @@ func TestGetToList(t *testing.T) {
|
|||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
podFoo := makeTestPod("foo")
|
podFoo := makeTestPod("foo")
|
||||||
@ -316,7 +326,10 @@ func TestList(t *testing.T) {
|
|||||||
func TestInfiniteList(t *testing.T) {
|
func TestInfiniteList(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
podFoo := makeTestPod("foo")
|
podFoo := makeTestPod("foo")
|
||||||
@ -372,7 +385,10 @@ func TestWatch(t *testing.T) {
|
|||||||
// Inject one list error to make sure we test the relist case.
|
// Inject one list error to make sure we test the relist case.
|
||||||
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
|
cacher, _, err := newTestCacher(etcdStorage, 3) // small capacity to trigger "too old version" error
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
podFoo := makeTestPod("foo")
|
podFoo := makeTestPod("foo")
|
||||||
@ -447,7 +463,10 @@ func TestWatch(t *testing.T) {
|
|||||||
func TestWatcherTimeout(t *testing.T) {
|
func TestWatcherTimeout(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// initialVersion is used to initate the watcher at the beginning of the world,
|
// initialVersion is used to initate the watcher at the beginning of the world,
|
||||||
@ -489,7 +508,10 @@ func TestWatcherTimeout(t *testing.T) {
|
|||||||
func TestFiltering(t *testing.T) {
|
func TestFiltering(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// Ensure that the cacher is initialized, before creating any pods,
|
// Ensure that the cacher is initialized, before creating any pods,
|
||||||
@ -551,7 +573,10 @@ func TestFiltering(t *testing.T) {
|
|||||||
func TestStartingResourceVersion(t *testing.T) {
|
func TestStartingResourceVersion(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// add 1 object
|
// add 1 object
|
||||||
@ -609,7 +634,10 @@ func TestEmptyWatchEventCache(t *testing.T) {
|
|||||||
|
|
||||||
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
||||||
|
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
// get rv of last pod created
|
// get rv of last pod created
|
||||||
@ -663,7 +691,10 @@ func TestEmptyWatchEventCache(t *testing.T) {
|
|||||||
func TestRandomWatchDeliver(t *testing.T) {
|
func TestRandomWatchDeliver(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
||||||
@ -789,7 +820,10 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
|||||||
|
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
||||||
@ -851,7 +885,10 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
|
|||||||
|
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v, err := newTestCacher(etcdStorage, 10)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create cacher: %v", err)
|
||||||
|
}
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
|
||||||
pred := storage.Everything
|
pred := storage.Everything
|
||||||
|
Loading…
Reference in New Issue
Block a user