Add per-node-certification support

This change introduces per-node certification for multus pods.
Once multus pod is launched, then specified bootstrap kubeconfig
is used for initial access, then multus sends CSR request to
kube API to get original certs for kube API access. Once it is
accepted then the multus pod uses generated certs for kube access.
This commit is contained in:
Tomofumi Hayashi
2023-09-18 23:46:07 +09:00
parent acfdc64991
commit e5d19fff6b
20 changed files with 2853 additions and 101 deletions

363
cmd/cert-approver/main.go Normal file
View File

@@ -0,0 +1,363 @@
// Copyright (c) 2023 Network Plumbing Working Group
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This is Kubernetes controller which approves CSR submitted by multus.
// This command is required only if multus runs with per-node certificate.
package main
// Note: cert-approver should be simple, just approve multus' CSR, hence
// this go code should not have any dependencies from pkg/, if possible,
// to keep its code simplicity.
import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"os/signal"
"reflect"
"strings"
"syscall"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/wait"
certificatesv1 "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/certificate/csr"
"k8s.io/client-go/util/workqueue"
)
// CertController object
type CertController struct {
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
broadcaster record.EventBroadcaster
recorder record.EventRecorder
commonNamePrefixes string
}
const (
maxDuration = time.Hour * 24 * 365
resyncPeriod time.Duration = time.Second * 3600 // resync every one hour, default is 10 hour
maxRetries = 5
)
var (
ControllerName = "csr-approver"
NamePrefix = "system:multus"
Organization = []string{"system:multus"}
Groups = sets.New[string]("system:nodes", "system:multus", "system:authenticated")
UserPrefixes = sets.New[string]("system:node", NamePrefix)
Usages = sets.New[certificatesv1.KeyUsage](
certificatesv1.UsageDigitalSignature,
certificatesv1.UsageClientAuth)
)
func NewCertController() (*CertController, error) {
var clientset kubernetes.Interface
/* setup Kubernetes API client */
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
informer := cache.NewSharedIndexInformer(
cache.NewListWatchFromClient(
clientset.CertificatesV1().RESTClient(),
"certificatesigningrequests", corev1.NamespaceAll, fields.Everything()),
&certificatesv1.CertificateSigningRequest{},
resyncPeriod,
nil)
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientset.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "cert-approver"})
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
c := &CertController{
clientset: clientset,
informer: informer,
queue: queue,
commonNamePrefixes: NamePrefix,
broadcaster: broadcaster,
recorder: recorder,
}
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if csr, ok := obj.(*certificatesv1.CertificateSigningRequest); ok {
if c.filterCSR(csr) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
}
}
},
})
return c, nil
}
func (c *CertController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Info("Starting cert approver")
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.HasSynced) {
utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
klog.Info("cert approver synced and ready")
wait.Until(c.runWorker, time.Second, stopCh)
}
// HasSynced is required for the cache.Controller interface.
func (c *CertController) HasSynced() bool {
return c.informer.HasSynced()
}
// LastSyncResourceVersion is required for the cache.Controller interface.
func (c *CertController) LastSyncResourceVersion() string {
return c.informer.LastSyncResourceVersion()
}
func (c *CertController) runWorker() {
for c.processNextItem() {
// continue looping
}
}
func (c *CertController) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
}
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.queue.Done(key)
// Invoke the method containing the business logic
err := c.processItem(key.(string))
// Handle the error if something went wrong during the execution of the business logic
c.handleErr(err, key)
return true
}
// handleErr checks if an error happened and makes sure we will retry later.
func (c *CertController) handleErr(err error, key interface{}) {
if err == nil {
// Forget about the #AddRateLimited history of the key on every successful synchronization.
// This ensures that future processing of updates for this key is not delayed because of
// an outdated error history.
c.queue.Forget(key)
return
}
// This controller retries 5 times if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < maxRetries {
klog.Infof("Error syncing csr %s: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
utilruntime.HandleError(err)
klog.Infof("Dropping csr %q out of the queue: %v", key, err)
}
func (c *CertController) processItem(key string) error {
startTime := time.Now()
obj, _, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
}
req, _ := obj.(*certificatesv1.CertificateSigningRequest)
nodeName := "unknown"
defer func() {
klog.Infof("Finished syncing CSR %s for %s node in %v", req.Name, nodeName, time.Since(startTime))
}()
if len(req.Status.Certificate) > 0 {
klog.V(5).Infof("CSR %s is already signed", req.Name)
return nil
}
if isApprovedOrDenied(&req.Status) {
klog.V(5).Infof("CSR %s is already approved/denied", req.Name)
return nil
}
csrPEM, _ := pem.Decode(req.Spec.Request)
if csrPEM == nil {
return fmt.Errorf("failed to PEM-parse the CSR block in .spec.request: no CSRs were found")
}
x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes)
if err != nil {
return fmt.Errorf("failed to parse the CSR bytes: %v", err)
}
i := strings.LastIndex(req.Spec.Username, ":")
if i == -1 || i == len(req.Spec.Username)-1 {
return fmt.Errorf("failed to parse the username: %s", req.Spec.Username)
}
ctx := context.Background()
prefix := req.Spec.Username[:i]
nodeName = req.Spec.Username[i+1:]
if !UserPrefixes.Has(prefix) {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by an unexpected user: %q", req.Name, req.Spec.Username))
}
if errs := validation.IsDNS1123Subdomain(nodeName); len(errs) != 0 {
return c.denyCSR(ctx, req, fmt.Sprintf("extracted node name %q is not a valid DNS subdomain %v", nodeName, errs))
}
if usages := sets.New[certificatesv1.KeyUsage](req.Spec.Usages...); !usages.Equal(Usages) {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with unexpected usages: %v", req.Name, usages.UnsortedList()))
}
if !Groups.HasAll(req.Spec.Groups...) {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created by a user with unexpected groups: %v", req.Name, req.Spec.Groups))
}
expectedSubject := fmt.Sprintf("%s:%s", c.commonNamePrefixes, nodeName)
if x509CSR.Subject.CommonName != expectedSubject {
return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's commonName to be %q, but it is %q", expectedSubject, x509CSR.Subject.CommonName))
}
if !reflect.DeepEqual(x509CSR.Subject.Organization, Organization) {
return c.denyCSR(ctx, req, fmt.Sprintf("expected the CSR's organization to be %v, but it is %v", Organization, x509CSR.Subject.Organization))
}
if req.Spec.ExpirationSeconds == nil {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created without specyfying the expirationSeconds", req.Name))
}
if csr.ExpirationSecondsToDuration(*req.Spec.ExpirationSeconds) > maxDuration {
return c.denyCSR(ctx, req, fmt.Sprintf("CSR %q was created with invalid expirationSeconds value: %d", req.Name, *req.Spec.ExpirationSeconds))
}
return c.approveCSR(ctx, req)
}
// CSR specific functions
func (c *CertController) filterCSR(csr *certificatesv1.CertificateSigningRequest) bool {
nsName := types.NamespacedName{Namespace: csr.Namespace, Name: csr.Name}
csrPEM, _ := pem.Decode(csr.Spec.Request)
if csrPEM == nil {
klog.Errorf("Failed to PEM-parse the CSR block in .spec.request: no CSRs were found in %s", nsName)
return false
}
x509CSR, err := x509.ParseCertificateRequest(csrPEM.Bytes)
if err != nil {
klog.Errorf("Failed to parse the CSR .spec.request of %q: %v", nsName, err)
return false
}
return strings.HasPrefix(x509CSR.Subject.CommonName, c.commonNamePrefixes) &&
csr.Spec.SignerName == certificatesv1.KubeAPIServerClientSignerName
}
func (c *CertController) approveCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest) error {
csr.Status.Conditions = append(csr.Status.Conditions,
certificatesv1.CertificateSigningRequestCondition{
Type: certificatesv1.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "AutoApproved",
Message: fmt.Sprintf("Auto-approved CSR %q", csr.Name),
})
c.recorder.Eventf(csr, corev1.EventTypeNormal, "CSRApproved", "CSR %q has been approved by %s", csr.Name, ControllerName)
_, err := c.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
return err
}
func (c *CertController) denyCSR(ctx context.Context, csr *certificatesv1.CertificateSigningRequest, message string) error {
csr.Status.Conditions = append(csr.Status.Conditions,
certificatesv1.CertificateSigningRequestCondition{
Type: certificatesv1.CertificateDenied,
Status: corev1.ConditionTrue,
Reason: "CSRDenied",
Message: message,
},
)
c.recorder.Eventf(csr, corev1.EventTypeWarning, "CSRDenied", "The CSR %q has been denied by: %s", csr.Name, ControllerName, message)
_, err := c.clientset.CertificatesV1().CertificateSigningRequests().Update(ctx, csr, metav1.UpdateOptions{})
return err
}
func isApprovedOrDenied(status *certificatesv1.CertificateSigningRequestStatus) bool {
for _, c := range status.Conditions {
if c.Type == certificatesv1.CertificateApproved || c.Type == certificatesv1.CertificateDenied {
return true
}
}
return false
}
func main() {
klog.Infof("starting cert-approver")
//Start watching for pod creations
certController, err := NewCertController()
if err != nil {
klog.Fatal(err)
}
stopCh := make(chan struct{})
defer close(stopCh)
go certController.Run(stopCh)
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
<-sigterm
}

View File

