Merge pull request #94364 from wojtek-t/efficient_watch_resumption

Efficient watch resumption
This commit is contained in:
Kubernetes Prow Robot 2020-09-25 15:42:48 -07:00 committed by GitHub
commit 0f39af90ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 480 additions and 84 deletions

View File

@ -258,7 +258,7 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
if err != nil {
klog.Fatalf("Error determining service IP ranges: %v", err)
}
leaseStorage, _, err := storagefactory.Create(*config)
leaseStorage, _, err := storagefactory.Create(*config, nil)
if err != nil {
klog.Fatalf("Error creating storage factory: %v", err)
}

View File

@ -30,7 +30,7 @@ import (
func TestPodLogValidates(t *testing.T) {
config, server := registrytest.NewEtcdStorage(t, "")
defer server.Terminate(t)
s, destroyFunc, err := generic.NewRawStorage(config)
s, destroyFunc, err := generic.NewRawStorage(config, nil)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@ -62,7 +62,7 @@ var _ rangeallocation.RangeRegistry = &Etcd{}
// NewEtcd returns an allocator that is backed by Etcd and can manage
// persisting the snapshot state of allocation after each allocation is made.
func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.GroupResource, config *storagebackend.Config) (*Etcd, error) {
storage, d, err := generic.NewRawStorage(config)
storage, d, err := generic.NewRawStorage(config, nil)
if err != nil {
return nil, err
}

View File

@ -54,7 +54,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, ipallocator.Interfa
if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err)
}
s, d, err := generic.NewRawStorage(etcdStorage)
s, d, err := generic.NewRawStorage(etcdStorage, nil)
if err != nil {
t.Fatalf("Couldn't create storage: %v", err)
}

View File

@ -57,7 +57,7 @@ func newStorage(t *testing.T) (*etcd3testing.EtcdTestServer, portallocator.Inter
if err != nil {
t.Fatalf("unexpected error creating etcd: %v", err)
}
s, d, err := generic.NewRawStorage(etcdStorage)
s, d, err := generic.NewRawStorage(etcdStorage, nil)
if err != nil {
t.Fatalf("Couldn't create storage: %v", err)
}

View File

@ -138,6 +138,12 @@ const (
//
// Allows sending warning headers in API responses.
WarningHeaders featuregate.Feature = "WarningHeaders"
// owner: @wojtek-t
// alpha: v1.20
//
// Allows for updating watchcache resource version with progress notify events.
EfficientWatchResumption featuregate.Feature = "EfficientWatchResumption"
)
func init() {
@ -148,18 +154,19 @@ func init() {
// To add a new feature, define a key for it above and add it here. The features will be
// available throughout Kubernetes binaries.
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.GA},
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
StreamingProxyRedirects: {Default: true, PreRelease: featuregate.Deprecated},
ValidateProxyRedirects: {Default: true, PreRelease: featuregate.Beta},
AdvancedAuditing: {Default: true, PreRelease: featuregate.GA},
APIResponseCompression: {Default: true, PreRelease: featuregate.Beta},
APIListChunking: {Default: true, PreRelease: featuregate.Beta},
DryRun: {Default: true, PreRelease: featuregate.GA},
RemainingItemCount: {Default: true, PreRelease: featuregate.Beta},
ServerSideApply: {Default: true, PreRelease: featuregate.Beta},
StorageVersionHash: {Default: true, PreRelease: featuregate.Beta},
WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha},
RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta},
SelectorIndex: {Default: true, PreRelease: featuregate.Beta},
WarningHeaders: {Default: true, PreRelease: featuregate.Beta},
EfficientWatchResumption: {Default: false, PreRelease: featuregate.Alpha},
}

View File

@ -38,7 +38,7 @@ import (
func NewDryRunnableTestStorage(t *testing.T) (DryRunnableStorage, func()) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, destroy, err := factory.Create(*sc)
s, destroy, err := factory.Create(*sc, nil)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}

View File

