mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 01:40:07 +00:00
simplify the client cache
This commit is contained in:
parent
b1357da473
commit
df3439c2d9
@ -4,7 +4,6 @@ go_library(
|
|||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"cached_discovery.go",
|
"cached_discovery.go",
|
||||||
"clientcache.go",
|
|
||||||
"factory.go",
|
"factory.go",
|
||||||
"factory_builder.go",
|
"factory_builder.go",
|
||||||
"factory_client_access.go",
|
"factory_client_access.go",
|
||||||
|
@ -1,226 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2014 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package util
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
||||||
"k8s.io/client-go/discovery"
|
|
||||||
"k8s.io/client-go/kubernetes"
|
|
||||||
restclient "k8s.io/client-go/rest"
|
|
||||||
"k8s.io/client-go/tools/clientcmd"
|
|
||||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
||||||
"k8s.io/kubernetes/pkg/version"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewClientCache(loader clientcmd.ClientConfig, discoveryClientFactory DiscoveryClientFactory) *ClientCache {
|
|
||||||
return &ClientCache{
|
|
||||||
clientsets: make(map[schema.GroupVersion]internalclientset.Interface),
|
|
||||||
configs: make(map[schema.GroupVersion]*restclient.Config),
|
|
||||||
loader: loader,
|
|
||||||
discoveryClientFactory: discoveryClientFactory,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientCache caches previously loaded clients for reuse, and ensures MatchServerVersion
|
|
||||||
// is invoked only once
|
|
||||||
type ClientCache struct {
|
|
||||||
loader clientcmd.ClientConfig
|
|
||||||
clientsets map[schema.GroupVersion]internalclientset.Interface
|
|
||||||
configs map[schema.GroupVersion]*restclient.Config
|
|
||||||
|
|
||||||
// noVersionConfig provides a cached config for the case of no required version specified
|
|
||||||
noVersionConfig *restclient.Config
|
|
||||||
|
|
||||||
matchVersion bool
|
|
||||||
|
|
||||||
lock sync.Mutex
|
|
||||||
defaultConfig *restclient.Config
|
|
||||||
// discoveryClientFactory comes as a factory method so that we can defer resolution until after
|
|
||||||
// argument evaluation
|
|
||||||
discoveryClientFactory DiscoveryClientFactory
|
|
||||||
discoveryClient discovery.DiscoveryInterface
|
|
||||||
|
|
||||||
kubernetesClientCache kubernetesClientCache
|
|
||||||
}
|
|
||||||
|
|
||||||
// kubernetesClientCache creates a new kubernetes.Clientset one time
|
|
||||||
// and then returns the result for all future requests
|
|
||||||
type kubernetesClientCache struct {
|
|
||||||
// once makes sure the client is only initialized once
|
|
||||||
once sync.Once
|
|
||||||
// client is the cached client value
|
|
||||||
client *kubernetes.Clientset
|
|
||||||
// err is the cached error value
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// KubernetesClientSetForVersion returns a new kubernetes.Clientset. It will cache the value
|
|
||||||
// the first time it is called and return the cached value on subsequent calls.
|
|
||||||
// If an error is encountered the first time KubernetesClientSetForVersion is called,
|
|
||||||
// the error will be cached.
|
|
||||||
func (c *ClientCache) KubernetesClientSetForVersion(requiredVersion *schema.GroupVersion) (*kubernetes.Clientset, error) {
|
|
||||||
c.kubernetesClientCache.once.Do(func() {
|
|
||||||
config, err := c.ClientConfigForVersion(requiredVersion)
|
|
||||||
if err != nil {
|
|
||||||
c.kubernetesClientCache.err = err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.kubernetesClientCache.client, c.kubernetesClientCache.err = kubernetes.NewForConfig(config)
|
|
||||||
})
|
|
||||||
return c.kubernetesClientCache.client, c.kubernetesClientCache.err
|
|
||||||
}
|
|
||||||
|
|
||||||
// also looks up the discovery client. We can't do this during init because the flags won't have been set
|
|
||||||
// because this is constructed pre-command execution before the command tree is
|
|
||||||
// even set up. Requires the lock to already be acquired
|
|
||||||
func (c *ClientCache) getDefaultConfig() (restclient.Config, discovery.DiscoveryInterface, error) {
|
|
||||||
if c.defaultConfig != nil && c.discoveryClient != nil {
|
|
||||||
return *c.defaultConfig, c.discoveryClient, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
config, err := c.loader.ClientConfig()
|
|
||||||
if err != nil {
|
|
||||||
return restclient.Config{}, nil, err
|
|
||||||
}
|
|
||||||
discoveryClient, err := c.discoveryClientFactory.DiscoveryClient()
|
|
||||||
if err != nil {
|
|
||||||
return restclient.Config{}, nil, err
|
|
||||||
}
|
|
||||||
if c.matchVersion {
|
|
||||||
if err := discovery.MatchesServerVersion(version.Get(), discoveryClient); err != nil {
|
|
||||||
return restclient.Config{}, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
c.defaultConfig = config
|
|
||||||
c.discoveryClient = discoveryClient
|
|
||||||
return *c.defaultConfig, c.discoveryClient, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientConfigForVersion returns the correct config for a server
|
|
||||||
func (c *ClientCache) ClientConfigForVersion(requiredVersion *schema.GroupVersion) (*restclient.Config, error) {
|
|
||||||
c.lock.Lock()
|
|
||||||
defer c.lock.Unlock()
|
|
||||||
|
|
||||||
return c.clientConfigForVersion(requiredVersion)
|
|
||||||
}
|
|
||||||
|
|
||||||
// clientConfigForVersion returns the correct config for a server
|
|
||||||
func (c *ClientCache) clientConfigForVersion(requiredVersion *schema.GroupVersion) (*restclient.Config, error) {
|
|
||||||
// only lookup in the cache if the requiredVersion is set
|
|
||||||
if requiredVersion != nil {
|
|
||||||
if config, ok := c.configs[*requiredVersion]; ok {
|
|
||||||
return copyConfig(config), nil
|
|
||||||
}
|
|
||||||
} else if c.noVersionConfig != nil {
|
|
||||||
return copyConfig(c.noVersionConfig), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// this returns a shallow copy to work with
|
|
||||||
config, discoveryClient, err := c.getDefaultConfig()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if requiredVersion != nil {
|
|
||||||
if err := discovery.ServerSupportsVersion(discoveryClient, *requiredVersion); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
config.GroupVersion = requiredVersion
|
|
||||||
} else {
|
|
||||||
// TODO remove this hack. This is allowing the GetOptions to be serialized.
|
|
||||||
config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO this isn't what we want. Each clientset should be setting defaults as it sees fit.
|
|
||||||
setKubernetesDefaults(&config)
|
|
||||||
|
|
||||||
if requiredVersion != nil {
|
|
||||||
c.configs[*requiredVersion] = copyConfig(&config)
|
|
||||||
} else {
|
|
||||||
c.noVersionConfig = copyConfig(&config)
|
|
||||||
}
|
|
||||||
|
|
||||||
// `version` does not necessarily equal `config.Version`. However, we know that we call this method again with
|
|
||||||
// `config.Version`, we should get the config we've just built.
|
|
||||||
c.configs[*config.GroupVersion] = copyConfig(&config)
|
|
||||||
|
|
||||||
return copyConfig(&config), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// setKubernetesDefaults sets default values on the provided client config for accessing the
|
|
||||||
// Kubernetes API or returns an error if any of the defaults are impossible or invalid.
|
|
||||||
func setKubernetesDefaults(config *restclient.Config) error {
|
|
||||||
if config.APIPath == "" {
|
|
||||||
config.APIPath = "/api"
|
|
||||||
}
|
|
||||||
// TODO chase down uses and tolerate nil
|
|
||||||
if config.GroupVersion == nil {
|
|
||||||
config.GroupVersion = &schema.GroupVersion{}
|
|
||||||
}
|
|
||||||
if config.NegotiatedSerializer == nil {
|
|
||||||
config.NegotiatedSerializer = legacyscheme.Codecs
|
|
||||||
}
|
|
||||||
return restclient.SetKubernetesDefaults(config)
|
|
||||||
}
|
|
||||||
|
|
||||||
func copyConfig(in *restclient.Config) *restclient.Config {
|
|
||||||
configCopy := *in
|
|
||||||
copyGroupVersion := *configCopy.GroupVersion
|
|
||||||
configCopy.GroupVersion = ©GroupVersion
|
|
||||||
return &configCopy
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClientSetForVersion initializes or reuses a clientset for the specified version, or returns an
|
|
||||||
// error if that is not possible
|
|
||||||
func (c *ClientCache) ClientSetForVersion(requiredVersion *schema.GroupVersion) (internalclientset.Interface, error) {
|
|
||||||
c.lock.Lock()
|
|
||||||
defer c.lock.Unlock()
|
|
||||||
|
|
||||||
if requiredVersion != nil {
|
|
||||||
if clientset, ok := c.clientsets[*requiredVersion]; ok {
|
|
||||||
return clientset, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
config, err := c.clientConfigForVersion(requiredVersion)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
clientset, err := internalclientset.NewForConfig(config)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.clientsets[*config.GroupVersion] = clientset
|
|
||||||
|
|
||||||
// `version` does not necessarily equal `config.Version`. However, we know that if we call this method again with
|
|
||||||
// `version`, we should get a client based on the same config we just found. There's no guarantee that a client
|
|
||||||
// is copiable, so create a new client and save it in the cache.
|
|
||||||
if requiredVersion != nil {
|
|
||||||
configCopy := *config
|
|
||||||
clientset, err := internalclientset.NewForConfig(&configCopy)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
c.clientsets[*requiredVersion] = clientset
|
|
||||||
}
|
|
||||||
|
|
||||||
return clientset, nil
|
|
||||||
}
|
|
@ -35,6 +35,8 @@ import (
|
|||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
|
|
||||||
|
"sync"
|
||||||
|
|
||||||
appsv1 "k8s.io/api/apps/v1"
|
appsv1 "k8s.io/api/apps/v1"
|
||||||
appsv1beta1 "k8s.io/api/apps/v1beta1"
|
appsv1beta1 "k8s.io/api/apps/v1beta1"
|
||||||
appsv1beta2 "k8s.io/api/apps/v1beta2"
|
appsv1beta2 "k8s.io/api/apps/v1beta2"
|
||||||
@ -62,13 +64,17 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/kubectl"
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/resource"
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/util/transport"
|
"k8s.io/kubernetes/pkg/kubectl/util/transport"
|
||||||
|
"k8s.io/kubernetes/pkg/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ring0Factory struct {
|
type ring0Factory struct {
|
||||||
flags *pflag.FlagSet
|
flags *pflag.FlagSet
|
||||||
clientConfig clientcmd.ClientConfig
|
clientConfig clientcmd.ClientConfig
|
||||||
discoveryFactory DiscoveryClientFactory
|
discoveryFactory DiscoveryClientFactory
|
||||||
clientCache *ClientCache
|
|
||||||
|
requireMatchedServerVersion bool
|
||||||
|
checkServerVersion sync.Once
|
||||||
|
matchesServerVersionErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClientAccessFactory(optionalClientConfig clientcmd.ClientConfig) ClientAccessFactory {
|
func NewClientAccessFactory(optionalClientConfig clientcmd.ClientConfig) ClientAccessFactory {
|
||||||
@ -87,13 +93,10 @@ func NewClientAccessFactory(optionalClientConfig clientcmd.ClientConfig) ClientA
|
|||||||
func NewClientAccessFactoryFromDiscovery(flags *pflag.FlagSet, clientConfig clientcmd.ClientConfig, discoveryFactory DiscoveryClientFactory) ClientAccessFactory {
|
func NewClientAccessFactoryFromDiscovery(flags *pflag.FlagSet, clientConfig clientcmd.ClientConfig, discoveryFactory DiscoveryClientFactory) ClientAccessFactory {
|
||||||
flags.SetNormalizeFunc(utilflag.WarnWordSepNormalizeFunc) // Warn for "_" flags
|
flags.SetNormalizeFunc(utilflag.WarnWordSepNormalizeFunc) // Warn for "_" flags
|
||||||
|
|
||||||
clientCache := NewClientCache(clientConfig, discoveryFactory)
|
|
||||||
|
|
||||||
f := &ring0Factory{
|
f := &ring0Factory{
|
||||||
flags: flags,
|
flags: flags,
|
||||||
clientConfig: clientConfig,
|
clientConfig: clientConfig,
|
||||||
discoveryFactory: discoveryFactory,
|
discoveryFactory: discoveryFactory,
|
||||||
clientCache: clientCache,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return f
|
return f
|
||||||
@ -202,22 +205,54 @@ func (f *ring0Factory) DiscoveryClient() (discovery.CachedDiscoveryInterface, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *ring0Factory) KubernetesClientSet() (*kubernetes.Clientset, error) {
|
func (f *ring0Factory) KubernetesClientSet() (*kubernetes.Clientset, error) {
|
||||||
return f.clientCache.KubernetesClientSetForVersion(nil)
|
clientConfig, err := f.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return kubernetes.NewForConfig(clientConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *ring0Factory) ClientSet() (internalclientset.Interface, error) {
|
func (f *ring0Factory) ClientSet() (internalclientset.Interface, error) {
|
||||||
return f.clientCache.ClientSetForVersion(nil)
|
clientConfig, err := f.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return internalclientset.NewForConfig(clientConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *ring0Factory) checkMatchingServerVersion() error {
|
||||||
|
f.checkServerVersion.Do(func() {
|
||||||
|
if !f.requireMatchedServerVersion {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
discoveryClient, err := f.DiscoveryClient()
|
||||||
|
if err != nil {
|
||||||
|
f.matchesServerVersionErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.matchesServerVersionErr = discovery.MatchesServerVersion(version.Get(), discoveryClient)
|
||||||
|
})
|
||||||
|
|
||||||
|
return f.matchesServerVersionErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *ring0Factory) ClientConfig() (*restclient.Config, error) {
|
func (f *ring0Factory) ClientConfig() (*restclient.Config, error) {
|
||||||
return f.clientCache.ClientConfigForVersion(nil)
|
if err := f.checkMatchingServerVersion(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
clientConfig, err := f.clientConfig.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
setKubernetesDefaults(clientConfig)
|
||||||
|
return clientConfig, nil
|
||||||
}
|
}
|
||||||
func (f *ring0Factory) BareClientConfig() (*restclient.Config, error) {
|
func (f *ring0Factory) BareClientConfig() (*restclient.Config, error) {
|
||||||
return f.clientConfig.ClientConfig()
|
return f.clientConfig.ClientConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *ring0Factory) RESTClient() (*restclient.RESTClient, error) {
|
func (f *ring0Factory) RESTClient() (*restclient.RESTClient, error) {
|
||||||
clientConfig, err := f.clientCache.ClientConfigForVersion(nil)
|
clientConfig, err := f.ClientConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -395,7 +430,7 @@ func (f *ring0Factory) BindFlags(flags *pflag.FlagSet) {
|
|||||||
// TODO Change flag names to consts to allow safer lookup from subcommands.
|
// TODO Change flag names to consts to allow safer lookup from subcommands.
|
||||||
// TODO Add a verbose flag that turns on glog logging. Probably need a way
|
// TODO Add a verbose flag that turns on glog logging. Probably need a way
|
||||||
// to do that automatically for every subcommand.
|
// to do that automatically for every subcommand.
|
||||||
flags.BoolVar(&f.clientCache.matchVersion, FlagMatchBinaryVersion, false, "Require server version to match client version")
|
flags.BoolVar(&f.requireMatchedServerVersion, FlagMatchBinaryVersion, false, "Require server version to match client version")
|
||||||
|
|
||||||
f.discoveryFactory.BindFlags(flags)
|
f.discoveryFactory.BindFlags(flags)
|
||||||
|
|
||||||
@ -711,3 +746,19 @@ func InternalVersionJSONEncoder() runtime.Encoder {
|
|||||||
encoder := legacyscheme.Codecs.LegacyCodec(legacyscheme.Registry.EnabledVersions()...)
|
encoder := legacyscheme.Codecs.LegacyCodec(legacyscheme.Registry.EnabledVersions()...)
|
||||||
return unstructured.JSONFallbackEncoder{Encoder: encoder}
|
return unstructured.JSONFallbackEncoder{Encoder: encoder}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setKubernetesDefaults sets default values on the provided client config for accessing the
|
||||||
|
// Kubernetes API or returns an error if any of the defaults are impossible or invalid.
|
||||||
|
// TODO this isn't what we want. Each clientset should be setting defaults as it sees fit.
|
||||||
|
func setKubernetesDefaults(config *restclient.Config) error {
|
||||||
|
// TODO remove this hack. This is allowing the GetOptions to be serialized.
|
||||||
|
config.GroupVersion = &schema.GroupVersion{Group: "", Version: "v1"}
|
||||||
|
|
||||||
|
if config.APIPath == "" {
|
||||||
|
config.APIPath = "/api"
|
||||||
|
}
|
||||||
|
if config.NegotiatedSerializer == nil {
|
||||||
|
config.NegotiatedSerializer = legacyscheme.Codecs
|
||||||
|
}
|
||||||
|
return restclient.SetKubernetesDefaults(config)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user