mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 01:06:27 +00:00
212 lines
6.6 KiB
Go
212 lines
6.6 KiB
Go
/*
|
|
Copyright 2022 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package legacytokentracking
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"golang.org/x/time/rate"
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
corev1informers "k8s.io/client-go/informers/core/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"k8s.io/klog/v2"
|
|
kubefeatures "k8s.io/kubernetes/pkg/features"
|
|
"k8s.io/utils/clock"
|
|
)
|
|
|
|
const (
|
|
ConfigMapName = "kube-apiserver-legacy-service-account-token-tracking"
|
|
ConfigMapDataKey = "since"
|
|
dateFormat = "2006-01-02"
|
|
)
|
|
|
|
var (
|
|
queueKey = metav1.NamespaceSystem + "/" + ConfigMapName
|
|
)
|
|
|
|
// Controller maintains a timestamp value configmap `kube-apiserver-legacy-service-account-token-tracking`
|
|
// in `kube-system` to indicates if the tracking for legacy tokens is enabled in
|
|
// the cluster. For HA clusters, the configmap will be eventually created after
|
|
// all controller instances have enabled the feature. When disabling this
|
|
// feature, existing configmap will be deleted.
|
|
type Controller struct {
|
|
configMapClient corev1client.ConfigMapsGetter
|
|
configMapInformer cache.SharedIndexInformer
|
|
configMapCache cache.Indexer
|
|
configMapSynced cache.InformerSynced
|
|
queue workqueue.RateLimitingInterface
|
|
|
|
// enabled controls the behavior of the controller: if enabled is true, the
|
|
//configmap will be created; otherwise, the configmap will be deleted.
|
|
enabled bool
|
|
// rate limiter controls the rate limit of the creation of the configmap.
|
|
// this is useful in multi-apiserver cluster to prevent config existing in a
|
|
// cluster with mixed enabled/disabled controllers. otherwise, those
|
|
// apiservers will fight to create/delete until all apiservers are enabled
|
|
// or disabled.
|
|
creationRatelimiter *rate.Limiter
|
|
clock clock.Clock
|
|
}
|
|
|
|
// NewController returns a Controller struct.
|
|
func NewController(cs kubernetes.Interface) *Controller {
|
|
return newController(cs, clock.RealClock{}, rate.NewLimiter(rate.Every(30*time.Minute), 1))
|
|
}
|
|
|
|
func newController(cs kubernetes.Interface, cl clock.Clock, limiter *rate.Limiter) *Controller {
|
|
informer := corev1informers.NewFilteredConfigMapInformer(cs, metav1.NamespaceSystem, 12*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, func(options *metav1.ListOptions) {
|
|
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", ConfigMapName).String()
|
|
})
|
|
|
|
c := &Controller{
|
|
configMapClient: cs.CoreV1(),
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "legacy_token_tracking_controller"),
|
|
configMapInformer: informer,
|
|
configMapCache: informer.GetIndexer(),
|
|
configMapSynced: informer.HasSynced,
|
|
enabled: utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LegacyServiceAccountTokenTracking),
|
|
creationRatelimiter: limiter,
|
|
clock: cl,
|
|
}
|
|
|
|
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
c.enqueue()
|
|
},
|
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
|
c.enqueue()
|
|
},
|
|
DeleteFunc: func(obj interface{}) {
|
|
c.enqueue()
|
|
},
|
|
})
|
|
|
|
return c
|
|
}
|
|
|
|
func (c *Controller) enqueue() {
|
|
c.queue.Add(queueKey)
|
|
}
|
|
|
|
// Run starts the controller sync loop.
|
|
func (c *Controller) Run(stopCh <-chan struct{}) {
|
|
defer utilruntime.HandleCrash()
|
|
defer c.queue.ShutDown()
|
|
|
|
klog.Info("Starting legacy_token_tracking_controller")
|
|
defer klog.Infof("Shutting down legacy_token_tracking_controller")
|
|
|
|
go c.configMapInformer.Run(stopCh)
|
|
if !cache.WaitForNamedCacheSync("configmaps", stopCh, c.configMapSynced) {
|
|
return
|
|
}
|
|
|
|
go wait.Until(c.runWorker, time.Second, stopCh)
|
|
|
|
c.queue.Add(queueKey)
|
|
|
|
<-stopCh
|
|
klog.Info("Ending legacy_token_tracking_controller")
|
|
}
|
|
|
|
func (c *Controller) runWorker() {
|
|
for c.processNext() {
|
|
}
|
|
}
|
|
|
|
func (c *Controller) processNext() bool {
|
|
key, quit := c.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
defer c.queue.Done(key)
|
|
|
|
if err := c.syncConfigMap(); err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("while syncing ConfigMap %q, err: %w", key, err))
|
|
c.queue.AddRateLimited(key)
|
|
return true
|
|
}
|
|
c.queue.Forget(key)
|
|
return true
|
|
}
|
|
|
|
func (c *Controller) syncConfigMap() error {
|
|
obj, exists, err := c.configMapCache.GetByKey(queueKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
now := c.clock.Now()
|
|
switch {
|
|
case c.enabled:
|
|
if !exists {
|
|
r := c.creationRatelimiter.ReserveN(now, 1)
|
|
if delay := r.DelayFrom(now); delay > 0 {
|
|
c.queue.AddAfter(queueKey, delay)
|
|
r.CancelAt(now)
|
|
return nil
|
|
}
|
|
|
|
if _, err = c.configMapClient.ConfigMaps(metav1.NamespaceSystem).Create(context.TODO(), &corev1.ConfigMap{
|
|
ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceSystem, Name: ConfigMapName},
|
|
Data: map[string]string{ConfigMapDataKey: now.UTC().Format(dateFormat)},
|
|
}, metav1.CreateOptions{}); err != nil {
|
|
if apierrors.IsAlreadyExists(err) {
|
|
return nil
|
|
}
|
|
// don't consume the creationRatelimiter for an unsuccessful attempt
|
|
r.CancelAt(now)
|
|
return err
|
|
}
|
|
} else {
|
|
configMap := obj.(*corev1.ConfigMap)
|
|
if _, err = time.Parse(dateFormat, configMap.Data[ConfigMapDataKey]); err != nil {
|
|
configMap := configMap.DeepCopy()
|
|
configMap.Data[ConfigMapDataKey] = now.UTC().Format(dateFormat)
|
|
if _, err = c.configMapClient.ConfigMaps(metav1.NamespaceSystem).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
|
|
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
case !c.enabled:
|
|
if exists && obj.(*corev1.ConfigMap).DeletionTimestamp == nil {
|
|
if err := c.configMapClient.ConfigMaps(metav1.NamespaceSystem).Delete(context.TODO(), ConfigMapName, metav1.DeleteOptions{}); err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|