Merge pull request #64513 from fisherxu/useSameRv

Automatic merge from submit-queue (batch tested with PRs 65319, 64513, 65474, 65601, 65634). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Use same rvParse in watchcache and move cacheStorage in separate dir

**What this PR does / why we need it**:
Use same rvParse in watchcache and move cacheStorage in separate dir.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-06-29 12:15:09 -07:00 committed by GitHub
commit 7b46e884ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 281 additions and 175 deletions

View File

@ -1406,6 +1406,10 @@
"ImportPath": "k8s.io/apiserver/pkg/storage",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/cacher",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/errors",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -36,6 +36,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
@ -76,6 +77,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/registry/generic:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/errors:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/metrics:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
@ -49,7 +50,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface -- cacher and low level kv.
cacherConfig := storage.CacherConfig{
cacherConfig := cacherstorage.Config{
CacheCapacity: capacity,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
@ -61,7 +62,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
}
cacher := storage.NewCacherFromConfig(cacherConfig)
cacher := cacherstorage.NewCacherFromConfig(cacherConfig)
destroyFunc := func() {
cacher.Stop()
d()

View File

@ -49,6 +49,7 @@ import (
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/apiserver/pkg/storage/names"
@ -1849,7 +1850,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
server.Terminate(t)
}
if hasCacheEnabled {
config := storage.CacherConfig{
config := cacherstorage.Config{
CacheCapacity: 10,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
@ -1860,7 +1861,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: sc.Codec,
}
cacher := storage.NewCacherFromConfig(config)
cacher := cacherstorage.NewCacherFromConfig(config)
d := destroyFunc
s = cacher
destroyFunc = func() {

View File

@ -9,65 +9,38 @@ load(
go_test(
name = "go_default_test",
srcs = [
"cacher_whitebox_test.go",
"selection_predicate_test.go",
"time_budget_test.go",
"util_test.go",
"watch_cache_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"cacher.go",
"doc.go",
"errors.go",
"interfaces.go",
"selection_predicate.go",
"time_budget.go",
"util.go",
"watch_cache.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage",
importpath = "k8s.io/apiserver/pkg/storage",
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/validation/path:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/trace:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
@ -82,6 +55,7 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/errors:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:all-srcs",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:all-srcs",

View File

@ -0,0 +1,75 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cacher.go",
"time_budget.go",
"util.go",
"watch_cache.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/cacher",
importpath = "k8s.io/apiserver/pkg/storage/cacher",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/trace:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"cacher_whitebox_test.go",
"time_budget_test.go",
"util_test.go",
"watch_cache_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"context"
@ -37,21 +37,22 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/tools/cache"
)
// CacherConfig contains the configuration for a given Cache.
type CacherConfig struct {
// Config contains the configuration for a given Cache.
type Config struct {
// Maximum size of the history cached in memory.
CacheCapacity int
// An underlying storage.Interface.
Storage Interface
Storage storage.Interface
// An underlying storage.Versioner.
Versioner Versioner
Versioner storage.Versioner
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
@ -66,7 +67,7 @@ type CacherConfig struct {
// TriggerPublisherFunc is used for optimizing amount of watchers that
// needs to process an incoming event.
TriggerPublisherFunc TriggerPublisherFunc
TriggerPublisherFunc storage.TriggerPublisherFunc
// NewList is a function that creates new empty object storing a list of
// objects of type Type.
@ -141,7 +142,7 @@ type Cacher struct {
// HighWaterMarks for performance debugging.
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
incomingHWM HighWaterMark
incomingHWM storage.HighWaterMark
// Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent
@ -156,7 +157,7 @@ type Cacher struct {
ready *ready
// Underlying storage.Interface.
storage Interface
storage storage.Interface
// Expected type of objects in the underlying cache.
objectType reflect.Type
@ -166,11 +167,11 @@ type Cacher struct {
reflector *cache.Reflector
// Versioner is used to handle resource versions.
versioner Versioner
versioner storage.Versioner
// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc TriggerPublisherFunc
triggerFunc storage.TriggerPublisherFunc
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
@ -187,11 +188,11 @@ type Cacher struct {
stopWg sync.WaitGroup
}
// Create a new Cacher responsible for servicing WATCH and LIST requests from
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
func NewCacherFromConfig(config Config) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
@ -272,23 +273,23 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
}
}
// Implements storage.Interface.
func (c *Cacher) Versioner() Versioner {
// Versioner implements storage.Interface.
func (c *Cacher) Versioner() storage.Versioner {
return c.storage.Versioner()
}
// Implements storage.Interface.
// Create implements storage.Interface.
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}
// Implements storage.Interface.
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
// Delete implements storage.Interface.
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
return c.storage.Delete(ctx, key, out, preconditions)
}
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return nil, err
@ -344,12 +345,12 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
return watcher, nil
}
// Implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
// WatchList implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, pred)
}
// Implements storage.Interface.
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
@ -394,14 +395,14 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
} else {
objVal.Set(reflect.Zero(objVal.Type()))
if !ignoreNotFound {
return NewKeyNotFoundError(key, int64(readResourceVersion))
return storage.NewKeyNotFoundError(key, int64(readResourceVersion))
}
}
return nil
}
// Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
// GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
// If resourceVersion is not specified, serve it from underlying
@ -464,8 +465,8 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
return nil
}
// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
@ -539,10 +540,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
return nil
}
// Implements storage.Interface.
// GuaranteedUpdate implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
@ -555,6 +556,7 @@ func (c *Cacher) GuaranteedUpdate(
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
}
// Count implements storage.Interface.
func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
@ -651,6 +653,7 @@ func (c *Cacher) isStopped() bool {
return c.stopped
}
// Stop implements the graceful termination.
func (c *Cacher) Stop() {
// avoid stopping twice (note: cachers are shared with subresources)
if c.isStopped() {
@ -684,7 +687,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
}
}
func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFunc {
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool {
if !hasPathPrefix(objKey, key) {
return false
@ -694,7 +697,7 @@ func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFu
return filterFunc
}
// Returns resource version to which the underlying cache is synced.
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait()
@ -704,12 +707,12 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage Interface
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
}
func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
@ -720,7 +723,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", storage.Everything, list); err != nil {
return nil, err
}
return list, nil
@ -728,7 +731,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, storage.Everything)
}
// errWatcher implements watch.Interface to return a single error
@ -780,10 +783,10 @@ type cacheWatcher struct {
filter filterWithAttrsFunc
stopped bool
forget func(bool)
versioner Versioner
versioner storage.Versioner
}
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"fmt"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"sync"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"testing"

View File

@ -0,0 +1,46 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"strings"
)
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"testing"
)
func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string
prefix string
}{
// Exact matches
{"", ""},
{"a", "a"},
{"a/", "a/"},
{"a/../", "a/../"},
// Path prefix matches
{"a/b", "a"},
{"a/b", "a/"},
{"中文/", "中文"},
}
for i, tc := range validTestcases {
if !hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
}
}
invalidTestcases := []struct {
s string
prefix string
}{
// Mismatch
{"a", "b"},
// Dir requirement
{"a", "a/"},
// Prefix mismatch
{"ns2", "ns"},
{"ns2", "ns/"},
{"中文文", "中文"},
// Ensure no normalization is applied
{"a/c/../b/", "a/b/"},
{"a/", "a/b/.."},
}
for i, tc := range invalidTestcases {
if hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
}
}
}

