multus-cni/pkg/k8sclient/kubeconfig.go
Doug Smith ba18cf5ab3
Merge pull request #1214 from s1061123/add-netdef-informer
Add net-attach-def informer for thick plugin
2024-02-15 09:40:57 -05:00

248 lines
8.6 KiB
Go

// Copyright (c) 2023 Multus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package k8sclient
import (
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"fmt"
"os"
"path"
"strings"
"time"
certificatesv1 "k8s.io/api/certificates/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/certificate"
"k8s.io/klog"
netclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
)
const (
certNamePrefix = "multus-client"
certCommonNamePrefix = "system:multus"
certOrganization = "system:multus"
)
var (
certUsages = []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageClientAuth}
)
// getPerNodeKubeconfig creates new kubeConfig, based on bootstrap, with new certDir
func getPerNodeKubeconfig(bootstrap *rest.Config, certDir string) *rest.Config {
return &rest.Config{
Host: bootstrap.Host,
APIPath: bootstrap.APIPath,
ContentConfig: rest.ContentConfig{
AcceptContentTypes: "application/vnd.kubernetes.protobuf,application/json",
ContentType: "application/vnd.kubernetes.protobuf",
},
TLSClientConfig: rest.TLSClientConfig{
KeyFile: path.Join(certDir, certNamePrefix+"-current.pem"),
CertFile: path.Join(certDir, certNamePrefix+"-current.pem"),
CAData: bootstrap.TLSClientConfig.CAData,
},
// Allow multus (especially in server mode) to make more concurrent requests
// to reduce client-side throttling
QPS: 50,
Burst: 50,
// Set the config timeout to one minute.
Timeout: time.Minute,
}
}
// PerNodeK8sClient creates/reload new multus kubeconfig per-node.
func PerNodeK8sClient(nodeName, bootstrapKubeconfigFile string, certDuration time.Duration, certDir string) (*ClientInfo, error) {
bootstrapKubeconfig, err := clientcmd.BuildConfigFromFlags("", bootstrapKubeconfigFile)
if err != nil {
return nil, logging.Errorf("failed to load bootstrap kubeconfig %s: %v", bootstrapKubeconfigFile, err)
}
config := getPerNodeKubeconfig(bootstrapKubeconfig, certDir)
// If we have a valid certificate, user that to fetch CSRs.
// Otherwise, use the bootstrap credentials from bootstrapKubeconfig
// https://github.com/kubernetes/kubernetes/blob/068ee321bc7bfe1c2cefb87fb4d9e5deea84fbc8/cmd/kubelet/app/server.go#L953-L963
newClientsetFn := func(current *tls.Certificate) (kubernetes.Interface, error) {
cfg := bootstrapKubeconfig
// validate the kubeconfig
tempClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
logging.Errorf("failed to read kubeconfig from cert manager: %v", err)
} else {
_, err := tempClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
// tls unknown authority error is unrecoverable error with retry
if err != nil {
if strings.Contains(err.Error(), "x509: certificate signed by unknown authority") {
logging.Verbosef("cert mgr gets invalid config. rebuild from bootstrap kubeconfig")
// reload and use bootstrapKubeconfig again
newBootstrapKubeconfig, _ := clientcmd.BuildConfigFromFlags("", bootstrapKubeconfigFile)
cfg = newBootstrapKubeconfig
} else {
logging.Errorf("failed to list pods with new certs: %v", err)
}
}
if current != nil {
cfg = config
}
}
return kubernetes.NewForConfig(cfg)
}
certificateStore, err := certificate.NewFileStore(certNamePrefix, certDir, certDir, "", "")
if err != nil {
return nil, logging.Errorf("failed to initialize the certificate store: %v", err)
}
certManager, err := certificate.NewManager(&certificate.Config{
ClientsetFn: newClientsetFn,
Template: &x509.CertificateRequest{
Subject: pkix.Name{
CommonName: fmt.Sprintf("%s:%s", certCommonNamePrefix, nodeName),
Organization: []string{certOrganization},
},
},
RequestedCertificateLifetime: &certDuration,
SignerName: certificatesv1.KubeAPIServerClientSignerName,
Usages: certUsages,
CertificateStore: certificateStore,
})
if err != nil {
return nil, logging.Errorf("failed to initialize the certificate manager: %v", err)
}
if certDuration < time.Hour {
// the default value for CertCallbackRefreshDuration (5min) is too long for short-lived certs,
// set it to a more sensible value
transport.CertCallbackRefreshDuration = time.Second * 10
}
certManager.Start()
logging.Verbosef("Waiting for certificate")
var storeErr error
err = wait.PollWithContext(context.TODO(), time.Second, 2*time.Minute, func(_ context.Context) (bool, error) {
var currentCert *tls.Certificate
currentCert, storeErr = certificateStore.Current()
return currentCert != nil && storeErr == nil, nil
})
if err != nil {
return nil, logging.Errorf("certificate was not signed, last cert store err: %v err: %v", storeErr, err)
}
logging.Verbosef("Certificate found!")
return newClientInfo(config)
}
// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to
// the k8s API.
func InClusterK8sClient() (*ClientInfo, error) {
clientInfo, err := GetK8sClient("", nil)
if err != nil {
return nil, err
}
if clientInfo == nil {
return nil, fmt.Errorf("failed to create in-cluster kube client")
}
return clientInfo, err
}
// SetK8sClientInformers adds informer structure to ClientInfo to utilize in thick daemon
func (c *ClientInfo) SetK8sClientInformers(podInformer, netDefInformer cache.SharedIndexInformer) {
c.PodInformer = podInformer
c.NetDefInformer = netDefInformer
}
// GetK8sClient gets client info from kubeconfig
func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) {
logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient)
// If we get a valid kubeClient (eg from testcases) just return that
// one.
if kubeClient != nil {
return kubeClient, nil
}
var err error
var config *rest.Config
// Otherwise try to create a kubeClient from a given kubeConfig
if kubeconfig != "" {
// uses the current context in kubeconfig
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v: %v", kubeconfig, err)
}
} else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
// Try in-cluster config where multus might be running in a kubernetes pod
config, err = rest.InClusterConfig()
if err != nil {
return nil, logging.Errorf("GetK8sClient: failed to get context for in-cluster kube config: %v", err)
}
} else {
// No kubernetes config; assume we shouldn't talk to Kube at all
return nil, nil
}
// Specify that we use gRPC
config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
config.ContentType = "application/vnd.kubernetes.protobuf"
// Set the config timeout to one minute.
config.Timeout = time.Minute
// Allow multus (especially in server mode) to make more concurrent requests
// to reduce client-side throttling
config.QPS = 50
config.Burst = 50
return newClientInfo(config)
}
// newClientInfo returns a `ClientInfo` from a configuration created from an
// existing kubeconfig file.
func newClientInfo(config *rest.Config) (*ClientInfo, error) {
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
netclient, err := netclient.NewForConfig(config)
if err != nil {
return nil, err
}
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"})
return &ClientInfo{
Client: client,
NetClient: netclient,
EventBroadcaster: broadcaster,
EventRecorder: recorder,
}, nil
}