diff --git a/cmd/kube-controller-manager/app/certificates.go b/cmd/kube-controller-manager/app/certificates.go index 42880fa4645..696bcec3c3d 100644 --- a/cmd/kube-controller-manager/app/certificates.go +++ b/cmd/kube-controller-manager/app/certificates.go @@ -23,7 +23,9 @@ import ( "context" "fmt" + certificatesv1alpha1 "k8s.io/api/certificates/v1alpha1" certificatesv1beta1 "k8s.io/api/certificates/v1beta1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/server/dynamiccertificates" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" @@ -233,6 +235,8 @@ func newKubeAPIServerSignerClusterTrustBundledPublisherDescriptor() *ControllerD } } +type controllerConstructor func(string, dynamiccertificates.CAContentProvider, kubernetes.Interface) (ctbpublisher.PublisherRunner, error) + func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) { rootCA, err := getKubeAPIServerCAFileContents(controllerContext) if err != nil { @@ -243,36 +247,50 @@ func newKubeAPIServerSignerClusterTrustBundledPublisherController(ctx context.Co return nil, false, nil } - apiserverSignerClient := controllerContext.ClientBuilder.ClientOrDie("kube-apiserver-serving-clustertrustbundle-publisher") - ctbAvailable, err := clusterTrustBundlesAvailable(apiserverSignerClient) - if err != nil { - return nil, false, fmt.Errorf("discovery failed for ClusterTrustBundle: %w", err) - } - - if !ctbAvailable { - return nil, false, nil - } - servingSigners, err := dynamiccertificates.NewStaticCAContent("kube-apiserver-serving", rootCA) if err != nil { return nil, false, fmt.Errorf("failed to create a static CA content provider for the kube-apiserver-serving signer: %w", err) } - ctbPublisher, err := ctbpublisher.NewClusterTrustBundlePublisher( - "kubernetes.io/kube-apiserver-serving", - servingSigners, - apiserverSignerClient, - ) - if err != nil { - return nil, false, fmt.Errorf("error creating kube-apiserver-serving signer certificates publisher: %w", err) + schemaControllerMapping := map[schema.GroupVersion]controllerConstructor{ + certificatesv1alpha1.SchemeGroupVersion: ctbpublisher.NewAlphaClusterTrustBundlePublisher, + certificatesv1beta1.SchemeGroupVersion: ctbpublisher.NewBetaClusterTrustBundlePublisher, } - go ctbPublisher.Run(ctx) + apiserverSignerClient := controllerContext.ClientBuilder.ClientOrDie("kube-apiserver-serving-clustertrustbundle-publisher") + var runner ctbpublisher.PublisherRunner + for _, gv := range []schema.GroupVersion{certificatesv1beta1.SchemeGroupVersion, certificatesv1alpha1.SchemeGroupVersion} { + ctbAvailable, err := clusterTrustBundlesAvailable(apiserverSignerClient, gv) + if err != nil { + return nil, false, fmt.Errorf("discovery failed for ClusterTrustBundle: %w", err) + } + + if !ctbAvailable { + continue + } + + runner, err = schemaControllerMapping[gv]( + "kubernetes.io/kube-apiserver-serving", + servingSigners, + apiserverSignerClient, + ) + if err != nil { + return nil, false, fmt.Errorf("error creating kube-apiserver-serving signer certificates publisher: %w", err) + } + break + } + + if runner == nil { + klog.Info("no known scheme version was found for clustertrustbundles, cannot start kube-apiserver-serving-clustertrustbundle-publisher-controller") + return nil, false, nil + } + + go runner.Run(ctx) return nil, true, nil } -func clusterTrustBundlesAvailable(client kubernetes.Interface) (bool, error) { - resList, err := client.Discovery().ServerResourcesForGroupVersion(certificatesv1beta1.SchemeGroupVersion.String()) +func clusterTrustBundlesAvailable(client kubernetes.Interface, schemaVersion schema.GroupVersion) (bool, error) { + resList, err := client.Discovery().ServerResourcesForGroupVersion(schemaVersion.String()) if resList != nil { // even in case of an error above there might be a partial list for APIs that diff --git a/pkg/controller/certificates/clustertrustbundlepublisher/publisher.go b/pkg/controller/certificates/clustertrustbundlepublisher/publisher.go index 2197f5bb801..f5293b0583e 100644 --- a/pkg/controller/certificates/clustertrustbundlepublisher/publisher.go +++ b/pkg/controller/certificates/clustertrustbundlepublisher/publisher.go @@ -23,6 +23,7 @@ import ( "strings" "time" + certificatesv1alpha1 "k8s.io/api/certificates/v1alpha1" certificatesv1beta1 "k8s.io/api/certificates/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,9 +32,11 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/server/dynamiccertificates" - certinformers "k8s.io/client-go/informers/certificates/v1beta1" + certalpha1informers "k8s.io/client-go/informers/certificates/v1alpha1" + certbeta1informers "k8s.io/client-go/informers/certificates/v1beta1" clientset "k8s.io/client-go/kubernetes" - certlisters "k8s.io/client-go/listers/certificates/v1beta1" + certalphav1listers "k8s.io/client-go/listers/certificates/v1alpha1" + certbetav1listers "k8s.io/client-go/listers/certificates/v1beta1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -43,41 +46,196 @@ func init() { registerMetrics() } -type ClusterTrustBundlePublisher struct { +type PublisherRunner interface { + Run(context.Context) +} + +type ClusterTrustBundlePublisher[T clusterTrustBundle] struct { signerName string ca dynamiccertificates.CAContentProvider - client clientset.Interface + client clusterTrustBundlesClient[T] ctbInformer cache.SharedIndexInformer - ctbLister certlisters.ClusterTrustBundleLister + ctbLister clusterTrustBundlesLister[T] ctbListerSynced cache.InformerSynced + handlers clusterTrustBundleHandlers[T] + queue workqueue.TypedRateLimitingInterface[string] } +// clusterTrustBundle is a type constraint grouping all APIs versions of ClusterTrustBundles +type clusterTrustBundle interface { + certificatesv1alpha1.ClusterTrustBundle | certificatesv1beta1.ClusterTrustBundle +} + +// clusterTrustBundlesClient is an API-version independent client for the ClusterTrustBundles API +type clusterTrustBundlesClient[T clusterTrustBundle] interface { + Create(context.Context, *T, metav1.CreateOptions) (*T, error) + Update(context.Context, *T, metav1.UpdateOptions) (*T, error) + Delete(context.Context, string, metav1.DeleteOptions) error +} + +// clusterTrustBundlesLister is an API-version independent lister for the ClusterTrustBundles API +type clusterTrustBundlesLister[T clusterTrustBundle] interface { + Get(string) (*T, error) + List(labels.Selector) ([]*T, error) +} + +type clusterTrustBundleHandlers[T clusterTrustBundle] interface { + createClusterTrustBundle(bundleName, signerName, trustBundle string) *T + updateWithTrustBundle(ctbObject *T, newBundle string) *T + containsTrustBundle(ctbObject *T, bundle string) bool + getName(ctbObject *T) string +} + +var _ clusterTrustBundleHandlers[certificatesv1beta1.ClusterTrustBundle] = &betaHandlers{} +var _ clusterTrustBundleHandlers[certificatesv1alpha1.ClusterTrustBundle] = &alphaHandlers{} + +// betaHandlers groups the `clusterTrustBundleHandlers` for the v1beta1 API of +// clusterTrustBundles +type betaHandlers struct{} + +func (w *betaHandlers) createClusterTrustBundle(bundleName, signerName, trustBundle string) *certificatesv1beta1.ClusterTrustBundle { + return &certificatesv1beta1.ClusterTrustBundle{ + ObjectMeta: metav1.ObjectMeta{ + Name: bundleName, + }, + Spec: certificatesv1beta1.ClusterTrustBundleSpec{ + SignerName: signerName, + TrustBundle: trustBundle, + }, + } +} + +func (w *betaHandlers) updateWithTrustBundle(ctbObject *certificatesv1beta1.ClusterTrustBundle, newBundle string) *certificatesv1beta1.ClusterTrustBundle { + newObj := ctbObject.DeepCopy() + newObj.Spec.TrustBundle = newBundle + return newObj +} + +func (w *betaHandlers) containsTrustBundle(ctbObject *certificatesv1beta1.ClusterTrustBundle, bundle string) bool { + return ctbObject.Spec.TrustBundle == bundle +} + +func (w *betaHandlers) getName(ctbObject *certificatesv1beta1.ClusterTrustBundle) string { + return ctbObject.Name +} + +// alphaHandlers groups the `clusterTrustBundleHandlers` for the v1alpha1 API of +// clusterTrustBundles +type alphaHandlers struct{} + +func (w *alphaHandlers) createClusterTrustBundle(bundleName, signerName, trustBundle string) *certificatesv1alpha1.ClusterTrustBundle { + return &certificatesv1alpha1.ClusterTrustBundle{ + ObjectMeta: metav1.ObjectMeta{ + Name: bundleName, + }, + Spec: certificatesv1alpha1.ClusterTrustBundleSpec{ + SignerName: signerName, + TrustBundle: trustBundle, + }, + } +} + +func (w *alphaHandlers) updateWithTrustBundle(ctbObject *certificatesv1alpha1.ClusterTrustBundle, newBundle string) *certificatesv1alpha1.ClusterTrustBundle { + newObj := ctbObject.DeepCopy() + newObj.Spec.TrustBundle = newBundle + return newObj +} + +func (w *alphaHandlers) containsTrustBundle(ctbObject *certificatesv1alpha1.ClusterTrustBundle, bundle string) bool { + return ctbObject.Spec.TrustBundle == bundle +} + +func (w *alphaHandlers) getName(ctbObject *certificatesv1alpha1.ClusterTrustBundle) string { + return ctbObject.Name +} + type caContentListener func() func (f caContentListener) Enqueue() { f() } -// NewClusterTrustBundlePublisher creates and maintains a cluster trust bundle object -// for a signer named `signerName`. The cluster trust bundle object contains the -// CA from the `caProvider` in its .spec.TrustBundle. -func NewClusterTrustBundlePublisher( +// NewBetaClusterTrustBundlePublisher sets up a ClusterTrustBundlePublisher for the +// v1beta1 API +func NewBetaClusterTrustBundlePublisher( signerName string, caProvider dynamiccertificates.CAContentProvider, kubeClient clientset.Interface, -) (*ClusterTrustBundlePublisher, error) { +) ( + PublisherRunner, + error, +) { + + ctbInformer := certbeta1informers.NewFilteredClusterTrustBundleInformer(kubeClient, 0, cache.Indexers{}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.signerName", signerName).String() + }) + + return newClusterTrustBundlePublisher( + signerName, + caProvider, + kubeClient.CertificatesV1beta1().ClusterTrustBundles(), + ctbInformer, + certbetav1listers.NewClusterTrustBundleLister(ctbInformer.GetIndexer()), + &betaHandlers{}, + ) +} + +// NewAlphaClusterTrustBundlePublisher sets up a ClusterTrustBundlePublisher for the +// v1alpha1 API +func NewAlphaClusterTrustBundlePublisher( + signerName string, + caProvider dynamiccertificates.CAContentProvider, + kubeClient clientset.Interface, +) ( + PublisherRunner, + error, +) { + + ctbInformer := certalpha1informers.NewFilteredClusterTrustBundleInformer(kubeClient, 0, cache.Indexers{}, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("spec.signerName", signerName).String() + }) + + return newClusterTrustBundlePublisher( + signerName, + caProvider, + kubeClient.CertificatesV1alpha1().ClusterTrustBundles(), + ctbInformer, + certalphav1listers.NewClusterTrustBundleLister(ctbInformer.GetIndexer()), + &alphaHandlers{}, + ) +} + +// NewClusterTrustBundlePublisher creates and maintains a cluster trust bundle object +// for a signer named `signerName`. The cluster trust bundle object contains the +// CA from the `caProvider` in its .spec.TrustBundle. +func newClusterTrustBundlePublisher[T clusterTrustBundle]( + signerName string, + caProvider dynamiccertificates.CAContentProvider, + bundleClient clusterTrustBundlesClient[T], + ctbInformer cache.SharedIndexInformer, + ctbLister clusterTrustBundlesLister[T], + handlers clusterTrustBundleHandlers[T], +) (PublisherRunner, error) { if len(signerName) == 0 { return nil, fmt.Errorf("signerName cannot be empty") } - p := &ClusterTrustBundlePublisher{ + p := &ClusterTrustBundlePublisher[T]{ signerName: signerName, ca: caProvider, - client: kubeClient, + client: bundleClient, + + ctbInformer: ctbInformer, + ctbLister: ctbLister, + ctbListerSynced: ctbInformer.HasSynced, + + handlers: handlers, queue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), @@ -86,9 +244,6 @@ func NewClusterTrustBundlePublisher( }, ), } - p.ctbInformer = setupSignerNameFilteredCTBInformer(p.client, p.signerName) - p.ctbLister = certlisters.NewClusterTrustBundleLister(p.ctbInformer.GetIndexer()) - p.ctbListerSynced = p.ctbInformer.HasSynced _, err := p.ctbInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -109,13 +264,13 @@ func NewClusterTrustBundlePublisher( return p, nil } -func (p *ClusterTrustBundlePublisher) caContentChangedListener() dynamiccertificates.Listener { +func (p *ClusterTrustBundlePublisher[T]) caContentChangedListener() dynamiccertificates.Listener { return caContentListener(func() { p.queue.Add("") }) } -func (p *ClusterTrustBundlePublisher) Run(ctx context.Context) { +func (p *ClusterTrustBundlePublisher[T]) Run(ctx context.Context) { defer utilruntime.HandleCrash() defer p.queue.ShutDown() @@ -136,7 +291,7 @@ func (p *ClusterTrustBundlePublisher) Run(ctx context.Context) { <-ctx.Done() } -func (p *ClusterTrustBundlePublisher) runWorker() func(context.Context) { +func (p *ClusterTrustBundlePublisher[T]) runWorker() func(context.Context) { return func(ctx context.Context) { for p.processNextWorkItem(ctx) { } @@ -145,7 +300,7 @@ func (p *ClusterTrustBundlePublisher) runWorker() func(context.Context) { // processNextWorkItem deals with one key off the queue. It returns false when // it's time to quit. -func (p *ClusterTrustBundlePublisher) processNextWorkItem(ctx context.Context) bool { +func (p *ClusterTrustBundlePublisher[T]) processNextWorkItem(ctx context.Context) bool { key, quit := p.queue.Get() if quit { return false @@ -162,7 +317,7 @@ func (p *ClusterTrustBundlePublisher) processNextWorkItem(ctx context.Context) b return true } -func (p *ClusterTrustBundlePublisher) syncClusterTrustBundle(ctx context.Context) (err error) { +func (p *ClusterTrustBundlePublisher[T]) syncClusterTrustBundle(ctx context.Context) (err error) { startTime := time.Now() defer func() { recordMetrics(startTime, err) @@ -174,19 +329,10 @@ func (p *ClusterTrustBundlePublisher) syncClusterTrustBundle(ctx context.Context bundle, err := p.ctbLister.Get(bundleName) if apierrors.IsNotFound(err) { - _, err = p.client.CertificatesV1beta1().ClusterTrustBundles().Create(ctx, &certificatesv1beta1.ClusterTrustBundle{ - ObjectMeta: metav1.ObjectMeta{ - Name: bundleName, - }, - Spec: certificatesv1beta1.ClusterTrustBundleSpec{ - SignerName: p.signerName, - TrustBundle: caBundle, - }, - }, metav1.CreateOptions{}) - } else if err == nil && bundle.Spec.TrustBundle != caBundle { - bundle = bundle.DeepCopy() - bundle.Spec.TrustBundle = caBundle - _, err = p.client.CertificatesV1beta1().ClusterTrustBundles().Update(ctx, bundle, metav1.UpdateOptions{}) + _, err = p.client.Create(ctx, p.handlers.createClusterTrustBundle(bundleName, p.signerName, caBundle), metav1.CreateOptions{}) + } else if err == nil && !p.handlers.containsTrustBundle(bundle, caBundle) { + updatedBundle := p.handlers.updateWithTrustBundle(bundle, caBundle) + _, err = p.client.Update(ctx, updatedBundle, metav1.UpdateOptions{}) } if err != nil { @@ -201,12 +347,13 @@ func (p *ClusterTrustBundlePublisher) syncClusterTrustBundle(ctx context.Context // keep the deletion error to be returned in the end in order to retrigger the reconciliation loop var deletionError error for _, bundleObject := range signerTrustBundles { - if bundleObject.Name == bundleName { + if p.handlers.getName(bundleObject) == bundleName { continue } - if err := p.client.CertificatesV1beta1().ClusterTrustBundles().Delete(ctx, bundleObject.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { - klog.FromContext(ctx).Error(err, "failed to remove a cluster trust bundle", "bundleName", bundleObject.Name) + deleteName := p.handlers.getName(bundleObject) + if err := p.client.Delete(ctx, deleteName, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + klog.FromContext(ctx).Error(err, "failed to remove a cluster trust bundle", "bundleName", deleteName) deletionError = err } } @@ -214,13 +361,6 @@ func (p *ClusterTrustBundlePublisher) syncClusterTrustBundle(ctx context.Context return deletionError } -func setupSignerNameFilteredCTBInformer(client clientset.Interface, signerName string) cache.SharedIndexInformer { - return certinformers.NewFilteredClusterTrustBundleInformer(client, 0, cache.Indexers{}, - func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("spec.signerName", signerName).String() - }) -} - func constructBundleName(signerName string, bundleBytes []byte) string { namePrefix := strings.ReplaceAll(signerName, "/", ":") + ":" bundleHash := sha256.Sum256(bundleBytes) diff --git a/pkg/controller/certificates/clustertrustbundlepublisher/publisher_test.go b/pkg/controller/certificates/clustertrustbundlepublisher/publisher_test.go index cd7f83858b3..ca34b5cea5c 100644 --- a/pkg/controller/certificates/clustertrustbundlepublisher/publisher_test.go +++ b/pkg/controller/certificates/clustertrustbundlepublisher/publisher_test.go @@ -264,17 +264,22 @@ func TestCTBPublisherSync(t *testing.T) { fakeClient := fakeKubeClientSetWithCTBList(t, testSignerName, tt.existingCTBs...) - p, err := NewClusterTrustBundlePublisher(testSignerName, testCAProvider, fakeClient) + p, err := NewBetaClusterTrustBundlePublisher(testSignerName, testCAProvider, fakeClient) if err != nil { t.Fatalf("failed to set up a new cluster trust bundle publisher: %v", err) } - go p.ctbInformer.Run(testCtx.Done()) - if !cache.WaitForCacheSync(testCtx.Done(), p.ctbInformer.HasSynced) { + controller, ok := p.(*ClusterTrustBundlePublisher[certificatesv1beta1.ClusterTrustBundle]) + if !ok { + t.Fatalf("failed to assert the controller for the beta API") + } + + go controller.ctbInformer.Run(testCtx.Done()) + if !cache.WaitForCacheSync(testCtx.Done(), controller.ctbInformer.HasSynced) { t.Fatal("timed out waiting for informer to sync") } - if err := p.syncClusterTrustBundle(testCtx); (err != nil) != tt.wantErr { + if err := controller.syncClusterTrustBundle(testCtx); (err != nil) != tt.wantErr { t.Errorf("syncClusterTrustBundle() error = %v, wantErr %v", err, tt.wantErr) }