@ -45,7 +45,7 @@ func StorageWithCacher() generic.StorageDecorator {
triggerFuncs storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
s, d, err := generic.NewRawStorage(storageConfig)
s, d, err := generic.NewRawStorage(storageConfig, newFunc)
if err != nil {
return s, d, err
}

View File

@ -1601,8 +1601,11 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
strategy := &testRESTStrategy{scheme, names.SimpleNameGenerator, true, false, true}
newFunc := func() runtime.Object { return &example.Pod{} }
newListFunc := func() runtime.Object { return &example.PodList{} }
sc.Codec = apitesting.TestStorageCodec(codecs, examplev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc)
s, dFunc, err := factory.Create(*sc, newFunc)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
@ -1617,8 +1620,8 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs,
NewFunc: func() runtime.Object { return &example.Pod{} },
NewListFunc: func() runtime.Object { return &example.PodList{} },
NewFunc: newFunc,
NewListFunc: newListFunc,
Codec: sc.Codec,
}
cacher, err := cacherstorage.NewCacherFromConfig(config)

View File

@ -47,12 +47,12 @@ func UndecoratedStorage(
getAttrsFunc storage.AttrFunc,
trigger storage.IndexerFuncs,
indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
return NewRawStorage(config)
return NewRawStorage(config, newFunc)
}
// NewRawStorage creates the low level kv storage. This is a work-around for current
// two layer of same storage interface.
// TODO: Once cacher is enabled on all registries (event registry is special), we will remove this method.
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config)
func NewRawStorage(config *storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, factory.DestroyFunc, error) {
return factory.Create(*config, newFunc)
}

View File

@ -1098,7 +1098,14 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
}
if utilfeature.DefaultFeatureGate.Enabled(features.EfficientWatchResumption) {
opts.ProgressNotify = true
}
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, opts)
}
// errWatcher implements watch.Interface to return a single error

View File

@ -381,6 +381,29 @@ func (w *watchCache) doCacheResizeLocked(capacity int) {
w.capacity = capacity
}
func (w *watchCache) UpdateResourceVersion(resourceVersion string) {
rv, err := w.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
klog.Errorf("Couldn't parse resourceVersion: %v", err)
return
}
w.Lock()
defer w.Unlock()
w.resourceVersion = rv
// Don't dispatch bookmarks coming from the storage layer.
// They can be very frequent (even to the level of subseconds)
// to allow efficient watch resumption on kube-apiserver restarts,
// and propagating them down may overload the whole system.
//
// TODO: If at some point we decide the performance and scalability
// footprint is acceptable, this is the place to hook them in.
// However, we then need to check if this was called as a result
// of a bookmark event or regular Add/Update/Delete operation by
// checking if resourceVersion here has changed.
}
// List returns list of pointers to <storeElement> objects.
func (w *watchCache) List() []interface{} {
return w.store.List()

View File

@ -23,12 +23,13 @@ import (
)
type event struct {
key string
value []byte
prevValue []byte
rev int64
isDeleted bool
isCreated bool
key string
value []byte
prevValue []byte
rev int64
isDeleted bool
isCreated bool
isProgressNotify bool
}
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
@ -61,3 +62,10 @@ func parseEvent(e *clientv3.Event) (*event, error) {
}
return ret, nil
}
func progressNotifyEvent(rev int64) *event {
return &event{
rev: rev,
isProgressNotify: true,
}
}

View File

