mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 04:27:54 +00:00
Merge pull request #119975 from p0lyn0mial/upstream-storage-get-current-rv
storage/util: move GetCurrentResourceVersionFromStorage
This commit is contained in:
commit
ba10ee7671
@ -21,7 +21,6 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -773,7 +772,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
||||
return c.storage.GetList(ctx, key, opts, listObj)
|
||||
}
|
||||
if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
|
||||
listRV, err = c.getCurrentResourceVersionFromStorage(ctx)
|
||||
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1249,42 +1248,6 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
||||
return c.versioner.ParseResourceVersion(resourceVersion)
|
||||
}
|
||||
|
||||
// getCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
|
||||
// this method issues an empty list request and reads only the ResourceVersion from the object metadata
|
||||
func (c *Cacher) getCurrentResourceVersionFromStorage(ctx context.Context) (uint64, error) {
|
||||
if c.newListFunc == nil {
|
||||
return 0, fmt.Errorf("newListFunction wasn't provided for %v", c.objectType)
|
||||
}
|
||||
emptyList := c.newListFunc()
|
||||
pred := storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
Limit: 1, // just in case we actually hit something
|
||||
}
|
||||
|
||||
err := c.storage.GetList(ctx, c.resourcePrefix, storage.ListOptions{Predicate: pred}, emptyList)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
emptyListAccessor, err := meta.ListAccessor(emptyList)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if emptyListAccessor == nil {
|
||||
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
|
||||
}
|
||||
|
||||
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if currentResourceVersion == 0 {
|
||||
return 0, fmt.Errorf("the current resource version must be greater than 0")
|
||||
}
|
||||
return uint64(currentResourceVersion), nil
|
||||
}
|
||||
|
||||
// getBookmarkAfterResourceVersionLockedFunc returns a function that
|
||||
// spits a ResourceVersion after which the bookmark event will be delivered.
|
||||
//
|
||||
@ -1318,7 +1281,7 @@ func (c *Cacher) getStartResourceVersionForWatchLockedFunc(ctx context.Context,
|
||||
func (c *Cacher) getCommonResourceVersionLockedFunc(ctx context.Context, parsedWatchResourceVersion uint64, opts storage.ListOptions) (func() uint64, error) {
|
||||
switch {
|
||||
case len(opts.ResourceVersion) == 0:
|
||||
rv, err := c.getCurrentResourceVersionFromStorage(ctx)
|
||||
rv, err := storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -30,7 +30,6 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -42,12 +41,9 @@ import (
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
example2v1 "k8s.io/apiserver/pkg/apis/example2/v1"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/utils/clock"
|
||||
@ -1814,76 +1810,6 @@ func TestCacherWatchSemantics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
|
||||
// test data
|
||||
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
||||
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
|
||||
return server, storage
|
||||
}
|
||||
server, etcdStorage := newEtcdTestStorage(t, "")
|
||||
defer server.Terminate(t)
|
||||
podCacher, versioner, err := newTestCacher(etcdStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create podCacher: %v", err)
|
||||
}
|
||||
defer podCacher.Stop()
|
||||
|
||||
makePod := func(name string) *example.Pod {
|
||||
return &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
||||
}
|
||||
}
|
||||
createPod := func(obj *example.Pod) *example.Pod {
|
||||
key := "pods/" + obj.Namespace + "/" + obj.Name
|
||||
out := &example.Pod{}
|
||||
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
|
||||
require.NoError(t, err)
|
||||
return out
|
||||
}
|
||||
getPod := func(name, ns string) *example.Pod {
|
||||
key := "pods/" + ns + "/" + name
|
||||
out := &example.Pod{}
|
||||
err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out)
|
||||
require.NoError(t, err)
|
||||
return out
|
||||
}
|
||||
makeReplicaSet := func(name string) *example2v1.ReplicaSet {
|
||||
return &example2v1.ReplicaSet{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
||||
}
|
||||
}
|
||||
createReplicaSet := func(obj *example2v1.ReplicaSet) *example2v1.ReplicaSet {
|
||||
key := "replicasets/" + obj.Namespace + "/" + obj.Name
|
||||
out := &example2v1.ReplicaSet{}
|
||||
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
|
||||
require.NoError(t, err)
|
||||
return out
|
||||
}
|
||||
|
||||
// create a pod and make sure its RV is equal to the one maintained by etcd
|
||||
pod := createPod(makePod("pod-1"))
|
||||
currentStorageRV, err := podCacher.getCurrentResourceVersionFromStorage(context.TODO())
|
||||
require.NoError(t, err)
|
||||
podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")
|
||||
|
||||
// now create a replicaset (new resource) and make sure the target function returns global etcd RV
|
||||
rs := createReplicaSet(makeReplicaSet("replicaset-1"))
|
||||
currentStorageRV, err = podCacher.getCurrentResourceVersionFromStorage(context.TODO())
|
||||
require.NoError(t, err)
|
||||
rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV")
|
||||
|
||||
// ensure that the pod's RV hasn't been changed
|
||||
currentPod := getPod(pod.Name, pod.Namespace)
|
||||
currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
|
||||
}
|
||||
|
||||
func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
|
||||
backingStorage := &dummyStorage{}
|
||||
|
@ -17,11 +17,15 @@ limitations under the License.
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/api/validation/path"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
)
|
||||
|
||||
@ -79,3 +83,42 @@ func (hwm *HighWaterMark) Update(current int64) bool {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetCurrentResourceVersionFromStorage gets the current resource version from the underlying storage engine.
|
||||
// This method issues an empty list request and reads only the ResourceVersion from the object metadata
|
||||
func GetCurrentResourceVersionFromStorage(ctx context.Context, storage Interface, newListFunc func() runtime.Object, resourcePrefix, objectType string) (uint64, error) {
|
||||
if storage == nil {
|
||||
return 0, fmt.Errorf("storage wasn't provided for %s", objectType)
|
||||
}
|
||||
if newListFunc == nil {
|
||||
return 0, fmt.Errorf("newListFunction wasn't provided for %s", objectType)
|
||||
}
|
||||
emptyList := newListFunc()
|
||||
pred := SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
Limit: 1, // just in case we actually hit something
|
||||
}
|
||||
|
||||
err := storage.GetList(ctx, resourcePrefix, ListOptions{Predicate: pred}, emptyList)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
emptyListAccessor, err := meta.ListAccessor(emptyList)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if emptyListAccessor == nil {
|
||||
return 0, fmt.Errorf("unable to extract a list accessor from %T", emptyList)
|
||||
}
|
||||
|
||||
currentResourceVersion, err := strconv.Atoi(emptyListAccessor.GetResourceVersion())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if currentResourceVersion == 0 {
|
||||
return 0, fmt.Errorf("the current resource version must be greater than 0")
|
||||
}
|
||||
return uint64(currentResourceVersion), nil
|
||||
}
|
||||
|
@ -14,16 +14,44 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
package storage_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"k8s.io/apimachinery/pkg/api/apitesting"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
example2v1 "k8s.io/apiserver/pkg/apis/example2/v1"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
|
||||
)
|
||||
|
||||
var (
|
||||
scheme = runtime.NewScheme()
|
||||
codecs = serializer.NewCodecFactory(scheme)
|
||||
)
|
||||
|
||||
func init() {
|
||||
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
|
||||
utilruntime.Must(example.AddToScheme(scheme))
|
||||
utilruntime.Must(examplev1.AddToScheme(scheme))
|
||||
utilruntime.Must(example2v1.AddToScheme(scheme))
|
||||
}
|
||||
|
||||
func TestHighWaterMark(t *testing.T) {
|
||||
var h HighWaterMark
|
||||
var h storage.HighWaterMark
|
||||
|
||||
for i := int64(10); i < 20; i++ {
|
||||
if !h.Update(i) {
|
||||
@ -52,3 +80,69 @@ func TestHighWaterMark(t *testing.T) {
|
||||
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetCurrentResourceVersionFromStorage(t *testing.T) {
|
||||
// test data
|
||||
newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
||||
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, prefix, schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
|
||||
return server, storage
|
||||
}
|
||||
server, etcdStorage := newEtcdTestStorage(t, "")
|
||||
defer server.Terminate(t)
|
||||
versioner := storage.APIObjectVersioner{}
|
||||
|
||||
makePod := func(name string) *example.Pod {
|
||||
return &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
||||
}
|
||||
}
|
||||
createPod := func(obj *example.Pod) *example.Pod {
|
||||
key := "pods/" + obj.Namespace + "/" + obj.Name
|
||||
out := &example.Pod{}
|
||||
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
|
||||
require.NoError(t, err)
|
||||
return out
|
||||
}
|
||||
getPod := func(name, ns string) *example.Pod {
|
||||
key := "pods/" + ns + "/" + name
|
||||
out := &example.Pod{}
|
||||
err := etcdStorage.Get(context.TODO(), key, storage.GetOptions{}, out)
|
||||
require.NoError(t, err)
|
||||
return out
|
||||
}
|
||||
makeReplicaSet := func(name string) *example2v1.ReplicaSet {
|
||||
return &example2v1.ReplicaSet{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: name},
|
||||
}
|
||||
}
|
||||
createReplicaSet := func(obj *example2v1.ReplicaSet) *example2v1.ReplicaSet {
|
||||
key := "replicasets/" + obj.Namespace + "/" + obj.Name
|
||||
out := &example2v1.ReplicaSet{}
|
||||
err := etcdStorage.Create(context.TODO(), key, obj, out, 0)
|
||||
require.NoError(t, err)
|
||||
return out
|
||||
}
|
||||
|
||||
// create a pod and make sure its RV is equal to the one maintained by etcd
|
||||
pod := createPod(makePod("pod-1"))
|
||||
currentStorageRV, err := storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod")
|
||||
require.NoError(t, err)
|
||||
podRV, err := versioner.ParseResourceVersion(pod.ResourceVersion)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, currentStorageRV, podRV, "expected the global etcd RV to be equal to pod's RV")
|
||||
|
||||
// now create a replicaset (new resource) and make sure the target function returns global etcd RV
|
||||
rs := createReplicaSet(makeReplicaSet("replicaset-1"))
|
||||
currentStorageRV, err = storage.GetCurrentResourceVersionFromStorage(context.TODO(), etcdStorage, func() runtime.Object { return &example.PodList{} }, "/pods", "Pod")
|
||||
require.NoError(t, err)
|
||||
rsRV, err := versioner.ParseResourceVersion(rs.ResourceVersion)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, currentStorageRV, rsRV, "expected the global etcd RV to be equal to replicaset's RV")
|
||||
|
||||
// ensure that the pod's RV hasn't been changed
|
||||
currentPod := getPod(pod.Name, pod.Namespace)
|
||||
currentPodRV, err := versioner.ParseResourceVersion(currentPod.ResourceVersion)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, currentPodRV, podRV, "didn't expect to see the pod's RV changed")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user