@@ -0,0 +1,133 @@
// Copyright (c) 2023 Multus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This binary submit CSR for kube controll access for multus thin plugin
// and generate Kubeconfig
package main
import (
"encoding/base64"
"fmt"
"os"
"os/signal"
"syscall"
"text/template"
"github.com/spf13/pflag"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)
var kubeConfigTemplate = `apiVersion: v1
clusters:
- cluster:
certificate-authority-data: {{.CADATA}}
server: {{.K8S_APISERVER}}
name: default-cluster
contexts:
- context:
cluster: default-cluster
namespace: default
user: default-auth
name: default-context
current-context: default-context
kind: Config
preferences: {}
users:
- name: default-auth
user:
client-certificate: {{.CERTDIR}}/multus-client-current.pem
client-key: {{.CERTDIR}}/multus-client-current.pem
`
func main() {
certDir := pflag.StringP("certdir", "", "/tmp", "specify cert directory")
bootstrapConfig := pflag.StringP("bootstrap-config", "", "/tmp/kubeconfig", "specify bootstrap kubernetes config")
kubeconfigPath := pflag.StringP("kubeconfig", "", "/run/multus/kubeconfig", "specify output kubeconfig path")
helpFlag := pflag.BoolP("help", "h", false, "show help message and quit")
pflag.Parse()
if *helpFlag {
pflag.PrintDefaults()
os.Exit(1)
}
// check variables
if _, err := os.Stat(*bootstrapConfig); err != nil {
klog.Fatalf("failed to read bootstrap config %q", *bootstrapConfig)
}
st, err := os.Stat(*certDir)
if err != nil {
klog.Fatalf("failed to find cert directory %q", *certDir)
}
if !st.IsDir() {
klog.Fatalf("cert directory %q is not directory", *certDir)
}
nodeName := os.Getenv("K8S_NODE")
if nodeName == "" {
klog.Fatalf("cannot identify node name from K8S_NODE env variables")
}
// retrieve API server from bootstrapConfig()
config, err := clientcmd.BuildConfigFromFlags("", *bootstrapConfig)
if err != nil {
klog.Fatalf("cannot get in-cluster config: %v", err)
}
apiServer := fmt.Sprintf("%s%s", config.Host, config.APIPath)
caData := base64.StdEncoding.EncodeToString(config.CAData)
// run certManager to create certification
if _, err = k8sclient.PerNodeK8sClient(nodeName, *bootstrapConfig, *certDir); err != nil {
klog.Fatalf("failed to start cert manager: %v", err)
}
fp, err := os.OpenFile(*kubeconfigPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
klog.Fatalf("cannot create kubeconfig file %q: %v", *kubeconfigPath, err)
}
// render kubeconfig
templateKubeconfig, err := template.New("kubeconfig").Parse(kubeConfigTemplate)
if err != nil {
klog.Fatalf("template parse error: %v", err)
}
templateData := map[string]string{
"CADATA": caData,
"CERTDIR": *certDir,
"K8S_APISERVER": apiServer,
}
// genearate kubeconfig from template
if err = templateKubeconfig.Execute(fp, templateData); err != nil {
klog.Fatalf("cannot create kubeconfig: %v", err)
}
if err = fp.Close(); err != nil {
klog.Fatalf("cannot save kubeconfig: %v", err)
}
klog.Infof("kubeconfig %q is saved", *kubeconfigPath)
// wait for signal
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
<-sigterm
klog.Infof("signal received. remove kubeconfig %q and quit.", *kubeconfigPath)
err = os.Remove(*kubeconfigPath)
if err != nil {
klog.Errorf("failed to remove kubeconfig %q: %v", *kubeconfigPath, err)
}
}

2
go.mod
View File

@@ -21,7 +21,7 @@ require (
k8s.io/apimachinery v0.27.5
k8s.io/client-go v1.5.2
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/klog/v2 v2.90.1
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
k8s.io/kubelet v0.27.5
sigs.k8s.io/yaml v1.3.0 // indirect

View File

@@ -76,3 +76,7 @@ echo "Building install_multus"
go build -o "${DEST_DIR}"/install_multus ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/install_multus
echo "Building thin_entrypoint"
go build -o "${DEST_DIR}"/thin_entrypoint ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/thin_entrypoint
echo "Building kubeconfig_generator"
go build -o "${DEST_DIR}"/kubeconfig_generator ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/kubeconfig_generator
echo "Building cert-approver"
go build -o "${DEST_DIR}"/cert-approver ${BUILD_ARGS} -ldflags "${LDFLAGS}" ./cmd/cert-approver

View File

@@ -16,4 +16,6 @@ WORKDIR /
COPY --from=build /usr/src/multus-cni/bin/install_multus /
COPY --from=build /usr/src/multus-cni/bin/thin_entrypoint /
ENTRYPOINT ["/thin_entrypoint"]
COPY --from=build /usr/src/multus-cni/bin/kubeconfig_generator /
COPY --from=build /usr/src/multus-cni/bin/cert-approver /
CMD ["/thin_entrypoint"]

View File

@@ -16,4 +16,6 @@ WORKDIR /
COPY --from=build /usr/src/multus-cni/bin/install_multus /
COPY --from=build /usr/src/multus-cni/bin/thin_entrypoint /
ENTRYPOINT ["/thin_entrypoint"]
COPY --from=build /usr/src/multus-cni/bin/kubeconfig_generator /
COPY --from=build /usr/src/multus-cni/bin/cert-approver /
CMD ["/thin_entrypoint"]

View File

@@ -11,6 +11,7 @@ FROM debian:stable-slim
LABEL org.opencontainers.image.source https://github.com/k8snetworkplumbingwg/multus-cni
COPY --from=build /usr/src/multus-cni/bin /usr/src/multus-cni/bin
COPY --from=build /usr/src/multus-cni/LICENSE /usr/src/multus-cni/LICENSE
COPY --from=build /usr/src/multus-cni/bin/cert-approver /
WORKDIR /
ENTRYPOINT [ "/usr/src/multus-cni/bin/multus-daemon" ]

View File

@@ -23,18 +23,12 @@ import (
"os"
"regexp"
"strings"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/skel"
@@ -393,87 +387,6 @@ func TryLoadPodDelegates(pod *v1.Pod, conf *types.NetConf, clientInfo *ClientInf
return 0, clientInfo, err
}
// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to
// the k8s API.
func InClusterK8sClient() (*ClientInfo, error) {
clientInfo, err := GetK8sClient("", nil)
if err != nil {
return nil, err
}
if clientInfo == nil {
return nil, fmt.Errorf("failed to create in-cluster kube client")
}
return clientInfo, err
}
// GetK8sClient gets client info from kubeconfig
func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) {
logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient)
// If we get a valid kubeClient (eg from testcases) just return that
// one.
if kubeClient != nil {
return kubeClient, nil
}
var err error
var config *rest.Config
// Otherwise try to create a kubeClient from a given kubeConfig
if kubeconfig != "" {
// uses the current context in kubeconfig
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v: %v", kubeconfig, err)
}
} else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
// Try in-cluster config where multus might be running in a kubernetes pod
config, err = rest.InClusterConfig()
if err != nil {
return nil, logging.Errorf("GetK8sClient: failed to get context for in-cluster kube config: %v", err)
}
} else {
// No kubernetes config; assume we shouldn't talk to Kube at all
return nil, nil
}
// Specify that we use gRPC
config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
config.ContentType = "application/vnd.kubernetes.protobuf"
// Set the config timeout to one minute.
config.Timeout = time.Minute
// Allow multus (especially in server mode) to make more concurrent requests
// to reduce client-side throttling
config.QPS = 50
config.Burst = 50
return newClientInfo(config)
}
// newClientInfo returns a `ClientInfo` from a configuration created from an
// existing kubeconfig file.
func newClientInfo(config *rest.Config) (*ClientInfo, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
netclient, err := netclient.NewForConfig(config)
if err != nil {
return nil, err
}
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"})
return &ClientInfo{
Client: client,
NetClient: netclient,
EventBroadcaster: broadcaster,
EventRecorder: recorder,
}, nil
}
// GetPodNetwork gets net-attach-def annotation from pod
func GetPodNetwork(pod *v1.Pod) ([]*types.NetworkSelectionElement, error) {
logging.Debugf("GetPodNetwork: %v", pod)

219
pkg/k8sclient/kubeconfig.go Normal file
View File

@@ -0,0 +1,219 @@
// Copyright (c) 2023 Multus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package k8sclient
import (
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"fmt"
"os"
"path"
"time"
certificatesv1 "k8s.io/api/certificates/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/certificate"
"k8s.io/klog"
netclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
)
const (
certNamePrefix = "multus-client"
certCommonNamePrefix = "system:multus"
certOrganization = "system:multus"
)
var (
certUsages = []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageClientAuth}
)
// getPerNodeKubeconfig creates new kubeConfig, based on bootstrap, with new certDir
func getPerNodeKubeconfig(bootstrap *rest.Config, certDir string) *rest.Config {
return &rest.Config{
Host: bootstrap.Host,
APIPath: bootstrap.APIPath,
ContentConfig: rest.ContentConfig{
AcceptContentTypes: "application/vnd.kubernetes.protobuf,application/json",
ContentType: "application/vnd.kubernetes.protobuf",
},
TLSClientConfig: rest.TLSClientConfig{
KeyFile: path.Join(certDir, certNamePrefix+"-current.pem"),
CertFile: path.Join(certDir, certNamePrefix+"-current.pem"),
CAData: bootstrap.TLSClientConfig.CAData,
},
// Allow multus (especially in server mode) to make more concurrent requests
// to reduce client-side throttling
QPS: 50,
Burst: 50,
// Set the config timeout to one minute.
Timeout: time.Minute,
}
}
// PerNodeK8sClient creates/reload new multus kubeconfig per-node.
func PerNodeK8sClient(nodeName, bootstrapKubeconfigFile, certDir string) (*ClientInfo, error) {
bootstrapKubeconfig, err := clientcmd.BuildConfigFromFlags("", bootstrapKubeconfigFile)
if err != nil {
return nil, logging.Errorf("failed to load bootstrap kubeconfig %s: %v", bootstrapKubeconfigFile, err)
}
config := getPerNodeKubeconfig(bootstrapKubeconfig, certDir)
// If we have a valid certificate, user that to fetch CSRs.
// Otherwise, use the bootstrap credentials from bootstrapKubeconfig
// https://github.com/kubernetes/kubernetes/blob/068ee321bc7bfe1c2cefb87fb4d9e5deea84fbc8/cmd/kubelet/app/server.go#L953-L963
newClientsetFn := func(current *tls.Certificate) (kubernetes.Interface, error) {
cfg := bootstrapKubeconfig
if current != nil {
cfg = config
}
return kubernetes.NewForConfig(cfg)
}
certificateStore, err := certificate.NewFileStore(certNamePrefix, certDir, certDir, "", "")
if err != nil {
return nil, logging.Errorf("failed to initialize the certificate store: %v", err)
}
certDuration := 10 * time.Minute
certManager, err := certificate.NewManager(&certificate.Config{
ClientsetFn: newClientsetFn,
Template: &x509.CertificateRequest{
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s:%s", certCommonNamePrefix, nodeName),
Organization: []string{certOrganization},
},
},
RequestedCertificateLifetime: &certDuration,
SignerName: certificatesv1.KubeAPIServerClientSignerName,
Usages: certUsages,
CertificateStore: certificateStore,
})
if err != nil {
return nil, logging.Errorf("failed to initialize the certificate manager: %v", err)
}
if certDuration < time.Hour {
// the default value for CertCallbackRefreshDuration (5min) is too long for short-lived certs,
// set it to a more sensible value
transport.CertCallbackRefreshDuration = time.Second * 10
}
certManager.Start()
logging.Verbosef("Waiting for certificate")
var storeErr error
err = wait.PollWithContext(context.TODO(), time.Second, 2*time.Minute, func(_ context.Context) (bool, error) {
var currentCert *tls.Certificate
currentCert, storeErr = certificateStore.Current()
return currentCert != nil && storeErr == nil, nil
})
if err != nil {
return nil, logging.Errorf("certificate was not signed, last cert store err: %v err: %v", storeErr, err)
}
logging.Verbosef("Certificate found!")
return newClientInfo(config)
}
// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to
// the k8s API.
func InClusterK8sClient() (*ClientInfo, error) {
clientInfo, err := GetK8sClient("", nil)
if err != nil {
return nil, err
}
if clientInfo == nil {
return nil, fmt.Errorf("failed to create in-cluster kube client")
}
return clientInfo, err
}
// GetK8sClient gets client info from kubeconfig
func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) {
logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient)
// If we get a valid kubeClient (eg from testcases) just return that
// one.
if kubeClient != nil {
return kubeClient, nil
}
var err error
var config *rest.Config
// Otherwise try to create a kubeClient from a given kubeConfig
if kubeconfig != "" {
// uses the current context in kubeconfig
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v: %v", kubeconfig, err)
}
} else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
// Try in-cluster config where multus might be running in a kubernetes pod
config, err = rest.InClusterConfig()
if err != nil {
return nil, logging.Errorf("GetK8sClient: failed to get context for in-cluster kube config: %v", err)
}
} else {
// No kubernetes config; assume we shouldn't talk to Kube at all
return nil, nil
}
// Specify that we use gRPC
config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
config.ContentType = "application/vnd.kubernetes.protobuf"
// Set the config timeout to one minute.
config.Timeout = time.Minute
// Allow multus (especially in server mode) to make more concurrent requests
// to reduce client-side throttling
config.QPS = 50
config.Burst = 50
return newClientInfo(config)
}
// newClientInfo returns a `ClientInfo` from a configuration created from an
// existing kubeconfig file.
func newClientInfo(config *rest.Config) (*ClientInfo, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
netclient, err := netclient.NewForConfig(config)
if err != nil {
return nil, err
}
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"})
return &ClientInfo{
Client: client,
NetClient: netclient,
EventBroadcaster: broadcaster,
EventRecorder: recorder,
}, nil
}

