Merge pull request #35418 from sttts/sttts-discovery-cache

Automatic merge from submit-queue

Add caching for discovery info with invalidation on cache-miss

TODO:
- [x] write tests for `CachedDiscoveryClient`
- [x] write tests for `DeferredDiscoveryRESTMapper` on cache-miss
- [x] find better way/structure to get rid of `invalidateCh` in c06ba3175b
This commit is contained in:
Kubernetes Submit Queue 2016-11-06 20:10:57 -08:00 committed by GitHub
commit f7e0c6c19e
10 changed files with 725 additions and 43 deletions

View File

@ -32,6 +32,7 @@ go_library(
"//pkg/version:go_default_library",
"//plugin/pkg/client/auth:go_default_library",
"//vendor:github.com/emicklei/go-restful/swagger",
"//vendor:github.com/golang/glog",
],
)
@ -44,11 +45,15 @@ go_test(
library = "go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/errors:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apimachinery/registered:go_default_library",
"//pkg/client/restclient:go_default_library",
"//pkg/client/restclient/fake:go_default_library",
"//pkg/version:go_default_library",
"//vendor:github.com/emicklei/go-restful/swagger",
"//vendor:github.com/stretchr/testify/assert",
],
)

View File

@ -45,6 +45,15 @@ type DiscoveryInterface interface {
SwaggerSchemaInterface
}
// CachedDiscoveryInterface is a DiscoveryInterface with cache invalidation and freshness.
type CachedDiscoveryInterface interface {
DiscoveryInterface
// Fresh returns true if no cached data was used that had been retrieved before the instantiation.
Fresh() bool
// Invalidate enforces that no cached data is used in the future that is older than the current time.
Invalidate()
}
// ServerGroupsInterface has methods for obtaining supported groups on the API server
type ServerGroupsInterface interface {
// ServerGroups returns the supported groups, with information like supported versions and the

View File

@ -23,6 +23,8 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
"github.com/golang/glog"
)
// APIGroupResources is an API group with a mapping of versions to
@ -35,7 +37,7 @@ type APIGroupResources struct {
}
// NewRESTMapper returns a PriorityRESTMapper based on the discovered
// groups and resourced passed in.
// groups and resources passed in.
func NewRESTMapper(groupResources []*APIGroupResources, versionInterfaces meta.VersionInterfacesFunc) meta.RESTMapper {
unionMapper := meta.MultiRESTMapper{}
@ -47,8 +49,8 @@ func NewRESTMapper(groupResources []*APIGroupResources, versionInterfaces meta.V
groupPriority = append(groupPriority, group.Group.Name)
if len(group.Group.PreferredVersion.Version) != 0 {
preffered := group.Group.PreferredVersion.Version
if _, ok := group.VersionedResources[preffered]; ok {
preferred := group.Group.PreferredVersion.Version
if _, ok := group.VersionedResources[preferred]; ok {
resourcePriority = append(resourcePriority, unversioned.GroupVersionResource{
Group: group.Group.Name,
Version: group.Group.PreferredVersion.Version,
@ -141,14 +143,14 @@ func GetAPIGroupResources(cl DiscoveryInterface) ([]*APIGroupResources, error) {
type DeferredDiscoveryRESTMapper struct {
initMu sync.Mutex
delegate meta.RESTMapper
cl DiscoveryInterface
cl CachedDiscoveryInterface
versionInterface meta.VersionInterfacesFunc
}
// NewDeferredDiscoveryRESTMapper returns a
// DeferredDiscoveryRESTMapper that will lazily query the provided
// client for discovery information to do REST mappings.
func NewDeferredDiscoveryRESTMapper(cl DiscoveryInterface, versionInterface meta.VersionInterfacesFunc) *DeferredDiscoveryRESTMapper {
func NewDeferredDiscoveryRESTMapper(cl CachedDiscoveryInterface, versionInterface meta.VersionInterfacesFunc) *DeferredDiscoveryRESTMapper {
return &DeferredDiscoveryRESTMapper{
cl: cl,
versionInterface: versionInterface,
@ -175,79 +177,118 @@ func (d *DeferredDiscoveryRESTMapper) getDelegate() (meta.RESTMapper, error) {
// Reset resets the internally cached Discovery information and will
// cause the next mapping request to re-discover.
func (d *DeferredDiscoveryRESTMapper) Reset() {
glog.V(5).Info("Invalidating discovery information")
d.initMu.Lock()
defer d.initMu.Unlock()
d.cl.Invalidate()
d.delegate = nil
d.initMu.Unlock()
}
// KindFor takes a partial resource and returns back the single match.
// It returns an error if there are multiple matches.
func (d *DeferredDiscoveryRESTMapper) KindFor(resource unversioned.GroupVersionResource) (unversioned.GroupVersionKind, error) {
func (d *DeferredDiscoveryRESTMapper) KindFor(resource unversioned.GroupVersionResource) (gvk unversioned.GroupVersionKind, err error) {
del, err := d.getDelegate()
if err != nil {
return unversioned.GroupVersionKind{}, err
}
return del.KindFor(resource)
gvk, err = del.KindFor(resource)
if err != nil && !d.cl.Fresh() {
d.Reset()
gvk, err = d.KindFor(resource)
}
return
}
// KindsFor takes a partial resource and returns back the list of
// potential kinds in priority order.
func (d *DeferredDiscoveryRESTMapper) KindsFor(resource unversioned.GroupVersionResource) ([]unversioned.GroupVersionKind, error) {
func (d *DeferredDiscoveryRESTMapper) KindsFor(resource unversioned.GroupVersionResource) (gvks []unversioned.GroupVersionKind, err error) {
del, err := d.getDelegate()
if err != nil {
return nil, err
}
return del.KindsFor(resource)
gvks, err = del.KindsFor(resource)
if len(gvks) == 0 && !d.cl.Fresh() {
d.Reset()
gvks, err = d.KindsFor(resource)
}
return
}
// ResourceFor takes a partial resource and returns back the single
// match. It returns an error if there are multiple matches.
func (d *DeferredDiscoveryRESTMapper) ResourceFor(input unversioned.GroupVersionResource) (unversioned.GroupVersionResource, error) {
func (d *DeferredDiscoveryRESTMapper) ResourceFor(input unversioned.GroupVersionResource) (gvr unversioned.GroupVersionResource, err error) {
del, err := d.getDelegate()
if err != nil {
return unversioned.GroupVersionResource{}, err
}
return del.ResourceFor(input)
gvr, err = del.ResourceFor(input)
if err != nil && !d.cl.Fresh() {
d.Reset()
gvr, err = d.ResourceFor(input)
}
return
}
// ResourcesFor takes a partial resource and returns back the list of
// potential resource in priority order.
func (d *DeferredDiscoveryRESTMapper) ResourcesFor(input unversioned.GroupVersionResource) ([]unversioned.GroupVersionResource, error) {
func (d *DeferredDiscoveryRESTMapper) ResourcesFor(input unversioned.GroupVersionResource) (gvrs []unversioned.GroupVersionResource, err error) {
del, err := d.getDelegate()
if err != nil {
return nil, err
}
return del.ResourcesFor(input)
gvrs, err = del.ResourcesFor(input)
if len(gvrs) == 0 && !d.cl.Fresh() {
d.Reset()
gvrs, err = d.ResourcesFor(input)
}
return
}
// RESTMapping identifies a preferred resource mapping for the
// provided group kind.
func (d *DeferredDiscoveryRESTMapper) RESTMapping(gk unversioned.GroupKind, versions ...string) (*meta.RESTMapping, error) {
func (d *DeferredDiscoveryRESTMapper) RESTMapping(gk unversioned.GroupKind, versions ...string) (m *meta.RESTMapping, err error) {
del, err := d.getDelegate()
if err != nil {
return nil, err
}
return del.RESTMapping(gk, versions...)
m, err = del.RESTMapping(gk, versions...)
if err != nil && !d.cl.Fresh() {
d.Reset()
m, err = d.RESTMapping(gk, versions...)
}
return
}
// RESTMappings returns the RESTMappings for the provided group kind
// in a rough internal preferred order. If no kind is found, it will
// return a NoResourceMatchError.
func (d *DeferredDiscoveryRESTMapper) RESTMappings(gk unversioned.GroupKind) ([]*meta.RESTMapping, error) {
func (d *DeferredDiscoveryRESTMapper) RESTMappings(gk unversioned.GroupKind) (ms []*meta.RESTMapping, err error) {
del, err := d.getDelegate()
if err != nil {
return nil, err
}
return del.RESTMappings(gk)
ms, err = del.RESTMappings(gk)
if len(ms) == 0 && !d.cl.Fresh() {
d.Reset()
ms, err = d.RESTMappings(gk)
}
return
}
// AliasesForResource returns whether a resource has an alias or not.
func (d *DeferredDiscoveryRESTMapper) AliasesForResource(resource string) ([]string, bool) {
func (d *DeferredDiscoveryRESTMapper) AliasesForResource(resource string) (as []string, ok bool) {
del, err := d.getDelegate()
if err != nil {
return nil, false
}
return del.AliasesForResource(resource)
as, ok = del.AliasesForResource(resource)
if len(as) == 0 && !d.cl.Fresh() {
d.Reset()
as, ok = d.AliasesForResource(resource)
}
return
}
// ResourceSingularizer converts a resource name from plural to
@ -257,7 +298,12 @@ func (d *DeferredDiscoveryRESTMapper) ResourceSingularizer(resource string) (sin
if err != nil {
return resource, err
}
return del.ResourceSingularizer(resource)
singular, err = del.ResourceSingularizer(resource)
if err != nil && !d.cl.Fresh() {
d.Reset()
singular, err = d.ResourceSingularizer(resource)
}
return
}
func (d *DeferredDiscoveryRESTMapper) String() string {

View File

@ -20,7 +20,15 @@ import (
"reflect"
"testing"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/restclient/fake"
"k8s.io/kubernetes/pkg/version"
"github.com/emicklei/go-restful/swagger"
"github.com/stretchr/testify/assert"
)
func TestRESTMapper(t *testing.T) {
@ -174,3 +182,144 @@ func TestRESTMapper(t *testing.T) {
}
}
}
func TestDeferredDiscoveryRESTMapper_CacheMiss(t *testing.T) {
assert := assert.New(t)
cdc := fakeCachedDiscoveryInterface{fresh: false}
m := NewDeferredDiscoveryRESTMapper(&cdc, registered.InterfacesFor)
assert.False(cdc.fresh, "should NOT be fresh after instantiation")
assert.Zero(cdc.invalidateCalls, "should not have called Invalidate()")
gvk, err := m.KindFor(unversioned.GroupVersionResource{
Group: "a",
Version: "v1",
Resource: "foo",
})
assert.NoError(err)
assert.True(cdc.fresh, "should be fresh after a cache-miss")
assert.Equal(cdc.invalidateCalls, 1, "should have called Invalidate() once")
assert.Equal(gvk.Kind, "Foo")
gvk, err = m.KindFor(unversioned.GroupVersionResource{
Group: "a",
Version: "v1",
Resource: "foo",
})
assert.NoError(err)
assert.Equal(cdc.invalidateCalls, 1, "should NOT have called Invalidate() again")
gvk, err = m.KindFor(unversioned.GroupVersionResource{
Group: "a",
Version: "v1",
Resource: "bar",
})
assert.Error(err)
assert.Equal(cdc.invalidateCalls, 1, "should NOT have called Invalidate() again after another cache-miss, but with fresh==true")
cdc.fresh = false
gvk, err = m.KindFor(unversioned.GroupVersionResource{
Group: "a",
Version: "v1",
Resource: "bar",
})
assert.Error(err)
assert.Equal(cdc.invalidateCalls, 2, "should HAVE called Invalidate() again after another cache-miss, but with fresh==false")
}
type fakeCachedDiscoveryInterface struct {
invalidateCalls int
fresh bool
enabledA bool
}
var _ CachedDiscoveryInterface = &fakeCachedDiscoveryInterface{}
func (c *fakeCachedDiscoveryInterface) Fresh() bool {
return c.fresh
}
func (c *fakeCachedDiscoveryInterface) Invalidate() {
c.invalidateCalls = c.invalidateCalls + 1
c.fresh = true
c.enabledA = true
}
func (c *fakeCachedDiscoveryInterface) RESTClient() restclient.Interface {
return &fake.RESTClient{}
}
func (c *fakeCachedDiscoveryInterface) ServerGroups() (*unversioned.APIGroupList, error) {
if c.enabledA {
return &unversioned.APIGroupList{
Groups: []unversioned.APIGroup{
{
Name: "a",
Versions: []unversioned.GroupVersionForDiscovery{
{
GroupVersion: "a/v1",
Version: "v1",
},
},
PreferredVersion: unversioned.GroupVersionForDiscovery{
GroupVersion: "a/v1",
Version: "v1",
},
},
},
}, nil
}
return &unversioned.APIGroupList{}, nil
}
func (c *fakeCachedDiscoveryInterface) ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error) {
if c.enabledA && groupVersion == "a/v1" {
return &unversioned.APIResourceList{
GroupVersion: "a/v1",
APIResources: []unversioned.APIResource{
{
Name: "foo",
Kind: "Foo",
Namespaced: false,
},
},
}, nil
}
return nil, errors.NewNotFound(unversioned.GroupResource{}, "")
}
func (c *fakeCachedDiscoveryInterface) ServerResources() (map[string]*unversioned.APIResourceList, error) {
if c.enabledA {
av1, _ := c.ServerResourcesForGroupVersion("a/v1")
return map[string]*unversioned.APIResourceList{
"a/v1": av1,
}, nil
}
return map[string]*unversioned.APIResourceList{}, nil
}
func (c *fakeCachedDiscoveryInterface) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) {
if c.enabledA {
return []unversioned.GroupVersionResource{
{
Group: "a",
Version: "v1",
Resource: "foo",
},
}, nil
}
return []unversioned.GroupVersionResource{}, nil
}
func (c *fakeCachedDiscoveryInterface) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) {
return []unversioned.GroupVersionResource{}, nil
}
func (c *fakeCachedDiscoveryInterface) ServerVersion() (*version.Info, error) {
return &version.Info{}, nil
}
func (c *fakeCachedDiscoveryInterface) SwaggerSchema(version unversioned.GroupVersion) (*swagger.ApiDeclaration, error) {
return &swagger.ApiDeclaration{}, nil
}

View File

@ -133,6 +133,17 @@ func newExternalScheme() (*runtime.Scheme, meta.RESTMapper, runtime.Codec) {
return scheme, mapper, codec
}
type fakeCachedDiscoveryClient struct {
discovery.DiscoveryInterface
}
func (d *fakeCachedDiscoveryClient) Fresh() bool {
return true
}
func (d *fakeCachedDiscoveryClient) Invalidate() {
}
type TestFactory struct {
Mapper meta.RESTMapper
Typer runtime.ObjectTyper
@ -164,6 +175,14 @@ func NewTestFactory() (cmdutil.Factory, *TestFactory, runtime.Codec, runtime.Neg
}, t, codec, negotiatedSerializer
}
func (f *FakeFactory) DiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(f.tf.ClientConfig)
if err != nil {
return nil, err
}
return &fakeCachedDiscoveryClient{DiscoveryInterface: discoveryClient}, nil
}
func (f *FakeFactory) FlagSet() *pflag.FlagSet {
return nil
}

View File

@ -13,6 +13,7 @@ load(
go_library(
name = "go_default_library",
srcs = [
"cached_discovery.go",
"clientcache.go",
"factory.go",
"helpers.go",
@ -50,8 +51,10 @@ go_library(
"//pkg/util/errors:go_default_library",
"//pkg/util/exec:go_default_library",
"//pkg/util/flag:go_default_library",
"//pkg/util/homedir:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/strategicpatch:go_default_library",
"//pkg/version:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/emicklei/go-restful/swagger",
"//vendor:github.com/evanphx/json-patch",
@ -64,6 +67,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"cached_discovery_test.go",
"factory_test.go",
"helpers_test.go",
"shortcut_restmapper_test.go",
@ -85,8 +89,10 @@ go_test(
"//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/restclient:go_default_library",
"//pkg/client/restclient/fake:go_default_library",
"//pkg/client/testing/core:go_default_library",
"//pkg/client/typed/discovery:go_default_library",
"//pkg/client/unversioned/clientcmd:go_default_library",
"//pkg/client/unversioned/clientcmd/api:go_default_library",
"//pkg/controller:go_default_library",
@ -97,6 +103,9 @@ go_test(
"//pkg/util/exec:go_default_library",
"//pkg/util/flag:go_default_library",
"//pkg/util/validation/field:go_default_library",
"//pkg/version:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/emicklei/go-restful/swagger",
"//vendor:github.com/stretchr/testify/assert",
],
)

View File

@ -0,0 +1,252 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"errors"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
"github.com/emicklei/go-restful/swagger"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/version"
)
// CachedDiscoveryClient implements the functions that discovery server-supported API groups,
// versions and resources.
type CachedDiscoveryClient struct {
delegate discovery.DiscoveryInterface
// cacheDirectory is the directory where discovery docs are held. It must be unique per host:port combination to work well.
cacheDirectory string
// ttl is how long the cache should be considered valid
ttl time.Duration
// mutex protects the variables below
mutex sync.Mutex
// ourFiles are all filenames of cache files created by this process
ourFiles map[string]struct{}
// invalidated is true if all cache files should be ignored that are not ours (e.g. after Invalidate() was called)
invalidated bool
// fresh is true if all used cache files were ours
fresh bool
}
var _ discovery.CachedDiscoveryInterface = &CachedDiscoveryClient{}
// ServerResourcesForGroupVersion returns the supported resources for a group and version.
func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error) {
filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
cachedBytes, err := d.getCachedFile(filename)
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
if err == nil {
cachedResources := &unversioned.APIResourceList{}
if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
glog.V(6).Infof("returning cached discovery info from %v", filename)
return cachedResources, nil
}
}
liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
return liveResources, err
}
if err := d.writeCachedFile(filename, liveResources); err != nil {
glog.V(3).Infof("failed to write cache to %v due to %v", filename, err)
}
return liveResources, nil
}
// ServerResources returns the supported resources for all groups and versions.
func (d *CachedDiscoveryClient) ServerResources() (map[string]*unversioned.APIResourceList, error) {
apiGroups, err := d.ServerGroups()
if err != nil {
return nil, err
}
groupVersions := unversioned.ExtractGroupVersions(apiGroups)
result := map[string]*unversioned.APIResourceList{}
for _, groupVersion := range groupVersions {
resources, err := d.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
return nil, err
}
result[groupVersion] = resources
}
return result, nil
}
func (d *CachedDiscoveryClient) ServerGroups() (*unversioned.APIGroupList, error) {
filename := filepath.Join(d.cacheDirectory, "servergroups.json")
cachedBytes, err := d.getCachedFile(filename)
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
if err == nil {
cachedGroups := &unversioned.APIGroupList{}
if err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
glog.V(6).Infof("returning cached discovery info from %v", filename)
return cachedGroups, nil
}
}
liveGroups, err := d.delegate.ServerGroups()
if err != nil {
return liveGroups, err
}
if err := d.writeCachedFile(filename, liveGroups); err != nil {
glog.V(3).Infof("failed to write cache to %v due to %v", filename, err)
}
return liveGroups, nil
}
func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
// after invalidation ignore cache files not created by this process
d.mutex.Lock()
_, ourFile := d.ourFiles[filename]
if d.invalidated && !ourFile {
d.mutex.Unlock()
return nil, errors.New("cache invalidated")
}
d.mutex.Unlock()
file, err := os.Open(filename)
if err != nil {
return nil, err
}
fileInfo, err := file.Stat()
if err != nil {
return nil, err
}
if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
return nil, errors.New("cache expired")
}
// the cache is present and its valid. Try to read and use it.
cachedBytes, err := ioutil.ReadAll(file)
if err != nil {
return nil, err
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.fresh = d.fresh && ourFile
return cachedBytes, nil
}
func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
if err := os.MkdirAll(filepath.Dir(filename), 0755); err != nil {
return err
}
bytes, err := runtime.Encode(api.Codecs.LegacyCodec(), obj)
if err != nil {
return err
}
f, err := ioutil.TempFile(filepath.Dir(filename), filepath.Base(filename)+".")
if err != nil {
return err
}
defer os.Remove(f.Name())
_, err = f.Write(bytes)
if err != nil {
return err
}
err = f.Chmod(0755)
if err != nil {
return err
}
name := f.Name()
err = f.Close()
if err != nil {
return err
}
// atomic rename
d.mutex.Lock()
defer d.mutex.Unlock()
err = os.Rename(name, filename)
if err == nil {
d.ourFiles[filename] = struct{}{}
}
return err
}
func (d *CachedDiscoveryClient) RESTClient() restclient.Interface {
return d.delegate.RESTClient()
}
func (d *CachedDiscoveryClient) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) {
return d.delegate.ServerPreferredResources()
}
func (d *CachedDiscoveryClient) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) {
return d.delegate.ServerPreferredNamespacedResources()
}
func (d *CachedDiscoveryClient) ServerVersion() (*version.Info, error) {
return d.delegate.ServerVersion()
}
func (d *CachedDiscoveryClient) SwaggerSchema(version unversioned.GroupVersion) (*swagger.ApiDeclaration, error) {
return d.delegate.SwaggerSchema(version)
}
func (d *CachedDiscoveryClient) Fresh() bool {
d.mutex.Lock()
defer d.mutex.Unlock()
return d.fresh
}
func (d *CachedDiscoveryClient) Invalidate() {
d.mutex.Lock()
defer d.mutex.Unlock()
d.ourFiles = map[string]struct{}{}
d.fresh = true
d.invalidated = true
}
// NewCachedDiscoveryClient creates a new DiscoveryClient. cacheDirectory is the directory where discovery docs are held. It must be unique per host:port combination to work well.
func NewCachedDiscoveryClient(delegate discovery.DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
return &CachedDiscoveryClient{
delegate: delegate,
cacheDirectory: cacheDirectory,
ttl: ttl,
ourFiles: map[string]struct{}{},
fresh: true,
}
}

View File

@ -0,0 +1,164 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/emicklei/go-restful/swagger"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/restclient/fake"
"k8s.io/kubernetes/pkg/client/typed/discovery"
"k8s.io/kubernetes/pkg/version"
)
func TestCachedDiscoveryClient_Fresh(t *testing.T) {
assert := assert.New(t)
d, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(d)
c := fakeDiscoveryClient{}
cdc := NewCachedDiscoveryClient(&c, d, 60*time.Second)
assert.True(cdc.Fresh(), "should be fresh after creation")
cdc.ServerGroups()
assert.True(cdc.Fresh(), "should be fresh after groups call without cache")
assert.Equal(c.groupCalls, 1)
cdc.ServerGroups()
assert.True(cdc.Fresh(), "should be fresh after another groups call")
assert.Equal(c.groupCalls, 1)
cdc.ServerResources()
assert.True(cdc.Fresh(), "should be fresh after resources call")
assert.Equal(c.resourceCalls, 1)
cdc.ServerResources()
assert.True(cdc.Fresh(), "should be fresh after another resources call")
assert.Equal(c.resourceCalls, 1)
cdc = NewCachedDiscoveryClient(&c, d, 60*time.Second)
cdc.ServerGroups()
assert.False(cdc.Fresh(), "should NOT be fresh after recreation with existing groups cache")
assert.Equal(c.groupCalls, 1)
cdc.ServerResources()
assert.False(cdc.Fresh(), "should NOT be fresh after recreation with existing resources cache")
assert.Equal(c.resourceCalls, 1)
cdc.Invalidate()
assert.True(cdc.Fresh(), "should be fresh after cache invalidation")
cdc.ServerResources()
assert.True(cdc.Fresh(), "should ignore existing resources cache after invalidation")
assert.Equal(c.resourceCalls, 2)
}
func TestNewCachedDiscoveryClient_TTL(t *testing.T) {
assert := assert.New(t)
d, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(d)
c := fakeDiscoveryClient{}
cdc := NewCachedDiscoveryClient(&c, d, 1*time.Nanosecond)
cdc.ServerGroups()
assert.Equal(c.groupCalls, 1)
time.Sleep(1 * time.Second)
cdc.ServerGroups()
assert.Equal(c.groupCalls, 2)
}
type fakeDiscoveryClient struct {
groupCalls int
resourceCalls int
versionCalls int
swaggerCalls int
}
var _ discovery.DiscoveryInterface = &fakeDiscoveryClient{}
func (c *fakeDiscoveryClient) RESTClient() restclient.Interface {
return &fake.RESTClient{}
}
func (c *fakeDiscoveryClient) ServerGroups() (*unversioned.APIGroupList, error) {
c.groupCalls = c.groupCalls + 1
return &unversioned.APIGroupList{
Groups: []unversioned.APIGroup{
{
Name: "a",
Versions: []unversioned.GroupVersionForDiscovery{
{
GroupVersion: "a/v1",
Version: "v1",
},
},
PreferredVersion: unversioned.GroupVersionForDiscovery{
GroupVersion: "a/v1",
Version: "v1",
},
},
},
}, nil
}
func (c *fakeDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error) {
c.resourceCalls = c.resourceCalls + 1
if groupVersion == "a/v1" {
return &unversioned.APIResourceList{}, nil
}
return nil, errors.NewNotFound(unversioned.GroupResource{}, "")
}
func (c *fakeDiscoveryClient) ServerResources() (map[string]*unversioned.APIResourceList, error) {
c.resourceCalls = c.resourceCalls + 1
return map[string]*unversioned.APIResourceList{}, nil
}
func (c *fakeDiscoveryClient) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) {
c.resourceCalls = c.resourceCalls + 1
return []unversioned.GroupVersionResource{}, nil
}
func (c *fakeDiscoveryClient) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) {
c.resourceCalls = c.resourceCalls + 1
return []unversioned.GroupVersionResource{}, nil
}
func (c *fakeDiscoveryClient) ServerVersion() (*version.Info, error) {
c.versionCalls = c.versionCalls + 1
return &version.Info{}, nil
}
func (c *fakeDiscoveryClient) SwaggerSchema(version unversioned.GroupVersion) (*swagger.ApiDeclaration, error) {
c.swaggerCalls = c.swaggerCalls + 1
return &swagger.ApiDeclaration{}, nil
}

View File

@ -27,6 +27,7 @@ import (
"os/user"
"path"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
@ -61,6 +62,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/json"
utilflag "k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/homedir"
"k8s.io/kubernetes/pkg/watch"
)
@ -77,6 +79,8 @@ type Factory interface {
// Returns internal flagset
FlagSet() *pflag.FlagSet
// Returns a discovery client
DiscoveryClient() (discovery.CachedDiscoveryInterface, error)
// Returns interfaces for dealing with arbitrary runtime.Objects.
Object() (meta.RESTMapper, runtime.ObjectTyper)
// Returns interfaces for dealing with arbitrary
@ -313,27 +317,36 @@ func NewFactory(optionalClientConfig clientcmd.ClientConfig) Factory {
}
clients := NewClientCache(clientConfig)
return &factory{
f := &factory{
flags: flags,
clientConfig: clientConfig,
clients: clients,
}
return f
}
func (f *factory) FlagSet() *pflag.FlagSet {
return f.flags
}
func (f *factory) Object() (meta.RESTMapper, runtime.ObjectTyper) {
func (f *factory) DiscoveryClient() (discovery.CachedDiscoveryInterface, error) {
cfg, err := f.clientConfig.ClientConfig()
checkErrWithPrefix("failed to get client config: ", err)
cmdApiVersion := unversioned.GroupVersion{}
if cfg.GroupVersion != nil {
cmdApiVersion = *cfg.GroupVersion
if err != nil {
return nil, err
}
mapper := registered.RESTMapper()
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, err
}
cacheDir := computeDiscoverCacheDir(filepath.Join(homedir.HomeDir(), ".kube", "cache", "discovery"), cfg.Host)
return NewCachedDiscoveryClient(discoveryClient, cacheDir, time.Duration(10*time.Minute)), nil
}
func (f *factory) Object() (meta.RESTMapper, runtime.ObjectTyper) {
mapper := registered.RESTMapper()
discoveryClient, err := f.DiscoveryClient()
if err == nil {
mapper = meta.FirstHitRESTMapper{
MultiRESTMapper: meta.MultiRESTMapper{
@ -345,23 +358,28 @@ func (f *factory) Object() (meta.RESTMapper, runtime.ObjectTyper) {
// wrap with shortcuts
mapper = NewShortcutExpander(mapper, discoveryClient)
// wrap with output preferences
cfg, err := f.clients.ClientConfigForVersion(nil)
checkErrWithPrefix("failed to get client config: ", err)
cmdApiVersion := unversioned.GroupVersion{}
if cfg.GroupVersion != nil {
cmdApiVersion = *cfg.GroupVersion
}
mapper = kubectl.OutputVersionMapper{RESTMapper: mapper, OutputVersions: []unversioned.GroupVersion{cmdApiVersion}}
return mapper, api.Scheme
}
func (f *factory) UnstructuredObject() (meta.RESTMapper, runtime.ObjectTyper, error) {
cfg, err := f.clients.ClientConfigForVersion(nil)
discoveryClient, err := f.DiscoveryClient()
if err != nil {
return nil, nil, err
}
dc, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, nil, err
groupResources, err := discovery.GetAPIGroupResources(discoveryClient)
if err != nil && !discoveryClient.Fresh() {
discoveryClient.Invalidate()
groupResources, err = discovery.GetAPIGroupResources(discoveryClient)
}
groupResources, err := discovery.GetAPIGroupResources(dc)
if err != nil {
return nil, nil, err
}
@ -378,11 +396,9 @@ func (f *factory) UnstructuredObject() (meta.RESTMapper, runtime.ObjectTyper, er
}
}
mapper := discovery.NewRESTMapper(groupResources, meta.InterfacesForUnstructured)
mapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient, meta.InterfacesForUnstructured)
typer := discovery.NewUnstructuredObjectTyper(groupResources)
return NewShortcutExpander(mapper, dc), typer, nil
return NewShortcutExpander(mapper, discoveryClient), typer, nil
}
func (f *factory) RESTClient() (*restclient.RESTClient, error) {
@ -1316,3 +1332,16 @@ func (f *factory) SuggestedPodTemplateResources() []unversioned.GroupResource {
{Resource: "replicaset"},
}
}
// overlyCautiousIllegalFileCharacters matches characters that *might* not be supported. Windows is really restrictive, so this is really restrictive
var overlyCautiousIllegalFileCharacters = regexp.MustCompile(`[^(\w/\.)]`)
// computeDiscoverCacheDir takes the parentDir and the host and comes up with a "usually non-colliding" name.
func computeDiscoverCacheDir(parentDir, host string) string {
// strip the optional scheme from host if its there:
schemelessHost := strings.Replace(strings.Replace(host, "https://", "", 1), "http://", "", 1)
// now do a simple collapse of non-AZ09 characters. Collisions are possible but unlikely. Even if we do collide the problem is short lived
safeHost := overlyCautiousIllegalFileCharacters.ReplaceAllString(schemelessHost, "_")
return filepath.Join(parentDir, safeHost)
}

View File

@ -25,18 +25,18 @@ import (
"k8s.io/kubernetes/pkg/kubectl"
)
// ShortcutExpander is a RESTMapper that can be used for OpenShift resources. It expands the resource first, then invokes the wrapped
// ShortcutExpander is a RESTMapper that can be used for Kubernetes resources. It expands the resource first, then invokes the wrapped
type ShortcutExpander struct {
RESTMapper meta.RESTMapper
All []unversioned.GroupResource
discoveryClient *discovery.DiscoveryClient
discoveryClient discovery.DiscoveryInterface
}
var _ meta.RESTMapper = &ShortcutExpander{}
func NewShortcutExpander(delegate meta.RESTMapper, client *discovery.DiscoveryClient) ShortcutExpander {
func NewShortcutExpander(delegate meta.RESTMapper, client discovery.DiscoveryInterface) ShortcutExpander {
return ShortcutExpander{All: userResources, RESTMapper: delegate, discoveryClient: client}
}