mirror of
https://github.com/k8snetworkplumbingwg/multus-cni.git
synced 2025-06-21 21:37:05 +00:00
248 lines
8.6 KiB
Go
248 lines
8.6 KiB
Go
// Copyright (c) 2023 Multus Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package k8sclient
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"crypto/x509/pkix"
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
certificatesv1 "k8s.io/api/certificates/v1"
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/transport"
|
|
"k8s.io/client-go/util/certificate"
|
|
"k8s.io/klog"
|
|
|
|
netclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"
|
|
"gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging"
|
|
)
|
|
|
|
const (
|
|
certNamePrefix = "multus-client"
|
|
certCommonNamePrefix = "system:multus"
|
|
certOrganization = "system:multus"
|
|
)
|
|
|
|
var (
|
|
certUsages = []certificatesv1.KeyUsage{certificatesv1.UsageDigitalSignature, certificatesv1.UsageClientAuth}
|
|
)
|
|
|
|
// getPerNodeKubeconfig creates new kubeConfig, based on bootstrap, with new certDir
|
|
func getPerNodeKubeconfig(bootstrap *rest.Config, certDir string) *rest.Config {
|
|
return &rest.Config{
|
|
Host: bootstrap.Host,
|
|
APIPath: bootstrap.APIPath,
|
|
ContentConfig: rest.ContentConfig{
|
|
AcceptContentTypes: "application/vnd.kubernetes.protobuf,application/json",
|
|
ContentType: "application/vnd.kubernetes.protobuf",
|
|
},
|
|
TLSClientConfig: rest.TLSClientConfig{
|
|
KeyFile: path.Join(certDir, certNamePrefix+"-current.pem"),
|
|
CertFile: path.Join(certDir, certNamePrefix+"-current.pem"),
|
|
CAData: bootstrap.TLSClientConfig.CAData,
|
|
},
|
|
// Allow multus (especially in server mode) to make more concurrent requests
|
|
// to reduce client-side throttling
|
|
QPS: 50,
|
|
Burst: 50,
|
|
// Set the config timeout to one minute.
|
|
Timeout: time.Minute,
|
|
}
|
|
}
|
|
|
|
// PerNodeK8sClient creates/reload new multus kubeconfig per-node.
|
|
func PerNodeK8sClient(nodeName, bootstrapKubeconfigFile string, certDuration time.Duration, certDir string) (*ClientInfo, error) {
|
|
bootstrapKubeconfig, err := clientcmd.BuildConfigFromFlags("", bootstrapKubeconfigFile)
|
|
if err != nil {
|
|
return nil, logging.Errorf("failed to load bootstrap kubeconfig %s: %v", bootstrapKubeconfigFile, err)
|
|
}
|
|
config := getPerNodeKubeconfig(bootstrapKubeconfig, certDir)
|
|
|
|
// If we have a valid certificate, user that to fetch CSRs.
|
|
// Otherwise, use the bootstrap credentials from bootstrapKubeconfig
|
|
// https://github.com/kubernetes/kubernetes/blob/068ee321bc7bfe1c2cefb87fb4d9e5deea84fbc8/cmd/kubelet/app/server.go#L953-L963
|
|
newClientsetFn := func(current *tls.Certificate) (kubernetes.Interface, error) {
|
|
cfg := bootstrapKubeconfig
|
|
|
|
// validate the kubeconfig
|
|
tempClient, err := kubernetes.NewForConfig(cfg)
|
|
if err != nil {
|
|
logging.Errorf("failed to read kubeconfig from cert manager: %v", err)
|
|
} else {
|
|
_, err := tempClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
|
|
// tls unknown authority error is unrecoverable error with retry
|
|
if err != nil {
|
|
if strings.Contains(err.Error(), "x509: certificate signed by unknown authority") {
|
|
logging.Verbosef("cert mgr gets invalid config. rebuild from bootstrap kubeconfig")
|
|
// reload and use bootstrapKubeconfig again
|
|
newBootstrapKubeconfig, _ := clientcmd.BuildConfigFromFlags("", bootstrapKubeconfigFile)
|
|
cfg = newBootstrapKubeconfig
|
|
} else {
|
|
logging.Errorf("failed to list pods with new certs: %v", err)
|
|
}
|
|
}
|
|
|
|
if current != nil {
|
|
cfg = config
|
|
}
|
|
}
|
|
return kubernetes.NewForConfig(cfg)
|
|
}
|
|
|
|
certificateStore, err := certificate.NewFileStore(certNamePrefix, certDir, certDir, "", "")
|
|
if err != nil {
|
|
return nil, logging.Errorf("failed to initialize the certificate store: %v", err)
|
|
}
|
|
|
|
certManager, err := certificate.NewManager(&certificate.Config{
|
|
ClientsetFn: newClientsetFn,
|
|
Template: &x509.CertificateRequest{
|
|
Subject: pkix.Name{
|
|
CommonName: fmt.Sprintf("%s:%s", certCommonNamePrefix, nodeName),
|
|
Organization: []string{certOrganization},
|
|
},
|
|
},
|
|
RequestedCertificateLifetime: &certDuration,
|
|
SignerName: certificatesv1.KubeAPIServerClientSignerName,
|
|
Usages: certUsages,
|
|
CertificateStore: certificateStore,
|
|
})
|
|
if err != nil {
|
|
return nil, logging.Errorf("failed to initialize the certificate manager: %v", err)
|
|
}
|
|
if certDuration < time.Hour {
|
|
// the default value for CertCallbackRefreshDuration (5min) is too long for short-lived certs,
|
|
// set it to a more sensible value
|
|
transport.CertCallbackRefreshDuration = time.Second * 10
|
|
}
|
|
certManager.Start()
|
|
|
|
logging.Verbosef("Waiting for certificate")
|
|
var storeErr error
|
|
err = wait.PollWithContext(context.TODO(), time.Second, 2*time.Minute, func(_ context.Context) (bool, error) {
|
|
var currentCert *tls.Certificate
|
|
currentCert, storeErr = certificateStore.Current()
|
|
return currentCert != nil && storeErr == nil, nil
|
|
})
|
|
if err != nil {
|
|
return nil, logging.Errorf("certificate was not signed, last cert store err: %v err: %v", storeErr, err)
|
|
}
|
|
logging.Verbosef("Certificate found!")
|
|
|
|
return newClientInfo(config)
|
|
}
|
|
|
|
// InClusterK8sClient returns the `k8s.ClientInfo` struct to use to connect to
|
|
// the k8s API.
|
|
func InClusterK8sClient() (*ClientInfo, error) {
|
|
clientInfo, err := GetK8sClient("", nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if clientInfo == nil {
|
|
return nil, fmt.Errorf("failed to create in-cluster kube client")
|
|
}
|
|
return clientInfo, err
|
|
}
|
|
|
|
// SetK8sClientInformers adds informer structure to ClientInfo to utilize in thick daemon
|
|
func (c *ClientInfo) SetK8sClientInformers(podInformer, netDefInformer cache.SharedIndexInformer) {
|
|
c.PodInformer = podInformer
|
|
c.NetDefInformer = netDefInformer
|
|
}
|
|
|
|
// GetK8sClient gets client info from kubeconfig
|
|
func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) {
|
|
logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient)
|
|
// If we get a valid kubeClient (eg from testcases) just return that
|
|
// one.
|
|
if kubeClient != nil {
|
|
return kubeClient, nil
|
|
}
|
|
|
|
var err error
|
|
var config *rest.Config
|
|
|
|
// Otherwise try to create a kubeClient from a given kubeConfig
|
|
if kubeconfig != "" {
|
|
// uses the current context in kubeconfig
|
|
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
|
|
if err != nil {
|
|
return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v: %v", kubeconfig, err)
|
|
}
|
|
} else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
|
|
// Try in-cluster config where multus might be running in a kubernetes pod
|
|
config, err = rest.InClusterConfig()
|
|
if err != nil {
|
|
return nil, logging.Errorf("GetK8sClient: failed to get context for in-cluster kube config: %v", err)
|
|
}
|
|
} else {
|
|
// No kubernetes config; assume we shouldn't talk to Kube at all
|
|
return nil, nil
|
|
}
|
|
|
|
// Specify that we use gRPC
|
|
config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
|
|
config.ContentType = "application/vnd.kubernetes.protobuf"
|
|
// Set the config timeout to one minute.
|
|
config.Timeout = time.Minute
|
|
// Allow multus (especially in server mode) to make more concurrent requests
|
|
// to reduce client-side throttling
|
|
config.QPS = 50
|
|
config.Burst = 50
|
|
|
|
return newClientInfo(config)
|
|
}
|
|
|
|
// newClientInfo returns a `ClientInfo` from a configuration created from an
|
|
// existing kubeconfig file.
|
|
func newClientInfo(config *rest.Config) (*ClientInfo, error) {
|
|
client, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
netclient, err := netclient.NewForConfig(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
broadcaster := record.NewBroadcaster()
|
|
broadcaster.StartLogging(klog.Infof)
|
|
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
|
|
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"})
|
|
return &ClientInfo{
|
|
Client: client,
|
|
NetClient: netclient,
|
|
EventBroadcaster: broadcaster,
|
|
EventRecorder: recorder,
|
|
}, nil
|
|
}
|