View File

@@ -42,7 +42,7 @@ import (
)
func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.SharedIndexInformer {
informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0 * time.Second)
informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0*time.Second)
podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return v1coreinformers.NewFilteredPodInformer(
@@ -55,7 +55,7 @@ func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.Sha
informerFactory.Start(ctx.Done())
waitCtx, waitCancel := context.WithTimeout(ctx, 20 * time.Second)
waitCtx, waitCancel := context.WithTimeout(ctx, 20*time.Second)
if !cache.WaitForCacheSync(waitCtx.Done(), podInformer.HasSynced) {
logging.Errorf("failed to sync pod informer cache")
}

View File

@@ -170,11 +170,38 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali
return informerFactory, podInformer
}
func isPerNodeCertEnabled(config *PerNodeCertificate) (bool, error) {
if config != nil && config.Enabled {
if config.BootstrapKubeconfig != "" && config.CertDir != "" {
return true, nil
}
return true, logging.Errorf("failed to configure PerNodeCertificate: enabled: %v, BootstrapKubeconfig: %q, CertDir: %q", config.Enabled, config.BootstrapKubeconfig, config.CertDir)
}
return false, nil
}
// NewCNIServer creates and returns a new Server object which will listen on a socket in the given path
func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreReadinessIndicator bool) (*Server, error) {
kubeClient, err := k8s.InClusterK8sClient()
if err != nil {
return nil, fmt.Errorf("error getting k8s client: %v", err)
var kubeClient *k8s.ClientInfo
enabled, err := isPerNodeCertEnabled(daemonConfig.PerNodeCertificate)
if enabled {
if err != nil {
return nil, err
}
perNodeCertConfig := daemonConfig.PerNodeCertificate
nodeName := os.Getenv("K8S_NODE")
if nodeName == "" {
return nil, logging.Errorf("error getting node name for perNodeCertificate")
}
kubeClient, err = k8s.PerNodeK8sClient(nodeName, perNodeCertConfig.BootstrapKubeconfig, perNodeCertConfig.CertDir)
if err != nil {
return nil, logging.Errorf("error getting perNodeClient: %v", err)
}
} else {
kubeClient, err = k8s.InClusterK8sClient()
if err != nil {
return nil, fmt.Errorf("error getting k8s client: %v", err)
}
}
exec := invoke.Exec(nil)

View File

@@ -56,12 +56,20 @@ type Server struct {
ignoreReadinessIndicator bool
}
// PerNodeCertificate for auto certificate generation for per node
type PerNodeCertificate struct {
Enabled bool `json:"enabled,omitempty"`
BootstrapKubeconfig string `json:"bootstrapKubeconfig,omitempty"`
CertDir string `json:"certDir,omitempty"`
}
// ControllerNetConf for the controller cni configuration
type ControllerNetConf struct {
ChrootDir string `json:"chrootDir,omitempty"`
LogFile string `json:"logFile"`
LogLevel string `json:"logLevel"`
LogToStderr bool `json:"logToStderr,omitempty"`
ChrootDir string `json:"chrootDir,omitempty"`
LogFile string `json:"logFile"`
LogLevel string `json:"logLevel"`
LogToStderr bool `json:"logToStderr,omitempty"`
PerNodeCertificate *PerNodeCertificate `json:"perNodeCertificate,omitempty"`
MetricsPort *int `json:"metricsPort,omitempty"`

150
vendor/k8s.io/client-go/tools/watch/informerwatcher.go generated vendored Normal file
View File

@@ -0,0 +1,150 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"sync"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func newEventProcessor(out chan<- watch.Event) *eventProcessor {
return &eventProcessor{
out: out,
cond: sync.NewCond(&sync.Mutex{}),
done: make(chan struct{}),
}
}
// eventProcessor buffers events and writes them to an out chan when a reader
// is waiting. Because of the requirement to buffer events, it synchronizes
// input with a condition, and synchronizes output with a channels. It needs to
// be able to yield while both waiting on an input condition and while blocked
// on writing to the output channel.
type eventProcessor struct {
out chan<- watch.Event
cond *sync.Cond
buff []watch.Event
done chan struct{}
}
func (e *eventProcessor) run() {
for {
batch := e.takeBatch()
e.writeBatch(batch)
if e.stopped() {
return
}
}
}
func (e *eventProcessor) takeBatch() []watch.Event {
e.cond.L.Lock()
defer e.cond.L.Unlock()
for len(e.buff) == 0 && !e.stopped() {
e.cond.Wait()
}
batch := e.buff
e.buff = nil
return batch
}
func (e *eventProcessor) writeBatch(events []watch.Event) {
for _, event := range events {
select {
case e.out <- event:
case <-e.done:
return
}
}
}
func (e *eventProcessor) push(event watch.Event) {
e.cond.L.Lock()
defer e.cond.L.Unlock()
defer e.cond.Signal()
e.buff = append(e.buff, event)
}
func (e *eventProcessor) stopped() bool {
select {
case <-e.done:
return true
default:
return false
}
}
func (e *eventProcessor) stop() {
close(e.done)
e.cond.Signal()
}
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
// it also returns a channel you can use to wait for the informers to fully shutdown.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
ch := make(chan watch.Event)
w := watch.NewProxyWatcher(ch)
e := newEventProcessor(ch)
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e.push(watch.Event{
Type: watch.Added,
Object: obj.(runtime.Object),
})
},
UpdateFunc: func(old, new interface{}) {
e.push(watch.Event{
Type: watch.Modified,
Object: new.(runtime.Object),
})
},
DeleteFunc: func(obj interface{}) {
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
if stale {
// We have no means of passing the additional information down using
// watch API based on watch.Event but the caller can filter such
// objects by checking if metadata.deletionTimestamp is set
obj = staleObj.Obj
}
e.push(watch.Event{
Type: watch.Deleted,
Object: obj.(runtime.Object),
})
},
}, cache.Indexers{})
go e.run()
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
defer e.stop()
informer.Run(w.StopChan())
}()
return indexer, informer, w, doneCh
}

296
vendor/k8s.io/client-go/tools/watch/retrywatcher.go generated vendored Normal file
View File

