1
0
mirror of https://github.com/rancher/rke.git synced 2025-04-28 03:31:24 +00:00

Add support for custom WrapTransport for Kubernetes Client

This commit is contained in:
moelsayed 2018-02-20 13:51:57 +02:00
parent b1c2a5d153
commit 03673b8f22
12 changed files with 54 additions and 40 deletions

View File

@ -8,9 +8,9 @@ import (
"github.com/rancher/rke/templates"
)
func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string) error {
func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
log.Infof(ctx, "[authz] Creating rke-job-deployer ServiceAccount")
k8sClient, err := k8s.NewClient(kubeConfigPath)
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
if err != nil {
return err
}
@ -24,9 +24,9 @@ func ApplyJobDeployerServiceAccount(ctx context.Context, kubeConfigPath string)
return nil
}
func ApplySystemNodeClusterRoleBinding(ctx context.Context, kubeConfigPath string) error {
func ApplySystemNodeClusterRoleBinding(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
log.Infof(ctx, "[authz] Creating system:node ClusterRoleBinding")
k8sClient, err := k8s.NewClient(kubeConfigPath)
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
if err != nil {
return err
}

View File

@ -8,9 +8,9 @@ import (
"github.com/rancher/rke/templates"
)
func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string) error {
func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
log.Infof(ctx, "[authz] Applying default PodSecurityPolicy")
k8sClient, err := k8s.NewClient(kubeConfigPath)
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
if err != nil {
return err
}
@ -21,9 +21,9 @@ func ApplyDefaultPodSecurityPolicy(ctx context.Context, kubeConfigPath string) e
return nil
}
func ApplyDefaultPodSecurityPolicyRole(ctx context.Context, kubeConfigPath string) error {
func ApplyDefaultPodSecurityPolicyRole(ctx context.Context, kubeConfigPath string, k8sWrapTransport k8s.WrapTransport) error {
log.Infof(ctx, "[authz] Applying default PodSecurityPolicy Role and RoleBinding")
k8sClient, err := k8s.NewClient(kubeConfigPath)
k8sClient, err := k8s.NewClient(kubeConfigPath, k8sWrapTransport)
if err != nil {
return err
}

View File

@ -88,7 +88,7 @@ func (c *Cluster) doAddonDeploy(ctx context.Context, addonYaml, resourceName str
func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, addonName string) error {
log.Infof(ctx, "[addons] Saving addon ConfigMap to Kubernetes")
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath)
kubeClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return err
}
@ -116,7 +116,7 @@ func (c *Cluster) StoreAddonConfigMap(ctx context.Context, addonYaml string, add
func (c *Cluster) ApplySystemAddonExcuteJob(addonJob string) error {
if err := k8s.ApplyK8sSystemJob(addonJob, c.LocalKubeConfigPath); err != nil {
if err := k8s.ApplyK8sSystemJob(addonJob, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
fmt.Println(err)
return err
}

View File

@ -38,6 +38,7 @@ type Cluster struct {
DockerDialerFactory hosts.DialerFactory
LocalConnDialerFactory hosts.DialerFactory
PrivateRegistriesMap map[string]v3.PrivateRegistry
K8sWrapTransport k8s.WrapTransport
}
const (
@ -122,7 +123,8 @@ func ParseCluster(
rkeConfig *v3.RancherKubernetesEngineConfig,
clusterFilePath, configDir string,
dockerDialerFactory,
localConnDialerFactory hosts.DialerFactory) (*Cluster, error) {
localConnDialerFactory hosts.DialerFactory,
k8sWrapTransport k8s.WrapTransport) (*Cluster, error) {
var err error
c := &Cluster{
RancherKubernetesEngineConfig: *rkeConfig,
@ -130,6 +132,7 @@ func ParseCluster(
DockerDialerFactory: dockerDialerFactory,
LocalConnDialerFactory: localConnDialerFactory,
PrivateRegistriesMap: make(map[string]v3.PrivateRegistry),
K8sWrapTransport: k8sWrapTransport,
}
// Setting cluster Defaults
c.setClusterDefaults(ctx)
@ -187,7 +190,7 @@ func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
return fmt.Errorf("Failed to redeploy local admin config with new host")
}
workingConfig = newConfig
if _, err := GetK8sVersion(kubeCluster.LocalKubeConfigPath); err == nil {
if _, err := GetK8sVersion(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport); err == nil {
log.Infof(ctx, "[reconcile] host [%s] is active master on the cluster", cpHost.Address)
break
}
@ -197,8 +200,8 @@ func rebuildLocalAdminConfig(ctx context.Context, kubeCluster *Cluster) error {
return nil
}
func isLocalConfigWorking(ctx context.Context, localKubeConfigPath string) bool {
if _, err := GetK8sVersion(localKubeConfigPath); err != nil {
func isLocalConfigWorking(ctx context.Context, localKubeConfigPath string, k8sWrapTransport k8s.WrapTransport) bool {
if _, err := GetK8sVersion(localKubeConfigPath, k8sWrapTransport); err != nil {
log.Infof(ctx, "[reconcile] Local config is not vaild, rebuilding admin config")
return false
}
@ -230,22 +233,22 @@ func getLocalAdminConfigWithNewAddress(localConfigPath, cpAddress string) string
}
func (c *Cluster) ApplyAuthzResources(ctx context.Context) error {
if err := authz.ApplyJobDeployerServiceAccount(ctx, c.LocalKubeConfigPath); err != nil {
if err := authz.ApplyJobDeployerServiceAccount(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
return fmt.Errorf("Failed to apply the ServiceAccount needed for job execution: %v", err)
}
if c.Authorization.Mode == NoneAuthorizationMode {
return nil
}
if c.Authorization.Mode == services.RBACAuthorizationMode {
if err := authz.ApplySystemNodeClusterRoleBinding(ctx, c.LocalKubeConfigPath); err != nil {
if err := authz.ApplySystemNodeClusterRoleBinding(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
return fmt.Errorf("Failed to apply the ClusterRoleBinding needed for node authorization: %v", err)
}
}
if c.Authorization.Mode == services.RBACAuthorizationMode && c.Services.KubeAPI.PodSecurityPolicy {
if err := authz.ApplyDefaultPodSecurityPolicy(ctx, c.LocalKubeConfigPath); err != nil {
if err := authz.ApplyDefaultPodSecurityPolicy(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
return fmt.Errorf("Failed to apply default PodSecurityPolicy: %v", err)
}
if err := authz.ApplyDefaultPodSecurityPolicyRole(ctx, c.LocalKubeConfigPath); err != nil {
if err := authz.ApplyDefaultPodSecurityPolicyRole(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport); err != nil {
return fmt.Errorf("Failed to apply default PodSecurityPolicy ClusterRole and ClusterRoleBinding: %v", err)
}
}
@ -262,7 +265,7 @@ func (c *Cluster) deployAddons(ctx context.Context) error {
func (c *Cluster) SyncLabelsAndTaints(ctx context.Context) error {
if len(c.ControlPlaneHosts) > 0 {
log.Infof(ctx, "[sync] Syncing nodes Labels and Taints")
k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath)
k8sClient, err := k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}
@ -297,9 +300,9 @@ func (c *Cluster) PrePullK8sImages(ctx context.Context) error {
return nil
}
func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, crtBundle map[string]pki.CertificatePKI, clusterFilePath, configDir string) error {
func ConfigureCluster(ctx context.Context, rkeConfig v3.RancherKubernetesEngineConfig, crtBundle map[string]pki.CertificatePKI, clusterFilePath, configDir string, k8sWrapTransport k8s.WrapTransport) error {
// dialer factories are not needed here since we are not uses docker only k8s jobs
kubeCluster, err := ParseCluster(ctx, &rkeConfig, clusterFilePath, configDir, nil, nil)
kubeCluster, err := ParseCluster(ctx, &rkeConfig, clusterFilePath, configDir, nil, nil, k8sWrapTransport)
if err != nil {
return err
}

View File

@ -18,7 +18,7 @@ const (
func GeneratePlan(ctx context.Context, rkeConfig *v3.RancherKubernetesEngineConfig) (v3.RKEPlan, error) {
clusterPlan := v3.RKEPlan{}
myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil)
myCluster, _ := ParseCluster(ctx, rkeConfig, "", "", nil, nil, nil)
// rkeConfig.Nodes are already unique. But they don't have role flags. So I will use the parsed cluster.Hosts to make use of the role flags.
uniqHosts := hosts.GetUniqueHostList(myCluster.EtcdHosts, myCluster.ControlPlaneHosts, myCluster.WorkerHosts)
for _, host := range uniqHosts {

View File

@ -27,7 +27,7 @@ func ReconcileCluster(ctx context.Context, kubeCluster, currentCluster *Cluster)
return nil
}
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath)
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}
@ -90,7 +90,7 @@ func reconcileControl(ctx context.Context, currentCluster, kubeCluster *Cluster,
}
for _, toDeleteHost := range cpToDelete {
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath)
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath, kubeCluster.K8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
}

View File

@ -19,7 +19,7 @@ func (c *Cluster) SaveClusterState(ctx context.Context, rkeConfig *v3.RancherKub
if len(c.ControlPlaneHosts) > 0 {
// Reinitialize kubernetes Client
var err error
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath)
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err)
}
@ -44,14 +44,14 @@ func (c *Cluster) GetClusterState(ctx context.Context) (*Cluster, error) {
log.Infof(ctx, "[state] Found local kube config file, trying to get state from cluster")
// to handle if current local admin is down and we need to use new cp from the list
if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath) {
if !isLocalConfigWorking(ctx, c.LocalKubeConfigPath, c.K8sWrapTransport) {
if err := rebuildLocalAdminConfig(ctx, c); err != nil {
return nil, err
}
}
// initiate kubernetes client
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath)
c.KubeClient, err = k8s.NewClient(c.LocalKubeConfigPath, c.K8sWrapTransport)
if err != nil {
log.Warnf(ctx, "Failed to initiate new Kubernetes Client: %v", err)
return nil, nil
@ -140,9 +140,9 @@ func getStateFromKubernetes(ctx context.Context, kubeClient *kubernetes.Clientse
}
}
func GetK8sVersion(localConfigPath string) (string, error) {
func GetK8sVersion(localConfigPath string, k8sWrapTransport k8s.WrapTransport) (string, error) {
logrus.Debugf("[version] Using %s to connect to Kubernetes cluster..", localConfigPath)
k8sClient, err := k8s.NewClient(localConfigPath)
k8sClient, err := k8s.NewClient(localConfigPath, k8sWrapTransport)
if err != nil {
return "", fmt.Errorf("Failed to create Kubernetes Client: %v", err)
}

View File

@ -9,6 +9,7 @@ import (
"github.com/rancher/rke/cluster"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
@ -45,10 +46,11 @@ func ClusterRemove(
ctx context.Context,
rkeConfig *v3.RancherKubernetesEngineConfig,
dialerFactory hosts.DialerFactory,
k8sWrapTransport k8s.WrapTransport,
local bool, configDir string) error {
log.Infof(ctx, "Tearing down Kubernetes cluster")
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dialerFactory, nil)
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dialerFactory, nil, k8sWrapTransport)
if err != nil {
return err
}
@ -94,7 +96,7 @@ func clusterRemoveFromCli(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("Failed to parse cluster file: %v", err)
}
return ClusterRemove(context.Background(), rkeConfig, nil, false, "")
return ClusterRemove(context.Background(), rkeConfig, nil, nil, false, "")
}
func clusterRemoveLocal(ctx *cli.Context) error {
@ -111,5 +113,5 @@ func clusterRemoveLocal(ctx *cli.Context) error {
}
rkeConfig.Nodes = []v3.RKEConfigNode{*cluster.GetLocalRKENodeConfig()}
}
return ClusterRemove(context.Background(), rkeConfig, nil, true, "")
return ClusterRemove(context.Background(), rkeConfig, nil, nil, true, "")
}

View File

@ -6,6 +6,7 @@ import (
"github.com/rancher/rke/cluster"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/log"
"github.com/rancher/rke/pki"
"github.com/rancher/types/apis/management.cattle.io/v3"
@ -40,11 +41,12 @@ func ClusterUp(
ctx context.Context,
rkeConfig *v3.RancherKubernetesEngineConfig,
dockerDialerFactory, localConnDialerFactory hosts.DialerFactory,
k8sWrapTransport k8s.WrapTransport,
local bool, configDir string) (string, string, string, string, error) {
log.Infof(ctx, "Building Kubernetes cluster")
var APIURL, caCrt, clientCert, clientKey string
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory)
kubeCluster, err := cluster.ParseCluster(ctx, rkeConfig, clusterFilePath, configDir, dockerDialerFactory, localConnDialerFactory, k8sWrapTransport)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
@ -102,7 +104,7 @@ func ClusterUp(
return APIURL, caCrt, clientCert, clientKey, err
}
err = cluster.ConfigureCluster(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.Certificates, clusterFilePath, configDir)
err = cluster.ConfigureCluster(ctx, kubeCluster.RancherKubernetesEngineConfig, kubeCluster.Certificates, clusterFilePath, configDir, k8sWrapTransport)
if err != nil {
return APIURL, caCrt, clientCert, clientKey, err
}
@ -131,7 +133,7 @@ func clusterUpFromCli(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("Failed to parse cluster file: %v", err)
}
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, false, "")
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, nil, nil, false, "")
return err
}
@ -149,6 +151,6 @@ func clusterUpLocal(ctx *cli.Context) error {
}
rkeConfig.Nodes = []v3.RKEConfigNode{*cluster.GetLocalRKENodeConfig()}
}
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, hosts.LocalHealthcheckFactory, true, "")
_, _, _, _, err = ClusterUp(context.Background(), rkeConfig, nil, hosts.LocalHealthcheckFactory, nil, true, "")
return err
}

View File

@ -27,7 +27,8 @@ func VersionCommand() cli.Command {
func getClusterVersion(ctx *cli.Context) error {
localKubeConfig := pki.GetLocalKubeConfig(ctx.String("config"), "")
serverVersion, err := cluster.GetK8sVersion(localKubeConfig)
// not going to use a k8s dialer here.. this is a CLI command
serverVersion, err := cluster.GetK8sVersion(localKubeConfig, nil)
if err != nil {
return err
}

View File

@ -12,7 +12,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error {
func ApplyK8sSystemJob(jobYaml, kubeConfigPath string, k8sWrapTransport WrapTransport) error {
job := v1.Job{}
if err := decodeYamlResource(&job, jobYaml); err != nil {
return err
@ -20,7 +20,7 @@ func ApplyK8sSystemJob(jobYaml, kubeConfigPath string) error {
if job.Namespace == metav1.NamespaceNone {
job.Namespace = metav1.NamespaceSystem
}
k8sClient, err := NewClient(kubeConfigPath)
k8sClient, err := NewClient(kubeConfigPath, k8sWrapTransport)
if err != nil {
return err
}

View File

@ -2,6 +2,7 @@ package k8s
import (
"bytes"
"net/http"
"time"
yamlutil "k8s.io/apimachinery/pkg/util/yaml"
@ -16,12 +17,17 @@ const (
type k8sCall func(*kubernetes.Clientset, interface{}) error
func NewClient(kubeConfigPath string) (*kubernetes.Clientset, error) {
type WrapTransport func(rt http.RoundTripper) http.RoundTripper
func NewClient(kubeConfigPath string, k8sWrapTransport WrapTransport) (*kubernetes.Clientset, error) {
// use the current admin kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return nil, err
}
if k8sWrapTransport != nil {
config.WrapTransport = k8sWrapTransport
}
K8sClientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err