mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #120212 from p0lyn0mial/upstream-etcd-watcher-validation
storage/etcd3: error when progressNotify option set and newFunc was not provided for a registry
This commit is contained in:
commit
6eca142082
@ -18,6 +18,7 @@ package etcd3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
@ -97,6 +98,9 @@ func (w *watcher) Watch(ctx context.Context, key string, rev int64, opts storage
|
||||
if opts.Recursive && !strings.HasSuffix(key, "/") {
|
||||
key += "/"
|
||||
}
|
||||
if opts.ProgressNotify && w.newFunc == nil {
|
||||
return nil, apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
|
||||
}
|
||||
wc := w.createWatchChan(ctx, key, rev, opts.Recursive, opts.ProgressNotify, opts.Predicate)
|
||||
go wc.run()
|
||||
|
||||
@ -334,9 +338,6 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
|
||||
|
||||
switch {
|
||||
case e.isProgressNotify:
|
||||
if wc.watcher.newFunc == nil {
|
||||
return nil
|
||||
}
|
||||
object := wc.watcher.newFunc()
|
||||
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
|
||||
klog.Errorf("failed to propagate object version: %v", err)
|
||||
|
@ -18,11 +18,14 @@ package etcd3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||
@ -133,3 +136,25 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestWatchErrorWhenNoNewFunc checks if an error
|
||||
// will be returned when establishing a watch
|
||||
// with progressNotify options set
|
||||
// when newFunc wasn't provided
|
||||
func TestWatchErrorWhenNoNewFunc(t *testing.T) {
|
||||
origCtx, store, _ := testSetup(t, func(opts *setupOptions) { opts.newFunc = nil })
|
||||
ctx, cancel := context.WithCancel(origCtx)
|
||||
defer cancel()
|
||||
|
||||
w, err := store.watcher.Watch(ctx, "/abc", 0, storage.ListOptions{ProgressNotify: true})
|
||||
if err == nil {
|
||||
t.Fatalf("expected an error but got none")
|
||||
}
|
||||
if w != nil {
|
||||
t.Fatalf("didn't expect a watcher because progress notifications cannot be delivered for a watcher without newFunc")
|
||||
}
|
||||
expectedError := apierrors.NewInternalError(errors.New("progressNotify for watch is unsupported by the etcd storage because no newFunc was provided"))
|
||||
if err.Error() != expectedError.Error() {
|
||||
t.Fatalf("unexpected err = %v, expected = %v", err, expectedError)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user