@@ -0,0 +1,296 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/davecgh/go-spew/spew"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// resourceVersionGetter is an interface used to get resource version from events.
// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method
type resourceVersionGetter interface {
GetResourceVersion() string
}
// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout)
// it will get restarted from the last point without the consumer even knowing about it.
// RetryWatcher does that by inspecting events and keeping track of resourceVersion.
// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes.
// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to
// use Informers for that.
type RetryWatcher struct {
lastResourceVersion string
watcherClient cache.Watcher
resultChan chan watch.Event
stopChan chan struct{}
doneChan chan struct{}
minRestartDelay time.Duration
}
// NewRetryWatcher creates a new RetryWatcher.
// It will make sure that watches gets restarted in case of recoverable errors.
// The initialResourceVersion will be given to watch method when first called.
func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) {
return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second)
}
func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
switch initialResourceVersion {
case "", "0":
// TODO: revisit this if we ever get WATCH v2 where it means start "now"
// without doing the synthetic list of objects at the beginning (see #74022)
return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
default:
break
}
rw := &RetryWatcher{
lastResourceVersion: initialResourceVersion,
watcherClient: watcherClient,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
resultChan: make(chan watch.Event, 0),
minRestartDelay: minRestartDelay,
}
go rw.receive()
return rw, nil
}
func (rw *RetryWatcher) send(event watch.Event) bool {
// Writing to an unbuffered channel is blocking operation
// and we need to check if stop wasn't requested while doing so.
select {
case rw.resultChan <- event:
return true
case <-rw.stopChan:
return false
}
}
// doReceive returns true when it is done, false otherwise.
// If it is not done the second return value holds the time to wait before calling it again.
func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
ResourceVersion: rw.lastResourceVersion,
AllowWatchBookmarks: true,
})
// We are very unlikely to hit EOF here since we are just establishing the call,
// but it may happen that the apiserver is just shutting down (e.g. being restarted)
// This is consistent with how it is handled for informers
switch err {
case nil:
break
case io.EOF:
// watch closed normally
return false, 0
case io.ErrUnexpectedEOF:
klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err)
return false, 0
default:
msg := "Watch failed"
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).InfoS(msg, "err", err)
// Retry
return false, 0
}
klog.ErrorS(err, msg)
// Retry
return false, 0
}
if watcher == nil {
klog.ErrorS(nil, "Watch returned nil watcher")
// Retry
return false, 0
}
ch := watcher.ResultChan()
defer watcher.Stop()
for {
select {
case <-rw.stopChan:
klog.V(4).InfoS("Stopping RetryWatcher.")
return true, 0
case event, ok := <-ch:
if !ok {
klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion)
return false, 0
}
// We need to inspect the event and get ResourceVersion out of it
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
metaObject, ok := event.Object.(resourceVersionGetter)
if !ok {
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}
resourceVersion := metaObject.GetResourceVersion()
if resourceVersion == "" {
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}
// All is fine; send the non-bookmark events and update resource version.
if event.Type != watch.Bookmark {
ok = rw.send(event)
if !ok {
return true, 0
}
}
rw.lastResourceVersion = resourceVersion
continue
case watch.Error:
// This round trip allows us to handle unstructured status
errObject := apierrors.FromObject(event.Object)
statusErr, ok := errObject.(*apierrors.StatusError)
if !ok {
klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object))
// Retry unknown errors
return false, 0
}
status := statusErr.ErrStatus
statusDelay := time.Duration(0)
if status.Details != nil {
statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second
}
switch status.Code {
case http.StatusGone:
// Never retry RV too old errors
_ = rw.send(event)
return true, 0
case http.StatusGatewayTimeout, http.StatusInternalServerError:
// Retry
return false, statusDelay
default:
// We retry by default. RetryWatcher is meant to proceed unless it is certain
// that it can't. If we are not certain, we proceed with retry and leave it
// up to the user to timeout if needed.
// Log here so we have a record of hitting the unexpected error
// and we can whitelist some error codes if we missed any that are expected.
klog.V(5).Info(spew.Sprintf("Retrying after unexpected error: %#+v", event.Object))
// Retry
return false, statusDelay
}
default:
klog.Errorf("Failed to recognize Event type %q", event.Type)
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus,
})
// We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}
}
}
}
// receive reads the result from a watcher, restarting it if necessary.
func (rw *RetryWatcher) receive() {
defer close(rw.doneChan)
defer close(rw.resultChan)
klog.V(4).Info("Starting RetryWatcher.")
defer klog.V(4).Info("Stopping RetryWatcher.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-rw.stopChan:
cancel()
return
case <-ctx.Done():
return
}
}()
// We use non sliding until so we don't introduce delays on happy path when WATCH call
// timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
done, retryAfter := rw.doReceive()
if done {
cancel()
return
}
timer := time.NewTimer(retryAfter)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
}, rw.minRestartDelay)
}
// ResultChan implements Interface.
func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
return rw.resultChan
}
// Stop implements Interface.
func (rw *RetryWatcher) Stop() {
close(rw.stopChan)
}
// Done allows the caller to be notified when Retry watcher stops.
func (rw *RetryWatcher) Done() <-chan struct{} {
return rw.doneChan
}

168
vendor/k8s.io/client-go/tools/watch/until.go generated vendored Normal file
View File

@@ -0,0 +1,168 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"context"
"errors"
"fmt"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition failed or detected an error state.
type PreconditionFunc func(store cache.Store) (bool, error)
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
// from false to true).
type ConditionFunc func(event watch.Event) (bool, error)
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
// Waits until context deadline or until context is canceled.
//
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
// Warning: solving such issues.
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
ch := watcher.ResultChan()
defer watcher.Stop()
var lastEvent *watch.Event
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
continue
}
}
ConditionSucceeded:
for {
select {
case event, ok := <-ch:
if !ok {
return lastEvent, ErrWatchClosed
}
lastEvent = &event
done, err := condition(event)
if err != nil {
return lastEvent, err
}
if done {
break ConditionSucceeded
}
case <-ctx.Done():
return lastEvent, wait.ErrWaitTimeout
}
}
}
return lastEvent, nil
}
// Until wraps the watcherClient's watch function with RetryWatcher making sure that watcher gets restarted in case of errors.
// The initialResourceVersion will be given to watch method when first called. It shall not be "" or "0"
// given the underlying WATCH call issues (#74022).
// Remaining behaviour is identical to function UntilWithoutRetry. (See above.)
// Until can deal with API timeouts and lost connections.
// It guarantees you to see all events and in the order they happened.
// Due to this guarantee there is no way it can deal with 'Resource version too old error'. It will fail in this case.
// (See `UntilWithSync` if you'd prefer to recover from all the errors including RV too old by re-listing
// those items. In normal code you should care about being level driven so you'd not care about not seeing all the edges.)
//
// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges").
func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) {
w, err := NewRetryWatcher(initialResourceVersion, watcherClient)
if err != nil {
return nil, err
}
return UntilWithoutRetry(ctx, w, conditions...)
}
// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced,
// and watches the output until each provided condition succeeds, in a way that is identical
// to function UntilWithoutRetry. (See above.)
// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'.
// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will
// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple
// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will
// re-list to recover and you always get an event, if there has been a change, after recovery.
// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for
// particular object, not between more of them even it's the same resource.
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
// waiting for object reaching a state, "small" controllers, ...
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType)
// We need to wait for the internal informers to fully stop so it's easier to reason about
// and it works with non-thread safe clients.
defer func() { <-done }()
// Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and
// let UntilWithoutRetry to stop it
defer watcher.Stop()
if precondition != nil {
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %w", ctx.Err())
}
done, err := precondition(indexer)
if err != nil {
return nil, err
}
if done {
return nil, nil
}
}
return UntilWithoutRetry(ctx, watcher, conditions...)
}
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout < 0 {
// This should be handled in validation
klog.Errorf("Timeout for context shall not be negative!")
timeout = 0
}
if timeout == 0 {
return context.WithCancel(parent)
}
return context.WithTimeout(parent, timeout)
}

8
vendor/k8s.io/client-go/util/certificate/OWNERS generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-auth-certificates-approvers
reviewers:
- sig-auth-certificates-reviewers
labels:
- sig/auth

View File

