mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 18:00:08 +00:00
kubelet: lazy enabling the ClusterTrustBundleProjection feature
Determine whether the ClusterTrustBundleProjection should be enabled based on ClusterTrustBundle API discovery. Some distributions may rely on a running kubelet in order to start their kube-apiserver. Therefore we must delay the API discovery. This patch delays it until the first time a clustertrustbundle is requested from the InformerMaanager.
This commit is contained in:
parent
0cd2976cab
commit
d3f44a5bc0
@ -23,14 +23,18 @@ import (
|
|||||||
"encoding/pem"
|
"encoding/pem"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-logr/logr"
|
||||||
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
|
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
|
||||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
lrucache "k8s.io/apimachinery/pkg/util/cache"
|
lrucache "k8s.io/apimachinery/pkg/util/cache"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
certinformersv1beta1 "k8s.io/client-go/informers/certificates/v1beta1"
|
certinformersv1beta1 "k8s.io/client-go/informers/certificates/v1beta1"
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
certlistersv1beta1 "k8s.io/client-go/listers/certificates/v1beta1"
|
certlistersv1beta1 "k8s.io/client-go/listers/certificates/v1beta1"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
@ -261,3 +265,113 @@ func (m *NoopManager) GetTrustAnchorsByName(name string, allowMissing bool) ([]b
|
|||||||
func (m *NoopManager) GetTrustAnchorsBySigner(signerName string, labelSelector *metav1.LabelSelector, allowMissing bool) ([]byte, error) {
|
func (m *NoopManager) GetTrustAnchorsBySigner(signerName string, labelSelector *metav1.LabelSelector, allowMissing bool) ([]byte, error) {
|
||||||
return nil, fmt.Errorf("ClusterTrustBundle projection is not supported in static kubelet mode")
|
return nil, fmt.Errorf("ClusterTrustBundle projection is not supported in static kubelet mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LazyInformerManager decides whether to use the noop or the actual manager on a call to
|
||||||
|
// the manager's methods.
|
||||||
|
// We cannot determine this upon startup because some may rely on the kubelet to be fully
|
||||||
|
// running in order to setup their kube-apiserver.
|
||||||
|
type LazyInformerManager struct {
|
||||||
|
manager Manager
|
||||||
|
managerLock sync.RWMutex
|
||||||
|
client clientset.Interface
|
||||||
|
cacheSize int
|
||||||
|
contextWithLogger context.Context
|
||||||
|
logger logr.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewLazyInformerManager(ctx context.Context, kubeClient clientset.Interface, cacheSize int) Manager {
|
||||||
|
return &LazyInformerManager{
|
||||||
|
client: kubeClient,
|
||||||
|
cacheSize: cacheSize,
|
||||||
|
contextWithLogger: ctx,
|
||||||
|
logger: klog.FromContext(ctx),
|
||||||
|
managerLock: sync.RWMutex{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LazyInformerManager) GetTrustAnchorsByName(name string, allowMissing bool) ([]byte, error) {
|
||||||
|
if err := m.ensureManagerSet(); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to ensure informer manager for ClusterTrustBundles: %w", err)
|
||||||
|
}
|
||||||
|
return m.manager.GetTrustAnchorsByName(name, allowMissing)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LazyInformerManager) GetTrustAnchorsBySigner(signerName string, labelSelector *metav1.LabelSelector, allowMissing bool) ([]byte, error) {
|
||||||
|
if err := m.ensureManagerSet(); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to ensure informer manager for ClusterTrustBundles: %w", err)
|
||||||
|
}
|
||||||
|
return m.manager.GetTrustAnchorsBySigner(signerName, labelSelector, allowMissing)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LazyInformerManager) isManagerSet() bool {
|
||||||
|
m.managerLock.RLock()
|
||||||
|
defer m.managerLock.RUnlock()
|
||||||
|
return m.manager != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *LazyInformerManager) ensureManagerSet() error {
|
||||||
|
if m.isManagerSet() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
m.managerLock.Lock()
|
||||||
|
defer m.managerLock.Unlock()
|
||||||
|
// we need to check again in case the manager was set between locking
|
||||||
|
if m.manager != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctbAPIAvailable, err := clusterTrustBundlesAvailable(m.client)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to determine which informer manager to choose: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ctbAPIAvailable {
|
||||||
|
m.manager = &NoopManager{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
kubeInformers := informers.NewSharedInformerFactoryWithOptions(m.client, 0)
|
||||||
|
clusterTrustBundleManager, err := NewInformerManager(m.contextWithLogger, kubeInformers.Certificates().V1beta1().ClusterTrustBundles(), m.cacheSize, 5*time.Minute)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error starting informer-based ClusterTrustBundle manager: %w", err)
|
||||||
|
}
|
||||||
|
m.manager = clusterTrustBundleManager
|
||||||
|
kubeInformers.Start(m.contextWithLogger.Done())
|
||||||
|
m.logger.Info("Started ClusterTrustBundle informer")
|
||||||
|
|
||||||
|
// a cache fetch will likely follow right after, wait for the freshly started
|
||||||
|
// informers to sync
|
||||||
|
synced := true
|
||||||
|
timeoutContext, cancel := context.WithTimeout(m.contextWithLogger, 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
m.logger.Info("Waiting for ClusterTrustBundle informer to sync")
|
||||||
|
for _, ok := range kubeInformers.WaitForCacheSync(timeoutContext.Done()) {
|
||||||
|
synced = synced && ok
|
||||||
|
}
|
||||||
|
if synced {
|
||||||
|
m.logger.Info("ClusterTrustBundle informer synced")
|
||||||
|
} else {
|
||||||
|
m.logger.Info("ClusterTrustBundle informer not synced, continuing to attempt in background")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func clusterTrustBundlesAvailable(client clientset.Interface) (bool, error) {
|
||||||
|
resList, err := client.Discovery().ServerResourcesForGroupVersion(certificatesv1beta1.SchemeGroupVersion.String())
|
||||||
|
if k8serrors.IsNotFound(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if resList != nil {
|
||||||
|
// even in case of an error above there might be a partial list for APIs that
|
||||||
|
// were already successfully discovered
|
||||||
|
for _, r := range resList.APIResources {
|
||||||
|
if r.Name == "clustertrustbundles" {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
@ -878,19 +878,12 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||||||
|
|
||||||
tokenManager := token.NewManager(kubeDeps.KubeClient)
|
tokenManager := token.NewManager(kubeDeps.KubeClient)
|
||||||
|
|
||||||
var clusterTrustBundleManager clustertrustbundle.Manager
|
var clusterTrustBundleManager clustertrustbundle.Manager = &clustertrustbundle.NoopManager{}
|
||||||
if kubeDeps.KubeClient != nil && utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundleProjection) {
|
if kubeDeps.KubeClient != nil && utilfeature.DefaultFeatureGate.Enabled(features.ClusterTrustBundleProjection) {
|
||||||
kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0)
|
clusterTrustBundleManager = clustertrustbundle.NewLazyInformerManager(ctx, kubeDeps.KubeClient, 2*int(kubeCfg.MaxPods))
|
||||||
clusterTrustBundleManager, err = clustertrustbundle.NewInformerManager(ctx, kubeInformers.Certificates().V1beta1().ClusterTrustBundles(), 2*int(kubeCfg.MaxPods), 5*time.Minute)
|
klog.InfoS("ClusterTrustBundle informer will be started eventually once a trust bundle is requested")
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("while starting informer-based ClusterTrustBundle manager: %w", err)
|
|
||||||
}
|
|
||||||
kubeInformers.Start(wait.NeverStop)
|
|
||||||
klog.InfoS("Started ClusterTrustBundle informer")
|
|
||||||
} else {
|
} else {
|
||||||
// In static kubelet mode, use a no-op manager.
|
klog.InfoS("Not starting ClusterTrustBundle informer because we are in static kubelet mode or the ClusterTrustBundleProjection featuregate is disabled")
|
||||||
clusterTrustBundleManager = &clustertrustbundle.NoopManager{}
|
|
||||||
klog.InfoS("Not starting ClusterTrustBundle informer because we are in static kubelet mode")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
|
// NewInitializedVolumePluginMgr initializes some storageErrors on the Kubelet runtimeState (in csi_plugin.go init)
|
||||||
|
Loading…
Reference in New Issue
Block a user