move cacher in separate dir

This commit is contained in:
fisherxu 2018-05-30 20:44:31 +08:00
parent 6cb344e78e
commit dbb448bbdc
13 changed files with 178 additions and 130 deletions

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
@ -49,7 +50,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside.
// Currently it has two layers of same storage interface -- cacher and low level kv.
cacherConfig := storage.CacherConfig{
cacherConfig := cacherstorage.Config{
CacheCapacity: capacity,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
@ -61,7 +62,7 @@ func StorageWithCacher(capacity int) generic.StorageDecorator {
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
}
cacher := storage.NewCacherFromConfig(cacherConfig)
cacher := cacherstorage.NewCacherFromConfig(cacherConfig)
destroyFunc := func() {
cacher.Stop()
d()

View File

@ -49,6 +49,7 @@ import (
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/apiserver/pkg/storage/names"
@ -1849,7 +1850,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
server.Terminate(t)
}
if hasCacheEnabled {
config := storage.CacherConfig{
config := cacherstorage.Config{
CacheCapacity: 10,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
@ -1860,7 +1861,7 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: sc.Codec,
}
cacher := storage.NewCacherFromConfig(config)
cacher := cacherstorage.NewCacherFromConfig(config)
d := destroyFunc
s = cacher
destroyFunc = func() {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"context"
@ -37,21 +37,22 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/tools/cache"
)
// CacherConfig contains the configuration for a given Cache.
type CacherConfig struct {
// Config contains the configuration for a given Cache.
type Config struct {
// Maximum size of the history cached in memory.
CacheCapacity int
// An underlying storage.Interface.
Storage Interface
Storage storage.Interface
// An underlying storage.Versioner.
Versioner Versioner
Versioner storage.Versioner
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
@ -66,7 +67,7 @@ type CacherConfig struct {
// TriggerPublisherFunc is used for optimizing amount of watchers that
// needs to process an incoming event.
TriggerPublisherFunc TriggerPublisherFunc
TriggerPublisherFunc storage.TriggerPublisherFunc
// NewList is a function that creates new empty object storing a list of
// objects of type Type.
@ -141,7 +142,7 @@ type Cacher struct {
// HighWaterMarks for performance debugging.
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
incomingHWM HighWaterMark
incomingHWM storage.HighWaterMark
// Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent
@ -156,7 +157,7 @@ type Cacher struct {
ready *ready
// Underlying storage.Interface.
storage Interface
storage storage.Interface
// Expected type of objects in the underlying cache.
objectType reflect.Type
@ -166,11 +167,11 @@ type Cacher struct {
reflector *cache.Reflector
// Versioner is used to handle resource versions.
versioner Versioner
versioner storage.Versioner
// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc TriggerPublisherFunc
triggerFunc storage.TriggerPublisherFunc
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
@ -187,10 +188,10 @@ type Cacher struct {
stopWg sync.WaitGroup
}
// Create a new Cacher responsible for servicing WATCH and LIST requests from
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
func NewCacherFromConfig(config CacherConfig) *Cacher {
func NewCacherFromConfig(config Config) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
@ -272,23 +273,23 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
}
}
// Implements storage.Interface.
func (c *Cacher) Versioner() Versioner {
// Versioner implements storage.Interface.
func (c *Cacher) Versioner() storage.Versioner {
return c.storage.Versioner()
}
// Implements storage.Interface.
// Create implements storage.Interface.
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}
// Implements storage.Interface.
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
// Delete implements storage.Interface.
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
return c.storage.Delete(ctx, key, out, preconditions)
}
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return nil, err
@ -344,12 +345,12 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
return watcher, nil
}
// Implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
// WatchList implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, pred)
}
// Implements storage.Interface.
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
@ -394,14 +395,14 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
} else {
objVal.Set(reflect.Zero(objVal.Type()))
if !ignoreNotFound {
return NewKeyNotFoundError(key, int64(readResourceVersion))
return storage.NewKeyNotFoundError(key, int64(readResourceVersion))
}
}
return nil
}
// Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
// GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
// If resourceVersion is not specified, serve it from underlying
@ -464,8 +465,8 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
return nil
}
// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
@ -539,10 +540,10 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
return nil
}
// Implements storage.Interface.
// GuaranteedUpdate implements storage.Interface.
func (c *Cacher) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
preconditions *Preconditions, tryUpdate UpdateFunc, _ ...runtime.Object) error {
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
@ -555,6 +556,7 @@ func (c *Cacher) GuaranteedUpdate(
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
}
// Count implements storage.Interface.
func (c *Cacher) Count(pathPrefix string) (int64, error) {
return c.storage.Count(pathPrefix)
}
@ -651,6 +653,7 @@ func (c *Cacher) isStopped() bool {
return c.stopped
}
// Stop implements the graceful termination.
func (c *Cacher) Stop() {
// avoid stopping twice (note: cachers are shared with subresources)
if c.isStopped() {
@ -684,7 +687,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
}
}
func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFunc {
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
filterFunc := func(objKey string, label labels.Set, field fields.Set, uninitialized bool) bool {
if !hasPathPrefix(objKey, key) {
return false
@ -694,7 +697,7 @@ func filterWithAttrsFunction(key string, p SelectionPredicate) filterWithAttrsFu
return filterFunc
}
// Returns resource version to which the underlying cache is synced.
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
c.ready.wait()
@ -704,12 +707,12 @@ func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage Interface
storage storage.Interface
resourcePrefix string
newListFunc func() runtime.Object
}
func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
func newCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
@ -720,7 +723,7 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", storage.Everything, list); err != nil {
return nil, err
}
return list, nil
@ -728,7 +731,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, storage.Everything)
}
// errWatcher implements watch.Interface to return a single error
@ -780,10 +783,10 @@ type cacheWatcher struct {
filter filterWithAttrsFunc
stopped bool
forget func(bool)
versioner Versioner
versioner storage.Versioner
}
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner Versioner) *cacheWatcher {
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool), versioner storage.Versioner) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"fmt"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"sync"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"testing"