@@ -0,0 +1,775 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package certificate
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
cryptorand "crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"reflect"
"sync"
"time"
"k8s.io/klog/v2"
certificates "k8s.io/api/certificates/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/cert"
"k8s.io/client-go/util/certificate/csr"
"k8s.io/client-go/util/keyutil"
)
var (
// certificateWaitTimeout controls the amount of time we wait for certificate
// approval in one iteration.
certificateWaitTimeout = 15 * time.Minute
kubeletServingUsagesWithEncipherment = []certificates.KeyUsage{
// https://tools.ietf.org/html/rfc5280#section-4.2.1.3
//
// Digital signature allows the certificate to be used to verify
// digital signatures used during TLS negotiation.
certificates.UsageDigitalSignature,
// KeyEncipherment allows the cert/key pair to be used to encrypt
// keys, including the symmetric keys negotiated during TLS setup
// and used for data transfer.
certificates.UsageKeyEncipherment,
// ServerAuth allows the cert to be used by a TLS server to
// authenticate itself to a TLS client.
certificates.UsageServerAuth,
}
kubeletServingUsagesNoEncipherment = []certificates.KeyUsage{
// https://tools.ietf.org/html/rfc5280#section-4.2.1.3
//
// Digital signature allows the certificate to be used to verify
// digital signatures used during TLS negotiation.
certificates.UsageDigitalSignature,
// ServerAuth allows the cert to be used by a TLS server to
// authenticate itself to a TLS client.
certificates.UsageServerAuth,
}
DefaultKubeletServingGetUsages = func(privateKey interface{}) []certificates.KeyUsage {
switch privateKey.(type) {
case *rsa.PrivateKey:
return kubeletServingUsagesWithEncipherment
default:
return kubeletServingUsagesNoEncipherment
}
}
kubeletClientUsagesWithEncipherment = []certificates.KeyUsage{
// https://tools.ietf.org/html/rfc5280#section-4.2.1.3
//
// Digital signature allows the certificate to be used to verify
// digital signatures used during TLS negotiation.
certificates.UsageDigitalSignature,
// KeyEncipherment allows the cert/key pair to be used to encrypt
// keys, including the symmetric keys negotiated during TLS setup
// and used for data transfer.
certificates.UsageKeyEncipherment,
// ClientAuth allows the cert to be used by a TLS client to
// authenticate itself to the TLS server.
certificates.UsageClientAuth,
}
kubeletClientUsagesNoEncipherment = []certificates.KeyUsage{
// https://tools.ietf.org/html/rfc5280#section-4.2.1.3
//
// Digital signature allows the certificate to be used to verify
// digital signatures used during TLS negotiation.
certificates.UsageDigitalSignature,
// ClientAuth allows the cert to be used by a TLS client to
// authenticate itself to the TLS server.
certificates.UsageClientAuth,
}
DefaultKubeletClientGetUsages = func(privateKey interface{}) []certificates.KeyUsage {
switch privateKey.(type) {
case *rsa.PrivateKey:
return kubeletClientUsagesWithEncipherment
default:
return kubeletClientUsagesNoEncipherment
}
}
)
// Manager maintains and updates the certificates in use by this certificate
// manager. In the background it communicates with the API server to get new
// certificates for certificates about to expire.
type Manager interface {
// Start the API server status sync loop.
Start()
// Stop the cert manager loop.
Stop()
// Current returns the currently selected certificate from the
// certificate manager, as well as the associated certificate and key data
// in PEM format.
Current() *tls.Certificate
// ServerHealthy returns true if the manager is able to communicate with
// the server. This allows a caller to determine whether the cert manager
// thinks it can potentially talk to the API server. The cert manager may
// be very conservative and only return true if recent communication has
// occurred with the server.
ServerHealthy() bool
}
// Config is the set of configuration parameters available for a new Manager.
type Config struct {
// ClientsetFn will be used to create a clientset for
// creating/fetching new certificate requests generated when a key rotation occurs.
// The function will never be invoked in parallel.
// It is passed the current client certificate if one exists.
ClientsetFn ClientsetFunc
// Template is the CertificateRequest that will be used as a template for
// generating certificate signing requests for all new keys generated as
// part of rotation. It follows the same rules as the template parameter of
// crypto.x509.CreateCertificateRequest in the Go standard libraries.
Template *x509.CertificateRequest
// GetTemplate returns the CertificateRequest that will be used as a template for
// generating certificate signing requests for all new keys generated as
// part of rotation. It follows the same rules as the template parameter of
// crypto.x509.CreateCertificateRequest in the Go standard libraries.
// If no template is available, nil may be returned, and no certificate will be requested.
// If specified, takes precedence over Template.
GetTemplate func() *x509.CertificateRequest
// SignerName is the name of the certificate signer that should sign certificates
// generated by the manager.
SignerName string
// RequestedCertificateLifetime is the requested lifetime length for certificates generated by the manager.
// Optional.
// This will set the spec.expirationSeconds field on the CSR. Controlling the lifetime of
// the issued certificate is not guaranteed as the signer may choose to ignore the request.
RequestedCertificateLifetime *time.Duration
// Usages is the types of usages that certificates generated by the manager
// can be used for. It is mutually exclusive with GetUsages.
Usages []certificates.KeyUsage
// GetUsages is dynamic way to get the types of usages that certificates generated by the manager
// can be used for. If Usages is not nil, GetUsages has to be nil, vice versa.
// It is mutually exclusive with Usages.
GetUsages func(privateKey interface{}) []certificates.KeyUsage
// CertificateStore is a persistent store where the current cert/key is
// kept and future cert/key pairs will be persisted after they are
// generated.
CertificateStore Store
// BootstrapCertificatePEM is the certificate data that will be returned
// from the Manager if the CertificateStore doesn't have any cert/key pairs
// currently available and has not yet had a chance to get a new cert/key
// pair from the API. If the CertificateStore does have a cert/key pair,
// this will be ignored. If there is no cert/key pair available in the
// CertificateStore, as soon as Start is called, it will request a new
// cert/key pair from the CertificateSigningRequestClient. This is intended
// to allow the first boot of a component to be initialized using a
// generic, multi-use cert/key pair which will be quickly replaced with a
// unique cert/key pair.
BootstrapCertificatePEM []byte
// BootstrapKeyPEM is the key data that will be returned from the Manager
// if the CertificateStore doesn't have any cert/key pairs currently
// available. If the CertificateStore does have a cert/key pair, this will
// be ignored. If the bootstrap cert/key pair are used, they will be
// rotated at the first opportunity, possibly well in advance of expiring.
// This is intended to allow the first boot of a component to be
// initialized using a generic, multi-use cert/key pair which will be
// quickly replaced with a unique cert/key pair.
BootstrapKeyPEM []byte `datapolicy:"security-key"`
// CertificateRotation will record a metric showing the time in seconds
// that certificates lived before being rotated. This metric is a histogram
// because there is value in keeping a history of rotation cadences. It
// allows one to setup monitoring and alerting of unexpected rotation
// behavior and track trends in rotation frequency.
CertificateRotation Histogram
// CertifcateRenewFailure will record a metric that keeps track of
// certificate renewal failures.
CertificateRenewFailure Counter
// Name is an optional string that will be used when writing log output
// or returning errors from manager methods. If not set, SignerName will
// be used, if SignerName is not set, if Usages includes client auth the
// name will be "client auth", otherwise the value will be "server".
Name string
// Logf is an optional function that log output will be sent to from the
// certificate manager. If not set it will use klog.V(2)
Logf func(format string, args ...interface{})
}
// Store is responsible for getting and updating the current certificate.
// Depending on the concrete implementation, the backing store for this
// behavior may vary.
type Store interface {
// Current returns the currently selected certificate, as well as the
// associated certificate and key data in PEM format. If the Store doesn't
// have a cert/key pair currently, it should return a NoCertKeyError so
// that the Manager can recover by using bootstrap certificates to request
// a new cert/key pair.
Current() (*tls.Certificate, error)
// Update accepts the PEM data for the cert/key pair and makes the new
// cert/key pair the 'current' pair, that will be returned by future calls
// to Current().
Update(cert, key []byte) (*tls.Certificate, error)
}
// Gauge will record the remaining lifetime of the certificate each time it is
// updated.
type Gauge interface {
Set(float64)
}
// Histogram will record the time a rotated certificate was used before being
// rotated.
type Histogram interface {
Observe(float64)
}
// Counter will wrap a counter with labels
type Counter interface {
Inc()
}
// NoCertKeyError indicates there is no cert/key currently available.
type NoCertKeyError string
// ClientsetFunc returns a new clientset for discovering CSR API availability and requesting CSRs.
// It is passed the current certificate if one is available and valid.
type ClientsetFunc func(current *tls.Certificate) (clientset.Interface, error)
func (e *NoCertKeyError) Error() string { return string(*e) }
type manager struct {
getTemplate func() *x509.CertificateRequest
// lastRequestLock guards lastRequestCancel and lastRequest
lastRequestLock sync.Mutex
lastRequestCancel context.CancelFunc
lastRequest *x509.CertificateRequest
dynamicTemplate bool
signerName string
requestedCertificateLifetime *time.Duration
getUsages func(privateKey interface{}) []certificates.KeyUsage
forceRotation bool
certStore Store
certificateRotation Histogram
certificateRenewFailure Counter
// the following variables must only be accessed under certAccessLock
certAccessLock sync.RWMutex
cert *tls.Certificate
serverHealth bool
// the clientFn must only be accessed under the clientAccessLock
clientAccessLock sync.Mutex
clientsetFn ClientsetFunc
stopCh chan struct{}
stopped bool
// Set to time.Now but can be stubbed out for testing
now func() time.Time
name string
logf func(format string, args ...interface{})
}
// NewManager returns a new certificate manager. A certificate manager is
// responsible for being the authoritative source of certificates in the
// Kubelet and handling updates due to rotation.
func NewManager(config *Config) (Manager, error) {
cert, forceRotation, err := getCurrentCertificateOrBootstrap(
config.CertificateStore,
config.BootstrapCertificatePEM,
config.BootstrapKeyPEM)
if err != nil {
return nil, err
}
getTemplate := config.GetTemplate
if getTemplate == nil {
getTemplate = func() *x509.CertificateRequest { return config.Template }
}
if config.GetUsages != nil && config.Usages != nil {
return nil, errors.New("cannot specify both GetUsages and Usages")
}
if config.GetUsages == nil && config.Usages == nil {
return nil, errors.New("either GetUsages or Usages should be specified")
}
var getUsages func(interface{}) []certificates.KeyUsage
if config.GetUsages != nil {
getUsages = config.GetUsages
} else {
getUsages = func(interface{}) []certificates.KeyUsage { return config.Usages }
}
m := manager{
stopCh: make(chan struct{}),
clientsetFn: config.ClientsetFn,
getTemplate: getTemplate,
dynamicTemplate: config.GetTemplate != nil,
signerName: config.SignerName,
requestedCertificateLifetime: config.RequestedCertificateLifetime,
getUsages: getUsages,
certStore: config.CertificateStore,
cert: cert,
forceRotation: forceRotation,
certificateRotation: config.CertificateRotation,
certificateRenewFailure: config.CertificateRenewFailure,
now: time.Now,
}
name := config.Name
if len(name) == 0 {
name = m.signerName
}
if len(name) == 0 {
usages := getUsages(nil)
switch {
case hasKeyUsage(usages, certificates.UsageClientAuth):
name = string(certificates.UsageClientAuth)
default:
name = "certificate"
}
}
m.name = name
m.logf = config.Logf
if m.logf == nil {
m.logf = func(format string, args ...interface{}) { klog.V(2).Infof(format, args...) }
}
return &m, nil
}
// Current returns the currently selected certificate from the certificate
// manager. This can be nil if the manager was initialized without a
// certificate and has not yet received one from the
// CertificateSigningRequestClient, or if the current cert has expired.
func (m *manager) Current() *tls.Certificate {
m.certAccessLock.RLock()
defer m.certAccessLock.RUnlock()
if m.cert != nil && m.cert.Leaf != nil && m.now().After(m.cert.Leaf.NotAfter) {
m.logf("%s: Current certificate is expired", m.name)
return nil
}
return m.cert
}
// ServerHealthy returns true if the cert manager believes the server
// is currently alive.
func (m *manager) ServerHealthy() bool {
m.certAccessLock.RLock()
defer m.certAccessLock.RUnlock()
return m.serverHealth
}
// Stop terminates the manager.
func (m *manager) Stop() {
m.clientAccessLock.Lock()
defer m.clientAccessLock.Unlock()
if m.stopped {
return
}
close(m.stopCh)
m.stopped = true
}
// Start will start the background work of rotating the certificates.
func (m *manager) Start() {
// Certificate rotation depends on access to the API server certificate
// signing API, so don't start the certificate manager if we don't have a
// client.
if m.clientsetFn == nil {
m.logf("%s: Certificate rotation is not enabled, no connection to the apiserver", m.name)
return
}
m.logf("%s: Certificate rotation is enabled", m.name)
templateChanged := make(chan struct{})
go wait.Until(func() {
deadline := m.nextRotationDeadline()
if sleepInterval := deadline.Sub(m.now()); sleepInterval > 0 {
m.logf("%s: Waiting %v for next certificate rotation", m.name, sleepInterval)
timer := time.NewTimer(sleepInterval)
defer timer.Stop()
select {
case <-timer.C:
// unblock when deadline expires
case <-templateChanged:
_, lastRequestTemplate := m.getLastRequest()
if reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) {
// if the template now matches what we last requested, restart the rotation deadline loop
return
}
m.logf("%s: Certificate template changed, rotating", m.name)
}
}
// Don't enter rotateCerts and trigger backoff if we don't even have a template to request yet
if m.getTemplate() == nil {
return
}
backoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 2,
Jitter: 0.1,
Steps: 5,
}
if err := wait.ExponentialBackoff(backoff, m.rotateCerts); err != nil {
utilruntime.HandleError(fmt.Errorf("%s: Reached backoff limit, still unable to rotate certs: %v", m.name, err))
wait.PollInfinite(32*time.Second, m.rotateCerts)
}
}, time.Second, m.stopCh)
if m.dynamicTemplate {
go wait.Until(func() {
// check if the current template matches what we last requested
lastRequestCancel, lastRequestTemplate := m.getLastRequest()
if !m.certSatisfiesTemplate() && !reflect.DeepEqual(lastRequestTemplate, m.getTemplate()) {
// if the template is different, queue up an interrupt of the rotation deadline loop.
// if we've requested a CSR that matches the new template by the time the interrupt is handled, the interrupt is disregarded.
if lastRequestCancel != nil {
// if we're currently waiting on a submitted request that no longer matches what we want, stop waiting
lastRequestCancel()
}
select {
case templateChanged <- struct{}{}:
case <-m.stopCh:
}
}
}, time.Second, m.stopCh)
}
}
func getCurrentCertificateOrBootstrap(
store Store,
bootstrapCertificatePEM []byte,
bootstrapKeyPEM []byte) (cert *tls.Certificate, shouldRotate bool, errResult error) {
currentCert, err := store.Current()
if err == nil {
// if the current cert is expired, fall back to the bootstrap cert
if currentCert.Leaf != nil && time.Now().Before(currentCert.Leaf.NotAfter) {
return currentCert, false, nil
}
} else {
if _, ok := err.(*NoCertKeyError); !ok {
return nil, false, err
}
}
if bootstrapCertificatePEM == nil || bootstrapKeyPEM == nil {
return nil, true, nil
}
bootstrapCert, err := tls.X509KeyPair(bootstrapCertificatePEM, bootstrapKeyPEM)
if err != nil {
return nil, false, err
}
if len(bootstrapCert.Certificate) < 1 {
return nil, false, fmt.Errorf("no cert/key data found")
}
certs, err := x509.ParseCertificates(bootstrapCert.Certificate[0])
if err != nil {
return nil, false, fmt.Errorf("unable to parse certificate data: %v", err)
}
if len(certs) < 1 {
return nil, false, fmt.Errorf("no cert data found")
}
bootstrapCert.Leaf = certs[0]
if _, err := store.Update(bootstrapCertificatePEM, bootstrapKeyPEM); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to set the cert/key pair to the bootstrap certificate: %v", err))
}
return &bootstrapCert, true, nil
}
func (m *manager) getClientset() (clientset.Interface, error) {
current := m.Current()
m.clientAccessLock.Lock()
defer m.clientAccessLock.Unlock()
return m.clientsetFn(current)
}
// RotateCerts is exposed for testing only and is not a part of the public interface.
// Returns true if it changed the cert, false otherwise. Error is only returned in
// exceptional cases.
func (m *manager) RotateCerts() (bool, error) {
return m.rotateCerts()
}
// rotateCerts attempts to request a client cert from the server, wait a reasonable
// period of time for it to be signed, and then update the cert on disk. If it cannot
// retrieve a cert, it will return false. It will only return error in exceptional cases.
// This method also keeps track of "server health" by interpreting the responses it gets
// from the server on the various calls it makes.
// TODO: return errors, have callers handle and log them correctly
func (m *manager) rotateCerts() (bool, error) {
m.logf("%s: Rotating certificates", m.name)
template, csrPEM, keyPEM, privateKey, err := m.generateCSR()
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: Unable to generate a certificate signing request: %v", m.name, err))
if m.certificateRenewFailure != nil {
m.certificateRenewFailure.Inc()
}
return false, nil
}
// request the client each time
clientSet, err := m.getClientset()
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: Unable to load a client to request certificates: %v", m.name, err))
if m.certificateRenewFailure != nil {
m.certificateRenewFailure.Inc()
}
return false, nil
}
getUsages := m.getUsages
if m.getUsages == nil {
getUsages = DefaultKubeletClientGetUsages
}
usages := getUsages(privateKey)
// Call the Certificate Signing Request API to get a certificate for the
// new private key
reqName, reqUID, err := csr.RequestCertificate(clientSet, csrPEM, "", m.signerName, m.requestedCertificateLifetime, usages, privateKey)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: Failed while requesting a signed certificate from the control plane: %v", m.name, err))
if m.certificateRenewFailure != nil {
m.certificateRenewFailure.Inc()
}
return false, m.updateServerError(err)
}
ctx, cancel := context.WithTimeout(context.Background(), certificateWaitTimeout)
defer cancel()
// Once we've successfully submitted a CSR for this template, record that we did so
m.setLastRequest(cancel, template)
// Wait for the certificate to be signed. This interface and internal timout
// is a remainder after the old design using raw watch wrapped with backoff.
crtPEM, err := csr.WaitForCertificate(ctx, clientSet, reqName, reqUID)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: certificate request was not signed: %v", m.name, err))
if m.certificateRenewFailure != nil {
m.certificateRenewFailure.Inc()
}
return false, nil
}
cert, err := m.certStore.Update(crtPEM, keyPEM)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: Unable to store the new cert/key pair: %v", m.name, err))
if m.certificateRenewFailure != nil {
m.certificateRenewFailure.Inc()
}
return false, nil
}
if old := m.updateCached(cert); old != nil && m.certificateRotation != nil {
m.certificateRotation.Observe(m.now().Sub(old.Leaf.NotBefore).Seconds())
}
return true, nil
}
// Check that the current certificate on disk satisfies the requests from the
// current template.
//
// Note that extra items in the certificate's SAN or orgs that don't exist in
// the template will not trigger a renewal.
//
// Requires certAccessLock to be locked.
func (m *manager) certSatisfiesTemplateLocked() bool {
if m.cert == nil {
return false
}
if template := m.getTemplate(); template != nil {
if template.Subject.CommonName != m.cert.Leaf.Subject.CommonName {
m.logf("%s: Current certificate CN (%s) does not match requested CN (%s)", m.name, m.cert.Leaf.Subject.CommonName, template.Subject.CommonName)
return false
}
currentDNSNames := sets.NewString(m.cert.Leaf.DNSNames...)
desiredDNSNames := sets.NewString(template.DNSNames...)
missingDNSNames := desiredDNSNames.Difference(currentDNSNames)
if len(missingDNSNames) > 0 {
m.logf("%s: Current certificate is missing requested DNS names %v", m.name, missingDNSNames.List())
return false
}
currentIPs := sets.NewString()
for _, ip := range m.cert.Leaf.IPAddresses {
currentIPs.Insert(ip.String())
}
desiredIPs := sets.NewString()
for _, ip := range template.IPAddresses {
desiredIPs.Insert(ip.String())
}
missingIPs := desiredIPs.Difference(currentIPs)
if len(missingIPs) > 0 {
m.logf("%s: Current certificate is missing requested IP addresses %v", m.name, missingIPs.List())
return false
}
currentOrgs := sets.NewString(m.cert.Leaf.Subject.Organization...)
desiredOrgs := sets.NewString(template.Subject.Organization...)
missingOrgs := desiredOrgs.Difference(currentOrgs)
if len(missingOrgs) > 0 {
m.logf("%s: Current certificate is missing requested orgs %v", m.name, missingOrgs.List())
return false
}
}
return true
}
func (m *manager) certSatisfiesTemplate() bool {
m.certAccessLock.RLock()
defer m.certAccessLock.RUnlock()
return m.certSatisfiesTemplateLocked()
}
// nextRotationDeadline returns a value for the threshold at which the
// current certificate should be rotated, 80%+/-10% of the expiration of the
// certificate.
func (m *manager) nextRotationDeadline() time.Time {
// forceRotation is not protected by locks
if m.forceRotation {
m.forceRotation = false
return m.now()
}
m.certAccessLock.RLock()
defer m.certAccessLock.RUnlock()
if !m.certSatisfiesTemplateLocked() {
return m.now()
}
notAfter := m.cert.Leaf.NotAfter
totalDuration := float64(notAfter.Sub(m.cert.Leaf.NotBefore))
deadline := m.cert.Leaf.NotBefore.Add(jitteryDuration(totalDuration))
m.logf("%s: Certificate expiration is %v, rotation deadline is %v", m.name, notAfter, deadline)
return deadline
}
// jitteryDuration uses some jitter to set the rotation threshold so each node
// will rotate at approximately 70-90% of the total lifetime of the
// certificate. With jitter, if a number of nodes are added to a cluster at
// approximately the same time (such as cluster creation time), they won't all
// try to rotate certificates at the same time for the rest of the life of the
// cluster.
//
// This function is represented as a variable to allow replacement during testing.
var jitteryDuration = func(totalDuration float64) time.Duration {
return wait.Jitter(time.Duration(totalDuration), 0.2) - time.Duration(totalDuration*0.3)
}
// updateCached sets the most recent retrieved cert and returns the old cert.
// It also sets the server as assumed healthy.
func (m *manager) updateCached(cert *tls.Certificate) *tls.Certificate {
m.certAccessLock.Lock()
defer m.certAccessLock.Unlock()
m.serverHealth = true
old := m.cert
m.cert = cert
return old
}
// updateServerError takes an error returned by the server and infers
// the health of the server based on the error. It will return nil if
// the error does not require immediate termination of any wait loops,
// and otherwise it will return the error.
func (m *manager) updateServerError(err error) error {
m.certAccessLock.Lock()
defer m.certAccessLock.Unlock()
switch {
case apierrors.IsUnauthorized(err):
// SSL terminating proxies may report this error instead of the master
m.serverHealth = true
case apierrors.IsUnexpectedServerError(err):
// generally indicates a proxy or other load balancer problem, rather than a problem coming
// from the master
m.serverHealth = false
default:
// Identify known errors that could be expected for a cert request that
// indicate everything is working normally
m.serverHealth = apierrors.IsNotFound(err) || apierrors.IsForbidden(err)
}
return nil
}
func (m *manager) generateCSR() (template *x509.CertificateRequest, csrPEM []byte, keyPEM []byte, key interface{}, err error) {
// Generate a new private key.
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("%s: unable to generate a new private key: %v", m.name, err)
}
der, err := x509.MarshalECPrivateKey(privateKey)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("%s: unable to marshal the new key to DER: %v", m.name, err)
}
keyPEM = pem.EncodeToMemory(&pem.Block{Type: keyutil.ECPrivateKeyBlockType, Bytes: der})
template = m.getTemplate()
if template == nil {
return nil, nil, nil, nil, fmt.Errorf("%s: unable to create a csr, no template available", m.name)
}
csrPEM, err = cert.MakeCSRFromTemplate(privateKey, template)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("%s: unable to create a csr from the private key: %v", m.name, err)
}
return template, csrPEM, keyPEM, privateKey, nil
}
func (m *manager) getLastRequest() (context.CancelFunc, *x509.CertificateRequest) {
m.lastRequestLock.Lock()
defer m.lastRequestLock.Unlock()
return m.lastRequestCancel, m.lastRequest
}
func (m *manager) setLastRequest(cancel context.CancelFunc, r *x509.CertificateRequest) {
m.lastRequestLock.Lock()
defer m.lastRequestLock.Unlock()
m.lastRequestCancel = cancel
m.lastRequest = r
}
func hasKeyUsage(usages []certificates.KeyUsage, usage certificates.KeyUsage) bool {
for _, u := range usages {
if u == usage {
return true
}
}
return false
}