View File

@ -14,22 +14,21 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"fmt"
"sort"
"strconv"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/tools/cache"
)
@ -137,12 +136,16 @@ type watchCache struct {
// for testing timeouts.
clock clock.Clock
// An underlying storage.Versioner.
versioner storage.Versioner
}
func newWatchCache(
capacity int,
keyFunc func(runtime.Object) (string, error),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error)) *watchCache {
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
versioner storage.Versioner) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
@ -153,6 +156,7 @@ func newWatchCache(
store: cache.NewStore(storeElementKey),
resourceVersion: 0,
clock: clock.RealClock{},
versioner: versioner,
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
@ -160,7 +164,7 @@ func newWatchCache(
// Add takes runtime.Object as an argument.
func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
@ -172,7 +176,7 @@ func (w *watchCache) Add(obj interface{}) error {
// Update takes runtime.Object as an argument.
func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
@ -184,7 +188,7 @@ func (w *watchCache) Update(obj interface{}) error {
// Delete takes runtime.Object as an argument.
func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := objectToVersionedRuntimeObject(obj)
object, resourceVersion, err := w.objectToVersionedRuntimeObject(obj)
if err != nil {
return err
}
@ -194,30 +198,18 @@ func (w *watchCache) Delete(obj interface{}) error {
return w.processEvent(event, resourceVersion, f)
}
func objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
func (w *watchCache) objectToVersionedRuntimeObject(obj interface{}) (runtime.Object, uint64, error) {
object, ok := obj.(runtime.Object)
if !ok {
return nil, 0, fmt.Errorf("obj does not implement runtime.Object interface: %v", obj)
}
meta, err := meta.Accessor(object)
if err != nil {
return nil, 0, err
}
resourceVersion, err := parseResourceVersion(meta.GetResourceVersion())
resourceVersion, err := w.versioner.ObjectResourceVersion(object)
if err != nil {
return nil, 0, err
}
return object, resourceVersion, nil
}
func parseResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
// Use bitsize being the size of int on the machine.
return strconv.ParseUint(resourceVersion, 10, 0)
}
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
key, err := w.keyFunc(event.Object)
if err != nil {
@ -362,7 +354,7 @@ func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
// Replace takes slice of runtime.Object as a parameter.
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
version, err := parseResourceVersion(resourceVersion)
version, err := w.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"fmt"
@ -32,6 +32,8 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/client-go/tools/cache"
)
@ -64,16 +66,17 @@ func makeTestStoreElement(pod *v1.Pod) *storeElement {
// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int) *watchCache {
keyFunc := func(obj runtime.Object) (string, error) {
return NamespaceKeyFunc("prefix", obj)
return storage.NamespaceKeyFunc("prefix", obj)
}
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, nil, false, fmt.Errorf("not a pod!")
return nil, nil, false, fmt.Errorf("not a pod")
}
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, false, nil
}
wc := newWatchCache(capacity, keyFunc, getAttrsFunc)
versioner := etcd.APIObjectVersioner{}
wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner)
wc.clock = clock.NewFakeClock(time.Now())
return wc
}

