KCM: CTBPublisher: use generics to handle both alpha/beta APIs

This commit is contained in:
Stanislav Láznička 2024-11-05 23:18:15 +01:00
parent d3f44a5bc0
commit 5b3b68a3a1
No known key found for this signature in database
GPG Key ID: F8D8054395A1D157
3 changed files with 230 additions and 67 deletions

View File

@ -23,7 +23,9 @@ import (
"context"
"fmt"
certificatesv1alpha1 "k8s.io/api/certificates/v1alpha1"
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
@ -233,6 +235,8 @@ func newKubeAPIServerSignerClusterTrustBundledPublisherDescriptor() *ControllerD
}
}
type controllerConstructor func(string, dynamiccertificates.CAContentProvider, kubernetes.Interface) (ctbpublisher.PublisherRunner, error)
func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
rootCA, err := getKubeAPIServerCAFileContents(controllerContext)
if err != nil {
@ -243,36 +247,50 @@ func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Co
return nil, false, nil
}
apiserverSignerClient := controllerContext.ClientBuilder.ClientOrDie("kube-apiserver-serving-clustertrustbundle-publisher")
ctbAvailable, err := clusterTrustBundlesAvailable(apiserverSignerClient)
if err != nil {
return nil, false, fmt.Errorf("discovery failed for ClusterTrustBundle: %w", err)
}
if !ctbAvailable {
return nil, false, nil
}
servingSigners, err := dynamiccertificates.NewStaticCAContent("kube-apiserver-serving", rootCA)
if err != nil {
return nil, false, fmt.Errorf("failed to create a static CA content provider for the kube-apiserver-serving signer: %w", err)
}
ctbPublisher, err := ctbpublisher.NewClusterTrustBundlePublisher(
"kubernetes.io/kube-apiserver-serving",
servingSigners,
apiserverSignerClient,
)
if err != nil {
return nil, false, fmt.Errorf("error creating kube-apiserver-serving signer certificates publisher: %w", err)
schemaControllerMapping := map[schema.GroupVersion]controllerConstructor{
certificatesv1alpha1.SchemeGroupVersion: ctbpublisher.NewAlphaClusterTrustBundlePublisher,
certificatesv1beta1.SchemeGroupVersion: ctbpublisher.NewBetaClusterTrustBundlePublisher,
}
go ctbPublisher.Run(ctx)
apiserverSignerClient := controllerContext.ClientBuilder.ClientOrDie("kube-apiserver-serving-clustertrustbundle-publisher")
var runner ctbpublisher.PublisherRunner
for _, gv := range []schema.GroupVersion{certificatesv1beta1.SchemeGroupVersion, certificatesv1alpha1.SchemeGroupVersion} {
ctbAvailable, err := clusterTrustBundlesAvailable(apiserverSignerClient, gv)
if err != nil {
return nil, false, fmt.Errorf("discovery failed for ClusterTrustBundle: %w", err)
}
if !ctbAvailable {
continue
}
runner, err = schemaControllerMapping[gv](
"kubernetes.io/kube-apiserver-serving",
servingSigners,
apiserverSignerClient,
)
if err != nil {
return nil, false, fmt.Errorf("error creating kube-apiserver-serving signer certificates publisher: %w", err)
}
break
}
if runner == nil {
klog.Info("no known scheme version was found for clustertrustbundles, cannot start kube-apiserver-serving-clustertrustbundle-publisher-controller")
return nil, false, nil
}
go runner.Run(ctx)
return nil, true, nil
}
func clusterTrustBundlesAvailable(client kubernetes.Interface) (bool, error) {
resList, err := client.Discovery().ServerResourcesForGroupVersion(certificatesv1beta1.SchemeGroupVersion.String())
func clusterTrustBundlesAvailable(client kubernetes.Interface, schemaVersion schema.GroupVersion) (bool, error) {
resList, err := client.Discovery().ServerResourcesForGroupVersion(schemaVersion.String())
if resList != nil {
// even in case of an error above there might be a partial list for APIs that

View File

@ -23,6 +23,7 @@ import (
"strings"
"time"
certificatesv1alpha1 "k8s.io/api/certificates/v1alpha1"
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -31,9 +32,11 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
certinformers "k8s.io/client-go/informers/certificates/v1beta1"
certalpha1informers "k8s.io/client-go/informers/certificates/v1alpha1"
certbeta1informers "k8s.io/client-go/informers/certificates/v1beta1"
clientset "k8s.io/client-go/kubernetes"
certlisters "k8s.io/client-go/listers/certificates/v1beta1"
certalphav1listers "k8s.io/client-go/listers/certificates/v1alpha1"
certbetav1listers "k8s.io/client-go/listers/certificates/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
@ -43,41 +46,196 @@ func init() {
registerMetrics()
}
type ClusterTrustBundlePublisher struct {
type PublisherRunner interface {
Run(context.Context)
}
type ClusterTrustBundlePublisher[T clusterTrustBundle] struct {
signerName string
ca dynamiccertificates.CAContentProvider
client clientset.Interface
client clusterTrustBundlesClient[T]
ctbInformer cache.SharedIndexInformer
ctbLister certlisters.ClusterTrustBundleLister
ctbLister clusterTrustBundlesLister[T]
ctbListerSynced cache.InformerSynced
handlers clusterTrustBundleHandlers[T]
queue workqueue.TypedRateLimitingInterface[string]
}
// clusterTrustBundle is a type constraint grouping all APIs versions of ClusterTrustBundles
type clusterTrustBundle interface {
certificatesv1alpha1.ClusterTrustBundle | certificatesv1beta1.ClusterTrustBundle
}
// clusterTrustBundlesClient is an API-version independent client for the ClusterTrustBundles API
type clusterTrustBundlesClient[T clusterTrustBundle] interface {
Create(context.Context, *T, metav1.CreateOptions) (*T, error)
Update(context.Context, *T, metav1.UpdateOptions) (*T, error)
Delete(context.Context, string, metav1.DeleteOptions) error
}
// clusterTrustBundlesLister is an API-version independent lister for the ClusterTrustBundles API
type clusterTrustBundlesLister[T clusterTrustBundle] interface {
Get(string) (*T, error)
List(labels.Selector) ([]*T, error)
}
type clusterTrustBundleHandlers[T clusterTrustBundle] interface {
createClusterTrustBundle(bundleName, signerName, trustBundle string) *T
updateWithTrustBundle(ctbObject *T, newBundle string) *T
containsTrustBundle(ctbObject *T, bundle string) bool
getName(ctbObject *T) string
}
var _ clusterTrustBundleHandlers[certificatesv1beta1.ClusterTrustBundle] = &betaHandlers{}
var _ clusterTrustBundleHandlers[certificatesv1alpha1.ClusterTrustBundle] = &alphaHandlers{}
// betaHandlers groups the `clusterTrustBundleHandlers` for the v1beta1 API of
// clusterTrustBundles
type betaHandlers struct{}
func (w *betaHandlers) createClusterTrustBundle(bundleName, signerName, trustBundle string) *certificatesv1beta1.ClusterTrustBundle {
return &certificatesv1beta1.ClusterTrustBundle{
ObjectMeta: metav1.ObjectMeta{
Name: bundleName,
},
Spec: certificatesv1beta1.ClusterTrustBundleSpec{
SignerName: signerName,
TrustBundle: trustBundle,
},
}
}
func (w *betaHandlers) updateWithTrustBundle(ctbObject *certificatesv1beta1.ClusterTrustBundle, newBundle string) *certificatesv1beta1.ClusterTrustBundle {
newObj := ctbObject.DeepCopy()
newObj.Spec.TrustBundle = newBundle
return newObj
}
func (w *betaHandlers) containsTrustBundle(ctbObject *certificatesv1beta1.ClusterTrustBundle, bundle string) bool {
return ctbObject.Spec.TrustBundle == bundle
}
func (w *betaHandlers) getName(ctbObject *certificatesv1beta1.ClusterTrustBundle) string {
return ctbObject.Name
}
// alphaHandlers groups the `clusterTrustBundleHandlers` for the v1alpha1 API of
// clusterTrustBundles
type alphaHandlers struct{}
func (w *alphaHandlers) createClusterTrustBundle(bundleName, signerName, trustBundle string) *certificatesv1alpha1.ClusterTrustBundle {
return &certificatesv1alpha1.ClusterTrustBundle{
ObjectMeta: metav1.ObjectMeta{
Name: bundleName,
},
Spec: certificatesv1alpha1.ClusterTrustBundleSpec{
SignerName: signerName,
TrustBundle: trustBundle,
},
}
}
func (w *alphaHandlers) updateWithTrustBundle(ctbObject *certificatesv1alpha1.ClusterTrustBundle, newBundle string) *certificatesv1alpha1.ClusterTrustBundle {
newObj := ctbObject.DeepCopy()
newObj.Spec.TrustBundle = newBundle
return newObj
}
func (w *alphaHandlers) containsTrustBundle(ctbObject *certificatesv1alpha1.ClusterTrustBundle, bundle string) bool {
return ctbObject.Spec.TrustBundle == bundle
}
func (w *alphaHandlers) getName(ctbObject *certificatesv1alpha1.ClusterTrustBundle) string {
return ctbObject.Name
}
type caContentListener func()
func (f caContentListener) Enqueue() {
f()
}
// NewClusterTrustBundlePublisher creates and maintains a cluster trust bundle object
// for a signer named `signerName`. The cluster trust bundle object contains the
// CA from the `caProvider` in its .spec.TrustBundle.
func NewClusterTrustBundlePublisher(
// NewBetaClusterTrustBundlePublisher sets up a ClusterTrustBundlePublisher for the
// v1beta1 API
func NewBetaClusterTrustBundlePublisher(
signerName string,
caProvider dynamiccertificates.CAContentProvider,
kubeClient clientset.Interface,
) (*ClusterTrustBundlePublisher, error) {
) (
PublisherRunner,
error,
) {
ctbInformer := certbeta1informers.NewFilteredClusterTrustBundleInformer(kubeClient, 0, cache.Indexers{},
func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.signerName", signerName).String()
})
return newClusterTrustBundlePublisher(
signerName,
caProvider,
kubeClient.CertificatesV1beta1().ClusterTrustBundles(),
ctbInformer,
certbetav1listers.NewClusterTrustBundleLister(ctbInformer.GetIndexer()),
&betaHandlers{},
)
}
// NewAlphaClusterTrustBundlePublisher sets up a ClusterTrustBundlePublisher for the
// v1alpha1 API
func NewAlphaClusterTrustBundlePublisher(
signerName string,
caProvider dynamiccertificates.CAContentProvider,
kubeClient clientset.Interface,
) (
PublisherRunner,
error,
) {
ctbInformer := certalpha1informers.NewFilteredClusterTrustBundleInformer(kubeClient, 0, cache.Indexers{},
func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.signerName", signerName).String()
})
return newClusterTrustBundlePublisher(
signerName,
caProvider,
kubeClient.CertificatesV1alpha1().ClusterTrustBundles(),
ctbInformer,
certalphav1listers.NewClusterTrustBundleLister(ctbInformer.GetIndexer()),
&alphaHandlers{},
)
}
// NewClusterTrustBundlePublisher creates and maintains a cluster trust bundle object
// for a signer named `signerName`. The cluster trust bundle object contains the
// CA from the `caProvider` in its .spec.TrustBundle.
func newClusterTrustBundlePublisher[T clusterTrustBundle](
signerName string,
caProvider dynamiccertificates.CAContentProvider,
bundleClient clusterTrustBundlesClient[T],
ctbInformer cache.SharedIndexInformer,
ctbLister clusterTrustBundlesLister[T],
handlers clusterTrustBundleHandlers[T],
) (PublisherRunner, error) {
if len(signerName) == 0 {
return nil, fmt.Errorf("signerName cannot be empty")
}
p := &ClusterTrustBundlePublisher{
p := &ClusterTrustBundlePublisher[T]{
signerName: signerName,
ca: caProvider,
client: kubeClient,
client: bundleClient,
ctbInformer: ctbInformer,
ctbLister: ctbLister,
ctbListerSynced: ctbInformer.HasSynced,
handlers: handlers,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
@ -86,9 +244,6 @@ func NewClusterTrustBundlePublisher(
},
),
}
p.ctbInformer = setupSignerNameFilteredCTBInformer(p.client, p.signerName)
p.ctbLister = certlisters.NewClusterTrustBundleLister(p.ctbInformer.GetIndexer())
p.ctbListerSynced = p.ctbInformer.HasSynced
_, err := p.ctbInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -109,13 +264,13 @@ func NewClusterTrustBundlePublisher(
return p, nil
}
func (p *ClusterTrustBundlePublisher) caContentChangedListener() dynamiccertificates.Listener {
func (p *ClusterTrustBundlePublisher[T]) caContentChangedListener() dynamiccertificates.Listener {
return caContentListener(func() {
p.queue.Add("")
})
}
func (p *ClusterTrustBundlePublisher) Run(ctx context.Context) {
func (p *ClusterTrustBundlePublisher[T]) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer p.queue.ShutDown()
@ -136,7 +291,7 @@ func (p *ClusterTrustBundlePublisher) Run(ctx context.Context) {
<-ctx.Done()
}
func (p *ClusterTrustBundlePublisher) runWorker() func(context.Context) {
func (p *ClusterTrustBundlePublisher[T]) runWorker() func(context.Context) {
return func(ctx context.Context) {
for p.processNextWorkItem(ctx) {
}
@ -145,7 +300,7 @@ func (p *ClusterTrustBundlePublisher) runWorker() func(context.Context) {
// processNextWorkItem deals with one key off the queue. It returns false when
// it's time to quit.
func (p *ClusterTrustBundlePublisher) processNextWorkItem(ctx context.Context) bool {
func (p *ClusterTrustBundlePublisher[T]) processNextWorkItem(ctx context.Context) bool {
key, quit := p.queue.Get()
if quit {
return false
@ -162,7 +317,7 @@ func (p *ClusterTrustBundlePublisher) processNextWorkItem(ctx context.Context) b
return true
}
func (p *ClusterTrustBundlePublisher) syncClusterTrustBundle(ctx context.Context) (err error) {
func (p *ClusterTrustBundlePublisher[T]) syncClusterTrustBundle(ctx context.Context) (err error) {
startTime := time.Now()
defer func() {
recordMetrics(startTime, err)
@ -174,19 +329,10 @@ func (p *ClusterTrustBundlePublisher) syncClusterTrustBundle(ctx context.Context
bundle, err := p.ctbLister.Get(bundleName)
if apierrors.IsNotFound(err) {
_, err = p.client.CertificatesV1beta1().ClusterTrustBundles().Create(ctx, &certificatesv1beta1.ClusterTrustBundle{
ObjectMeta: metav1.ObjectMeta{
Name: bundleName,
},
Spec: certificatesv1beta1.ClusterTrustBundleSpec{
SignerName: p.signerName,
TrustBundle: caBundle,
},
}, metav1.CreateOptions{})
} else if err == nil && bundle.Spec.TrustBundle != caBundle {
bundle = bundle.DeepCopy()
bundle.Spec.TrustBundle = caBundle
_, err = p.client.CertificatesV1beta1().ClusterTrustBundles().Update(ctx, bundle, metav1.UpdateOptions{})
_, err = p.client.Create(ctx, p.handlers.createClusterTrustBundle(bundleName, p.signerName, caBundle), metav1.CreateOptions{})
} else if err == nil && !p.handlers.containsTrustBundle(bundle, caBundle) {
updatedBundle := p.handlers.updateWithTrustBundle(bundle, caBundle)
_, err = p.client.Update(ctx, updatedBundle, metav1.UpdateOptions{})
}
if err != nil {
@ -201,12 +347,13 @@ func (p *ClusterTrustBundlePublisher) syncClusterTrustBundle(ctx context.Context
// keep the deletion error to be returned in the end in order to retrigger the reconciliation loop
var deletionError error
for _, bundleObject := range signerTrustBundles {
if bundleObject.Name == bundleName {
if p.handlers.getName(bundleObject) == bundleName {
continue
}
if err := p.client.CertificatesV1beta1().ClusterTrustBundles().Delete(ctx, bundleObject.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
klog.FromContext(ctx).Error(err, "failed to remove a cluster trust bundle", "bundleName", bundleObject.Name)
deleteName := p.handlers.getName(bundleObject)
if err := p.client.Delete(ctx, deleteName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
klog.FromContext(ctx).Error(err, "failed to remove a cluster trust bundle", "bundleName", deleteName)
deletionError = err
}
}
@ -214,13 +361,6 @@ func (p *ClusterTrustBundlePublisher) syncClusterTrustBundle(ctx context.Context
return deletionError
}
func setupSignerNameFilteredCTBInformer(client clientset.Interface, signerName string) cache.SharedIndexInformer {
return certinformers.NewFilteredClusterTrustBundleInformer(client, 0, cache.Indexers{},
func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.signerName", signerName).String()
})
}
func constructBundleName(signerName string, bundleBytes []byte) string {
namePrefix := strings.ReplaceAll(signerName, "/", ":") + ":"
bundleHash := sha256.Sum256(bundleBytes)

View File

@ -264,17 +264,22 @@ func TestCTBPublisherSync(t *testing.T) {
fakeClient := fakeKubeClientSetWithCTBList(t, testSignerName, tt.existingCTBs...)
p, err := NewClusterTrustBundlePublisher(testSignerName, testCAProvider, fakeClient)
p, err := NewBetaClusterTrustBundlePublisher(testSignerName, testCAProvider, fakeClient)
if err != nil {
t.Fatalf("failed to set up a new cluster trust bundle publisher: %v", err)
}
go p.ctbInformer.Run(testCtx.Done())
if !cache.WaitForCacheSync(testCtx.Done(), p.ctbInformer.HasSynced) {
controller, ok := p.(*ClusterTrustBundlePublisher[certificatesv1beta1.ClusterTrustBundle])
if !ok {
t.Fatalf("failed to assert the controller for the beta API")
}
go controller.ctbInformer.Run(testCtx.Done())
if !cache.WaitForCacheSync(testCtx.Done(), controller.ctbInformer.HasSynced) {
t.Fatal("timed out waiting for informer to sync")
}
if err := p.syncClusterTrustBundle(testCtx); (err != nil) != tt.wantErr {
if err := controller.syncClusterTrustBundle(testCtx); (err != nil) != tt.wantErr {
t.Errorf("syncClusterTrustBundle() error = %v, wantErr %v", err, tt.wantErr)
}