View File

@@ -0,0 +1,318 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package certificate
import (
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"os"
"path/filepath"
"time"
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog/v2"
)
const (
keyExtension = ".key"
certExtension = ".crt"
pemExtension = ".pem"
currentPair = "current"
updatedPair = "updated"
)
type fileStore struct {
pairNamePrefix string
certDirectory string
keyDirectory string
certFile string
keyFile string
}
// FileStore is a store that provides certificate retrieval as well as
// the path on disk of the current PEM.
type FileStore interface {
Store
// CurrentPath returns the path on disk of the current certificate/key
// pair encoded as PEM files.
CurrentPath() string
}
// NewFileStore returns a concrete implementation of a Store that is based on
// storing the cert/key pairs in a single file per pair on disk in the
// designated directory. When starting up it will look for the currently
// selected cert/key pair in:
//
// 1. ${certDirectory}/${pairNamePrefix}-current.pem - both cert and key are in the same file.
// 2. ${certFile}, ${keyFile}
// 3. ${certDirectory}/${pairNamePrefix}.crt, ${keyDirectory}/${pairNamePrefix}.key
//
// The first one found will be used. If rotation is enabled, future cert/key
// updates will be written to the ${certDirectory} directory and
// ${certDirectory}/${pairNamePrefix}-current.pem will be created as a soft
// link to the currently selected cert/key pair.
func NewFileStore(
pairNamePrefix string,
certDirectory string,
keyDirectory string,
certFile string,
keyFile string) (FileStore, error) {
s := fileStore{
pairNamePrefix: pairNamePrefix,
certDirectory: certDirectory,
keyDirectory: keyDirectory,
certFile: certFile,
keyFile: keyFile,
}
if err := s.recover(); err != nil {
return nil, err
}
return &s, nil
}
// CurrentPath returns the path to the current version of these certificates.
func (s *fileStore) CurrentPath() string {
return filepath.Join(s.certDirectory, s.filename(currentPair))
}
// recover checks if there is a certificate rotation that was interrupted while
// progress, and if so, attempts to recover to a good state.
func (s *fileStore) recover() error {
// If the 'current' file doesn't exist, continue on with the recovery process.
currentPath := filepath.Join(s.certDirectory, s.filename(currentPair))
if exists, err := fileExists(currentPath); err != nil {
return err
} else if exists {
return nil
}
// If the 'updated' file exists, and it is a symbolic link, continue on
// with the recovery process.
updatedPath := filepath.Join(s.certDirectory, s.filename(updatedPair))
if fi, err := os.Lstat(updatedPath); err != nil {
if os.IsNotExist(err) {
return nil
}
return err
} else if fi.Mode()&os.ModeSymlink != os.ModeSymlink {
return fmt.Errorf("expected %q to be a symlink but it is a file", updatedPath)
}
// Move the 'updated' symlink to 'current'.
if err := os.Rename(updatedPath, currentPath); err != nil {
return fmt.Errorf("unable to rename %q to %q: %v", updatedPath, currentPath, err)
}
return nil
}
func (s *fileStore) Current() (*tls.Certificate, error) {
pairFile := filepath.Join(s.certDirectory, s.filename(currentPair))
if pairFileExists, err := fileExists(pairFile); err != nil {
return nil, err
} else if pairFileExists {
klog.Infof("Loading cert/key pair from %q.", pairFile)
return loadFile(pairFile)
}
certFileExists, err := fileExists(s.certFile)
if err != nil {
return nil, err
}
keyFileExists, err := fileExists(s.keyFile)
if err != nil {
return nil, err
}
if certFileExists && keyFileExists {
klog.Infof("Loading cert/key pair from (%q, %q).", s.certFile, s.keyFile)
return loadX509KeyPair(s.certFile, s.keyFile)
}
c := filepath.Join(s.certDirectory, s.pairNamePrefix+certExtension)
k := filepath.Join(s.keyDirectory, s.pairNamePrefix+keyExtension)
certFileExists, err = fileExists(c)
if err != nil {
return nil, err
}
keyFileExists, err = fileExists(k)
if err != nil {
return nil, err
}
if certFileExists && keyFileExists {
klog.Infof("Loading cert/key pair from (%q, %q).", c, k)
return loadX509KeyPair(c, k)
}
noKeyErr := NoCertKeyError(
fmt.Sprintf("no cert/key files read at %q, (%q, %q) or (%q, %q)",
pairFile,
s.certFile,
s.keyFile,
s.certDirectory,
s.keyDirectory))
return nil, &noKeyErr
}
func loadFile(pairFile string) (*tls.Certificate, error) {
// LoadX509KeyPair knows how to parse combined cert and private key from
// the same file.
cert, err := tls.LoadX509KeyPair(pairFile, pairFile)
if err != nil {
return nil, fmt.Errorf("could not convert data from %q into cert/key pair: %v", pairFile, err)
}
certs, err := x509.ParseCertificates(cert.Certificate[0])
if err != nil {
return nil, fmt.Errorf("unable to parse certificate data: %v", err)
}
cert.Leaf = certs[0]
return &cert, nil
}
func (s *fileStore) Update(certData, keyData []byte) (*tls.Certificate, error) {
ts := time.Now().Format("2006-01-02-15-04-05")
pemFilename := s.filename(ts)
if err := os.MkdirAll(s.certDirectory, 0755); err != nil {
return nil, fmt.Errorf("could not create directory %q to store certificates: %v", s.certDirectory, err)
}
certPath := filepath.Join(s.certDirectory, pemFilename)
f, err := os.OpenFile(certPath, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0600)
if err != nil {
return nil, fmt.Errorf("could not open %q: %v", certPath, err)
}
defer f.Close()
// First cert is leaf, remainder are intermediates
certs, err := certutil.ParseCertsPEM(certData)
if err != nil {
return nil, fmt.Errorf("invalid certificate data: %v", err)
}
for _, c := range certs {
pem.Encode(f, &pem.Block{Type: "CERTIFICATE", Bytes: c.Raw})
}
keyBlock, _ := pem.Decode(keyData)
if keyBlock == nil {
return nil, fmt.Errorf("invalid key data")
}
pem.Encode(f, keyBlock)
cert, err := loadFile(certPath)
if err != nil {
return nil, err
}
if err := s.updateSymlink(certPath); err != nil {
return nil, err
}
return cert, nil
}
// updateSymLink updates the current symlink to point to the file that is
// passed it. It will fail if there is a non-symlink file exists where the
// symlink is expected to be.
func (s *fileStore) updateSymlink(filename string) error {
// If the 'current' file either doesn't exist, or is already a symlink,
// proceed. Otherwise, this is an unrecoverable error.
currentPath := filepath.Join(s.certDirectory, s.filename(currentPair))
currentPathExists := false
if fi, err := os.Lstat(currentPath); err != nil {
if !os.IsNotExist(err) {
return err
}
} else if fi.Mode()&os.ModeSymlink != os.ModeSymlink {
return fmt.Errorf("expected %q to be a symlink but it is a file", currentPath)
} else {
currentPathExists = true
}
// If the 'updated' file doesn't exist, proceed. If it exists but it is a
// symlink, delete it. Otherwise, this is an unrecoverable error.
updatedPath := filepath.Join(s.certDirectory, s.filename(updatedPair))
if fi, err := os.Lstat(updatedPath); err != nil {
if !os.IsNotExist(err) {
return err
}
} else if fi.Mode()&os.ModeSymlink != os.ModeSymlink {
return fmt.Errorf("expected %q to be a symlink but it is a file", updatedPath)
} else {
if err := os.Remove(updatedPath); err != nil {
return fmt.Errorf("unable to remove %q: %v", updatedPath, err)
}
}
// Check that the new cert/key pair file exists to avoid rotating to an
// invalid cert/key.
if filenameExists, err := fileExists(filename); err != nil {
return err
} else if !filenameExists {
return fmt.Errorf("file %q does not exist so it can not be used as the currently selected cert/key", filename)
}
// Ensure the source path is absolute to ensure the symlink target is
// correct when certDirectory is a relative path.
filename, err := filepath.Abs(filename)
if err != nil {
return err
}
// Create the 'updated' symlink pointing to the requested file name.
if err := os.Symlink(filename, updatedPath); err != nil {
return fmt.Errorf("unable to create a symlink from %q to %q: %v", updatedPath, filename, err)
}
// Replace the 'current' symlink.
if currentPathExists {
if err := os.Remove(currentPath); err != nil {
return fmt.Errorf("unable to remove %q: %v", currentPath, err)
}
}
if err := os.Rename(updatedPath, currentPath); err != nil {
return fmt.Errorf("unable to rename %q to %q: %v", updatedPath, currentPath, err)
}
return nil
}
func (s *fileStore) filename(qualifier string) string {
return s.pairNamePrefix + "-" + qualifier + pemExtension
}
func loadX509KeyPair(certFile, keyFile string) (*tls.Certificate, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
certs, err := x509.ParseCertificates(cert.Certificate[0])
if err != nil {
return nil, fmt.Errorf("unable to parse certificate data: %v", err)
}
cert.Leaf = certs[0]
return &cert, nil
}
// FileExists checks if specified file exists.
func fileExists(filename string) (bool, error) {
if _, err := os.Stat(filename); os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}

