[kubeadm] Implement etcdutils with Cluster.HasTLS()

- Test HasTLS()
- Instrument throughout upgrade plan and apply
- Update plan_test and apply_test to use new fake Cluster interfaces
- Add descriptions to upgrade range test
- Support KubernetesDir and EtcdDataDir in upgrade tests
- Cover etcdUpgrade in upgrade tests
- Cover upcoming TLSUpgrade in upgrade tests
This commit is contained in:
leigh schrandt 2018-04-16 07:55:25 -06:00
parent 3b45b021ee
commit 99a1143676
13 changed files with 521 additions and 99 deletions

View File

@ -24,6 +24,7 @@ go_library(
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/dryrun:go_default_library",
"//cmd/kubeadm/app/util/etcd:go_default_library",
"//cmd/kubeadm/app/util/kubeconfig:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/util/version:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
configutil "k8s.io/kubernetes/cmd/kubeadm/app/util/config"
dryrunutil "k8s.io/kubernetes/cmd/kubeadm/app/util/dryrun"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/util/version"
)
@ -281,7 +282,23 @@ func PerformStaticPodUpgrade(client clientset.Interface, waiter apiclient.Waiter
return err
}
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade)
// These are the same because kubeadm currently does not support reconciling a new config against an older one.
// For instance, currently, changing CertificatesDir or EtcdDataDir breaks the upgrade, because oldcfg is not fetchable.
// There would need to be additional upgrade code to handle copying the certs/data over to the new filepaths.
// It's still useful to have these parameterized as separate clusters though, because it allows us to mock these
// interfaces for tests.
oldEtcdCluster := etcdutil.StaticPodCluster{
Endpoints: []string{"localhost:2379"},
ManifestDir: constants.GetStaticPodDirectory(),
CertificatesDir: internalcfg.CertificatesDir,
}
newEtcdCluster := etcdutil.StaticPodCluster{
Endpoints: []string{"localhost:2379"},
ManifestDir: constants.GetStaticPodDirectory(),
CertificatesDir: internalcfg.CertificatesDir,
}
return upgrade.StaticPodControlPlane(waiter, pathManager, internalcfg, etcdUpgrade, oldEtcdCluster, newEtcdCluster)
}
// DryRunStaticPodUpgrade fakes an upgrade of the control plane

View File

@ -27,9 +27,11 @@ import (
"github.com/spf13/cobra"
"k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/validation"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/upgrade"
kubeadmutil "k8s.io/kubernetes/cmd/kubeadm/app/util"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
)
// NewCmdPlan returns the cobra command for `kubeadm upgrade plan`
@ -64,7 +66,11 @@ func RunPlan(parentFlags *cmdUpgradeFlags) error {
}
// Define Local Etcd cluster to be able to retrieve information
etcdCluster := kubeadmutil.LocalEtcdCluster{}
etcdCluster := etcdutil.StaticPodCluster{
Endpoints: []string{"localhost:2379"},
ManifestDir: constants.GetStaticPodDirectory(),
CertificatesDir: upgradeVars.cfg.CertificatesDir,
}
// Compute which upgrade possibilities there are
glog.V(1).Infof("[upgrade/plan] computing upgrade possibilities")

View File

