mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
Move GetCurrentResourceVersion to storage.Interface
This commit is contained in:
parent
3d9fcb7c01
commit
fea89f25d1
@ -1248,7 +1248,7 @@ func (c *Cacher) getWatchCacheResourceVersion(ctx context.Context, parsedWatchRe
|
|||||||
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
|
if !utilfeature.DefaultFeatureGate.Enabled(features.WatchFromStorageWithoutResourceVersion) && opts.SendInitialEvents == nil && opts.ResourceVersion == "" {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
}
|
}
|
||||||
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
|
rv, err := c.storage.GetCurrentResourceVersion(ctx)
|
||||||
return rv, err
|
return rv, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,6 +113,7 @@ type dummyStorage struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
err error
|
err error
|
||||||
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
|
getListFn func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error
|
||||||
|
getRVFn func(_ context.Context) (uint64, error)
|
||||||
watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
|
watchFn func(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error)
|
||||||
|
|
||||||
// use getRequestWatchProgressCounter when reading
|
// use getRequestWatchProgressCounter when reading
|
||||||
@ -199,6 +200,13 @@ func (d *dummyStorage) injectError(err error) {
|
|||||||
d.err = err
|
d.err = err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *dummyStorage) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
|
||||||
|
if d.getRVFn != nil {
|
||||||
|
return d.getRVFn(ctx)
|
||||||
|
}
|
||||||
|
return 100, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetListCacheBypass(t *testing.T) {
|
func TestGetListCacheBypass(t *testing.T) {
|
||||||
type opts struct {
|
type opts struct {
|
||||||
ResourceVersion string
|
ResourceVersion string
|
||||||
@ -463,6 +471,14 @@ apiserver_watch_cache_consistent_read_total{fallback="true", resource="pods", su
|
|||||||
podList.ResourceVersion = tc.storageRV
|
podList.ResourceVersion = tc.storageRV
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
backingStorage.getRVFn = func(_ context.Context) (uint64, error) {
|
||||||
|
requestToStorageCount += 1
|
||||||
|
rv, err := strconv.Atoi(tc.storageRV)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to parse RV: %s", err)
|
||||||
|
}
|
||||||
|
return uint64(rv), nil
|
||||||
|
}
|
||||||
result := &example.PodList{}
|
result := &example.PodList{}
|
||||||
|
|
||||||
ctx, clockStepCancelFn := context.WithTimeout(context.TODO(), time.Minute)
|
ctx, clockStepCancelFn := context.WithTimeout(context.TODO(), time.Minute)
|
||||||
@ -2111,6 +2127,15 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
|||||||
listAccessor.SetResourceVersion("105")
|
listAccessor.SetResourceVersion("105")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
s.getRVFn = func(_ context.Context) (uint64, error) {
|
||||||
|
// the first call to this function
|
||||||
|
// primes the cacher
|
||||||
|
if !hasBeenPrimed {
|
||||||
|
hasBeenPrimed = true
|
||||||
|
return 100, nil
|
||||||
|
}
|
||||||
|
return 105, nil
|
||||||
|
}
|
||||||
return s
|
return s
|
||||||
}(),
|
}(),
|
||||||
verifyBackingStore: func(t *testing.T, s *dummyStorage) {
|
verifyBackingStore: func(t *testing.T, s *dummyStorage) {
|
||||||
@ -2146,6 +2171,15 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
|||||||
listAccessor.SetResourceVersion("105")
|
listAccessor.SetResourceVersion("105")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
s.getRVFn = func(_ context.Context) (uint64, error) {
|
||||||
|
// the first call to this function
|
||||||
|
// primes the cacher
|
||||||
|
if !hasBeenPrimed {
|
||||||
|
hasBeenPrimed = true
|
||||||
|
return 100, nil
|
||||||
|
}
|
||||||
|
return 105, nil
|
||||||
|
}
|
||||||
return s
|
return s
|
||||||
}(),
|
}(),
|
||||||
verifyBackingStore: func(t *testing.T, s *dummyStorage) {
|
verifyBackingStore: func(t *testing.T, s *dummyStorage) {
|
||||||
|
@ -57,6 +57,10 @@ func (c *CacheDelegator) Create(ctx context.Context, key string, obj, out runtim
|
|||||||
return c.storage.Create(ctx, key, obj, out, ttl)
|
return c.storage.Create(ctx, key, obj, out, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *CacheDelegator) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
|
||||||
|
return c.storage.GetCurrentResourceVersion(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CacheDelegator) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
|
func (c *CacheDelegator) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object, opts storage.DeleteOptions) error {
|
||||||
// Ignore the suggestion and try to pass down the current version of the object
|
// Ignore the suggestion and try to pass down the current version of the object
|
||||||
// read from cache.
|
// read from cache.
|
||||||
@ -160,7 +164,7 @@ func (c *CacheDelegator) GetList(ctx context.Context, key string, opts storage.L
|
|||||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||||
consistentRead := opts.ResourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
|
consistentRead := opts.ResourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported
|
||||||
if consistentRead {
|
if consistentRead {
|
||||||
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.cacher.newListFunc, c.cacher.resourcePrefix, c.cacher.objectType.String())
|
listRV, err = c.storage.GetCurrentResourceVersion(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -35,6 +36,8 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/conversion"
|
"k8s.io/apimachinery/pkg/conversion"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
@ -84,6 +87,9 @@ type store struct {
|
|||||||
leaseManager *leaseManager
|
leaseManager *leaseManager
|
||||||
decoder Decoder
|
decoder Decoder
|
||||||
listErrAggrFactory func() ListErrorAggregator
|
listErrAggrFactory func() ListErrorAggregator
|
||||||
|
|
||||||
|
resourcePrefix string
|
||||||
|
newListFunc func() runtime.Object
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) RequestWatchProgress(ctx context.Context) error {
|
func (s *store) RequestWatchProgress(ctx context.Context) error {
|
||||||
@ -185,10 +191,13 @@ func newStore(c *kubernetes.Client, codec runtime.Codec, newFunc, newListFunc fu
|
|||||||
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
|
leaseManager: newDefaultLeaseManager(c.Client, leaseManagerConfig),
|
||||||
decoder: decoder,
|
decoder: decoder,
|
||||||
listErrAggrFactory: listErrAggrFactory,
|
listErrAggrFactory: listErrAggrFactory,
|
||||||
|
|
||||||
|
resourcePrefix: resourcePrefix,
|
||||||
|
newListFunc: newListFunc,
|
||||||
}
|
}
|
||||||
|
|
||||||
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
|
w.getCurrentStorageRV = func(ctx context.Context) (uint64, error) {
|
||||||
return storage.GetCurrentResourceVersionFromStorage(ctx, s, newListFunc, resourcePrefix, w.objectType)
|
return s.GetCurrentResourceVersion(ctx)
|
||||||
}
|
}
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) || utilfeature.DefaultFeatureGate.Enabled(features.WatchList) {
|
||||||
etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress)
|
etcdfeature.DefaultFeatureSupportChecker.CheckClient(c.Ctx(), c, storage.RequestWatchProgress)
|
||||||
@ -677,6 +686,37 @@ func (s *store) resolveGetListRev(continueKey string, continueRV int64, opts sto
|
|||||||
return withRev, nil
|
return withRev, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *store) GetCurrentResourceVersion(ctx context.Context) (uint64, error) {
|
||||||
|
emptyList := s.newListFunc()
|
||||||
|
pred := storage.SelectionPredicate{
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
Limit: 1, // just in case we actually hit something
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.GetList(ctx, s.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
emptyListAccessor, err := meta.ListAccessor(emptyList)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
if emptyListAccessor == nil {
|
||||||
|
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
|
||||||
|
}
|
||||||
|
|
||||||
|
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if currentResourceVersion == 0 {
|
||||||
|
return 0, fmt.Errorf("the current resource version must be greater than 0")
|
||||||
|
}
|
||||||
|
return uint64(currentResourceVersion), nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetList implements storage.Interface.
|
// GetList implements storage.Interface.
|
||||||
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
func (s *store) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||||
keyPrefix, err := s.prepareKey(key)
|
keyPrefix, err := s.prepareKey(key)
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/go-logr/logr"
|
"github.com/go-logr/logr"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
"go.etcd.io/etcd/client/v3/kubernetes"
|
"go.etcd.io/etcd/client/v3/kubernetes"
|
||||||
"go.etcd.io/etcd/server/v3/embed"
|
"go.etcd.io/etcd/server/v3/embed"
|
||||||
@ -994,3 +995,49 @@ func BenchmarkStoreList(b *testing.B) {
|
|||||||
func computePodKey(obj *example.Pod) string {
|
func computePodKey(obj *example.Pod) string {
|
||||||
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
|
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetCurrentResourceVersion(t *testing.T) {
|
||||||
|
ctx, store, _ := testSetup(t)
|
||||||
|
|
||||||
|
makePod := func(name string) *example.Pod {
|
||||||
|
return &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
createPod := func(obj *example.Pod) *example.Pod {
|
||||||
|
key := "pods/" + obj.Namespace + "/" + obj.Name
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := store.Create(context.TODO(), key, obj, out, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
getPod := func(name, ns string) *example.Pod {
|
||||||
|
key := "pods/" + ns + "/" + name
|
||||||
|
out := &example.Pod{}
|
||||||
|
err := store.Get(context.TODO(), key, storage.GetOptions{}, out)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a pod and make sure its RV is equal to the one maintained by etcd
|
||||||
|
pod := createPod(makePod("pod-1"))
|
||||||
|
currentStorageRV, err := store.GetCurrentResourceVersion(context.TODO())
|
||||||
|
require.NoError(t, err)
|
||||||
|
podRV, err := store.versioner.ParseResourceVersion(pod.ResourceVersion)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")
|
||||||
|
|
||||||
|
// now make unrelated write and make sure the target function returns global etcd RV
|
||||||
|
resp, err := store.client.KV.Put(ctx, "compact_rev_key", pod.ResourceVersion)
|
||||||
|
require.NoError(t, err)
|
||||||
|
currentStorageRV, err = store.GetCurrentResourceVersion(context.TODO())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, currentStorageRV, uint64(resp.Header.Revision), "expected the global etcd RV to be equal to replicaset's RV")
|
||||||
|
|
||||||
|
// ensure that the pod's RV hasn't been changed
|
||||||
|
currentPod := getPod(pod.Name, pod.Namespace)
|
||||||
|
currentPodRV, err := store.versioner.ParseResourceVersion(currentPod.ResourceVersion)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
|
||||||
|
}
|
||||||
|
@ -262,6 +262,10 @@ type Interface interface {
|
|||||||
// TODO: Remove when storage.Interface will be separate from etc3.store.
|
// TODO: Remove when storage.Interface will be separate from etc3.store.
|
||||||
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
|
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
|
||||||
RequestWatchProgress(ctx context.Context) error
|
RequestWatchProgress(ctx context.Context) error
|
||||||
|
|
||||||
|
// GetCurrentResourceVersion gets the current resource version from etcd.
|
||||||
|
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
|
||||||
|
GetCurrentResourceVersion(ctx context.Context) (uint64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetOptions provides the options that may be provided for storage get operations.
|
// GetOptions provides the options that may be provided for storage get operations.
|
||||||
|
@ -1532,7 +1532,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
|
|||||||
}
|
}
|
||||||
|
|
||||||
if scenario.useCurrentRV {
|
if scenario.useCurrentRV {
|
||||||
currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(ctx, store, func() runtime.Object { return &example.PodList{} }, "/pods", "")
|
currentStorageRV, err := store.GetCurrentResourceVersion(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
scenario.resourceVersion = fmt.Sprintf("%d", currentStorageRV)
|
scenario.resourceVersion = fmt.Sprintf("%d", currentStorageRV)
|
||||||
}
|
}
|
||||||
|
@ -17,16 +17,12 @@ limitations under the License.
|
|||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
"k8s.io/apimachinery/pkg/api/validation/path"
|
"k8s.io/apimachinery/pkg/api/validation/path"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -81,45 +77,6 @@ func (hwm *HighWaterMark) Update(current int64) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
|
|
||||||
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
|
|
||||||
func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) {
|
|
||||||
if storage == nil {
|
|
||||||
return 0, fmt.Errorf("storage wasn't provided for %s", objectType)
|
|
||||||
}
|
|
||||||
if newListFunc == nil {
|
|
||||||
return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType)
|
|
||||||
}
|
|
||||||
emptyList := newListFunc()
|
|
||||||
pred := SelectionPredicate{
|
|
||||||
Label: labels.Everything(),
|
|
||||||
Field: fields.Everything(),
|
|
||||||
Limit: 1, // just in case we actually hit something
|
|
||||||
}
|
|
||||||
|
|
||||||
err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
emptyListAccessor, err := meta.ListAccessor(emptyList)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
if emptyListAccessor == nil {
|
|
||||||
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if currentResourceVersion == 0 {
|
|
||||||
return 0, fmt.Errorf("the current resource version must be greater than 0")
|
|
||||||
}
|
|
||||||
return uint64(currentResourceVersion), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AnnotateInitialEventsEndBookmark adds a special annotation to the given object
|
// AnnotateInitialEventsEndBookmark adds a special annotation to the given object
|
||||||
// which indicates that the initial events have been sent.
|
// which indicates that the initial events have been sent.
|
||||||
//
|
//
|
||||||
|
@ -17,30 +17,22 @@ limitations under the License.
|
|||||||
package storage_test
|
package storage_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
example2v1 "k8s.io/apiserver/pkg/apis/example2/v1"
|
example2v1 "k8s.io/apiserver/pkg/apis/example2/v1"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
|
||||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
|
||||||
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
scheme = runtime.NewScheme()
|
scheme = runtime.NewScheme()
|
||||||
codecs = serializer.NewCodecFactory(scheme)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -81,74 +73,6 @@ func TestHighWaterMark(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
|
|
||||||
// test data
|
|
||||||
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
|
||||||
versioner := storage.APIObjectVersioner{}
|
|
||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion)
|
|
||||||
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
|
||||||
storage := etcd3.New(server.V3Client, codec, func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig(), etcd3.NewDefaultDecoder(codec, versioner), versioner)
|
|
||||||
return server, storage
|
|
||||||
}
|
|
||||||
server, etcdStorage := newEtcdTestStorage(t, "")
|
|
||||||
defer server.Terminate(t)
|
|
||||||
versioner := storage.APIObjectVersioner{}
|
|
||||||
|
|
||||||
makePod := func(name string) *example.Pod {
|
|
||||||
return &example.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
createPod := func(obj *example.Pod) *example.Pod {
|
|
||||||
key := "pods/" + obj.Namespace + "/" + obj.Name
|
|
||||||
out := &example.Pod{}
|
|
||||||
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
|
|
||||||
require.NoError(t, err)
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
getPod := func(name, ns string) *example.Pod {
|
|
||||||
key := "pods/" + ns + "/" + name
|
|
||||||
out := &example.Pod{}
|
|
||||||
err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out)
|
|
||||||
require.NoError(t, err)
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
makeReplicaSet := func(name string) *example2v1.ReplicaSet {
|
|
||||||
return &example2v1.ReplicaSet{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
createReplicaSet := func(obj *example2v1.ReplicaSet) *example2v1.ReplicaSet {
|
|
||||||
key := "replicasets/" + obj.Namespace + "/" + obj.Name
|
|
||||||
out := &example2v1.ReplicaSet{}
|
|
||||||
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
|
|
||||||
require.NoError(t, err)
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
// create a pod and make sure its RV is equal to the one maintained by etcd
|
|
||||||
pod := createPod(makePod("pod-1"))
|
|
||||||
currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod")
|
|
||||||
require.NoError(t, err)
|
|
||||||
podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")
|
|
||||||
|
|
||||||
// now create a replicaset (new resource) and make sure the target function returns global etcd RV
|
|
||||||
rs := createReplicaSet(makeReplicaSet("replicaset-1"))
|
|
||||||
currentStorageRV, err = storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod")
|
|
||||||
require.NoError(t, err)
|
|
||||||
rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV")
|
|
||||||
|
|
||||||
// ensure that the pod's RV hasn't been changed
|
|
||||||
currentPod := getPod(pod.Name, pod.Namespace)
|
|
||||||
currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHasInitialEventsEndBookmarkAnnotation(t *testing.T) {
|
func TestHasInitialEventsEndBookmarkAnnotation(t *testing.T) {
|
||||||
createPod := func(name string) *example.Pod {
|
createPod := func(name string) *example.Pod {
|
||||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}}
|
||||||
|
Loading…
Reference in New Issue
Block a user