364
vendor/k8s.io/client-go/util/certificate/csr/csr.go generated vendored Normal file
View File

@@ -0,0 +1,364 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csr
import (
"context"
"crypto"
"crypto/x509"
"encoding/pem"
"fmt"
"reflect"
"time"
certificatesv1 "k8s.io/api/certificates/v1"
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
)
// RequestCertificate will either use an existing (if this process has run
// before but not to completion) or create a certificate signing request using the
// PEM encoded CSR and send it to API server. An optional requestedDuration may be passed
// to set the spec.expirationSeconds field on the CSR to control the lifetime of the issued
// certificate. This is not guaranteed as the signer may choose to ignore the request.
func RequestCertificate(client clientset.Interface, csrData []byte, name, signerName string, requestedDuration *time.Duration, usages []certificatesv1.KeyUsage, privateKey interface{}) (reqName string, reqUID types.UID, err error) {
csr := &certificatesv1.CertificateSigningRequest{
// Username, UID, Groups will be injected by API server.
TypeMeta: metav1.TypeMeta{Kind: "CertificateSigningRequest"},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: certificatesv1.CertificateSigningRequestSpec{
Request: csrData,
Usages: usages,
SignerName: signerName,
},
}
if len(csr.Name) == 0 {
csr.GenerateName = "csr-"
}
if requestedDuration != nil {
csr.Spec.ExpirationSeconds = DurationToExpirationSeconds(*requestedDuration)
}
reqName, reqUID, err = create(client, csr)
switch {
case err == nil:
return reqName, reqUID, err
case apierrors.IsAlreadyExists(err) && len(name) > 0:
klog.Infof("csr for this node already exists, reusing")
req, err := get(client, name)
if err != nil {
return "", "", formatError("cannot retrieve certificate signing request: %v", err)
}
if err := ensureCompatible(req, csr, privateKey); err != nil {
return "", "", fmt.Errorf("retrieved csr is not compatible: %v", err)
}
klog.Infof("csr for this node is still valid")
return req.Name, req.UID, nil
default:
return "", "", formatError("cannot create certificate signing request: %v", err)
}
}
func DurationToExpirationSeconds(duration time.Duration) *int32 {
return pointer.Int32(int32(duration / time.Second))
}
func ExpirationSecondsToDuration(expirationSeconds int32) time.Duration {
return time.Duration(expirationSeconds) * time.Second
}
func get(client clientset.Interface, name string) (*certificatesv1.CertificateSigningRequest, error) {
v1req, v1err := client.CertificatesV1().CertificateSigningRequests().Get(context.TODO(), name, metav1.GetOptions{})
if v1err == nil || !apierrors.IsNotFound(v1err) {
return v1req, v1err
}
v1beta1req, v1beta1err := client.CertificatesV1beta1().CertificateSigningRequests().Get(context.TODO(), name, metav1.GetOptions{})
if v1beta1err != nil {
return nil, v1beta1err
}
v1req = &certificatesv1.CertificateSigningRequest{
ObjectMeta: v1beta1req.ObjectMeta,
Spec: certificatesv1.CertificateSigningRequestSpec{
Request: v1beta1req.Spec.Request,
},
}
if v1beta1req.Spec.SignerName != nil {
v1req.Spec.SignerName = *v1beta1req.Spec.SignerName
}
for _, usage := range v1beta1req.Spec.Usages {
v1req.Spec.Usages = append(v1req.Spec.Usages, certificatesv1.KeyUsage(usage))
}
return v1req, nil
}
func create(client clientset.Interface, csr *certificatesv1.CertificateSigningRequest) (reqName string, reqUID types.UID, err error) {
// only attempt a create via v1 if we specified signerName and usages and are not using the legacy unknown signerName
if len(csr.Spec.Usages) > 0 && len(csr.Spec.SignerName) > 0 && csr.Spec.SignerName != "kubernetes.io/legacy-unknown" {
v1req, v1err := client.CertificatesV1().CertificateSigningRequests().Create(context.TODO(), csr, metav1.CreateOptions{})
switch {
case v1err != nil && apierrors.IsNotFound(v1err):
// v1 CSR API was not found, continue to try v1beta1
case v1err != nil:
// other creation error
return "", "", v1err
default:
// success
return v1req.Name, v1req.UID, v1err
}
}
// convert relevant bits to v1beta1
v1beta1csr := &certificatesv1beta1.CertificateSigningRequest{
ObjectMeta: csr.ObjectMeta,
Spec: certificatesv1beta1.CertificateSigningRequestSpec{
SignerName: &csr.Spec.SignerName,
Request: csr.Spec.Request,
},
}
for _, usage := range csr.Spec.Usages {
v1beta1csr.Spec.Usages = append(v1beta1csr.Spec.Usages, certificatesv1beta1.KeyUsage(usage))
}
// create v1beta1
v1beta1req, v1beta1err := client.CertificatesV1beta1().CertificateSigningRequests().Create(context.TODO(), v1beta1csr, metav1.CreateOptions{})
if v1beta1err != nil {
return "", "", v1beta1err
}
return v1beta1req.Name, v1beta1req.UID, nil
}
// WaitForCertificate waits for a certificate to be issued until timeout, or returns an error.
func WaitForCertificate(ctx context.Context, client clientset.Interface, reqName string, reqUID types.UID) (certData []byte, err error) {
fieldSelector := fields.OneTermEqualSelector("metadata.name", reqName).String()
var lw *cache.ListWatch
var obj runtime.Object
for {
// see if the v1 API is available
if _, err := client.CertificatesV1().CertificateSigningRequests().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}); err == nil {
// watch v1 objects
obj = &certificatesv1.CertificateSigningRequest{}
lw = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return client.CertificatesV1().CertificateSigningRequests().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return client.CertificatesV1().CertificateSigningRequests().Watch(ctx, options)
},
}
break
} else {
klog.V(2).Infof("error fetching v1 certificate signing request: %v", err)
}
// return if we've timed out
if err := ctx.Err(); err != nil {
return nil, wait.ErrWaitTimeout
}
// see if the v1beta1 API is available
if _, err := client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, metav1.ListOptions{FieldSelector: fieldSelector}); err == nil {
// watch v1beta1 objects
obj = &certificatesv1beta1.CertificateSigningRequest{}
lw = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return client.CertificatesV1beta1().CertificateSigningRequests().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return client.CertificatesV1beta1().CertificateSigningRequests().Watch(ctx, options)
},
}
break
} else {
klog.V(2).Infof("error fetching v1beta1 certificate signing request: %v", err)
}
// return if we've timed out
if err := ctx.Err(); err != nil {
return nil, wait.ErrWaitTimeout
}
// wait and try again
time.Sleep(time.Second)
}
var issuedCertificate []byte
_, err = watchtools.UntilWithSync(
ctx,
lw,
obj,
nil,
func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Modified, watch.Added:
case watch.Deleted:
return false, fmt.Errorf("csr %q was deleted", reqName)
default:
return false, nil
}
switch csr := event.Object.(type) {
case *certificatesv1.CertificateSigningRequest:
if csr.UID != reqUID {
return false, fmt.Errorf("csr %q changed UIDs", csr.Name)
}
approved := false
for _, c := range csr.Status.Conditions {
if c.Type == certificatesv1.CertificateDenied {
return false, fmt.Errorf("certificate signing request is denied, reason: %v, message: %v", c.Reason, c.Message)
}
if c.Type == certificatesv1.CertificateFailed {
return false, fmt.Errorf("certificate signing request failed, reason: %v, message: %v", c.Reason, c.Message)
}
if c.Type == certificatesv1.CertificateApproved {
approved = true
}
}
if approved {
if len(csr.Status.Certificate) > 0 {
klog.V(2).Infof("certificate signing request %s is issued", csr.Name)
issuedCertificate = csr.Status.Certificate
return true, nil
}
klog.V(2).Infof("certificate signing request %s is approved, waiting to be issued", csr.Name)
}
case *certificatesv1beta1.CertificateSigningRequest:
if csr.UID != reqUID {
return false, fmt.Errorf("csr %q changed UIDs", csr.Name)
}
approved := false
for _, c := range csr.Status.Conditions {
if c.Type == certificatesv1beta1.CertificateDenied {
return false, fmt.Errorf("certificate signing request is denied, reason: %v, message: %v", c.Reason, c.Message)
}
if c.Type == certificatesv1beta1.CertificateFailed {
return false, fmt.Errorf("certificate signing request failed, reason: %v, message: %v", c.Reason, c.Message)
}
if c.Type == certificatesv1beta1.CertificateApproved {
approved = true
}
}
if approved {
if len(csr.Status.Certificate) > 0 {
klog.V(2).Infof("certificate signing request %s is issued", csr.Name)
issuedCertificate = csr.Status.Certificate
return true, nil
}
klog.V(2).Infof("certificate signing request %s is approved, waiting to be issued", csr.Name)
}
default:
return false, fmt.Errorf("unexpected type received: %T", event.Object)
}
return false, nil
},
)
if err == wait.ErrWaitTimeout {
return nil, wait.ErrWaitTimeout
}
if err != nil {
return nil, formatError("cannot watch on the certificate signing request: %v", err)
}
return issuedCertificate, nil
}
// ensureCompatible ensures that a CSR object is compatible with an original CSR
func ensureCompatible(new, orig *certificatesv1.CertificateSigningRequest, privateKey interface{}) error {
newCSR, err := parseCSR(new.Spec.Request)
if err != nil {
return fmt.Errorf("unable to parse new csr: %v", err)
}
origCSR, err := parseCSR(orig.Spec.Request)
if err != nil {
return fmt.Errorf("unable to parse original csr: %v", err)
}
if !reflect.DeepEqual(newCSR.Subject, origCSR.Subject) {
return fmt.Errorf("csr subjects differ: new: %#v, orig: %#v", newCSR.Subject, origCSR.Subject)
}
if len(new.Spec.SignerName) > 0 && len(orig.Spec.SignerName) > 0 && new.Spec.SignerName != orig.Spec.SignerName {
return fmt.Errorf("csr signerNames differ: new %q, orig: %q", new.Spec.SignerName, orig.Spec.SignerName)
}
signer, ok := privateKey.(crypto.Signer)
if !ok {
return fmt.Errorf("privateKey is not a signer")
}
newCSR.PublicKey = signer.Public()
if err := newCSR.CheckSignature(); err != nil {
return fmt.Errorf("error validating signature new CSR against old key: %v", err)
}
if len(new.Status.Certificate) > 0 {
certs, err := certutil.ParseCertsPEM(new.Status.Certificate)
if err != nil {
return fmt.Errorf("error parsing signed certificate for CSR: %v", err)
}
now := time.Now()
for _, cert := range certs {
if now.After(cert.NotAfter) {
return fmt.Errorf("one of the certificates for the CSR has expired: %s", cert.NotAfter)
}
}
}
return nil
}
// formatError preserves the type of an API message but alters the message. Expects
// a single argument format string, and returns the wrapped error.
func formatError(format string, err error) error {
if s, ok := err.(apierrors.APIStatus); ok {
se := &apierrors.StatusError{ErrStatus: s.Status()}
se.ErrStatus.Message = fmt.Sprintf(format, se.ErrStatus.Message)
return se
}
return fmt.Errorf(format, err)
}
// parseCSR extracts the CSR from the API object and decodes it.
func parseCSR(pemData []byte) (*x509.CertificateRequest, error) {
// extract PEM from request object
block, _ := pem.Decode(pemData)
if block == nil || block.Type != "CERTIFICATE REQUEST" {
return nil, fmt.Errorf("PEM block type must be CERTIFICATE REQUEST")
}
return x509.ParseCertificateRequest(block.Bytes)
}

5
vendor/modules.txt vendored
View File

@@ -94,8 +94,6 @@ github.com/google/pprof/profile
# github.com/google/uuid v1.3.0
## explicit
github.com/google/uuid
# github.com/googleapis/gnostic v0.5.5
## explicit; go 1.12
# github.com/imdario/mergo v0.3.11
## explicit; go 1.13
github.com/imdario/mergo
@@ -755,8 +753,11 @@ k8s.io/client-go/tools/pager
k8s.io/client-go/tools/record
k8s.io/client-go/tools/record/util
k8s.io/client-go/tools/reference
k8s.io/client-go/tools/watch
k8s.io/client-go/transport
k8s.io/client-go/util/cert
k8s.io/client-go/util/certificate
k8s.io/client-go/util/certificate/csr
k8s.io/client-go/util/connrotation
k8s.io/client-go/util/flowcontrol
k8s.io/client-go/util/homedir