View File

@ -0,0 +1,46 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"strings"
)
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cacher
import (
"testing"
)
func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string
prefix string
}{
// Exact matches
{"", ""},
{"a", "a"},
{"a/", "a/"},
{"a/../", "a/../"},
// Path prefix matches
{"a/b", "a"},
{"a/b", "a/"},
{"中文/", "中文"},
}
for i, tc := range validTestcases {
if !hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
}
}
invalidTestcases := []struct {
s string
prefix string
}{
// Mismatch
{"a", "b"},
// Dir requirement
{"a", "a/"},
// Prefix mismatch
{"ns2", "ns"},
{"ns2", "ns/"},
{"中文文", "中文"},
// Ensure no normalization is applied
{"a/c/../b/", "a/b/"},
{"a/", "a/b/.."},
}
for i, tc := range invalidTestcases {
if hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
}
}
}

View File

@ -14,12 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"fmt"
"sort"
"strconv"
"sync"
"time"
@ -29,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/tools/cache"
)
@ -138,14 +138,14 @@ type watchCache struct {
clock clock.Clock
// An underlying storage.Versioner.
versioner Versioner
versioner storage.Versioner
}
func newWatchCache(
capacity int,
keyFunc func(runtime.Object) (string, error),
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
versioner Versioner) *watchCache {
versioner storage.Versioner) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
package cacher
import (
"fmt"
@ -32,7 +32,8 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/client-go/tools/cache"
)
@ -65,16 +66,16 @@ func makeTestStoreElement(pod *v1.Pod) *storeElement {
// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int) *watchCache {
keyFunc := func(obj runtime.Object) (string, error) {
return NamespaceKeyFunc("prefix", obj)
return storage.NamespaceKeyFunc("prefix", obj)
}
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, nil, false, fmt.Errorf("not a pod!")
return nil, nil, false, fmt.Errorf("not a pod")
}
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, false, nil
}
versioner := etcdstorage.APIObjectVersioner{}
versioner := etcd.APIObjectVersioner{}
wc := newWatchCache(capacity, keyFunc, getAttrsFunc, versioner)
wc.clock = clock.NewFakeClock(time.Now())
return wc

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
@ -95,10 +96,10 @@ func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServe
return server, storage
}
func newTestCacher(s storage.Interface, cap int) (*storage.Cacher, storage.Versioner) {
func newTestCacher(s storage.Interface, cap int) (*cacherstorage.Cacher, storage.Versioner) {
prefix := "pods"
v := etcdstorage.APIObjectVersioner{}
config := storage.CacherConfig{
config := cacherstorage.Config{
CacheCapacity: cap,
Storage: s,
Versioner: v,
@ -109,7 +110,7 @@ func newTestCacher(s storage.Interface, cap int) (*storage.Cacher, storage.Versi
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
}
return storage.NewCacherFromConfig(config), v
return cacherstorage.NewCacherFromConfig(config), v
}
func makeTestPod(name string) *example.Pod {

View File

@ -18,7 +18,6 @@ package storage
import (
"fmt"
"strings"
"sync/atomic"
"k8s.io/apimachinery/pkg/api/meta"
@ -72,31 +71,6 @@ func NoNamespaceKeyFunc(prefix string, obj runtime.Object) (string, error) {
return prefix + "/" + name, nil
}
// hasPathPrefix returns true if the string matches pathPrefix exactly, or if is prefixed with pathPrefix at a path segment boundary
func hasPathPrefix(s, pathPrefix string) bool {
// Short circuit if s doesn't contain the prefix at all
if !strings.HasPrefix(s, pathPrefix) {
return false
}
pathPrefixLength := len(pathPrefix)
if len(s) == pathPrefixLength {
// Exact match
return true
}
if strings.HasSuffix(pathPrefix, "/") {
// pathPrefix already ensured a path segment boundary
return true
}
if s[pathPrefixLength:pathPrefixLength+1] == "/" {
// The next character in s is a path segment boundary
// Check this instead of normalizing pathPrefix to avoid allocating on every call
return true
}
return false
}
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64

View File

@ -22,54 +22,6 @@ import (
"testing"
)
func TestHasPathPrefix(t *testing.T) {
validTestcases := []struct {
s string
prefix string
}{
// Exact matches
{"", ""},
{"a", "a"},
{"a/", "a/"},
{"a/../", "a/../"},
// Path prefix matches
{"a/b", "a"},
{"a/b", "a/"},
{"中文/", "中文"},
}
for i, tc := range validTestcases {
if !hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be true`, i, tc.s, tc.prefix)
}
}
invalidTestcases := []struct {
s string
prefix string
}{
// Mismatch
{"a", "b"},
// Dir requirement
{"a", "a/"},
// Prefix mismatch
{"ns2", "ns"},
{"ns2", "ns/"},
{"中文文", "中文"},
// Ensure no normalization is applied
{"a/c/../b/", "a/b/"},
{"a/", "a/b/.."},
}
for i, tc := range invalidTestcases {
if hasPathPrefix(tc.s, tc.prefix) {
t.Errorf(`%d: Expected hasPathPrefix("%s","%s") to be false`, i, tc.s, tc.prefix)
}
}
}
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark