From ac1e940c7543294ba548500060bcefda92684da5 Mon Sep 17 00:00:00 2001 From: Craig Tracey Date: Wed, 4 Apr 2018 15:35:15 -0400 Subject: [PATCH 1/2] Support kubeadm upgrade with remote etcd cluster Currently kubeadm only performs an upgrade if the etcd cluster is colocated with the control plane node. As this is only one possible configuration, kubeadm should support upgrades with etcd clusters that are not local to the node. Signed-off-by: Craig Tracey --- cmd/kubeadm/app/cmd/upgrade/plan.go | 33 +++-- cmd/kubeadm/app/phases/upgrade/compute.go | 2 +- .../app/phases/upgrade/compute_test.go | 115 ++++++++++++++++-- cmd/kubeadm/app/phases/upgrade/staticpods.go | 10 +- .../app/phases/upgrade/staticpods_test.go | 23 +++- cmd/kubeadm/app/util/etcd/etcd.go | 69 +++++++---- 6 files changed, 201 insertions(+), 51 deletions(-) diff --git a/cmd/kubeadm/app/cmd/upgrade/plan.go b/cmd/kubeadm/app/cmd/upgrade/plan.go index b2e20ec927b..fa139966283 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan.go @@ -89,14 +89,31 @@ func RunPlan(flags *planFlags) error { return err } - // Define Local Etcd cluster to be able to retrieve information - etcdClient, err := etcdutil.NewStaticPodClient( - []string{"localhost:2379"}, - constants.GetStaticPodDirectory(), - upgradeVars.cfg.CertificatesDir, - ) - if err != nil { - return err + var etcdClient etcdutil.ClusterInterrogator + + // Currently this is the only method we have for distinguishing + // external etcd vs static pod etcd + isExternalEtcd := len(upgradeVars.cfg.Etcd.Endpoints) > 0 + if isExternalEtcd { + client, err := etcdutil.New( + upgradeVars.cfg.Etcd.Endpoints, + upgradeVars.cfg.Etcd.CAFile, + upgradeVars.cfg.Etcd.CertFile, + upgradeVars.cfg.Etcd.KeyFile) + if err != nil { + return err + } + etcdClient = client + } else { + client, err := etcdutil.NewFromStaticPod( + []string{"localhost:2379"}, + constants.GetStaticPodDirectory(), + upgradeVars.cfg.CertificatesDir, + ) + if err != nil { + return err + } + etcdClient = client } // Compute which upgrade possibilities there are diff --git a/cmd/kubeadm/app/phases/upgrade/compute.go b/cmd/kubeadm/app/phases/upgrade/compute.go index e36bb0acc17..8e4be781e5f 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute.go +++ b/cmd/kubeadm/app/phases/upgrade/compute.go @@ -74,7 +74,7 @@ type ClusterState struct { // GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which // kinds of upgrades can be performed -func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.Client, featureGates map[string]bool) ([]Upgrade, error) { +func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.ClusterInterrogator, featureGates map[string]bool) ([]Upgrade, error) { fmt.Println("[upgrade] Fetching available versions to upgrade to") // Collect the upgrades kubeadm can do in this list diff --git a/cmd/kubeadm/app/phases/upgrade/compute_test.go b/cmd/kubeadm/app/phases/upgrade/compute_test.go index 071284a56cd..f99a7417af3 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute_test.go +++ b/cmd/kubeadm/app/phases/upgrade/compute_test.go @@ -17,11 +17,13 @@ limitations under the License. package upgrade import ( + "fmt" "reflect" "testing" "time" "github.com/coreos/etcd/clientv3" + etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd" versionutil "k8s.io/kubernetes/pkg/util/version" ) @@ -62,28 +64,81 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) { }, nil } -type fakeEtcdCluster struct{ TLS bool } +type fakeEtcdClient struct{ TLS bool } -func (f fakeEtcdCluster) HasTLS() bool { return f.TLS } +func (f fakeEtcdClient) HasTLS() bool { return f.TLS } -func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) { - client := &clientv3.StatusResponse{} - client.Version = "3.1.12" - return client, nil +func (f fakeEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { + // clusterStatus, err := f.GetClusterStatus() + // return clusterStatus[0], err + return &clientv3.StatusResponse{ + Version: "3.1.12", + }, nil } -func (f fakeEtcdCluster) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { +func (f fakeEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + var responses []*clientv3.StatusResponse + responses = append(responses, &clientv3.StatusResponse{ + Version: "3.1.12", + }) + return responses, nil +} + +func (f fakeEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { + return f.GetStatus() +} + +type mismatchEtcdClient struct{} + +func (f mismatchEtcdClient) HasTLS() bool { return true } + +func (f mismatchEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { + clusterStatus, err := f.GetClusterStatus() + return clusterStatus[0], err +} + +func (f mismatchEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + return []*clientv3.StatusResponse{ + &clientv3.StatusResponse{ + Version: "3.1.12", + }, + &clientv3.StatusResponse{ + Version: "3.2.0", + }, + }, nil +} + +func (f mismatchEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { + return f.GetStatus() +} + +type degradedEtcdClient struct{} + +func (f degradedEtcdClient) HasTLS() bool { return true } + +func (f degradedEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { + return nil, fmt.Errorf("Degraded etcd cluster") +} + +func (f degradedEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + var res []*clientv3.StatusResponse + return res, fmt.Errorf("Degraded etcd cluster") +} + +func (f degradedEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { return f.GetStatus() } func TestGetAvailableUpgrades(t *testing.T) { featureGates := make(map[string]bool) + etcdClient := fakeEtcdClient{} tests := []struct { name string vg VersionGetter expectedUpgrades []Upgrade allowExperimental, allowRCs bool errExpected bool + etcdClient etcdutil.ClusterInterrogator }{ { name: "no action needed, already up-to-date", @@ -98,6 +153,7 @@ func TestGetAvailableUpgrades(t *testing.T) { expectedUpgrades: []Upgrade{}, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "simple patch version upgrade", @@ -131,6 +187,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "no version provided to offline version getter does not change behavior", @@ -164,6 +221,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "minor version upgrade only", @@ -197,6 +255,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "both minor version upgrade and patch version upgrade available", @@ -248,6 +307,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "allow experimental upgrades, but no upgrade available", @@ -263,6 +323,7 @@ func TestGetAvailableUpgrades(t *testing.T) { expectedUpgrades: []Upgrade{}, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "upgrade to an unstable version should be supported", @@ -297,6 +358,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "upgrade from an unstable version to an unstable version should be supported", @@ -331,6 +393,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "v1.X.0-alpha.0 should be ignored", @@ -366,6 +429,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "upgrade to an RC version should be supported", @@ -401,6 +465,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowRCs: true, errExpected: false, + etcdClient: etcdClient, }, { name: "it is possible (but very uncommon) that the latest version from the previous branch is an rc and the current latest version is alpha.0. In that case, show the RC", @@ -436,6 +501,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "upgrade to an RC version should be supported. There may also be an even newer unstable version.", @@ -490,6 +556,37 @@ func TestGetAvailableUpgrades(t *testing.T) { allowRCs: true, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, + }, + { + name: "Upgrades with external etcd with mismatched versions should not be allowed.", + vg: &fakeVersionGetter{ + clusterVersion: "v1.9.3", + kubeletVersion: "v1.9.3", + kubeadmVersion: "v1.9.3", + stablePatchVersion: "v1.9.3", + stableVersion: "v1.9.3", + }, + allowRCs: false, + allowExperimental: false, + etcdClient: mismatchEtcdClient{}, + expectedUpgrades: []Upgrade{}, + errExpected: true, + }, + { + name: "Upgrades with external etcd with a degraded status should not be allowed.", + vg: &fakeVersionGetter{ + clusterVersion: "v1.9.3", + kubeletVersion: "v1.9.3", + kubeadmVersion: "v1.9.3", + stablePatchVersion: "v1.9.3", + stableVersion: "v1.9.3", + }, + allowRCs: false, + allowExperimental: false, + etcdClient: degradedEtcdClient{}, + expectedUpgrades: []Upgrade{}, + errExpected: true, }, { name: "offline version getter", @@ -498,6 +595,7 @@ func TestGetAvailableUpgrades(t *testing.T) { kubeletVersion: "v1.10.0", kubeadmVersion: "v1.10.1", }, "v1.11.1"), + etcdClient: etcdClient, expectedUpgrades: []Upgrade{ { Description: "version in the v1.1 series", @@ -523,10 +621,9 @@ func TestGetAvailableUpgrades(t *testing.T) { // Instantiating a fake etcd cluster for being able to get etcd version for a corresponding // kubernetes release. - testCluster := fakeEtcdCluster{} for _, rt := range tests { t.Run(rt.name, func(t *testing.T) { - actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, testCluster, featureGates) + actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, rt.etcdClient, featureGates) if !reflect.DeepEqual(actualUpgrades, rt.expectedUpgrades) { t.Errorf("failed TestGetAvailableUpgrades\n\texpected upgrades: %v\n\tgot: %v", rt.expectedUpgrades, actualUpgrades) } diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods.go b/cmd/kubeadm/app/phases/upgrade/staticpods.go index 255d1b4461e..fd16a2d544c 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods.go @@ -224,7 +224,7 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP } // performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error. -func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) (bool, error) { +func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) (bool, error) { // Add etcd static pod spec only if external etcd is not configured if len(cfg.Etcd.Endpoints) != 0 { return false, fmt.Errorf("external etcd detected, won't try to change any etcd state") @@ -321,7 +321,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM // Initialize the new etcd client if it wasn't pre-initialized if newEtcdClient == nil { - client, err := etcdutil.NewStaticPodClient( + client, err := etcdutil.NewFromStaticPod( []string{"localhost:2379"}, constants.GetStaticPodDirectory(), cfg.CertificatesDir, @@ -367,7 +367,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM } // StaticPodControlPlane upgrades a static pod-hosted control plane -func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) error { +func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) error { recoverManifests := map[string]string{} var isTLSUpgrade bool var isExternalEtcd bool @@ -381,7 +381,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager if len(cfg.Etcd.Endpoints) > 0 { // External etcd isExternalEtcd = true - client, err := etcdutil.NewClient( + client, err := etcdutil.New( cfg.Etcd.Endpoints, cfg.Etcd.CAFile, cfg.Etcd.CertFile, @@ -397,7 +397,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager } } else { // etcd Static Pod - client, err := etcdutil.NewStaticPodClient( + client, err := etcdutil.NewFromStaticPod( []string{"localhost:2379"}, constants.GetStaticPodDirectory(), cfg.CertificatesDir, diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go index be6de8b6285..a5c26270e79 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go @@ -30,7 +30,7 @@ import ( "github.com/coreos/etcd/pkg/transport" "k8s.io/apimachinery/pkg/runtime" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" - kubeadmapiv1alpha1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" + kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" "k8s.io/kubernetes/cmd/kubeadm/app/constants" certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs" controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" @@ -216,9 +216,15 @@ func (c fakeTLSEtcdClient) HasTLS() bool { } func (c fakeTLSEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { - client := &clientv3.StatusResponse{} - client.Version = "3.1.12" - return client, nil + clusterStatus, err := c.GetClusterStatus() + return clusterStatus[0], err +} + +func (c fakeTLSEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + client := &clientv3.StatusResponse{ + Version: "3.1.12", + } + return []*clientv3.StatusResponse{client}, nil } func (c fakeTLSEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { @@ -233,6 +239,11 @@ func (c fakePodManifestEtcdClient) HasTLS() bool { } func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { + clusterStatus, err := c.GetClusterStatus() + return clusterStatus[0], err +} + +func (c fakePodManifestEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { // Make sure the certificates generated from the upgrade are readable from disk tlsInfo := transport.TLSInfo{ CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName), @@ -246,7 +257,7 @@ func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) client := &clientv3.StatusResponse{} client.Version = "3.1.12" - return client, nil + return []*clientv3.StatusResponse{client}, nil } func (c fakePodManifestEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { @@ -485,7 +496,7 @@ func getAPIServerHash(dir string) (string, error) { } func getConfig(version, certsDir, etcdDataDir string) (*kubeadmapi.MasterConfiguration, error) { - externalcfg := &kubeadmapiv1alpha1.MasterConfiguration{} + externalcfg := &kubeadmapiext.MasterConfiguration{} internalcfg := &kubeadmapi.MasterConfiguration{} if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, etcdDataDir, version)), externalcfg); err != nil { return nil, fmt.Errorf("unable to decode config: %v", err) diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index 9eec25c4595..78068ebf424 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -30,22 +30,23 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod" ) -// Client is an interface to get etcd cluster related information -type Client interface { +// ClusterInterrogator is an interface to get etcd cluster related information +type ClusterInterrogator interface { GetStatus() (*clientv3.StatusResponse, error) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) HasTLS() bool + GetClusterStatus() ([]*clientv3.StatusResponse, error) } -// GenericClient is a common etcd client for supported etcd servers -type GenericClient struct { +// Client provides connection parameters for an etcd cluster +type Client struct { Endpoints []string - TLSConfig *tls.Config + TLS *tls.Config } // HasTLS returns true if etcd is configured for TLS -func (c GenericClient) HasTLS() bool { - return c.TLSConfig != nil +func (c Client) HasTLS() bool { + return c.TLS != nil } // PodManifestsHaveTLS reads the etcd staticpod manifest from disk and returns false if the TLS flags @@ -84,12 +85,12 @@ FlagLoop: } // GetStatus gets server status -func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) { +func (c Client) GetStatus() (*clientv3.StatusResponse, error) { const dialTimeout = 5 * time.Second cli, err := clientv3.New(clientv3.Config{ Endpoints: c.Endpoints, DialTimeout: dialTimeout, - TLS: c.TLSConfig, + TLS: c.TLS, }) if err != nil { return nil, err @@ -107,7 +108,7 @@ func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) { } // WaitForStatus returns a StatusResponse after an initial delay and retry attempts -func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { +func (c Client) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay) time.Sleep(delay) for i := 0; i < retries; i++ { @@ -131,39 +132,63 @@ func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInte return nil, fmt.Errorf("timeout waiting for etcd cluster status") } -// NewClient creates a new EtcdCluster client -func NewClient(endpoints []string, caFile string, certFile string, keyFile string) (*GenericClient, error) { - client := GenericClient{Endpoints: endpoints} +// New creates a new EtcdCluster client +func New(endpoints []string, ca, cert, key string) (*Client, error) { + client := Client{Endpoints: endpoints} - if caFile != "" || certFile != "" || keyFile != "" { + if ca != "" || cert != "" || key != "" { tlsInfo := transport.TLSInfo{ - CertFile: certFile, - KeyFile: keyFile, - TrustedCAFile: caFile, + CertFile: cert, + KeyFile: key, + TrustedCAFile: ca, } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { return nil, err } - client.TLSConfig = tlsConfig + client.TLS = tlsConfig } return &client, nil } -// NewStaticPodClient creates a GenericClient from the given endpoints, manifestDir, and certificatesDir -func NewStaticPodClient(endpoints []string, manifestDir string, certificatesDir string) (*GenericClient, error) { +// NewFromStaticPod creates a GenericClient from the given endpoints, manifestDir, and certificatesDir +func NewFromStaticPod(endpoints []string, manifestDir string, certificatesDir string) (*Client, error) { hasTLS, err := PodManifestsHaveTLS(manifestDir) if err != nil { return nil, fmt.Errorf("could not read manifests from: %s, error: %v", manifestDir, err) } if hasTLS { - return NewClient( + return New( endpoints, filepath.Join(certificatesDir, constants.EtcdCACertName), filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName), filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName), ) } - return NewClient(endpoints, "", "", "") + return New(endpoints, "", "", "") +} + +// GetClusterStatus returns nil for status Up or error for status Down +func (c Client) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + + var resp []*clientv3.StatusResponse + for _, ep := range c.Endpoints { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + DialTimeout: 5 * time.Second, + TLS: c.TLS, + }) + if err != nil { + return nil, err + } + defer cli.Close() + + r, err := cli.Status(context.Background(), ep) + if err != nil { + return nil, err + } + resp = append(resp, r) + } + return resp, nil } From f40b7f389e9af6b008be08de7ae3dbe9877b3b89 Mon Sep 17 00:00:00 2001 From: Jason DeTiberus Date: Fri, 11 May 2018 16:20:03 -0400 Subject: [PATCH 2/2] kubeadm - fix external etcd upgrades - Update upgrade plan output when configured for external etcd - Move etcd to a separate section and show available upgrades --- cmd/kubeadm/app/cmd/upgrade/plan.go | 18 +- cmd/kubeadm/app/cmd/upgrade/plan_test.go | 88 +++++++-- cmd/kubeadm/app/phases/upgrade/compute.go | 21 ++- .../app/phases/upgrade/compute_test.go | 118 +++++------- cmd/kubeadm/app/phases/upgrade/staticpods.go | 18 +- .../app/phases/upgrade/staticpods_test.go | 56 +++--- cmd/kubeadm/app/util/etcd/etcd.go | 170 ++++++++++-------- 7 files changed, 291 insertions(+), 198 deletions(-) diff --git a/cmd/kubeadm/app/cmd/upgrade/plan.go b/cmd/kubeadm/app/cmd/upgrade/plan.go index fa139966283..e1a161400a9 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan.go @@ -124,13 +124,13 @@ func RunPlan(flags *planFlags) error { } // Tell the user which upgrades are available - printAvailableUpgrades(availUpgrades, os.Stdout, upgradeVars.cfg.FeatureGates) + printAvailableUpgrades(availUpgrades, os.Stdout, upgradeVars.cfg.FeatureGates, isExternalEtcd) return nil } // printAvailableUpgrades prints a UX-friendly overview of what versions are available to upgrade to // TODO look into columnize or some other formatter when time permits instead of using the tabwriter -func printAvailableUpgrades(upgrades []upgrade.Upgrade, w io.Writer, featureGates map[string]bool) { +func printAvailableUpgrades(upgrades []upgrade.Upgrade, w io.Writer, featureGates map[string]bool, isExternalEtcd bool) { // Return quickly if no upgrades can be made if len(upgrades) == 0 { @@ -143,6 +143,16 @@ func printAvailableUpgrades(upgrades []upgrade.Upgrade, w io.Writer, featureGate // Loop through the upgrade possibilities and output text to the command line for _, upgrade := range upgrades { + if isExternalEtcd && upgrade.CanUpgradeEtcd() { + fmt.Fprintln(w, "External components that should be upgraded manually before you upgrade the control plane with 'kubeadm upgrade apply':") + fmt.Fprintln(tabw, "COMPONENT\tCURRENT\tAVAILABLE") + fmt.Fprintf(tabw, "Etcd\t%s\t%s\n", upgrade.Before.EtcdVersion, upgrade.After.EtcdVersion) + + // We should flush the writer here at this stage; as the columns will now be of the right size, adjusted to the above content + tabw.Flush() + fmt.Fprintln(w, "") + } + if upgrade.CanUpgradeKubelets() { fmt.Fprintln(w, "Components that must be upgraded manually after you have upgraded the control plane with 'kubeadm upgrade apply':") fmt.Fprintln(tabw, "COMPONENT\tCURRENT\tAVAILABLE") @@ -177,7 +187,9 @@ func printAvailableUpgrades(upgrades []upgrade.Upgrade, w io.Writer, featureGate } else { fmt.Fprintf(tabw, "Kube DNS\t%s\t%s\n", upgrade.Before.DNSVersion, upgrade.After.DNSVersion) } - fmt.Fprintf(tabw, "Etcd\t%s\t%s\n", upgrade.Before.EtcdVersion, upgrade.After.EtcdVersion) + if !isExternalEtcd { + fmt.Fprintf(tabw, "Etcd\t%s\t%s\n", upgrade.Before.EtcdVersion, upgrade.After.EtcdVersion) + } // The tabwriter should be flushed at this stage as we have now put in all the required content for this time. This is required for the tabs' size to be correct. tabw.Flush() diff --git a/cmd/kubeadm/app/cmd/upgrade/plan_test.go b/cmd/kubeadm/app/cmd/upgrade/plan_test.go index 370066310fb..f83c183f15a 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan_test.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan_test.go @@ -62,16 +62,27 @@ func TestSortedSliceFromStringIntMap(t *testing.T) { func TestPrintAvailableUpgrades(t *testing.T) { featureGates := make(map[string]bool) var tests = []struct { + name string upgrades []upgrade.Upgrade buf *bytes.Buffer expectedBytes []byte + externalEtcd bool }{ { + name: "Up to date", upgrades: []upgrade.Upgrade{}, expectedBytes: []byte(`Awesome, you're up-to-date! Enjoy! `), }, { + name: "Up to date external etcd", + externalEtcd: true, + upgrades: []upgrade.Upgrade{}, + expectedBytes: []byte(`Awesome, you're up-to-date! Enjoy! +`), + }, + { + name: "Patch version available", upgrades: []upgrade.Upgrade{ { Description: "version in the v1.8 series", @@ -117,6 +128,7 @@ _____________________________________________________________________ `), }, { + name: "minor version available", upgrades: []upgrade.Upgrade{ { Description: "stable version", @@ -160,6 +172,7 @@ _____________________________________________________________________ `), }, { + name: "patch and minor version available", upgrades: []upgrade.Upgrade{ { Description: "version in the v1.8 series", @@ -243,6 +256,7 @@ _____________________________________________________________________ `), }, { + name: "experimental version available", upgrades: []upgrade.Upgrade{ { Description: "experimental version", @@ -288,6 +302,7 @@ _____________________________________________________________________ `), }, { + name: "release candidate available", upgrades: []upgrade.Upgrade{ { Description: "release candidate version", @@ -333,6 +348,7 @@ _____________________________________________________________________ `), }, { + name: "multiple kubelet versions", upgrades: []upgrade.Upgrade{ { Description: "version in the v1.9 series", @@ -377,19 +393,71 @@ Note: Before you can perform this upgrade, you have to update kubeadm to v1.9.3. _____________________________________________________________________ +`), + }, + { + name: "external etcd upgrade available", + upgrades: []upgrade.Upgrade{ + { + Description: "version in the v1.9 series", + Before: upgrade.ClusterState{ + KubeVersion: "v1.9.2", + KubeletVersions: map[string]uint16{ + "v1.9.2": 1, + }, + KubeadmVersion: "v1.9.2", + DNSVersion: "1.14.5", + EtcdVersion: "3.0.17", + }, + After: upgrade.ClusterState{ + KubeVersion: "v1.9.3", + KubeadmVersion: "v1.9.3", + DNSVersion: "1.14.8", + EtcdVersion: "3.1.12", + }, + }, + }, + externalEtcd: true, + expectedBytes: []byte(`External components that should be upgraded manually before you upgrade the control plane with 'kubeadm upgrade apply': +COMPONENT CURRENT AVAILABLE +Etcd 3.0.17 3.1.12 + +Components that must be upgraded manually after you have upgraded the control plane with 'kubeadm upgrade apply': +COMPONENT CURRENT AVAILABLE +Kubelet 1 x v1.9.2 v1.9.3 + +Upgrade to the latest version in the v1.9 series: + +COMPONENT CURRENT AVAILABLE +API Server v1.9.2 v1.9.3 +Controller Manager v1.9.2 v1.9.3 +Scheduler v1.9.2 v1.9.3 +Kube Proxy v1.9.2 v1.9.3 +Kube DNS 1.14.5 1.14.8 + +You can now apply the upgrade by executing the following command: + + kubeadm upgrade apply v1.9.3 + +Note: Before you can perform this upgrade, you have to update kubeadm to v1.9.3. + +_____________________________________________________________________ + `), }, } for _, rt := range tests { - rt.buf = bytes.NewBufferString("") - printAvailableUpgrades(rt.upgrades, rt.buf, featureGates) - actualBytes := rt.buf.Bytes() - if !bytes.Equal(actualBytes, rt.expectedBytes) { - t.Errorf( - "failed PrintAvailableUpgrades:\n\texpected: %q\n\t actual: %q", - string(rt.expectedBytes), - string(actualBytes), - ) - } + t.Run(rt.name, func(t *testing.T) { + rt.buf = bytes.NewBufferString("") + printAvailableUpgrades(rt.upgrades, rt.buf, featureGates, rt.externalEtcd) + actualBytes := rt.buf.Bytes() + if !bytes.Equal(actualBytes, rt.expectedBytes) { + t.Errorf( + "failed PrintAvailableUpgrades:\n\texpected: %q\n\t actual: %q", + string(rt.expectedBytes), + string(actualBytes), + ) + } + }) } } diff --git a/cmd/kubeadm/app/phases/upgrade/compute.go b/cmd/kubeadm/app/phases/upgrade/compute.go index 8e4be781e5f..dca2c9939f8 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute.go +++ b/cmd/kubeadm/app/phases/upgrade/compute.go @@ -50,6 +50,11 @@ func (u *Upgrade) CanUpgradeKubelets() bool { return !sameVersionFound } +// CanUpgradeEtcd returns whether an upgrade of etcd is possible +func (u *Upgrade) CanUpgradeEtcd() bool { + return u.Before.EtcdVersion != u.After.EtcdVersion +} + // ActiveDNSAddon returns the version of CoreDNS or kube-dns func ActiveDNSAddon(featureGates map[string]bool) string { if features.Enabled(featureGates, features.CoreDNS) { @@ -83,13 +88,13 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA // Get the cluster version clusterVersionStr, clusterVersion, err := versionGetterImpl.ClusterVersion() if err != nil { - return nil, err + return upgrades, err } // Get current kubeadm CLI version kubeadmVersionStr, kubeadmVersion, err := versionGetterImpl.KubeadmVersion() if err != nil { - return nil, err + return upgrades, err } // Get and output the current latest stable version @@ -103,13 +108,13 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA // Get the kubelet versions in the cluster kubeletVersions, err := versionGetterImpl.KubeletVersions() if err != nil { - return nil, err + return upgrades, err } // Get current etcd version - etcdStatus, err := etcdClient.GetStatus() + etcdVersion, err := etcdClient.GetVersion() if err != nil { - return nil, err + return upgrades, err } // Construct a descriptor for the current state of the world @@ -118,7 +123,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA DNSVersion: dns.GetDNSVersion(clusterVersion, ActiveDNSAddon(featureGates)), KubeadmVersion: kubeadmVersionStr, KubeletVersions: kubeletVersions, - EtcdVersion: etcdStatus.Version, + EtcdVersion: etcdVersion, } // Do a "dumb guess" that a new minor upgrade is available just because the latest stable version is higher than the cluster version @@ -201,7 +206,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA // Get and output the current latest unstable version latestVersionStr, latestVersion, err := versionGetterImpl.VersionFromCILabel("latest", "experimental version") if err != nil { - return nil, err + return upgrades, err } minorUnstable := latestVersion.Components()[1] @@ -209,7 +214,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA previousBranch := fmt.Sprintf("latest-1.%d", minorUnstable-1) previousBranchLatestVersionStr, previousBranchLatestVersion, err := versionGetterImpl.VersionFromCILabel(previousBranch, "") if err != nil { - return nil, err + return upgrades, err } // If that previous latest version is an RC, RCs are allowed and the cluster version is lower than the RC version, show the upgrade diff --git a/cmd/kubeadm/app/phases/upgrade/compute_test.go b/cmd/kubeadm/app/phases/upgrade/compute_test.go index f99a7417af3..c5341a75f2f 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute_test.go +++ b/cmd/kubeadm/app/phases/upgrade/compute_test.go @@ -64,71 +64,44 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) { }, nil } -type fakeEtcdClient struct{ TLS bool } +type fakeEtcdClient struct { + TLS bool + mismatchedVersions bool +} func (f fakeEtcdClient) HasTLS() bool { return f.TLS } -func (f fakeEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { - // clusterStatus, err := f.GetClusterStatus() - // return clusterStatus[0], err - return &clientv3.StatusResponse{ - Version: "3.1.12", +func (f fakeEtcdClient) ClusterAvailable() (bool, error) { return true, nil } + +func (f fakeEtcdClient) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) { + return true, nil +} + +func (f fakeEtcdClient) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) { + return make(map[string]*clientv3.StatusResponse), nil +} + +func (f fakeEtcdClient) GetVersion() (string, error) { + versions, _ := f.GetClusterVersions() + if f.mismatchedVersions { + return "", fmt.Errorf("etcd cluster contains endpoints with mismatched versions: %v", versions) + } + return "3.1.12", nil +} + +func (f fakeEtcdClient) GetClusterVersions() (map[string]string, error) { + if f.mismatchedVersions { + return map[string]string{ + "foo": "3.1.12", + "bar": "3.2.0", + }, nil + } + return map[string]string{ + "foo": "3.1.12", + "bar": "3.1.12", }, nil } -func (f fakeEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { - var responses []*clientv3.StatusResponse - responses = append(responses, &clientv3.StatusResponse{ - Version: "3.1.12", - }) - return responses, nil -} - -func (f fakeEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { - return f.GetStatus() -} - -type mismatchEtcdClient struct{} - -func (f mismatchEtcdClient) HasTLS() bool { return true } - -func (f mismatchEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { - clusterStatus, err := f.GetClusterStatus() - return clusterStatus[0], err -} - -func (f mismatchEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { - return []*clientv3.StatusResponse{ - &clientv3.StatusResponse{ - Version: "3.1.12", - }, - &clientv3.StatusResponse{ - Version: "3.2.0", - }, - }, nil -} - -func (f mismatchEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { - return f.GetStatus() -} - -type degradedEtcdClient struct{} - -func (f degradedEtcdClient) HasTLS() bool { return true } - -func (f degradedEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { - return nil, fmt.Errorf("Degraded etcd cluster") -} - -func (f degradedEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { - var res []*clientv3.StatusResponse - return res, fmt.Errorf("Degraded etcd cluster") -} - -func (f degradedEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { - return f.GetStatus() -} - func TestGetAvailableUpgrades(t *testing.T) { featureGates := make(map[string]bool) etcdClient := fakeEtcdClient{} @@ -569,22 +542,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowRCs: false, allowExperimental: false, - etcdClient: mismatchEtcdClient{}, - expectedUpgrades: []Upgrade{}, - errExpected: true, - }, - { - name: "Upgrades with external etcd with a degraded status should not be allowed.", - vg: &fakeVersionGetter{ - clusterVersion: "v1.9.3", - kubeletVersion: "v1.9.3", - kubeadmVersion: "v1.9.3", - stablePatchVersion: "v1.9.3", - stableVersion: "v1.9.3", - }, - allowRCs: false, - allowExperimental: false, - etcdClient: degradedEtcdClient{}, + etcdClient: fakeEtcdClient{mismatchedVersions: true}, expectedUpgrades: []Upgrade{}, errExpected: true, }, @@ -624,12 +582,16 @@ func TestGetAvailableUpgrades(t *testing.T) { for _, rt := range tests { t.Run(rt.name, func(t *testing.T) { actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, rt.etcdClient, featureGates) + fmt.Printf("actualErr: %v\n", actualErr) + fmt.Printf("actualErr != nil: %v\n", actualErr != nil) + fmt.Printf("errExpected: %v\n", rt.errExpected) + if (actualErr != nil) != rt.errExpected { + fmt.Printf("Hello error") + t.Errorf("failed TestGetAvailableUpgrades\n\texpected error: %t\n\tgot error: %t", rt.errExpected, (actualErr != nil)) + } if !reflect.DeepEqual(actualUpgrades, rt.expectedUpgrades) { t.Errorf("failed TestGetAvailableUpgrades\n\texpected upgrades: %v\n\tgot: %v", rt.expectedUpgrades, actualUpgrades) } - if (actualErr != nil) != rt.errExpected { - t.Errorf("failed TestGetAvailableUpgrades\n\texpected error: %t\n\tgot error: %t", rt.errExpected, (actualErr != nil)) - } }) } } diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods.go b/cmd/kubeadm/app/phases/upgrade/staticpods.go index fd16a2d544c..488a850636f 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods.go @@ -231,7 +231,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM } // Checking health state of etcd before proceeding with the upgrade - etcdStatus, err := oldEtcdClient.GetStatus() + _, err := oldEtcdClient.GetClusterStatus() if err != nil { return true, fmt.Errorf("etcd cluster is not healthy: %v", err) } @@ -248,9 +248,13 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM if err != nil { return true, fmt.Errorf("failed to retrieve an etcd version for the target kubernetes version: %v", err) } - currentEtcdVersion, err := version.ParseSemantic(etcdStatus.Version) + currentEtcdVersionStr, err := oldEtcdClient.GetVersion() if err != nil { - return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", etcdStatus.Version, err) + return true, fmt.Errorf("failed to retrieve the current etcd version: %v", err) + } + currentEtcdVersion, err := version.ParseSemantic(currentEtcdVersionStr) + if err != nil { + return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", currentEtcdVersionStr, err) } // Comparing current etcd version with desired to catch the same version or downgrade condition and fail on them. @@ -292,7 +296,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM // Since upgrade component failed, the old etcd manifest has either been restored or was never touched // Now we need to check the health of etcd cluster if it is up with old manifest fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available") - if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil { + if _, err := oldEtcdClient.WaitForClusterAvailable(noDelay, retries, retryInterval); err != nil { fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err) // At this point we know that etcd cluster is dead and it is safe to copy backup datastore and to rollback old etcd manifest @@ -305,7 +309,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM // Now that we've rolled back the data, let's check if the cluster comes up fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available") - if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil { + if _, err := oldEtcdClient.WaitForClusterAvailable(noDelay, retries, retryInterval); err != nil { fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err) // Nothing else left to try to recover etcd cluster return true, fmt.Errorf("fatal error rolling back local etcd cluster manifest: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) @@ -334,7 +338,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM // Checking health state of etcd after the upgrade fmt.Println("[upgrade/etcd] Waiting for etcd to become available") - if _, err = newEtcdClient.WaitForStatus(podRestartDelay, retries, retryInterval); err != nil { + if _, err = newEtcdClient.WaitForClusterAvailable(podRestartDelay, retries, retryInterval); err != nil { fmt.Printf("[upgrade/etcd] Failed to healthcheck etcd: %v\n", err) // Despite the fact that upgradeComponent was successful, there is something wrong with the etcd cluster // First step is to restore back up of datastore @@ -352,7 +356,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM // Assuming rollback of the old etcd manifest was successful, check the status of etcd cluster again fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available") - if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil { + if _, err := oldEtcdClient.WaitForClusterAvailable(noDelay, retries, retryInterval); err != nil { fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err) // Nothing else left to try to recover etcd cluster return true, fmt.Errorf("fatal error rolling back local etcd cluster manifest: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go index a5c26270e79..7561c323ab2 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go @@ -30,7 +30,7 @@ import ( "github.com/coreos/etcd/pkg/transport" "k8s.io/apimachinery/pkg/runtime" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" - kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" + kubeadmapiv1alpha1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" "k8s.io/kubernetes/cmd/kubeadm/app/constants" certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs" controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" @@ -215,20 +215,27 @@ func (c fakeTLSEtcdClient) HasTLS() bool { return c.TLS } -func (c fakeTLSEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { - clusterStatus, err := c.GetClusterStatus() - return clusterStatus[0], err +func (c fakeTLSEtcdClient) ClusterAvailable() (bool, error) { return true, nil } + +func (c fakeTLSEtcdClient) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) { + return true, nil } -func (c fakeTLSEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { - client := &clientv3.StatusResponse{ - Version: "3.1.12", - } - return []*clientv3.StatusResponse{client}, nil +func (c fakeTLSEtcdClient) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) { + return map[string]*clientv3.StatusResponse{ + "foo": { + Version: "3.1.12", + }}, nil } -func (c fakeTLSEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { - return c.GetStatus() +func (c fakeTLSEtcdClient) GetClusterVersions() (map[string]string, error) { + return map[string]string{ + "foo": "3.1.12", + }, nil +} + +func (c fakeTLSEtcdClient) GetVersion() (string, error) { + return "3.1.12", nil } type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string } @@ -238,12 +245,13 @@ func (c fakePodManifestEtcdClient) HasTLS() bool { return hasTLS } -func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { - clusterStatus, err := c.GetClusterStatus() - return clusterStatus[0], err +func (c fakePodManifestEtcdClient) ClusterAvailable() (bool, error) { return true, nil } + +func (c fakePodManifestEtcdClient) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) { + return true, nil } -func (c fakePodManifestEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { +func (c fakePodManifestEtcdClient) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) { // Make sure the certificates generated from the upgrade are readable from disk tlsInfo := transport.TLSInfo{ CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName), @@ -255,13 +263,19 @@ func (c fakePodManifestEtcdClient) GetClusterStatus() ([]*clientv3.StatusRespons return nil, err } - client := &clientv3.StatusResponse{} - client.Version = "3.1.12" - return []*clientv3.StatusResponse{client}, nil + return map[string]*clientv3.StatusResponse{ + "foo": {Version: "3.1.12"}, + }, nil } -func (c fakePodManifestEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { - return c.GetStatus() +func (c fakePodManifestEtcdClient) GetClusterVersions() (map[string]string, error) { + return map[string]string{ + "foo": "3.1.12", + }, nil +} + +func (c fakePodManifestEtcdClient) GetVersion() (string, error) { + return "3.1.12", nil } func TestStaticPodControlPlane(t *testing.T) { @@ -496,7 +510,7 @@ func getAPIServerHash(dir string) (string, error) { } func getConfig(version, certsDir, etcdDataDir string) (*kubeadmapi.MasterConfiguration, error) { - externalcfg := &kubeadmapiext.MasterConfiguration{} + externalcfg := &kubeadmapiv1alpha1.MasterConfiguration{} internalcfg := &kubeadmapi.MasterConfiguration{} if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, etcdDataDir, version)), externalcfg); err != nil { return nil, fmt.Errorf("unable to decode config: %v", err) diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index 78068ebf424..3bd3e8034a7 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -32,10 +32,12 @@ import ( // ClusterInterrogator is an interface to get etcd cluster related information type ClusterInterrogator interface { - GetStatus() (*clientv3.StatusResponse, error) - WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) + ClusterAvailable() (bool, error) + GetClusterStatus() (map[string]*clientv3.StatusResponse, error) + GetClusterVersions() (map[string]string, error) + GetVersion() (string, error) HasTLS() bool - GetClusterStatus() ([]*clientv3.StatusResponse, error) + WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) } // Client provides connection parameters for an etcd cluster @@ -84,54 +86,6 @@ FlagLoop: return true, nil } -// GetStatus gets server status -func (c Client) GetStatus() (*clientv3.StatusResponse, error) { - const dialTimeout = 5 * time.Second - cli, err := clientv3.New(clientv3.Config{ - Endpoints: c.Endpoints, - DialTimeout: dialTimeout, - TLS: c.TLS, - }) - if err != nil { - return nil, err - } - defer cli.Close() - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - resp, err := cli.Status(ctx, c.Endpoints[0]) - cancel() - if err != nil { - return nil, err - } - - return resp, nil -} - -// WaitForStatus returns a StatusResponse after an initial delay and retry attempts -func (c Client) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { - fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay) - time.Sleep(delay) - for i := 0; i < retries; i++ { - if i > 0 { - fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval) - time.Sleep(retryInterval) - } - fmt.Printf("[util/etcd] Attempting to get etcd status %d/%d\n", i+1, retries) - resp, err := c.GetStatus() - if err != nil { - switch err { - case context.DeadlineExceeded: - fmt.Println("[util/etcd] Attempt timed out") - default: - fmt.Printf("[util/etcd] Attempt failed with error: %v\n", err) - } - continue - } - return resp, nil - } - return nil, fmt.Errorf("timeout waiting for etcd cluster status") -} - // New creates a new EtcdCluster client func New(endpoints []string, ca, cert, key string) (*Client, error) { client := Client{Endpoints: endpoints} @@ -169,26 +123,100 @@ func NewFromStaticPod(endpoints []string, manifestDir string, certificatesDir st return New(endpoints, "", "", "") } -// GetClusterStatus returns nil for status Up or error for status Down -func (c Client) GetClusterStatus() ([]*clientv3.StatusResponse, error) { +// GetVersion returns the etcd version of the cluster. +// An error is returned if the version of all endpoints do not match +func (c Client) GetVersion() (string, error) { + var clusterVersion string - var resp []*clientv3.StatusResponse - for _, ep := range c.Endpoints { - cli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{ep}, - DialTimeout: 5 * time.Second, - TLS: c.TLS, - }) - if err != nil { - return nil, err - } - defer cli.Close() - - r, err := cli.Status(context.Background(), ep) - if err != nil { - return nil, err - } - resp = append(resp, r) + versions, err := c.GetClusterVersions() + if err != nil { + return "", err } - return resp, nil + for _, v := range versions { + if clusterVersion == "" { + // This is the first version we've seen + clusterVersion = v + } else if v != clusterVersion { + return "", fmt.Errorf("etcd cluster contains endpoints with mismatched versions: %v", versions) + } else { + clusterVersion = v + } + } + if clusterVersion == "" { + return "", fmt.Errorf("could not determine cluster etcd version") + } + return clusterVersion, nil +} + +// GetClusterVersions returns a map of the endpoints and their associated versions +func (c Client) GetClusterVersions() (map[string]string, error) { + versions := make(map[string]string) + statuses, err := c.GetClusterStatus() + if err != nil { + return versions, err + } + + for ep, status := range statuses { + versions[ep] = status.Version + } + return versions, nil +} + +// ClusterAvailable returns true if the cluster status indicates the cluster is available. +func (c Client) ClusterAvailable() (bool, error) { + _, err := c.GetClusterStatus() + if err != nil { + return false, err + } + return true, nil +} + +// GetClusterStatus returns nil for status Up or error for status Down +func (c Client) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: c.Endpoints, + DialTimeout: 5 * time.Second, + TLS: c.TLS, + }) + if err != nil { + return nil, err + } + defer cli.Close() + + clusterStatus := make(map[string]*clientv3.StatusResponse) + for _, ep := range c.Endpoints { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + resp, err := cli.Status(ctx, ep) + cancel() + if err != nil { + return nil, err + } + clusterStatus[ep] = resp + } + return clusterStatus, nil +} + +// WaitForClusterAvailable returns true if all endpoints in the cluster are available after an initial delay and retry attempts, an error is returned otherwise +func (c Client) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) { + fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay) + time.Sleep(delay) + for i := 0; i < retries; i++ { + if i > 0 { + fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval) + time.Sleep(retryInterval) + } + fmt.Printf("[util/etcd] Attempting to see if all cluster endpoints are available %d/%d\n", i+1, retries) + resp, err := c.ClusterAvailable() + if err != nil { + switch err { + case context.DeadlineExceeded: + fmt.Println("[util/etcd] Attempt timed out") + default: + fmt.Printf("[util/etcd] Attempt failed with error: %v\n", err) + } + continue + } + return resp, nil + } + return false, fmt.Errorf("timeout waiting for etcd cluster to be available") }