Support kubeadm upgrade with remote etcd cluster

Currently kubeadm only performs an upgrade if the etcd cluster is
colocated with the control plane node. As this is only one possible
configuration, kubeadm should support upgrades with etcd clusters
that are not local to the node.

Signed-off-by: Craig Tracey <craigtracey@gmail.com>
This commit is contained in:
Craig Tracey 2018-04-04 15:35:15 -04:00 committed by Jason DeTiberus
parent ab180d808e
commit ac1e940c75
No known key found for this signature in database
GPG Key ID: CBD7D7A4B41437BC
6 changed files with 201 additions and 51 deletions

View File

@ -89,14 +89,31 @@ func RunPlan(flags *planFlags) error {
return err
}
// Define Local Etcd cluster to be able to retrieve information
etcdClient, err := etcdutil.NewStaticPodClient(
[]string{"localhost:2379"},
constants.GetStaticPodDirectory(),
upgradeVars.cfg.CertificatesDir,
)
if err != nil {
return err
var etcdClient etcdutil.ClusterInterrogator
// Currently this is the only method we have for distinguishing
// external etcd vs static pod etcd
isExternalEtcd := len(upgradeVars.cfg.Etcd.Endpoints) > 0
if isExternalEtcd {
client, err := etcdutil.New(
upgradeVars.cfg.Etcd.Endpoints,
upgradeVars.cfg.Etcd.CAFile,
upgradeVars.cfg.Etcd.CertFile,
upgradeVars.cfg.Etcd.KeyFile)
if err != nil {
return err
}
etcdClient = client
} else {
client, err := etcdutil.NewFromStaticPod(
[]string{"localhost:2379"},
constants.GetStaticPodDirectory(),
upgradeVars.cfg.CertificatesDir,
)
if err != nil {
return err
}
etcdClient = client
}
// Compute which upgrade possibilities there are

View File

@ -74,7 +74,7 @@ type ClusterState struct {
// GetAvailableUpgrades fetches all versions from the specified VersionGetter and computes which
// kinds of upgrades can be performed
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.Client, featureGates map[string]bool) ([]Upgrade, error) {
func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesAllowed, rcUpgradesAllowed bool, etcdClient etcdutil.ClusterInterrogator, featureGates map[string]bool) ([]Upgrade, error) {
fmt.Println("[upgrade] Fetching available versions to upgrade to")
// Collect the upgrades kubeadm can do in this list

View File

@ -17,11 +17,13 @@ limitations under the License.
package upgrade
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
etcdutil "k8s.io/kubernetes/cmd/kubeadm/app/util/etcd"
versionutil "k8s.io/kubernetes/pkg/util/version"
)
@ -62,28 +64,81 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) {
}, nil
}
type fakeEtcdCluster struct{ TLS bool }
type fakeEtcdClient struct{ TLS bool }
func (f fakeEtcdCluster) HasTLS() bool { return f.TLS }
func (f fakeEtcdClient) HasTLS() bool { return f.TLS }
func (f fakeEtcdCluster) GetStatus() (*clientv3.StatusResponse, error) {
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil
func (f fakeEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
// clusterStatus, err := f.GetClusterStatus()
// return clusterStatus[0], err
return &clientv3.StatusResponse{
Version: "3.1.12",
}, nil
}
func (f fakeEtcdCluster) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
func (f fakeEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
var responses []*clientv3.StatusResponse
responses = append(responses, &clientv3.StatusResponse{
Version: "3.1.12",
})
return responses, nil
}
func (f fakeEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return f.GetStatus()
}
type mismatchEtcdClient struct{}
func (f mismatchEtcdClient) HasTLS() bool { return true }
func (f mismatchEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
clusterStatus, err := f.GetClusterStatus()
return clusterStatus[0], err
}
func (f mismatchEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
return []*clientv3.StatusResponse{
&clientv3.StatusResponse{
Version: "3.1.12",
},
&clientv3.StatusResponse{
Version: "3.2.0",
},
}, nil
}
func (f mismatchEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return f.GetStatus()
}
type degradedEtcdClient struct{}
func (f degradedEtcdClient) HasTLS() bool { return true }
func (f degradedEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
return nil, fmt.Errorf("Degraded etcd cluster")
}
func (f degradedEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
var res []*clientv3.StatusResponse
return res, fmt.Errorf("Degraded etcd cluster")
}
func (f degradedEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return f.GetStatus()
}
func TestGetAvailableUpgrades(t *testing.T) {
featureGates := make(map[string]bool)
etcdClient := fakeEtcdClient{}
tests := []struct {
name string
vg VersionGetter
expectedUpgrades []Upgrade
allowExperimental, allowRCs bool
errExpected bool
etcdClient etcdutil.ClusterInterrogator
}{
{
name: "no action needed, already up-to-date",
@ -98,6 +153,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
expectedUpgrades: []Upgrade{},
allowExperimental: false,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "simple patch version upgrade",
@ -131,6 +187,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowExperimental: false,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "no version provided to offline version getter does not change behavior",
@ -164,6 +221,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowExperimental: false,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "minor version upgrade only",
@ -197,6 +255,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowExperimental: false,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "both minor version upgrade and patch version upgrade available",
@ -248,6 +307,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowExperimental: false,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "allow experimental upgrades, but no upgrade available",
@ -263,6 +323,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
expectedUpgrades: []Upgrade{},
allowExperimental: true,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "upgrade to an unstable version should be supported",
@ -297,6 +358,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowExperimental: true,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "upgrade from an unstable version to an unstable version should be supported",
@ -331,6 +393,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowExperimental: true,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "v1.X.0-alpha.0 should be ignored",
@ -366,6 +429,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowExperimental: true,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "upgrade to an RC version should be supported",
@ -401,6 +465,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowRCs: true,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "it is possible (but very uncommon) that the latest version from the previous branch is an rc and the current latest version is alpha.0. In that case, show the RC",
@ -436,6 +501,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
},
allowExperimental: true,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "upgrade to an RC version should be supported. There may also be an even newer unstable version.",
@ -490,6 +556,37 @@ func TestGetAvailableUpgrades(t *testing.T) {
allowRCs: true,
allowExperimental: true,
errExpected: false,
etcdClient: etcdClient,
},
{
name: "Upgrades with external etcd with mismatched versions should not be allowed.",
vg: &fakeVersionGetter{
clusterVersion: "v1.9.3",
kubeletVersion: "v1.9.3",
kubeadmVersion: "v1.9.3",
stablePatchVersion: "v1.9.3",
stableVersion: "v1.9.3",
},
allowRCs: false,
allowExperimental: false,
etcdClient: mismatchEtcdClient{},
expectedUpgrades: []Upgrade{},
errExpected: true,
},
{
name: "Upgrades with external etcd with a degraded status should not be allowed.",
vg: &fakeVersionGetter{
clusterVersion: "v1.9.3",
kubeletVersion: "v1.9.3",
kubeadmVersion: "v1.9.3",
stablePatchVersion: "v1.9.3",
stableVersion: "v1.9.3",
},
allowRCs: false,
allowExperimental: false,
etcdClient: degradedEtcdClient{},
expectedUpgrades: []Upgrade{},
errExpected: true,
},
{
name: "offline version getter",
@ -498,6 +595,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
kubeletVersion: "v1.10.0",
kubeadmVersion: "v1.10.1",
}, "v1.11.1"),
etcdClient: etcdClient,
expectedUpgrades: []Upgrade{
{
Description: "version in the v1.1 series",
@ -523,10 +621,9 @@ func TestGetAvailableUpgrades(t *testing.T) {
// Instantiating a fake etcd cluster for being able to get etcd version for a corresponding
// kubernetes release.
testCluster := fakeEtcdCluster{}
for _, rt := range tests {
t.Run(rt.name, func(t *testing.T) {
actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, testCluster, featureGates)
actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, rt.etcdClient, featureGates)
if !reflect.DeepEqual(actualUpgrades, rt.expectedUpgrades) {
t.Errorf("failed TestGetAvailableUpgrades\n\texpected upgrades: %v\n\tgot: %v", rt.expectedUpgrades, actualUpgrades)
}

View File

@ -224,7 +224,7 @@ func upgradeComponent(component string, waiter apiclient.Waiter, pathMgr StaticP
}
// performEtcdStaticPodUpgrade performs upgrade of etcd, it returns bool which indicates fatal error or not and the actual error.
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) (bool, error) {
func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, recoverManifests map[string]string, isTLSUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) (bool, error) {
// Add etcd static pod spec only if external etcd is not configured
if len(cfg.Etcd.Endpoints) != 0 {
return false, fmt.Errorf("external etcd detected, won't try to change any etcd state")
@ -321,7 +321,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
// Initialize the new etcd client if it wasn't pre-initialized
if newEtcdClient == nil {
client, err := etcdutil.NewStaticPodClient(
client, err := etcdutil.NewFromStaticPod(
[]string{"localhost:2379"},
constants.GetStaticPodDirectory(),
cfg.CertificatesDir,
@ -367,7 +367,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
}
// StaticPodControlPlane upgrades a static pod-hosted control plane
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.Client) error {
func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager, cfg *kubeadmapi.MasterConfiguration, etcdUpgrade bool, oldEtcdClient, newEtcdClient etcdutil.ClusterInterrogator) error {
recoverManifests := map[string]string{}
var isTLSUpgrade bool
var isExternalEtcd bool
@ -381,7 +381,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
if len(cfg.Etcd.Endpoints) > 0 {
// External etcd
isExternalEtcd = true
client, err := etcdutil.NewClient(
client, err := etcdutil.New(
cfg.Etcd.Endpoints,
cfg.Etcd.CAFile,
cfg.Etcd.CertFile,
@ -397,7 +397,7 @@ func StaticPodControlPlane(waiter apiclient.Waiter, pathMgr StaticPodPathManager
}
} else {
// etcd Static Pod
client, err := etcdutil.NewStaticPodClient(
client, err := etcdutil.NewFromStaticPod(
[]string{"localhost:2379"},
constants.GetStaticPodDirectory(),
cfg.CertificatesDir,

View File

@ -30,7 +30,7 @@ import (
"github.com/coreos/etcd/pkg/transport"
"k8s.io/apimachinery/pkg/runtime"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmapiv1alpha1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
"k8s.io/kubernetes/cmd/kubeadm/app/constants"
certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs"
controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
@ -216,9 +216,15 @@ func (c fakeTLSEtcdClient) HasTLS() bool {
}
func (c fakeTLSEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil
clusterStatus, err := c.GetClusterStatus()
return clusterStatus[0], err
}
func (c fakeTLSEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
client := &clientv3.StatusResponse{
Version: "3.1.12",
}
return []*clientv3.StatusResponse{client}, nil
}
func (c fakeTLSEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
@ -233,6 +239,11 @@ func (c fakePodManifestEtcdClient) HasTLS() bool {
}
func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
clusterStatus, err := c.GetClusterStatus()
return clusterStatus[0], err
}
func (c fakePodManifestEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
// Make sure the certificates generated from the upgrade are readable from disk
tlsInfo := transport.TLSInfo{
CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName),
@ -246,7 +257,7 @@ func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error)
client := &clientv3.StatusResponse{}
client.Version = "3.1.12"
return client, nil
return []*clientv3.StatusResponse{client}, nil
}
func (c fakePodManifestEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
@ -485,7 +496,7 @@ func getAPIServerHash(dir string) (string, error) {
}
func getConfig(version, certsDir, etcdDataDir string) (*kubeadmapi.MasterConfiguration, error) {
externalcfg := &kubeadmapiv1alpha1.MasterConfiguration{}
externalcfg := &kubeadmapiext.MasterConfiguration{}
internalcfg := &kubeadmapi.MasterConfiguration{}
if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, etcdDataDir, version)), externalcfg); err != nil {
return nil, fmt.Errorf("unable to decode config: %v", err)

View File

@ -30,22 +30,23 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/util/staticpod"
)
// Client is an interface to get etcd cluster related information
type Client interface {
// ClusterInterrogator is an interface to get etcd cluster related information
type ClusterInterrogator interface {
GetStatus() (*clientv3.StatusResponse, error)
WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error)
HasTLS() bool
GetClusterStatus() ([]*clientv3.StatusResponse, error)
}
// GenericClient is a common etcd client for supported etcd servers
type GenericClient struct {
// Client provides connection parameters for an etcd cluster
type Client struct {
Endpoints []string
TLSConfig *tls.Config
TLS *tls.Config
}
// HasTLS returns true if etcd is configured for TLS
func (c GenericClient) HasTLS() bool {
return c.TLSConfig != nil
func (c Client) HasTLS() bool {
return c.TLS != nil
}
// PodManifestsHaveTLS reads the etcd staticpod manifest from disk and returns false if the TLS flags
@ -84,12 +85,12 @@ FlagLoop:
}
// GetStatus gets server status
func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) {
func (c Client) GetStatus() (*clientv3.StatusResponse, error) {
const dialTimeout = 5 * time.Second
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
TLS: c.TLSConfig,
TLS: c.TLS,
})
if err != nil {
return nil, err
@ -107,7 +108,7 @@ func (c GenericClient) GetStatus() (*clientv3.StatusResponse, error) {
}
// WaitForStatus returns a StatusResponse after an initial delay and retry attempts
func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
func (c Client) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay)
time.Sleep(delay)
for i := 0; i < retries; i++ {
@ -131,39 +132,63 @@ func (c GenericClient) WaitForStatus(delay time.Duration, retries int, retryInte
return nil, fmt.Errorf("timeout waiting for etcd cluster status")
}
// NewClient creates a new EtcdCluster client
func NewClient(endpoints []string, caFile string, certFile string, keyFile string) (*GenericClient, error) {
client := GenericClient{Endpoints: endpoints}
// New creates a new EtcdCluster client
func New(endpoints []string, ca, cert, key string) (*Client, error) {
client := Client{Endpoints: endpoints}
if caFile != "" || certFile != "" || keyFile != "" {
if ca != "" || cert != "" || key != "" {
tlsInfo := transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
TrustedCAFile: caFile,
CertFile: cert,
KeyFile: key,
TrustedCAFile: ca,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
client.TLSConfig = tlsConfig
client.TLS = tlsConfig
}
return &client, nil
}
// NewStaticPodClient creates a GenericClient from the given endpoints, manifestDir, and certificatesDir
func NewStaticPodClient(endpoints []string, manifestDir string, certificatesDir string) (*GenericClient, error) {
// NewFromStaticPod creates a GenericClient from the given endpoints, manifestDir, and certificatesDir
func NewFromStaticPod(endpoints []string, manifestDir string, certificatesDir string) (*Client, error) {
hasTLS, err := PodManifestsHaveTLS(manifestDir)
if err != nil {
return nil, fmt.Errorf("could not read manifests from: %s, error: %v", manifestDir, err)
}
if hasTLS {
return NewClient(
return New(
endpoints,
filepath.Join(certificatesDir, constants.EtcdCACertName),
filepath.Join(certificatesDir, constants.EtcdHealthcheckClientCertName),
filepath.Join(certificatesDir, constants.EtcdHealthcheckClientKeyName),
)
}
return NewClient(endpoints, "", "", "")
return New(endpoints, "", "", "")
}
// GetClusterStatus returns nil for status Up or error for status Down
func (c Client) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
var resp []*clientv3.StatusResponse
for _, ep := range c.Endpoints {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
DialTimeout: 5 * time.Second,
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
r, err := cli.Status(context.Background(), ep)
if err != nil {
return nil, err
}
resp = append(resp, r)
}
return resp, nil
}