@ -61,6 +61,14 @@ var (
},
[]string{"endpoint"},
)
etcdBookmarkCounts = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Name: "etcd_bookmark_counts",
Help: "Number of etcd bookmarks (progress notify events) split by kind.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"resource"},
)
)
var registerMetrics sync.Once
@ -72,6 +80,7 @@ func Register() {
legacyregistry.MustRegister(etcdRequestLatency)
legacyregistry.MustRegister(objectCounts)
legacyregistry.MustRegister(dbTotalSize)
legacyregistry.MustRegister(etcdBookmarkCounts)
})
}
@ -85,6 +94,11 @@ func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatency.WithLabelValues(verb, resource).Observe(sinceInSeconds(startTime))
}
// RecordEtcdBookmark updates the etcd_bookmark_counts metric.
func RecordEtcdBookmark(resource string) {
etcdBookmarkCounts.WithLabelValues(resource).Inc()
}
// Reset resets the etcd_request_duration_seconds metric.
func Reset() {
etcdRequestLatency.Reset()

View File

@ -83,11 +83,11 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, pagingEnabled, codec, prefix, transformer)
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer)
}
func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := APIObjectVersioner{}
result := &store{
client: c,
@ -99,7 +99,7 @@ func newStore(c *clientv3.Client, pagingEnabled bool, codec runtime.Codec, prefi
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
pathPrefix: path.Join("/", prefix),
watcher: newWatcher(c, codec, versioner, transformer),
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
leaseManager: newDefaultLeaseManager(c),
}
return result
@ -784,7 +784,7 @@ func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions,
return nil, err
}
key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate)
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.ProgressNotify, opts.Predicate)
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {

View File

@ -99,6 +99,10 @@ func (p *prefixTransformer) resetReads() {
p.reads = 0
}
func newPod() runtime.Object {
return &example.Pod{}
}
func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
@ -818,7 +822,7 @@ func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
preset := []struct {
@ -895,8 +899,8 @@ func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
@ -1394,7 +1398,7 @@ func TestListContinuation(t *testing.T) {
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, true, codec, "", transformer)
store := newStore(etcdClient, newPod, true, codec, "", transformer)
ctx := context.Background()
// Setup storage with the following structure:
@ -1556,7 +1560,7 @@ func TestListContinuationWithFilter(t *testing.T) {
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, true, codec, "", transformer)
store := newStore(etcdClient, newPod, true, codec, "", transformer)
ctx := context.Background()
preset := []struct {
@ -1659,7 +1663,7 @@ func TestListInconsistentContinuation(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
@ -1804,7 +1808,7 @@ func TestListInconsistentContinuation(t *testing.T) {
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10
@ -1850,7 +1854,7 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry",
}
for configuredPrefix, effectivePrefix := range testcases {
store := newStore(cluster.RandClient(), true, codec, configuredPrefix, transformer)
store := newStore(cluster.RandClient(), nil, true, codec, configuredPrefix, transformer)
if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
}
@ -2017,7 +2021,7 @@ func TestConsistentList(t *testing.T) {
transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
}
store := newStore(cluster.RandClient(), true, codec, "", transformer)
store := newStore(cluster.RandClient(), newPod, true, codec, "", transformer)
transformer.store = store
for i := 0; i < 5; i++ {
@ -2079,7 +2083,6 @@ func TestConsistentList(t *testing.T) {
if !reflect.DeepEqual(result3, result4) {
t.Errorf("inconsistent lists: %#v, %#v", result3, result4)
}
}
func TestCount(t *testing.T) {

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"strconv"
"strings"
"sync"
@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
"k8s.io/apiserver/pkg/storage/value"
"go.etcd.io/etcd/clientv3"
@ -68,6 +70,8 @@ func TestOnlySetFatalOnDecodeError(b bool) {
type watcher struct {
client *clientv3.Client
codec runtime.Codec
newFunc func() runtime.Object
objectType string
versioner storage.Versioner
transformer value.Transformer
}
@ -78,6 +82,7 @@ type watchChan struct {
key string
initialRev int64
recursive bool
progressNotify bool
internalPred storage.SelectionPredicate
ctx context.Context
cancel context.CancelFunc
@ -86,13 +91,20 @@ type watchChan struct {
errChan chan error
}
func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.Versioner, transformer value.Transformer) *watcher {
return &watcher{
func newWatcher(client *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, versioner storage.Versioner, transformer value.Transformer) *watcher {
res := &watcher{
client: client,
codec: codec,
newFunc: newFunc,
versioner: versioner,
transformer: transformer,
}
if newFunc == nil {
res.objectType = "<unknown>"
} else {
res.objectType = reflect.TypeOf(newFunc()).String()
}
return res
}
// Watch watches on a key and returns a watch.Interface that transfers relevant notifications.
@ -102,21 +114,22 @@ func newWatcher(client *clientv3.Client, codec runtime.Codec, versioner storage.
// If recursive is false, it watches on given key.
// If recursive is true, it watches any children and directories under the key, excluding the root key itself.
// pred must be non-nil. Only if pred matches the change, it will be returned.
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) (watch.Interface, error) {
func (w *watcher) Watch(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) (watch.Interface, error) {
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
wc := w.createWatchChan(ctx, key, rev, recursive, pred)
wc := w.createWatchChan(ctx, key, rev, recursive, progressNotify, pred)
go wc.run()
return wc, nil
}
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive bool, pred storage.SelectionPredicate) *watchChan {
func (w *watcher) createWatchChan(ctx context.Context, key string, rev int64, recursive, progressNotify bool, pred storage.SelectionPredicate) *watchChan {
wc := &watchChan{
watcher: w,
key: key,
initialRev: rev,
recursive: recursive,
progressNotify: progressNotify,
internalPred: pred,
incomingEventChan: make(chan *event, incomingBufSize),
resultChan: make(chan watch.Event, outgoingBufSize),
@ -223,6 +236,9 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
if wc.recursive {
opts = append(opts, clientv3.WithPrefix())
}
if wc.progressNotify {
opts = append(opts, clientv3.WithProgressNotify())
}
wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
for wres := range wch {
if wres.Err() != nil {
@ -232,6 +248,12 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
wc.sendError(err)
return
}
if wres.IsProgressNotify() {
wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
metrics.RecordEtcdBookmark(wc.watcher.objectType)
continue
}
for _, e := range wres.Events {
parsedEvent, err := parseEvent(e)
if err != nil {
@ -299,6 +321,19 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
}
switch {
case e.isProgressNotify:
if wc.watcher.newFunc == nil {
return nil
}
object := wc.watcher.newFunc()
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
klog.Errorf("failed to propagate object version: %v", err)
return nil
}
res = &watch.Event{
Type: watch.Bookmark,
Object: object,
}
case e.isDeleted:
if !wc.filter(oldObj) {
return nil
@ -376,6 +411,11 @@ func (wc *watchChan) sendEvent(e *event) {
}
func (wc *watchChan) prepareObjs(e *event) (curObj runtime.Object, oldObj runtime.Object, err error) {
if e.isProgressNotify {
// progressNotify events doesn't contain neither current nor previous object version,
return nil, nil, nil
}
if !e.isDeleted {
data, _, err := wc.watcher.transformer.TransformFromStorage(e.value, authenticatedDataString(e.key))
if err != nil {

View File

@ -225,13 +225,13 @@ func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
invalidStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")})
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
@ -246,7 +246,7 @@ func TestWatchContextCancel(t *testing.T) {
cancel()
// When we watch with a canceled context, we should detect that it's context canceled.
// We won't take it as error and also close the watcher.
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything)
w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, false, storage.Everything)
if err != nil {
t.Fatal(err)
}
@ -265,7 +265,7 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
origCtx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, cancel := context.WithCancel(origCtx)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, storage.Everything)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
// make resutlChan and errChan blocking to ensure ordering.
w.resultChan = make(chan watch.Event)
w.errChan = make(chan error)
@ -314,6 +314,37 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
}
}
func TestProgressNotify(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
clusterConfig := &integration.ClusterConfig{
Size: 1,
WatchProgressNotifyInterval: time.Second,
}
cluster := integration.NewClusterV3(t, clusterConfig)
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
key := "/somekey"
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}
out := &example.Pod{}
if err := store.Create(ctx, key, input, out, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
opts := storage.ListOptions{
ResourceVersion: out.ResourceVersion,
Predicate: storage.Everything,
ProgressNotify: true,
}
w, err := store.Watch(ctx, key, opts)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
result := &example.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: out.ResourceVersion}}
testCheckResult(t, 0, watch.Bookmark, w, result)
}
type testWatchStruct struct {
obj *example.Pod
expectEvent bool

View File

@ -269,4 +269,7 @@ type ListOptions struct {
ResourceVersionMatch metav1.ResourceVersionMatch
// Predicate provides the selection rules for the list operation.
Predicate SelectionPredicate
// ProgressNotify determines whether storage-originated bookmark (progress notify) events should
// be delivered to the users. The option is ignored for non-watch requests.
ProgressNotify bool
}

View File

@ -34,6 +34,7 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory",
importpath = "k8s.io/apiserver/pkg/storage/storagebackend/factory",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",

View File

@ -31,6 +31,7 @@ import (
"go.etcd.io/etcd/pkg/transport"
"google.golang.org/grpc"
"k8s.io/apimachinery/pkg/runtime"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/egressselector"
@ -217,7 +218,7 @@ func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration
}, nil
}
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
if err != nil {
return nil, nil, err
@ -249,7 +250,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
if transformer == nil {
transformer = value.IdentityTransformer
}
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the

View File

@ -19,6 +19,7 @@ package factory
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -27,12 +28,12 @@ import (
type DestroyFunc func()
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
func Create(c storagebackend.Config, newFunc func() runtime.Object) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
return newETCD3Storage(c, newFunc)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}

View File

@ -75,7 +75,7 @@ func TestTLSConnection(t *testing.T) {
},
Codec: codec,
}
storage, destroyFunc, err := newETCD3Storage(cfg)
storage, destroyFunc, err := newETCD3Storage(cfg, nil)
defer destroyFunc()
if err != nil {
t.Fatal(err)

View File

@ -99,9 +99,12 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
return source
}
func newPod() runtime.Object { return &example.Pod{} }
func newPodList() runtime.Object { return &example.PodList{} }
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true)
return server, storage
}
@ -118,8 +121,8 @@ func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstor
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: GetAttrs,
NewFunc: func() runtime.Object { return &example.Pod{} },
NewListFunc: func() runtime.Object { return &example.PodList{} },
NewFunc: newPod,
NewListFunc: newPodList,
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: clock,
}

View File

@ -101,6 +101,15 @@ type Reflector struct {
watchErrorHandler WatchErrorHandler
}
// ResourceVersionUpdater is an interface that allows store implementation to
// track the current resource version of the reflector. This is especially
// important if storage bookmarks are enabled.
type ResourceVersionUpdater interface {
// UpdateResourceVersion is called each time current resource version of the reflector
// is updated.
UpdateResourceVersion(resourceVersion string)
}
// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
@ -507,6 +516,9 @@ loop:
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}

View File

@ -910,3 +910,59 @@ func TestReflectorSetExpectedType(t *testing.T) {
})
}
}
type storeWithRV struct {
Store
// resourceVersions tracks values passed by UpdateResourceVersion
resourceVersions []string
}
func (s *storeWithRV) UpdateResourceVersion(resourceVersion string) {
s.resourceVersions = append(s.resourceVersions, resourceVersion)
}
func newStoreWithRV() *storeWithRV {
return &storeWithRV{
Store: NewStore(MetaNamespaceKeyFunc),
}
}
func TestReflectorResourceVersionUpdate(t *testing.T) {
s := newStoreWithRV()
stopCh := make(chan struct{})
fw := watch.NewFake()
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}}, nil
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
makePod := func(rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{ResourceVersion: rv}}
}
go func() {
fw.Action(watch.Added, makePod("10"))
fw.Action(watch.Modified, makePod("20"))
fw.Action(watch.Bookmark, makePod("30"))
fw.Action(watch.Deleted, makePod("40"))
close(stopCh)
}()
// Initial list should use RV=0
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}
expectedRVs := []string{"10", "20", "30", "40"}
if !reflect.DeepEqual(s.resourceVersions, expectedRVs) {
t.Errorf("Expected series of resource version updates of %#v but got: %#v", expectedRVs, s.resourceVersions)
}
}

View File

@ -15,6 +15,7 @@ go_test(
"max_request_body_bytes_test.go",
"patch_test.go",
"print_test.go",
"watchcache_test.go",
],
rundir = ".",
tags = [
@ -25,6 +26,7 @@ go_test(
"//cmd/kube-apiserver/app/options:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/controlplane:go_default_library",
"//pkg/controlplane/reconcilers:go_default_library",
"//pkg/features:go_default_library",
"//pkg/printers:go_default_library",
"//pkg/printers/internalversion:go_default_library",

View File

@ -0,0 +1,170 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"context"
"fmt"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/test/integration/framework"
)
// setup create kube-apiserver backed up by two separate etcds,
// with one of them containing events and the other all other objects.
func multiEtcdSetup(t testing.TB) (clientset.Interface, framework.CloseFunc) {
etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"}
etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs)
if err != nil {
t.Fatalf("Couldn't start etcd: %v", err)
}
etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs)
if err != nil {
t.Fatalf("Couldn't start etcd: %v", err)
}
etcdOptions := framework.DefaultEtcdOptions()
// Overwrite etcd setup to our custom etcd instances.
etcdOptions.StorageConfig.Transport.ServerList = []string{etcd0URL}
etcdOptions.EtcdServersOverrides = []string{fmt.Sprintf("/events#%s", etcd1URL)}
etcdOptions.EnableWatchCache = true
opts := framework.MasterConfigOptions{EtcdOptions: etcdOptions}
masterConfig := framework.NewIntegrationTestMasterConfigWithOptions(&opts)
// Switch off endpoints reconciler to avoid unnecessary operations.
masterConfig.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
_, s, stopMaster := framework.RunAMaster(masterConfig)
closeFn := func() {
stopMaster()
stopEtcd1()
stopEtcd0()
}
clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL, QPS: -1})
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
// Wait for apiserver to be stabilized.
// Everything but default service creation is checked in RunAMaster above by
// waiting for post start hooks, so we just wait for default service to exist.
// TODO(wojtek-t): Figure out less fragile way.
ctx := context.Background()
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
_, err := clientSet.CoreV1().Services("default").Get(ctx, "kubernetes", metav1.GetOptions{})
return err == nil, nil
}); err != nil {
t.Fatalf("Failed to wait for kubernetes service: %v:", err)
}
return clientSet, closeFn
}
func TestWatchCacheUpdatedByEtcd(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EfficientWatchResumption, true)()
c, closeFn := multiEtcdSetup(t)
defer closeFn()
ctx := context.Background()
makeConfigMap := func(name string) *v1.ConfigMap {
return &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: name}}
}
makeSecret := func(name string) *v1.Secret {
return &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}}
}
makeEvent := func(name string) *v1.Event {
return &v1.Event{ObjectMeta: metav1.ObjectMeta{Name: name}}
}
cm, err := c.CoreV1().ConfigMaps("default").Create(ctx, makeConfigMap("name"), metav1.CreateOptions{})
if err != nil {
t.Errorf("Couldn't create configmap: %v", err)
}
ev, err := c.CoreV1().Events("default").Create(ctx, makeEvent("name"), metav1.CreateOptions{})
if err != nil {
t.Errorf("Couldn't create event: %v", err)
}
listOptions := metav1.ListOptions{
ResourceVersion: "0",
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
}
// Wait until listing from cache returns resource version of corresponding
// resources (being the last updates).
t.Logf("Waiting for configmaps watchcache synced to %s", cm.ResourceVersion)
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
res, err := c.CoreV1().ConfigMaps("default").List(ctx, listOptions)
if err != nil {
return false, nil
}
return res.ResourceVersion == cm.ResourceVersion, nil
}); err != nil {
t.Errorf("Failed to wait for configmaps watchcache synced: %v", err)
}
t.Logf("Waiting for events watchcache synced to %s", ev.ResourceVersion)
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
res, err := c.CoreV1().Events("default").List(ctx, listOptions)
if err != nil {
return false, nil
}
return res.ResourceVersion == ev.ResourceVersion, nil
}); err != nil {
t.Errorf("Failed to wait for events watchcache synced: %v", err)
}
// Create a secret, that is stored in the same etcd as configmap, but
// different than events.
se, err := c.CoreV1().Secrets("default").Create(ctx, makeSecret("name"), metav1.CreateOptions{})
if err != nil {
t.Errorf("Couldn't create secret: %v", err)
}
t.Logf("Waiting for configmaps watchcache synced to %s", se.ResourceVersion)
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
res, err := c.CoreV1().ConfigMaps("default").List(ctx, listOptions)
if err != nil {
return false, nil
}
return res.ResourceVersion == se.ResourceVersion, nil
}); err != nil {
t.Errorf("Failed to wait for configmaps watchcache synced: %v", err)
}
t.Logf("Waiting for events watchcache NOT synced to %s", se.ResourceVersion)
if err := wait.Poll(100*time.Millisecond, 5*time.Second, func() (bool, error) {
res, err := c.CoreV1().Events("default").List(ctx, listOptions)
if err != nil {
return false, nil
}
return res.ResourceVersion == se.ResourceVersion, nil
}); err == nil || err != wait.ErrWaitTimeout {
t.Errorf("Events watchcache unexpected synced: %v", err)
}
}

