diff --git a/cmd/kubeadm/app/cmd/upgrade/plan.go b/cmd/kubeadm/app/cmd/upgrade/plan.go index b2e20ec927b..fa139966283 100644 --- a/cmd/kubeadm/app/cmd/upgrade/plan.go +++ b/cmd/kubeadm/app/cmd/upgrade/plan.go @@ -89,14 +89,31 @@ func RunPlan(flags *planFlags) error { return err } - // Define Local Etcd cluster to be able to retrieve information - etcdClient, err := etcdutil.NewStaticPodClient( - []string{"localhost:2379"}, - constants.GetStaticPodDirectory(), - upgradeVars.cfg.CertificatesDir, - ) - if err != nil { - return err + var etcdClient etcdutil.ClusterInterrogator + + // Currently this is the only method we have for distinguishing + // external etcd vs static pod etcd + isExternalEtcd := len(upgradeVars.cfg.Etcd.Endpoints) > 0 + if isExternalEtcd { + client, err := etcdutil.New( + upgradeVars.cfg.Etcd.Endpoints, + upgradeVars.cfg.Etcd.CAFile, + upgradeVars.cfg.Etcd.CertFile, + upgradeVars.cfg.Etcd.KeyFile) + if err != nil { + return err + } + etcdClient = client + } else { + client, err := etcdutil.NewFromStaticPod( + []string{"localhost:2379"}, + constants.GetStaticPodDirectory(), + upgradeVars.cfg.CertificatesDir, + ) + if err != nil { + return err + } + etcdClient = client } // Compute which upgrade possibilities there are diff --git a/cmd/kubeadm/app/phases/upgrade/compute.go b/cmd/kubeadm/app/phases/upgrade/compute.go index e36bb0acc17..8e4be781e5f 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute.go +++ b/cmd/kubeadm/app/phases/upgrade/compute.go @@ -74,7 +74,7 @@ type ClusterState struct { // GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which // kinds of upgrades can be performed -func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.Client, featureGates map[string]bool) ([]Upgrade, error) { +func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.ClusterInterrogator, featureGates map[string]bool) ([]Upgrade, error) { fmt.Println("[upgrade] Fetching available versions to upgrade to") // Collect the upgrades kubeadm can do in this list diff --git a/cmd/kubeadm/app/phases/upgrade/compute_test.go b/cmd/kubeadm/app/phases/upgrade/compute_test.go index 071284a56cd..f99a7417af3 100644 --- a/cmd/kubeadm/app/phases/upgrade/compute_test.go +++ b/cmd/kubeadm/app/phases/upgrade/compute_test.go @@ -17,11 +17,13 @@ limitations under the License. package upgrade import ( + "fmt" "reflect" "testing" "time" "github.com/coreos/etcd/clientv3" + etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd" versionutil "k8s.io/kubernetes/pkg/util/version" ) @@ -62,28 +64,81 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) { }, nil } -type fakeEtcdCluster struct{ TLS bool } +type fakeEtcdClient struct{ TLS bool } -func (f fakeEtcdCluster) HasTLS() bool { return f.TLS } +func (f fakeEtcdClient) HasTLS() bool { return f.TLS } -func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) { - client := &clientv3.StatusResponse{} - client.Version = "3.1.12" - return client, nil +func (f fakeEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { + // clusterStatus, err := f.GetClusterStatus() + // return clusterStatus[0], err + return &clientv3.StatusResponse{ + Version: "3.1.12", + }, nil } -func (f fakeEtcdCluster) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { +func (f fakeEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + var responses []*clientv3.StatusResponse + responses = append(responses, &clientv3.StatusResponse{ + Version: "3.1.12", + }) + return responses, nil +} + +func (f fakeEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { + return f.GetStatus() +} + +type mismatchEtcdClient struct{} + +func (f mismatchEtcdClient) HasTLS() bool { return true } + +func (f mismatchEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { + clusterStatus, err := f.GetClusterStatus() + return clusterStatus[0], err +} + +func (f mismatchEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + return []*clientv3.StatusResponse{ + &clientv3.StatusResponse{ + Version: "3.1.12", + }, + &clientv3.StatusResponse{ + Version: "3.2.0", + }, + }, nil +} + +func (f mismatchEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { + return f.GetStatus() +} + +type degradedEtcdClient struct{} + +func (f degradedEtcdClient) HasTLS() bool { return true } + +func (f degradedEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { + return nil, fmt.Errorf("Degraded etcd cluster") +} + +func (f degradedEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + var res []*clientv3.StatusResponse + return res, fmt.Errorf("Degraded etcd cluster") +} + +func (f degradedEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { return f.GetStatus() } func TestGetAvailableUpgrades(t *testing.T) { featureGates := make(map[string]bool) + etcdClient := fakeEtcdClient{} tests := []struct { name string vg VersionGetter expectedUpgrades []Upgrade allowExperimental, allowRCs bool errExpected bool + etcdClient etcdutil.ClusterInterrogator }{ { name: "no action needed, already up-to-date", @@ -98,6 +153,7 @@ func TestGetAvailableUpgrades(t *testing.T) { expectedUpgrades: []Upgrade{}, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "simple patch version upgrade", @@ -131,6 +187,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "no version provided to offline version getter does not change behavior", @@ -164,6 +221,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "minor version upgrade only", @@ -197,6 +255,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "both minor version upgrade and patch version upgrade available", @@ -248,6 +307,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: false, errExpected: false, + etcdClient: etcdClient, }, { name: "allow experimental upgrades, but no upgrade available", @@ -263,6 +323,7 @@ func TestGetAvailableUpgrades(t *testing.T) { expectedUpgrades: []Upgrade{}, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "upgrade to an unstable version should be supported", @@ -297,6 +358,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "upgrade from an unstable version to an unstable version should be supported", @@ -331,6 +393,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "v1.X.0-alpha.0 should be ignored", @@ -366,6 +429,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "upgrade to an RC version should be supported", @@ -401,6 +465,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowRCs: true, errExpected: false, + etcdClient: etcdClient, }, { name: "it is possible (but very uncommon) that the latest version from the previous branch is an rc and the current latest version is alpha.0. In that case, show the RC", @@ -436,6 +501,7 @@ func TestGetAvailableUpgrades(t *testing.T) { }, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, }, { name: "upgrade to an RC version should be supported. There may also be an even newer unstable version.", @@ -490,6 +556,37 @@ func TestGetAvailableUpgrades(t *testing.T) { allowRCs: true, allowExperimental: true, errExpected: false, + etcdClient: etcdClient, + }, + { + name: "Upgrades with external etcd with mismatched versions should not be allowed.", + vg: &fakeVersionGetter{ + clusterVersion: "v1.9.3", + kubeletVersion: "v1.9.3", + kubeadmVersion: "v1.9.3", + stablePatchVersion: "v1.9.3", + stableVersion: "v1.9.3", + }, + allowRCs: false, + allowExperimental: false, + etcdClient: mismatchEtcdClient{}, + expectedUpgrades: []Upgrade{}, + errExpected: true, + }, + { + name: "Upgrades with external etcd with a degraded status should not be allowed.", + vg: &fakeVersionGetter{ + clusterVersion: "v1.9.3", + kubeletVersion: "v1.9.3", + kubeadmVersion: "v1.9.3", + stablePatchVersion: "v1.9.3", + stableVersion: "v1.9.3", + }, + allowRCs: false, + allowExperimental: false, + etcdClient: degradedEtcdClient{}, + expectedUpgrades: []Upgrade{}, + errExpected: true, }, { name: "offline version getter", @@ -498,6 +595,7 @@ func TestGetAvailableUpgrades(t *testing.T) { kubeletVersion: "v1.10.0", kubeadmVersion: "v1.10.1", }, "v1.11.1"), + etcdClient: etcdClient, expectedUpgrades: []Upgrade{ { Description: "version in the v1.1 series", @@ -523,10 +621,9 @@ func TestGetAvailableUpgrades(t *testing.T) { // Instantiating a fake etcd cluster for being able to get etcd version for a corresponding // kubernetes release. - testCluster := fakeEtcdCluster{} for _, rt := range tests { t.Run(rt.name, func(t *testing.T) { - actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, testCluster, featureGates) + actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, rt.etcdClient, featureGates) if !reflect.DeepEqual(actualUpgrades, rt.expectedUpgrades) { t.Errorf("failed TestGetAvailableUpgrades\n\texpected upgrades: %v\n\tgot: %v", rt.expectedUpgrades, actualUpgrades) } diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods.go b/cmd/kubeadm/app/phases/upgrade/staticpods.go index 255d1b4461e..fd16a2d544c 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods.go @@ -224,7 +224,7 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP } // performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error. -func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) (bool, error) { +func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) (bool, error) { // Add etcd static pod spec only if external etcd is not configured if len(cfg.Etcd.Endpoints) != 0 { return false, fmt.Errorf("external etcd detected, won't try to change any etcd state") @@ -321,7 +321,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM // Initialize the new etcd client if it wasn't pre-initialized if newEtcdClient == nil { - client, err := etcdutil.NewStaticPodClient( + client, err := etcdutil.NewFromStaticPod( []string{"localhost:2379"}, constants.GetStaticPodDirectory(), cfg.CertificatesDir, @@ -367,7 +367,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM } // StaticPodControlPlane upgrades a static pod-hosted control plane -func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) error { +func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) error { recoverManifests := map[string]string{} var isTLSUpgrade bool var isExternalEtcd bool @@ -381,7 +381,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager if len(cfg.Etcd.Endpoints) > 0 { // External etcd isExternalEtcd = true - client, err := etcdutil.NewClient( + client, err := etcdutil.New( cfg.Etcd.Endpoints, cfg.Etcd.CAFile, cfg.Etcd.CertFile, @@ -397,7 +397,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager } } else { // etcd Static Pod - client, err := etcdutil.NewStaticPodClient( + client, err := etcdutil.NewFromStaticPod( []string{"localhost:2379"}, constants.GetStaticPodDirectory(), cfg.CertificatesDir, diff --git a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go index be6de8b6285..a5c26270e79 100644 --- a/cmd/kubeadm/app/phases/upgrade/staticpods_test.go +++ b/cmd/kubeadm/app/phases/upgrade/staticpods_test.go @@ -30,7 +30,7 @@ import ( "github.com/coreos/etcd/pkg/transport" "k8s.io/apimachinery/pkg/runtime" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" - kubeadmapiv1alpha1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" + kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" "k8s.io/kubernetes/cmd/kubeadm/app/constants" certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs" controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" @@ -216,9 +216,15 @@ func (c fakeTLSEtcdClient) HasTLS() bool { } func (c fakeTLSEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { - client := &clientv3.StatusResponse{} - client.Version = "3.1.12" - return client, nil + clusterStatus, err := c.GetClusterStatus() + return clusterStatus[0], err +} + +func (c fakeTLSEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + client := &clientv3.StatusResponse{ + Version: "3.1.12", + } + return []*clientv3.StatusResponse{client}, nil } func (c fakeTLSEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { @@ -233,6 +239,11 @@ func (c fakePodManifestEtcdClient) HasTLS() bool { } func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { + clusterStatus, err := c.GetClusterStatus() + return clusterStatus[0], err +} + +func (c fakePodManifestEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { // Make sure the certificates generated from the upgrade are readable from disk tlsInfo := transport.TLSInfo{ CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName), @@ -246,7 +257,7 @@ func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) client := &clientv3.StatusResponse{} client.Version = "3.1.12" - return client, nil + return []*clientv3.StatusResponse{client}, nil } func (c fakePodManifestEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { @@ -485,7 +496,7 @@ func getAPIServerHash(dir string) (string, error) { } func getConfig(version, certsDir, etcdDataDir string) (*kubeadmapi.MasterConfiguration, error) { - externalcfg := &kubeadmapiv1alpha1.MasterConfiguration{} + externalcfg := &kubeadmapiext.MasterConfiguration{} internalcfg := &kubeadmapi.MasterConfiguration{} if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, etcdDataDir, version)), externalcfg); err != nil { return nil, fmt.Errorf("unable to decode config: %v", err) diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index 9eec25c4595..78068ebf424 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -30,22 +30,23 @@ import ( "k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod" ) -// Client is an interface to get etcd cluster related information -type Client interface { +// ClusterInterrogator is an interface to get etcd cluster related information +type ClusterInterrogator interface { GetStatus() (*clientv3.StatusResponse, error) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) HasTLS() bool + GetClusterStatus() ([]*clientv3.StatusResponse, error) } -// GenericClient is a common etcd client for supported etcd servers -type GenericClient struct { +// Client provides connection parameters for an etcd cluster +type Client struct { Endpoints []string - TLSConfig *tls.Config + TLS *tls.Config } // HasTLS returns true if etcd is configured for TLS -func (c GenericClient) HasTLS() bool { - return c.TLSConfig != nil +func (c Client) HasTLS() bool { + return c.TLS != nil } // PodManifestsHaveTLS reads the etcd staticpod manifest from disk and returns false if the TLS flags @@ -84,12 +85,12 @@ FlagLoop: } // GetStatus gets server status -func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) { +func (c Client) GetStatus() (*clientv3.StatusResponse, error) { const dialTimeout = 5 * time.Second cli, err := clientv3.New(clientv3.Config{ Endpoints: c.Endpoints, DialTimeout: dialTimeout, - TLS: c.TLSConfig, + TLS: c.TLS, }) if err != nil { return nil, err @@ -107,7 +108,7 @@ func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) { } // WaitForStatus returns a StatusResponse after an initial delay and retry attempts -func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { +func (c Client) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay) time.Sleep(delay) for i := 0; i < retries; i++ { @@ -131,39 +132,63 @@ func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInte return nil, fmt.Errorf("timeout waiting for etcd cluster status") } -// NewClient creates a new EtcdCluster client -func NewClient(endpoints []string, caFile string, certFile string, keyFile string) (*GenericClient, error) { - client := GenericClient{Endpoints: endpoints} +// New creates a new EtcdCluster client +func New(endpoints []string, ca, cert, key string) (*Client, error) { + client := Client{Endpoints: endpoints} - if caFile != "" || certFile != "" || keyFile != "" { + if ca != "" || cert != "" || key != "" { tlsInfo := transport.TLSInfo{ - CertFile: certFile, - KeyFile: keyFile, - TrustedCAFile: caFile, + CertFile: cert, + KeyFile: key, + TrustedCAFile: ca, } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { return nil, err } - client.TLSConfig = tlsConfig + client.TLS = tlsConfig } return &client, nil } -// NewStaticPodClient creates a GenericClient from the given endpoints, manifestDir, and certificatesDir -func NewStaticPodClient(endpoints []string, manifestDir string, certificatesDir string) (*GenericClient, error) { +// NewFromStaticPod creates a GenericClient from the given endpoints, manifestDir, and certificatesDir +func NewFromStaticPod(endpoints []string, manifestDir string, certificatesDir string) (*Client, error) { hasTLS, err := PodManifestsHaveTLS(manifestDir) if err != nil { return nil, fmt.Errorf("could not read manifests from: %s, error: %v", manifestDir, err) } if hasTLS { - return NewClient( + return New( endpoints, filepath.Join(certificatesDir, constants.EtcdCACertName), filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName), filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName), ) } - return NewClient(endpoints, "", "", "") + return New(endpoints, "", "", "") +} + +// GetClusterStatus returns nil for status Up or error for status Down +func (c Client) GetClusterStatus() ([]*clientv3.StatusResponse, error) { + + var resp []*clientv3.StatusResponse + for _, ep := range c.Endpoints { + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + DialTimeout: 5 * time.Second, + TLS: c.TLS, + }) + if err != nil { + return nil, err + } + defer cli.Close() + + r, err := cli.Status(context.Background(), ep) + if err != nil { + return nil, err + } + resp = append(resp, r) + } + return resp, nil }