mirror of
https://github.com/k8snetworkplumbingwg/multus-cni.git
synced 2025-08-21 01:33:28 +00:00
372 lines
12 KiB
Go
372 lines
12 KiB
Go
// Copyright (c) 2023 Network Plumbing Working Group
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
// This is Kubernetes controller which approves CSR submitted by multus.
|
|
// This command is required only if multus runs with per-node certificate.
|
|
package main
|
|
|
|
// Note: cert-approver should be simple, just approve multus' CSR, hence
|
|
// this go code should not have any dependencies from pkg/, if possible,
|
|
// to keep its code simplicity.
|
|
import (
|
|
"context"
|
|
"crypto/x509"
|
|
"encoding/pem"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"reflect"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/validation"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
|
|
certificatesv1 "k8s.io/api/certificates/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/klog/v2"
|
|
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/certificate/csr"
|
|
"k8s.io/client-go/util/workqueue"
|
|
)
|
|
|
|
// CertController object
|
|
type CertController struct {
|
|
clientset kubernetes.Interface
|
|
queue workqueue.RateLimitingInterface
|
|
informer cache.SharedIndexInformer
|
|
broadcaster record.EventBroadcaster
|
|
recorder record.EventRecorder
|
|
commonNamePrefixes string
|
|
}
|
|
|
|
const (
|
|
maxDuration = time.Hour * 24 * 365
|
|
resyncPeriod time.Duration = time.Second * 3600 // resync every one hour, default is 10 hour
|
|
maxRetries = 5
|
|
)
|
|
|
|
var (
|
|
// ControllerName provides controller name
|
|
ControllerName = "csr-approver"
|
|
// NamePrefix specifies which name in certification request should be target to approve
|
|
NamePrefix = "system:multus"
|
|
// Organization specifies which org in certification request should be target to approve
|
|
Organization = []string{"system:multus"}
|
|
// Groups specifies which group in certification request should be target to approve
|
|
Groups = sets.New[string]("system:nodes", "system:multus", "system:authenticated")
|
|
// UserPrefixes specifies which name prefix in certification request should be target to approve
|
|
UserPrefixes = sets.New[string]("system:node", NamePrefix)
|
|
// Usages specifies which usage in certification request should be target to approve
|
|
Usages = sets.New[certificatesv1.KeyUsage](
|
|
certificatesv1.UsageDigitalSignature,
|
|
certificatesv1.UsageClientAuth)
|
|
)
|
|
|
|
// NewCertController creates certcontroller
|
|
func NewCertController() (*CertController, error) {
|
|
var clientset kubernetes.Interface
|
|
// setup Kubernetes API client
|
|
config, err := rest.InClusterConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clientset, err = kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
informer := cache.NewSharedIndexInformer(
|
|
cache.NewListWatchFromClient(
|
|
clientset.CertificatesV1().RESTClient(),
|
|
"certificatesigningrequests", corev1.NamespaceAll, fields.Everything()),
|
|
&certificatesv1.CertificateSigningRequest{},
|
|
resyncPeriod,
|
|
nil)
|
|
|
|
broadcaster := record.NewBroadcaster()
|
|
broadcaster.StartLogging(klog.Infof)
|
|
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientset.CoreV1().Events("")})
|
|
recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cert-approver"})
|
|
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
|
|
c := &CertController{
|
|
clientset: clientset,
|
|
informer: informer,
|
|
queue: queue,
|
|
commonNamePrefixes: NamePrefix,
|
|
broadcaster: broadcaster,
|
|
recorder: recorder,
|
|
}
|
|
|
|
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: func(obj interface{}) {
|
|
if csr, ok := obj.(*certificatesv1.CertificateSigningRequest); ok {
|
|
if c.filterCSR(csr) {
|
|
key, err := cache.MetaNamespaceKeyFunc(obj)
|
|
if err == nil {
|
|
queue.Add(key)
|
|
}
|
|
}
|
|
}
|
|
},
|
|
})
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// Run starts controller
|
|
func (c *CertController) Run(stopCh <-chan struct{}) {
|
|
defer utilruntime.HandleCrash()
|
|
defer c.queue.ShutDown()
|
|
|
|
klog.Info("Starting cert approver")
|
|
|
|
go c.informer.Run(stopCh)
|
|
if !cache.WaitForCacheSync(stopCh, c.HasSynced) {
|
|
utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
|
|
return
|
|
}
|
|
|
|
klog.Info("cert approver synced and ready")
|
|
wait.Until(c.runWorker, time.Second, stopCh)
|
|
}
|
|
|
|
// HasSynced is required for the cache.Controller interface.
|
|
func (c *CertController) HasSynced() bool {
|
|
return c.informer.HasSynced()
|
|
}
|
|
|
|
// LastSyncResourceVersion is required for the cache.Controller interface.
|
|
func (c *CertController) LastSyncResourceVersion() string {
|
|
return c.informer.LastSyncResourceVersion()
|
|
}
|
|
|
|
func (c *CertController) runWorker() {
|
|
for c.processNextItem() {
|
|
// continue looping
|
|
}
|
|
}
|
|
|
|
func (c *CertController) processNextItem() bool {
|
|
// Wait until there is a new item in the working queue
|
|
key, quit := c.queue.Get()
|
|
if quit {
|
|
return false
|
|
}
|
|
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
|
|
// This allows safe parallel processing because two pods with the same key are never processed in
|
|
// parallel.
|
|
defer c.queue.Done(key)
|
|
|
|
// Invoke the method containing the business logic
|
|
err := c.processItem(key.(string))
|
|
// Handle the error if something went wrong during the execution of the business logic
|
|
c.handleErr(err, key)
|
|
return true
|
|
|
|
}
|
|
|
|
// handleErr checks if an error happened and makes sure we will retry later.
|
|
func (c *CertController) handleErr(err error, key interface{}) {
|
|
if err == nil {
|
|
// Forget about the #AddRateLimited history of the key on every successful synchronization.
|
|
// This ensures that future processing of updates for this key is not delayed because of
|
|
// an outdated error history.
|
|
c.queue.Forget(key)
|
|
return
|
|
}
|
|
|
|
// This controller retries 5 times if something goes wrong. After that, it stops trying.
|
|
if c.queue.NumRequeues(key) < maxRetries {
|
|
klog.Infof("Error syncing csr %s: %v", key, err)
|
|
// Re-enqueue the key rate limited. Based on the rate limiter on the
|
|
// queue and the re-enqueue history, the key will be processed later again.
|
|
c.queue.AddRateLimited(key)
|
|
return
|
|
}
|
|
|
|
c.queue.Forget(key)
|
|
// Report to an external entity that, even after several retries, we could not successfully process this key
|
|
utilruntime.HandleError(err)
|
|
klog.Infof("Dropping csr %q out of the queue: %v", key, err)
|
|
}
|
|
|
|
func (c *CertController) processItem(key string) error {
|
|
startTime := time.Now()
|
|
|
|
obj, _, err := c.informer.GetIndexer().GetByKey(key)
|
|
if err != nil {
|
|
return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
|
|
}
|
|
|
|
req, _ := obj.(*certificatesv1.CertificateSigningRequest)
|
|
|
|
nodeName := "unknown"
|
|
defer func() {
|
|
klog.Infof("Finished syncing CSR %s for %s node in %v", req.Name, nodeName, time.Since(startTime))
|
|
}()
|
|
|
|
if len(req.Status.Certificate) > 0 {
|
|
klog.V(5).Infof("CSR %s is already signed", req.Name)
|
|
return nil
|
|
}
|
|
|
|
if isApprovedOrDenied(&req.Status) {
|
|
klog.V(5).Infof("CSR %s is already approved/denied", req.Name)
|
|
return nil
|
|
}
|
|
|
|
csrPEM, _ := pem.Decode(req.Spec.Request)
|
|
if csrPEM == nil {
|
|
return fmt.Errorf("failed to PEM-parse the CSR block in .spec.request: no CSRs were found")
|
|
}
|
|
|
|
x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse the CSR bytes: %v", err)
|
|
}
|
|
|
|
i := strings.LastIndex(req.Spec.Username, ":")
|
|
if i == -1 || i == len(req.Spec.Username)-1 {
|
|
return fmt.Errorf("failed to parse the username: %s", req.Spec.Username)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
prefix := req.Spec.Username[:i]
|
|
nodeName = req.Spec.Username[i+1:]
|
|
if !UserPrefixes.Has(prefix) {
|
|
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by an unexpected user: %q", req.Name, req.Spec.Username))
|
|
}
|
|
|
|
if errs := validation.IsDNS1123Subdomain(nodeName); len(errs) != 0 {
|
|
return c.denyCSR(ctx, req, fmt.Sprintf("extracted node name %q is not a valid DNS subdomain %v", nodeName, errs))
|
|
}
|
|
|
|
if usages := sets.New[certificatesv1.KeyUsage](req.Spec.Usages...); !usages.Equal(Usages) {
|
|
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with unexpected usages: %v", req.Name, usages.UnsortedList()))
|
|
}
|
|
|
|
if !Groups.HasAll(req.Spec.Groups...) {
|
|
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by a user with unexpected groups: %v", req.Name, req.Spec.Groups))
|
|
}
|
|
|
|
expectedSubject := fmt.Sprintf("%s:%s", c.commonNamePrefixes, nodeName)
|
|
if x509CSR.Subject.CommonName != expectedSubject {
|
|
return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's commonName to be %q, but it is %q", expectedSubject, x509CSR.Subject.CommonName))
|
|
}
|
|
|
|
if !reflect.DeepEqual(x509CSR.Subject.Organization, Organization) {
|
|
return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's organization to be %v, but it is %v", Organization, x509CSR.Subject.Organization))
|
|
}
|
|
|
|
if req.Spec.ExpirationSeconds == nil {
|
|
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created without specyfying the expirationSeconds", req.Name))
|
|
}
|
|
|
|
if csr.ExpirationSecondsToDuration(*req.Spec.ExpirationSeconds) > maxDuration {
|
|
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with invalid expirationSeconds value: %d", req.Name, *req.Spec.ExpirationSeconds))
|
|
}
|
|
|
|
return c.approveCSR(ctx, req)
|
|
}
|
|
|
|
// CSR specific functions
|
|
|
|
func (c *CertController) filterCSR(csr *certificatesv1.CertificateSigningRequest) bool {
|
|
nsName := types.NamespacedName{Namespace: csr.Namespace, Name: csr.Name}
|
|
csrPEM, _ := pem.Decode(csr.Spec.Request)
|
|
if csrPEM == nil {
|
|
klog.Errorf("Failed to PEM-parse the CSR block in .spec.request: no CSRs were found in %s", nsName)
|
|
return false
|
|
}
|
|
|
|
x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes)
|
|
if err != nil {
|
|
klog.Errorf("Failed to parse the CSR .spec.request of %q: %v", nsName, err)
|
|
return false
|
|
}
|
|
|
|
return strings.HasPrefix(x509CSR.Subject.CommonName, c.commonNamePrefixes) &&
|
|
csr.Spec.SignerName == certificatesv1.KubeAPIServerClientSignerName
|
|
}
|
|
|
|
func (c *CertController) approveCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest) error {
|
|
csr.Status.Conditions = append(csr.Status.Conditions,
|
|
certificatesv1.CertificateSigningRequestCondition{
|
|
Type: certificatesv1.CertificateApproved,
|
|
Status: corev1.ConditionTrue,
|
|
Reason: "AutoApproved",
|
|
Message: fmt.Sprintf("Auto-approved CSR %q", csr.Name),
|
|
})
|
|
|
|
c.recorder.Eventf(csr, corev1.EventTypeNormal, "CSRApproved", "CSR %q has been approved by %s", csr.Name, ControllerName)
|
|
_, err := c.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
|
|
func (c *CertController) denyCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest, message string) error {
|
|
csr.Status.Conditions = append(csr.Status.Conditions,
|
|
certificatesv1.CertificateSigningRequestCondition{
|
|
Type: certificatesv1.CertificateDenied,
|
|
Status: corev1.ConditionTrue,
|
|
Reason: "CSRDenied",
|
|
Message: message,
|
|
},
|
|
)
|
|
|
|
c.recorder.Eventf(csr, corev1.EventTypeWarning, "CSRDenied", "The CSR %q has been denied by: %s", csr.Name, ControllerName, message)
|
|
_, err := c.clientset.CertificatesV1().CertificateSigningRequests().Update(ctx, csr, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
|
|
func isApprovedOrDenied(status *certificatesv1.CertificateSigningRequestStatus) bool {
|
|
for _, c := range status.Conditions {
|
|
if c.Type == certificatesv1.CertificateApproved || c.Type == certificatesv1.CertificateDenied {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func main() {
|
|
klog.Infof("starting cert-approver")
|
|
|
|
// Start watching for pod creations
|
|
certController, err := NewCertController()
|
|
if err != nil {
|
|
klog.Fatal(err)
|
|
}
|
|
|
|
stopCh := make(chan struct{})
|
|
defer close(stopCh)
|
|
go certController.Run(stopCh)
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
|
|
<-sigterm
|
|
}
|