@ -36,6 +36,7 @@ go_library(
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/config:go_default_library",
"//cmd/kubeadm/app/util/dryrun:go_default_library",
"//cmd/kubeadm/app/util/etcd:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/util/version:go_default_library",
"//pkg/version:go_default_library",
@ -85,6 +86,7 @@ go_test(
"//cmd/kubeadm/app/phases/controlplane:go_default_library",
"//cmd/kubeadm/app/phases/etcd:go_default_library",
"//cmd/kubeadm/app/util/apiclient:go_default_library",
"//cmd/kubeadm/app/util/etcd:go_default_library",
"//cmd/kubeadm/test:go_default_library",
"//pkg/api/legacyscheme:go_default_library",
"//pkg/util/version:go_default_library",

View File

@ -23,7 +23,7 @@ import (
kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/features"
"k8s.io/kubernetes/cmd/kubeadm/app/phases/addons/dns"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
"k8s.io/kubernetes/pkg/util/version"
)
@ -74,7 +74,7 @@ type ClusterState struct {
// GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which
// kinds of upgrades can be performed
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, cluster util.EtcdCluster, featureGates map[string]bool) ([]Upgrade, error) {
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdCluster etcdutil.Cluster, featureGates map[string]bool) ([]Upgrade, error) {
fmt.Println("[upgrade] Fetching available versions to upgrade to")
// Collect the upgrades kubeadm can do in this list
@ -107,7 +107,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA
}
// Get current etcd version
etcdStatus, err := cluster.GetEtcdClusterStatus()
etcdStatus, err := etcdCluster.GetStatus()
if err != nil {
return nil, err
}

View File

@ -61,9 +61,11 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) {
}, nil
}
type fakeEtcdCluster struct{}
type fakeEtcdCluster struct{ TLS bool }
func (f fakeEtcdCluster) GetEtcdClusterStatus() (*clientv3.StatusResponse, error) {
func (f fakeEtcdCluster) HasTLS() (bool, error) { return f.TLS, nil }
func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil

View File

@ -28,6 +28,7 @@ import (
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
"k8s.io/kubernetes/cmd/kubeadm/app/util"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
"k8s.io/kubernetes/pkg/util/version"
)
@ -127,7 +128,7 @@ func (spm *KubeStaticPodPathManager) BackupEtcdDir() string {
return spm.backupEtcdDir
}
func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, beforePodHash string, recoverManifests map[string]string) error {
func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, beforePodHash string, recoverManifests map[string]string, isTLSUpgrade bool) error {
// Special treatment is required for etcd case, when rollbackOldManifests should roll back etcd
// manifests only for the case when component is Etcd
recoverEtcd := false
@ -200,14 +201,13 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP
}
// performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error.
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string) (bool, error) {
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdCluster, newEtcdCluster etcdutil.Cluster) (bool, error) {
// Add etcd static pod spec only if external etcd is not configured
if len(cfg.Etcd.Endpoints) != 0 {
return false, fmt.Errorf("external etcd detected, won't try to change any etcd state")
}
// Checking health state of etcd before proceeding with the upgrade
etcdCluster := util.LocalEtcdCluster{}
etcdStatus, err := etcdCluster.GetEtcdClusterStatus()
etcdStatus, err := oldEtcdCluster.GetStatus()
if err != nil {
return true, fmt.Errorf("etcd cluster is not healthy: %v", err)
}
@ -250,10 +250,10 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
}
// Perform etcd upgrade using common to all control plane components function
if err := upgradeComponent(constants.Etcd, waiter, pathMgr, cfg, beforeEtcdPodHash, recoverManifests); err != nil {
if err := upgradeComponent(constants.Etcd, waiter, pathMgr, cfg, beforeEtcdPodHash, recoverManifests, isTLSUpgrade); err != nil {
// Since etcd upgrade component failed, the old manifest has been restored
// now we need to check the health of etcd cluster if it came back up with old manifest
if _, err := etcdCluster.GetEtcdClusterStatus(); err != nil {
if _, err := oldEtcdCluster.GetStatus(); err != nil {
// At this point we know that etcd cluster is dead and it is safe to copy backup datastore and to rollback old etcd manifest
if err := rollbackEtcdData(cfg, fmt.Errorf("etcd cluster is not healthy after upgrade: %v rolling back", err), pathMgr); err != nil {
// Even copying back datastore failed, no options for recovery left, bailing out
@ -265,7 +265,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
// Since rollback of the old etcd manifest was successful, checking again the status of etcd cluster
if _, err := etcdCluster.GetEtcdClusterStatus(); err != nil {
if _, err := oldEtcdCluster.GetStatus(); err != nil {
// Nothing else left to try to recover etcd cluster
return true, fmt.Errorf("fatal error upgrading local etcd cluster: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
}
@ -302,13 +302,14 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
}
// StaticPodControlPlane upgrades a static pod-hosted control plane
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool) error {
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdCluster, newEtcdCluster etcdutil.Cluster) error {
recoverManifests := map[string]string{}
var isTLSUpgrade bool
// etcd upgrade is done prior to other control plane components
if etcdUpgrade {
// Perform etcd upgrade using common to all control plane components function
fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests)
fatal, err := performEtcdStaticPodUpgrade(waiter, pathMgr, cfg, recoverManifests, isTLSUpgrade, oldEtcdCluster, newEtcdCluster)
if err != nil {
if fatal {
return err
@ -330,7 +331,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
}
for _, component := range constants.MasterComponents {
if err = upgradeComponent(component, waiter, pathMgr, cfg, beforePodHashMap[component], recoverManifests); err != nil {
if err = upgradeComponent(component, waiter, pathMgr, cfg, beforePodHashMap[component], recoverManifests, isTLSUpgrade); err != nil {
return err
}
}

View File

@ -21,10 +21,12 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"k8s.io/apimachinery/pkg/runtime"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
@ -33,6 +35,7 @@ import (
controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
etcdphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/etcd"
"k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
@ -56,7 +59,7 @@ controllerManagerExtraArgs: null
etcd:
caFile: ""
certFile: ""
dataDir: /var/lib/etcd
dataDir: %s
endpoints: null
extraArgs: null
image: ""
@ -128,6 +131,7 @@ func (w *fakeWaiter) WaitForHealthyKubelet(_ time.Duration, _ string) error {
}
type fakeStaticPodPathManager struct {
kubernetesDir string
realManifestDir string
tempManifestDir string
backupManifestDir string
@ -136,29 +140,36 @@ type fakeStaticPodPathManager struct {
}
func NewFakeStaticPodPathManager(moveFileFunc func(string, string) error) (StaticPodPathManager, error) {
realManifestsDir, err := ioutil.TempDir("", "kubeadm-upgraded-manifests")
kubernetesDir, err := ioutil.TempDir("", "kubeadm-pathmanager-")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
}
upgradedManifestsDir, err := ioutil.TempDir("", "kubeadm-upgraded-manifests")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
realManifestDir := filepath.Join(kubernetesDir, constants.ManifestsSubDirName)
if err := os.Mkdir(realManifestDir, 0700); err != nil {
return nil, fmt.Errorf("couldn't create a realManifestDir for the upgrade: %v", err)
}
backupManifestsDir, err := ioutil.TempDir("", "kubeadm-backup-manifests")
if err != nil {
return nil, fmt.Errorf("couldn't create a temporary directory for the upgrade: %v", err)
upgradedManifestDir := filepath.Join(kubernetesDir, "upgraded-manifests")
if err := os.Mkdir(upgradedManifestDir, 0700); err != nil {
return nil, fmt.Errorf("couldn't create a upgradedManifestDir for the upgrade: %v", err)
}
backupEtcdDir, err := ioutil.TempDir("", "kubeadm-backup-etcd")
if err != nil {
backupManifestDir := filepath.Join(kubernetesDir, "backup-manifests")
if err := os.Mkdir(backupManifestDir, 0700); err != nil {
return nil, fmt.Errorf("couldn't create a backupManifestDir for the upgrade: %v", err)
}
backupEtcdDir := filepath.Join(kubernetesDir, "kubeadm-backup-etcd")
if err := os.Mkdir(backupEtcdDir, 0700); err != nil {
return nil, err
}
return &fakeStaticPodPathManager{
realManifestDir: realManifestsDir,
tempManifestDir: upgradedManifestsDir,
backupManifestDir: backupManifestsDir,
kubernetesDir: kubernetesDir,
realManifestDir: realManifestDir,
tempManifestDir: upgradedManifestDir,
backupManifestDir: backupManifestDir,
backupEtcdDir: backupEtcdDir,
MoveFileFunc: moveFileFunc,
}, nil
@ -168,6 +179,10 @@ func (spm *fakeStaticPodPathManager) MoveFile(oldPath, newPath string) error {
return spm.MoveFileFunc(oldPath, newPath)
}
func (spm *fakeStaticPodPathManager) KubernetesDir() string {
return spm.kubernetesDir
}
func (spm *fakeStaticPodPathManager) RealManifestPath(component string) string {
return constants.GetStaticPodFilepath(component, spm.realManifestDir)
}
@ -193,14 +208,43 @@ func (spm *fakeStaticPodPathManager) BackupEtcdDir() string {
return spm.backupEtcdDir
}
type fakeTLSEtcdCluster struct{ TLS bool }
func (cluster fakeTLSEtcdCluster) HasTLS() (bool, error) {
return cluster.TLS, nil
}
func (cluster fakeTLSEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil
}
type fakePodManifestEtcdCluster struct{ ManifestDir, CertificatesDir string }
func (cluster fakePodManifestEtcdCluster) HasTLS() (bool, error) {
return etcdutil.PodManifestsHaveTLS(cluster.ManifestDir)
}
func (cluster fakePodManifestEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
// Make sure the certificates generated from the upgrade are readable from disk
etcdutil.NewTLSConfig(cluster.CertificatesDir)
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil
}
func TestStaticPodControlPlane(t *testing.T) {
tests := []struct {
description string
waitErrsToReturn map[string]error
moveFileFunc func(string, string) error
expectedErr bool
manifestShouldChange bool
}{
{ // error-free case should succeed
{
description: "error-free case should succeed",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -212,7 +256,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: false,
manifestShouldChange: true,
},
{ // any wait error should result in a rollback and an abort
{
description: "any wait error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: fmt.Errorf("boo! failed"),
waitForHashChange: nil,
@ -224,7 +269,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any wait error should result in a rollback and an abort
{
description: "any wait error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: fmt.Errorf("boo! failed"),
@ -236,7 +282,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any wait error should result in a rollback and an abort
{
description: "any wait error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -248,7 +295,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort
{
description: "any path-moving error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -264,7 +312,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort
{
description: "any path-moving error should result in a rollback and an abort",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -280,7 +329,8 @@ func TestStaticPodControlPlane(t *testing.T) {
expectedErr: true,
manifestShouldChange: false,
},
{ // any path-moving error should result in a rollback and an abort; even though this is the last component (kube-apiserver and kube-controller-manager healthy)
{
description: "any path-moving error should result in a rollback and an abort; even though this is the last component (kube-apiserver and kube-controller-manager healthy)",
waitErrsToReturn: map[string]error{
waitForHashes: nil,
waitForHashChange: nil,
@ -304,15 +354,19 @@ func TestStaticPodControlPlane(t *testing.T) {
if err != nil {
t.Fatalf("couldn't run NewFakeStaticPodPathManager: %v", err)
}
defer os.RemoveAll(pathMgr.RealManifestDir())
defer os.RemoveAll(pathMgr.TempManifestDir())
defer os.RemoveAll(pathMgr.BackupManifestDir())
defer os.RemoveAll(pathMgr.(*fakeStaticPodPathManager).KubernetesDir())
constants.KubernetesDir = pathMgr.(*fakeStaticPodPathManager).KubernetesDir()
tempCertsDir, err := ioutil.TempDir("", "kubeadm-certs")
if err != nil {
t.Fatalf("couldn't create temporary certificates directory: %v", err)
}
defer os.RemoveAll(tempCertsDir)
tmpEtcdDataDir, err := ioutil.TempDir("", "kubeadm-etcd-data")
if err != nil {
t.Fatalf("couldn't create temporary etcd data directory: %v", err)
}
defer os.RemoveAll(tmpEtcdDataDir)
oldcfg, err := getConfig("v1.7.0", tempCertsDir)
if err != nil {
@ -361,10 +415,23 @@ func TestStaticPodControlPlane(t *testing.T) {
t.Fatalf("couldn't create config: %v", err)
}
actualErr := StaticPodControlPlane(waiter, pathMgr, newcfg, false)
actualErr := StaticPodControlPlane(
waiter,
pathMgr,
newcfg,
true,
fakeTLSEtcdCluster{
TLS: false,
},
fakePodManifestEtcdCluster{
ManifestDir: pathMgr.RealManifestDir(),
CertificatesDir: newcfg.CertificatesDir,
},
)
if (actualErr != nil) != rt.expectedErr {
t.Errorf(
"failed UpgradeStaticPodControlPlane\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",
"failed UpgradeStaticPodControlPlane\n%s\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",
rt.description,
rt.expectedErr,
(actualErr != nil),
actualErr,
@ -378,12 +445,13 @@ func TestStaticPodControlPlane(t *testing.T) {
if (oldHash != newHash) != rt.manifestShouldChange {
t.Errorf(
"failed StaticPodControlPlane\n\texpected manifest change: %t\n\tgot: %t",
"failed StaticPodControlPlane\n%s\n\texpected manifest change: %t\n\tgot: %t",
rt.description,
rt.manifestShouldChange,
(oldHash != newHash),
)
}
return
}
}
@ -398,10 +466,10 @@ func getAPIServerHash(dir string) (string, error) {
return fmt.Sprintf("%x", sha256.Sum256(fileBytes)), nil
}
func getConfig(version string, certsDir string) (*kubeadmapi.MasterConfiguration, error) {
func getConfig(version, certsDir, etcdDataDir string) (*kubeadmapi.MasterConfiguration, error) {
externalcfg := &kubeadmapiext.MasterConfiguration{}
internalcfg := &kubeadmapi.MasterConfiguration{}
if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, version)), externalcfg); err != nil {
if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, etcdDataDir, version)), externalcfg); err != nil {
return nil, fmt.Errorf("unable to decode config: %v", err)
}
legacyscheme.Scheme.Convert(externalcfg, internalcfg, nil)

View File

@ -13,7 +13,6 @@ go_library(
"copy.go",
"endpoint.go",
"error.go",
"etcd.go",
"marshal.go",
"template.go",
"version.go",
@ -22,7 +21,6 @@ go_library(
deps = [
"//cmd/kubeadm/app/apis/kubeadm:go_default_library",
"//cmd/kubeadm/app/preflight:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
@ -64,6 +62,7 @@ filegroup(
"//cmd/kubeadm/app/util/audit:all-srcs",
"//cmd/kubeadm/app/util/config:all-srcs",
"//cmd/kubeadm/app/util/dryrun:all-srcs",
"//cmd/kubeadm/app/util/etcd:all-srcs",
"//cmd/kubeadm/app/util/kubeconfig:all-srcs",
"//cmd/kubeadm/app/util/pubkeypin:all-srcs",
"//cmd/kubeadm/app/util/staticpod:all-srcs",

View File

@ -1,51 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"github.com/coreos/etcd/clientv3"
"time"
)
// EtcdCluster is an interface to get etcd cluster related information
type EtcdCluster interface {
GetEtcdClusterStatus() (*clientv3.StatusResponse, error)
}
// LocalEtcdCluster represents an instance of a local etcd cluster
type LocalEtcdCluster struct{}
// GetEtcdClusterStatus returns nil for status Up or error for status Down
func (cluster LocalEtcdCluster) GetEtcdClusterStatus() (*clientv3.StatusResponse, error) {
ep := []string{"localhost:2379"}
cli, err := clientv3.New(clientv3.Config{
Endpoints: ep,
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
defer cli.Close()
resp, err := cli.Status(context.Background(), ep[0])
if err != nil {
return nil, err
}
return resp, nil
}

View File

@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["etcd.go"],
importpath = "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd",
visibility = ["//visibility:public"],
deps = [
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/app/util/staticpod:go_default_library",
"//vendor/github.com/coreos/etcd/clientv3:go_default_library",
"//vendor/github.com/coreos/etcd/pkg/transport:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["etcd_test.go"],
embed = [":go_default_library"],
deps = [
"//cmd/kubeadm/app/constants:go_default_library",
"//cmd/kubeadm/test:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,140 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"context"
"crypto/tls"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
"k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
)
// Cluster is an interface to get etcd cluster related information
type Cluster interface {
HasTLS() (bool, error)
GetStatus() (*clientv3.StatusResponse, error)
}
// StaticPodCluster represents an instance of a static pod etcd cluster.
// CertificatesDir should contain the etcd CA and healthcheck client TLS identity.
// ManifestDir should contain the etcd static pod manifest.
type StaticPodCluster struct {
Endpoints []string
CertificatesDir string
ManifestDir string
}
// HasTLS returns a boolean representing whether the static pod etcd cluster implements TLS.
// It may return an error for file I/O issues.
func (cluster StaticPodCluster) HasTLS() (bool, error) {
return PodManifestsHaveTLS(cluster.ManifestDir)
}
// PodManifestsHaveTLS reads the etcd staticpod manifest from disk and returns false if the TLS flags
// are missing from the command list. If all the flags are present it returns true.
func PodManifestsHaveTLS(ManifestDir string) (bool, error) {
etcdPodPath := constants.GetStaticPodFilepath(constants.Etcd, ManifestDir)
etcdPod, err := staticpod.ReadStaticPodFromDisk(etcdPodPath)
if err != nil {
return false, fmt.Errorf("failed to check if etcd pod implements TLS: %v", err)
}
tlsFlags := []string{
"--cert-file=",
"--key-file=",
"--trusted-ca-file=",
"--client-cert-auth=",
"--peer-cert-file=",
"--peer-key-file=",
"--peer-trusted-ca-file=",
"--peer-client-cert-auth=",
}
FlagLoop:
for _, flag := range tlsFlags {
for _, container := range etcdPod.Spec.Containers {
for _, arg := range container.Command {
if strings.Contains(arg, flag) {
continue FlagLoop
}
}
}
// flag not found in any container
return false, nil
}
// all flags were found in container args; pod fully implements TLS
return true, nil
}
// GetStatus invokes the proper protocol check based off of whether the cluster HasTLS() to get the cluster's status
func (cluster StaticPodCluster) GetStatus() (*clientv3.StatusResponse, error) {
hasTLS, err := cluster.HasTLS()
if err != nil {
return nil, fmt.Errorf("failed to determine if current etcd static pod is using TLS: %v", err)
}
var tlsConfig *tls.Config
if hasTLS {
tlsConfig, err = NewTLSConfig(cluster.CertificatesDir)
if err != nil {
return nil, fmt.Errorf("failed to create a TLS Config using the cluster.CertificatesDir: %v", err)
}
}
return GetClusterStatus(cluster.Endpoints, tlsConfig)
}
// NewTLSConfig generates a tlsConfig using credentials from the default sub-paths of the certificates directory
func NewTLSConfig(certificatesDir string) (*tls.Config, error) {
tlsInfo := transport.TLSInfo{
CertFile: filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
KeyFile: filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
TrustedCAFile: filepath.Join(certificatesDir, constants.EtcdCACertName),
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
return tlsConfig, nil
}
// GetClusterStatus returns nil for status Up or error for status Down
func GetClusterStatus(endpoints []string, tlsConfig *tls.Config) (*clientv3.StatusResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
return nil, err
}
defer cli.Close()
resp, err := cli.Status(context.Background(), endpoints[0])
if err != nil {
return nil, err
}
return resp, nil
}

View File

@ -0,0 +1,199 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
testutil "k8s.io/kubernetes/cmd/kubeadm/test"
)
const (
secureEtcdPod = `# generated by kubeadm v1.10.0
apiVersion: v1
kind: Pod
metadata:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
creationTimestamp: null
labels:
component: etcd
tier: control-plane
name: etcd
namespace: kube-system
spec:
containers:
- command:
- etcd
- --advertise-client-urls=https://127.0.0.1:2379
- --data-dir=/var/lib/etcd
- --peer-key-file=/etc/kubernetes/pki/etcd/peer.key
- --peer-trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt
- --listen-client-urls=https://127.0.0.1:2379
- --peer-client-cert-auth=true
- --cert-file=/etc/kubernetes/pki/etcd/server.crt
- --key-file=/etc/kubernetes/pki/etcd/server.key
- --trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt
- --peer-cert-file=/etc/kubernetes/pki/etcd/peer.crt
- --client-cert-auth=true
image: k8s.gcr.io/etcd-amd64:3.1.12
livenessProbe:
exec:
command:
- /bin/sh
- -ec
- ETCDCTL_API=3 etcdctl --endpoints=127.0.0.1:2379 --cacert=/etc/kubernetes/pki/etcd/ca.crt
--cert=/etc/kubernetes/pki/etcd/healthcheck-client.crt --key=/etc/kubernetes/pki/etcd/healthcheck-client.key
get foo
failureThreshold: 8
initialDelaySeconds: 15
timeoutSeconds: 15
name: etcd
resources: {}
volumeMounts:
- mountPath: /var/lib/etcd
name: etcd-data
- mountPath: /etc/kubernetes/pki/etcd
name: etcd-certs
hostNetwork: true
volumes:
- hostPath:
path: /var/lib/etcd
type: DirectoryOrCreate
name: etcd-data
- hostPath:
path: /etc/kubernetes/pki/etcd
type: DirectoryOrCreate
name: etcd-certs
status: {}
`
insecureEtcdPod = `# generated by kubeadm v1.9.6
apiVersion: v1
kind: Pod
metadata:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
creationTimestamp: null
labels:
component: etcd
tier: control-plane
name: etcd
namespace: kube-system
spec:
containers:
- command:
- etcd
- --listen-client-urls=http://127.0.0.1:2379
- --advertise-client-urls=http://127.0.0.1:2379
- --data-dir=/var/lib/etcd
image: gcr.io/google_containers/etcd-amd64:3.1.11
livenessProbe:
failureThreshold: 8
httpGet:
host: 127.0.0.1
path: /health
port: 2379
scheme: HTTP
initialDelaySeconds: 15
timeoutSeconds: 15
name: etcd
resources: {}
volumeMounts:
- mountPath: /var/lib/etcd
name: etcd
hostNetwork: true
volumes:
- hostPath:
path: /var/lib/etcd
type: DirectoryOrCreate
name: etcd
status: {}
`
invalidPod = `---{ broken yaml @@@`
)
func TestPodManifestHasTLS(t *testing.T) {
tests := []struct {
description string
podYaml string
hasTLS bool
expectErr bool
writeManifest bool
}{
{
description: "secure etcd returns true",
podYaml: secureEtcdPod,
hasTLS: true,
writeManifest: true,
expectErr: false,
},
{
description: "insecure etcd returns false",
podYaml: insecureEtcdPod,
hasTLS: false,
writeManifest: true,
expectErr: false,
},
{
description: "invalid pod fails to unmarshal",
podYaml: invalidPod,
hasTLS: false,
writeManifest: true,
expectErr: true,
},
{
description: "non-existent file returns error",
podYaml: ``,
hasTLS: false,
writeManifest: false,
expectErr: true,
},
}
for _, rt := range tests {
tmpdir := testutil.SetupTempDir(t)
defer os.RemoveAll(tmpdir)
manifestPath := filepath.Join(tmpdir, "etcd.yaml")
if rt.writeManifest {
err := ioutil.WriteFile(manifestPath, []byte(rt.podYaml), 0644)
if err != nil {
t.Fatalf("Failed to write pod manifest\n%s\n\tfatal error: %v", rt.description, err)
}
}
tmpEtcdCluster := StaticPodCluster{ManifestDir: tmpdir}
hasTLS, actualErr := tmpEtcdCluster.HasTLS()
if (actualErr != nil) != rt.expectErr {
t.Errorf(
"PodManifestHasTLS failed\n%s\n\texpected error: %t\n\tgot: %t\n\tactual error: %v",
rt.description,
rt.expectErr,
(actualErr != nil),
actualErr,
)
}
if hasTLS != rt.hasTLS {
t.Errorf("PodManifestHasTLS failed\n%s\n\texpected hasTLS: %t\n\tgot: %t", rt.description, rt.hasTLS, hasTLS)
}
}
}