View File

@ -84,41 +84,54 @@ func startEtcd() (func(), error) {
}
klog.V(1).Infof("could not connect to etcd: %v", err)
currentURL, stop, err := RunCustomEtcd("integration_test_etcd_data", nil)
if err != nil {
return nil, err
}
etcdURL = currentURL
os.Setenv("KUBE_INTEGRATION_ETCD_URL", etcdURL)
return stop, nil
}
// RunCustomEtcd starts a custom etcd instance for test purposes.
func RunCustomEtcd(dataDir string, customFlags []string) (url string, stopFn func(), err error) {
// TODO: Check for valid etcd version.
etcdPath, err := getEtcdPath()
if err != nil {
fmt.Fprintf(os.Stderr, installEtcd)
return nil, fmt.Errorf("could not find etcd in PATH: %v", err)
return "", nil, fmt.Errorf("could not find etcd in PATH: %v", err)
}
etcdPort, err := getAvailablePort()
if err != nil {
return nil, fmt.Errorf("could not get a port: %v", err)
return "", nil, fmt.Errorf("could not get a port: %v", err)
}
etcdURL = fmt.Sprintf("http://127.0.0.1:%d", etcdPort)
customURL := fmt.Sprintf("http://127.0.0.1:%d", etcdPort)
klog.Infof("starting etcd on %s", etcdURL)
klog.Infof("starting etcd on %s", customURL)
etcdDataDir, err := ioutil.TempDir(os.TempDir(), "integration_test_etcd_data")
etcdDataDir, err := ioutil.TempDir(os.TempDir(), dataDir)
if err != nil {
return nil, fmt.Errorf("unable to make temp etcd data dir: %v", err)
return "", nil, fmt.Errorf("unable to make temp etcd data dir %s: %v", dataDir, err)
}
klog.Infof("storing etcd data in: %v", etcdDataDir)
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(
ctx,
etcdPath,
args := []string{
"--data-dir",
etcdDataDir,
"--listen-client-urls",
GetEtcdURL(),
customURL,
"--advertise-client-urls",
GetEtcdURL(),
customURL,
"--listen-peer-urls",
"http://127.0.0.1:0",
"--log-package-levels",
"*=NOTICE", // set to INFO or DEBUG for more logs
)
}
args = append(args, customFlags...)
cmd := exec.CommandContext(ctx, etcdPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
stop := func() {
@ -136,14 +149,14 @@ func startEtcd() (func(), error) {
clientv3.SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, os.Stderr))
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("failed to run etcd: %v", err)
return "", nil, fmt.Errorf("failed to run etcd: %v", err)
}
var i int32 = 1
const pollCount = int32(300)
for i <= pollCount {
conn, err = net.DialTimeout("tcp", strings.TrimPrefix(etcdURL, "http://"), 1*time.Second)
conn, err := net.DialTimeout("tcp", strings.TrimPrefix(customURL, "http://"), 1*time.Second)
if err == nil {
conn.Close()
break
@ -151,16 +164,14 @@ func startEtcd() (func(), error) {
if i == pollCount {
stop()
return nil, fmt.Errorf("could not start etcd")
return "", nil, fmt.Errorf("could not start etcd")
}
time.Sleep(100 * time.Millisecond)
i = i + 1
}
os.Setenv("KUBE_INTEGRATION_ETCD_URL", etcdURL)
return stop, nil
return customURL, stop, nil
}
// EtcdMain starts an etcd instance before running tests.