View File

@ -26,6 +26,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/etcdtest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
@ -95,10 +96,10 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServe
return server, storage
}
func newTestCacher(s storage.Interface, cap int) (*storage.Cacher, storage.Versioner) {
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner) {
prefix := "pods"
v := etcdstorage.APIObjectVersioner{}
config := storage.CacherConfig{
config := cacherstorage.Config{
CacheCapacity: cap,
Storage: s,
Versioner: v,
@ -109,7 +110,7 @@ func newTestCacher(s storage.Interface, cap int) (*storage.Cacher, storage.Versi
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
}
return storage.NewCacherFromConfig(config), v
return cacherstorage.NewCacherFromConfig(config), v
}
func makeTestPod(name string) *example.Pod {

View File

@ -18,7 +18,6 @@ package storage
import (
"fmt"
"strings"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/meta"
@ -72,31 +71,6 @@ func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
return prefix + "/" + name, nil
}
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64

View File

@ -22,54 +22,6 @@ import (
"testing"
)
func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string
prefix string
}{
// Exact matches
{"", ""},
{"a", "a"},
{"a/", "a/"},
{"a/../", "a/../"},
// Path prefix matches
{"a/b", "a"},
{"a/b", "a/"},
{"中文/", "中文"},
}
for i, tc := range validTestcases {
if !hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
}
}
invalidTestcases := []struct {
s string
prefix string
}{
// Mismatch
{"a", "b"},
// Dir requirement
{"a", "a/"},
// Prefix mismatch
{"ns2", "ns"},
{"ns2", "ns/"},
{"中文文", "中文"},
// Ensure no normalization is applied
{"a/c/../b/", "a/b/"},
{"a/", "a/b/.."},
}
for i, tc := range invalidTestcases {
if hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
}
}
}
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark

View File

@ -1086,6 +1086,10 @@
"ImportPath": "k8s.io/apiserver/pkg/storage",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/cacher",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/errors",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -1058,6 +1058,10 @@
"ImportPath": "k8s.io/apiserver/pkg/storage",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/cacher",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apiserver/pkg/storage/errors",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"