kubeadm - fix external etcd upgrades

- Update upgrade plan output when configured for external etcd
  - Move etcd to a separate section and show available upgrades
This commit is contained in:
Jason DeTiberus 2018-05-11 16:20:03 -04:00
parent ac1e940c75
commit f40b7f389e
No known key found for this signature in database
GPG Key ID: CBD7D7A4B41437BC
7 changed files with 291 additions and 198 deletions

View File

@ -124,13 +124,13 @@ func RunPlan(flags *planFlags) error {
} }
// Tell the user which upgrades are available // Tell the user which upgrades are available
printAvailableUpgrades(availUpgrades, os.Stdout, upgradeVars.cfg.FeatureGates) printAvailableUpgrades(availUpgrades, os.Stdout, upgradeVars.cfg.FeatureGates, isExternalEtcd)
return nil return nil
} }
// printAvailableUpgrades prints a UX-friendly overview of what versions are available to upgrade to // printAvailableUpgrades prints a UX-friendly overview of what versions are available to upgrade to
// TODO look into columnize or some other formatter when time permits instead of using the tabwriter // TODO look into columnize or some other formatter when time permits instead of using the tabwriter
func printAvailableUpgrades(upgrades []upgrade.Upgrade, w io.Writer, featureGates map[string]bool) { func printAvailableUpgrades(upgrades []upgrade.Upgrade, w io.Writer, featureGates map[string]bool, isExternalEtcd bool) {
// Return quickly if no upgrades can be made // Return quickly if no upgrades can be made
if len(upgrades) == 0 { if len(upgrades) == 0 {
@ -143,6 +143,16 @@ func printAvailableUpgrades(upgrades []upgrade.Upgrade, w io.Writer, featureGate
// Loop through the upgrade possibilities and output text to the command line // Loop through the upgrade possibilities and output text to the command line
for _, upgrade := range upgrades { for _, upgrade := range upgrades {
if isExternalEtcd && upgrade.CanUpgradeEtcd() {
fmt.Fprintln(w, "External components that should be upgraded manually before you upgrade the control plane with 'kubeadm upgrade apply':")
fmt.Fprintln(tabw, "COMPONENT\tCURRENT\tAVAILABLE")
fmt.Fprintf(tabw, "Etcd\t%s\t%s\n", upgrade.Before.EtcdVersion, upgrade.After.EtcdVersion)
// We should flush the writer here at this stage; as the columns will now be of the right size, adjusted to the above content
tabw.Flush()
fmt.Fprintln(w, "")
}
if upgrade.CanUpgradeKubelets() { if upgrade.CanUpgradeKubelets() {
fmt.Fprintln(w, "Components that must be upgraded manually after you have upgraded the control plane with 'kubeadm upgrade apply':") fmt.Fprintln(w, "Components that must be upgraded manually after you have upgraded the control plane with 'kubeadm upgrade apply':")
fmt.Fprintln(tabw, "COMPONENT\tCURRENT\tAVAILABLE") fmt.Fprintln(tabw, "COMPONENT\tCURRENT\tAVAILABLE")
@ -177,7 +187,9 @@ func printAvailableUpgrades(upgrades []upgrade.Upgrade, w io.Writer, featureGate
} else { } else {
fmt.Fprintf(tabw, "Kube DNS\t%s\t%s\n", upgrade.Before.DNSVersion, upgrade.After.DNSVersion) fmt.Fprintf(tabw, "Kube DNS\t%s\t%s\n", upgrade.Before.DNSVersion, upgrade.After.DNSVersion)
} }
fmt.Fprintf(tabw, "Etcd\t%s\t%s\n", upgrade.Before.EtcdVersion, upgrade.After.EtcdVersion) if !isExternalEtcd {
fmt.Fprintf(tabw, "Etcd\t%s\t%s\n", upgrade.Before.EtcdVersion, upgrade.After.EtcdVersion)
}
// The tabwriter should be flushed at this stage as we have now put in all the required content for this time. This is required for the tabs' size to be correct. // The tabwriter should be flushed at this stage as we have now put in all the required content for this time. This is required for the tabs' size to be correct.
tabw.Flush() tabw.Flush()

View File

@ -62,16 +62,27 @@ func TestSortedSliceFromStringIntMap(t *testing.T) {
func TestPrintAvailableUpgrades(t *testing.T) { func TestPrintAvailableUpgrades(t *testing.T) {
featureGates := make(map[string]bool) featureGates := make(map[string]bool)
var tests = []struct { var tests = []struct {
name string
upgrades []upgrade.Upgrade upgrades []upgrade.Upgrade
buf *bytes.Buffer buf *bytes.Buffer
expectedBytes []byte expectedBytes []byte
externalEtcd bool
}{ }{
{ {
name: "Up to date",
upgrades: []upgrade.Upgrade{}, upgrades: []upgrade.Upgrade{},
expectedBytes: []byte(`Awesome, you're up-to-date! Enjoy! expectedBytes: []byte(`Awesome, you're up-to-date! Enjoy!
`), `),
}, },
{ {
name: "Up to date external etcd",
externalEtcd: true,
upgrades: []upgrade.Upgrade{},
expectedBytes: []byte(`Awesome, you're up-to-date! Enjoy!
`),
},
{
name: "Patch version available",
upgrades: []upgrade.Upgrade{ upgrades: []upgrade.Upgrade{
{ {
Description: "version in the v1.8 series", Description: "version in the v1.8 series",
@ -117,6 +128,7 @@ _____________________________________________________________________
`), `),
}, },
{ {
name: "minor version available",
upgrades: []upgrade.Upgrade{ upgrades: []upgrade.Upgrade{
{ {
Description: "stable version", Description: "stable version",
@ -160,6 +172,7 @@ _____________________________________________________________________
`), `),
}, },
{ {
name: "patch and minor version available",
upgrades: []upgrade.Upgrade{ upgrades: []upgrade.Upgrade{
{ {
Description: "version in the v1.8 series", Description: "version in the v1.8 series",
@ -243,6 +256,7 @@ _____________________________________________________________________
`), `),
}, },
{ {
name: "experimental version available",
upgrades: []upgrade.Upgrade{ upgrades: []upgrade.Upgrade{
{ {
Description: "experimental version", Description: "experimental version",
@ -288,6 +302,7 @@ _____________________________________________________________________
`), `),
}, },
{ {
name: "release candidate available",
upgrades: []upgrade.Upgrade{ upgrades: []upgrade.Upgrade{
{ {
Description: "release candidate version", Description: "release candidate version",
@ -333,6 +348,7 @@ _____________________________________________________________________
`), `),
}, },
{ {
name: "multiple kubelet versions",
upgrades: []upgrade.Upgrade{ upgrades: []upgrade.Upgrade{
{ {
Description: "version in the v1.9 series", Description: "version in the v1.9 series",
@ -377,19 +393,71 @@ Note: Before you can perform this upgrade, you have to update kubeadm to v1.9.3.
_____________________________________________________________________ _____________________________________________________________________
`),
},
{
name: "external etcd upgrade available",
upgrades: []upgrade.Upgrade{
{
Description: "version in the v1.9 series",
Before: upgrade.ClusterState{
KubeVersion: "v1.9.2",
KubeletVersions: map[string]uint16{
"v1.9.2": 1,
},
KubeadmVersion: "v1.9.2",
DNSVersion: "1.14.5",
EtcdVersion: "3.0.17",
},
After: upgrade.ClusterState{
KubeVersion: "v1.9.3",
KubeadmVersion: "v1.9.3",
DNSVersion: "1.14.8",
EtcdVersion: "3.1.12",
},
},
},
externalEtcd: true,
expectedBytes: []byte(`External components that should be upgraded manually before you upgrade the control plane with 'kubeadm upgrade apply':
COMPONENT CURRENT AVAILABLE
Etcd 3.0.17 3.1.12
Components that must be upgraded manually after you have upgraded the control plane with 'kubeadm upgrade apply':
COMPONENT CURRENT AVAILABLE
Kubelet 1 x v1.9.2 v1.9.3
Upgrade to the latest version in the v1.9 series:
COMPONENT CURRENT AVAILABLE
API Server v1.9.2 v1.9.3
Controller Manager v1.9.2 v1.9.3
Scheduler v1.9.2 v1.9.3
Kube Proxy v1.9.2 v1.9.3
Kube DNS 1.14.5 1.14.8
You can now apply the upgrade by executing the following command:
kubeadm upgrade apply v1.9.3
Note: Before you can perform this upgrade, you have to update kubeadm to v1.9.3.
_____________________________________________________________________
`), `),
}, },
} }
for _, rt := range tests { for _, rt := range tests {
rt.buf = bytes.NewBufferString("") t.Run(rt.name, func(t *testing.T) {
printAvailableUpgrades(rt.upgrades, rt.buf, featureGates) rt.buf = bytes.NewBufferString("")
actualBytes := rt.buf.Bytes() printAvailableUpgrades(rt.upgrades, rt.buf, featureGates, rt.externalEtcd)
if !bytes.Equal(actualBytes, rt.expectedBytes) { actualBytes := rt.buf.Bytes()
t.Errorf( if !bytes.Equal(actualBytes, rt.expectedBytes) {
"failed PrintAvailableUpgrades:\n\texpected: %q\n\t actual: %q", t.Errorf(
string(rt.expectedBytes), "failed PrintAvailableUpgrades:\n\texpected: %q\n\t actual: %q",
string(actualBytes), string(rt.expectedBytes),
) string(actualBytes),
} )
}
})
} }
} }

View File

@ -50,6 +50,11 @@ func (u *Upgrade) CanUpgradeKubelets() bool {
return !sameVersionFound return !sameVersionFound
} }
// CanUpgradeEtcd returns whether an upgrade of etcd is possible
func (u *Upgrade) CanUpgradeEtcd() bool {
return u.Before.EtcdVersion != u.After.EtcdVersion
}
// ActiveDNSAddon returns the version of CoreDNS or kube-dns // ActiveDNSAddon returns the version of CoreDNS or kube-dns
func ActiveDNSAddon(featureGates map[string]bool) string { func ActiveDNSAddon(featureGates map[string]bool) string {
if features.Enabled(featureGates, features.CoreDNS) { if features.Enabled(featureGates, features.CoreDNS) {
@ -83,13 +88,13 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA
// Get the cluster version // Get the cluster version
clusterVersionStr, clusterVersion, err := versionGetterImpl.ClusterVersion() clusterVersionStr, clusterVersion, err := versionGetterImpl.ClusterVersion()
if err != nil { if err != nil {
return nil, err return upgrades, err
} }
// Get current kubeadm CLI version // Get current kubeadm CLI version
kubeadmVersionStr, kubeadmVersion, err := versionGetterImpl.KubeadmVersion() kubeadmVersionStr, kubeadmVersion, err := versionGetterImpl.KubeadmVersion()
if err != nil { if err != nil {
return nil, err return upgrades, err
} }
// Get and output the current latest stable version // Get and output the current latest stable version
@ -103,13 +108,13 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA
// Get the kubelet versions in the cluster // Get the kubelet versions in the cluster
kubeletVersions, err := versionGetterImpl.KubeletVersions() kubeletVersions, err := versionGetterImpl.KubeletVersions()
if err != nil { if err != nil {
return nil, err return upgrades, err
} }
// Get current etcd version // Get current etcd version
etcdStatus, err := etcdClient.GetStatus() etcdVersion, err := etcdClient.GetVersion()
if err != nil { if err != nil {
return nil, err return upgrades, err
} }
// Construct a descriptor for the current state of the world // Construct a descriptor for the current state of the world
@ -118,7 +123,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA
DNSVersion: dns.GetDNSVersion(clusterVersion, ActiveDNSAddon(featureGates)), DNSVersion: dns.GetDNSVersion(clusterVersion, ActiveDNSAddon(featureGates)),
KubeadmVersion: kubeadmVersionStr, KubeadmVersion: kubeadmVersionStr,
KubeletVersions: kubeletVersions, KubeletVersions: kubeletVersions,
EtcdVersion: etcdStatus.Version, EtcdVersion: etcdVersion,
} }
// Do a "dumb guess" that a new minor upgrade is available just because the latest stable version is higher than the cluster version // Do a "dumb guess" that a new minor upgrade is available just because the latest stable version is higher than the cluster version
@ -201,7 +206,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA
// Get and output the current latest unstable version // Get and output the current latest unstable version
latestVersionStr, latestVersion, err := versionGetterImpl.VersionFromCILabel("latest", "experimental version") latestVersionStr, latestVersion, err := versionGetterImpl.VersionFromCILabel("latest", "experimental version")
if err != nil { if err != nil {
return nil, err return upgrades, err
} }
minorUnstable := latestVersion.Components()[1] minorUnstable := latestVersion.Components()[1]
@ -209,7 +214,7 @@ func GetAvailableUpgrades(versionGetterImpl VersionGetter, experimentalUpgradesA
previousBranch := fmt.Sprintf("latest-1.%d", minorUnstable-1) previousBranch := fmt.Sprintf("latest-1.%d", minorUnstable-1)
previousBranchLatestVersionStr, previousBranchLatestVersion, err := versionGetterImpl.VersionFromCILabel(previousBranch, "") previousBranchLatestVersionStr, previousBranchLatestVersion, err := versionGetterImpl.VersionFromCILabel(previousBranch, "")
if err != nil { if err != nil {
return nil, err return upgrades, err
} }
// If that previous latest version is an RC, RCs are allowed and the cluster version is lower than the RC version, show the upgrade // If that previous latest version is an RC, RCs are allowed and the cluster version is lower than the RC version, show the upgrade

View File

@ -64,71 +64,44 @@ func (f *fakeVersionGetter) KubeletVersions() (map[string]uint16, error) {
}, nil }, nil
} }
type fakeEtcdClient struct{ TLS bool } type fakeEtcdClient struct {
TLS bool
mismatchedVersions bool
}
func (f fakeEtcdClient) HasTLS() bool { return f.TLS } func (f fakeEtcdClient) HasTLS() bool { return f.TLS }
func (f fakeEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { func (f fakeEtcdClient) ClusterAvailable() (bool, error) { return true, nil }
// clusterStatus, err := f.GetClusterStatus()
// return clusterStatus[0], err func (f fakeEtcdClient) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) {
return &clientv3.StatusResponse{ return true, nil
Version: "3.1.12", }
func (f fakeEtcdClient) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) {
return make(map[string]*clientv3.StatusResponse), nil
}
func (f fakeEtcdClient) GetVersion() (string, error) {
versions, _ := f.GetClusterVersions()
if f.mismatchedVersions {
return "", fmt.Errorf("etcd cluster contains endpoints with mismatched versions: %v", versions)
}
return "3.1.12", nil
}
func (f fakeEtcdClient) GetClusterVersions() (map[string]string, error) {
if f.mismatchedVersions {
return map[string]string{
"foo": "3.1.12",
"bar": "3.2.0",
}, nil
}
return map[string]string{
"foo": "3.1.12",
"bar": "3.1.12",
}, nil }, nil
} }
func (f fakeEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
var responses []*clientv3.StatusResponse
responses = append(responses, &clientv3.StatusResponse{
Version: "3.1.12",
})
return responses, nil
}
func (f fakeEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return f.GetStatus()
}
type mismatchEtcdClient struct{}
func (f mismatchEtcdClient) HasTLS() bool { return true }
func (f mismatchEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
clusterStatus, err := f.GetClusterStatus()
return clusterStatus[0], err
}
func (f mismatchEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
return []*clientv3.StatusResponse{
&clientv3.StatusResponse{
Version: "3.1.12",
},
&clientv3.StatusResponse{
Version: "3.2.0",
},
}, nil
}
func (f mismatchEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return f.GetStatus()
}
type degradedEtcdClient struct{}
func (f degradedEtcdClient) HasTLS() bool { return true }
func (f degradedEtcdClient) GetStatus() (*clientv3.StatusResponse, error) {
return nil, fmt.Errorf("Degraded etcd cluster")
}
func (f degradedEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) {
var res []*clientv3.StatusResponse
return res, fmt.Errorf("Degraded etcd cluster")
}
func (f degradedEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
return f.GetStatus()
}
func TestGetAvailableUpgrades(t *testing.T) { func TestGetAvailableUpgrades(t *testing.T) {
featureGates := make(map[string]bool) featureGates := make(map[string]bool)
etcdClient := fakeEtcdClient{} etcdClient := fakeEtcdClient{}
@ -569,22 +542,7 @@ func TestGetAvailableUpgrades(t *testing.T) {
}, },
allowRCs: false, allowRCs: false,
allowExperimental: false, allowExperimental: false,
etcdClient: mismatchEtcdClient{}, etcdClient: fakeEtcdClient{mismatchedVersions: true},
expectedUpgrades: []Upgrade{},
errExpected: true,
},
{
name: "Upgrades with external etcd with a degraded status should not be allowed.",
vg: &fakeVersionGetter{
clusterVersion: "v1.9.3",
kubeletVersion: "v1.9.3",
kubeadmVersion: "v1.9.3",
stablePatchVersion: "v1.9.3",
stableVersion: "v1.9.3",
},
allowRCs: false,
allowExperimental: false,
etcdClient: degradedEtcdClient{},
expectedUpgrades: []Upgrade{}, expectedUpgrades: []Upgrade{},
errExpected: true, errExpected: true,
}, },
@ -624,12 +582,16 @@ func TestGetAvailableUpgrades(t *testing.T) {
for _, rt := range tests { for _, rt := range tests {
t.Run(rt.name, func(t *testing.T) { t.Run(rt.name, func(t *testing.T) {
actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, rt.etcdClient, featureGates) actualUpgrades, actualErr := GetAvailableUpgrades(rt.vg, rt.allowExperimental, rt.allowRCs, rt.etcdClient, featureGates)
fmt.Printf("actualErr: %v\n", actualErr)
fmt.Printf("actualErr != nil: %v\n", actualErr != nil)
fmt.Printf("errExpected: %v\n", rt.errExpected)
if (actualErr != nil) != rt.errExpected {
fmt.Printf("Hello error")
t.Errorf("failed TestGetAvailableUpgrades\n\texpected error: %t\n\tgot error: %t", rt.errExpected, (actualErr != nil))
}
if !reflect.DeepEqual(actualUpgrades, rt.expectedUpgrades) { if !reflect.DeepEqual(actualUpgrades, rt.expectedUpgrades) {
t.Errorf("failed TestGetAvailableUpgrades\n\texpected upgrades: %v\n\tgot: %v", rt.expectedUpgrades, actualUpgrades) t.Errorf("failed TestGetAvailableUpgrades\n\texpected upgrades: %v\n\tgot: %v", rt.expectedUpgrades, actualUpgrades)
} }
if (actualErr != nil) != rt.errExpected {
t.Errorf("failed TestGetAvailableUpgrades\n\texpected error: %t\n\tgot error: %t", rt.errExpected, (actualErr != nil))
}
}) })
} }
} }

View File

@ -231,7 +231,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
} }
// Checking health state of etcd before proceeding with the upgrade // Checking health state of etcd before proceeding with the upgrade
etcdStatus, err := oldEtcdClient.GetStatus() _, err := oldEtcdClient.GetClusterStatus()
if err != nil { if err != nil {
return true, fmt.Errorf("etcd cluster is not healthy: %v", err) return true, fmt.Errorf("etcd cluster is not healthy: %v", err)
} }
@ -248,9 +248,13 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
if err != nil { if err != nil {
return true, fmt.Errorf("failed to retrieve an etcd version for the target kubernetes version: %v", err) return true, fmt.Errorf("failed to retrieve an etcd version for the target kubernetes version: %v", err)
} }
currentEtcdVersion, err := version.ParseSemantic(etcdStatus.Version) currentEtcdVersionStr, err := oldEtcdClient.GetVersion()
if err != nil { if err != nil {
return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", etcdStatus.Version, err) return true, fmt.Errorf("failed to retrieve the current etcd version: %v", err)
}
currentEtcdVersion, err := version.ParseSemantic(currentEtcdVersionStr)
if err != nil {
return true, fmt.Errorf("failed to parse the current etcd version(%s): %v", currentEtcdVersionStr, err)
} }
// Comparing current etcd version with desired to catch the same version or downgrade condition and fail on them. // Comparing current etcd version with desired to catch the same version or downgrade condition and fail on them.
@ -292,7 +296,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
// Since upgrade component failed, the old etcd manifest has either been restored or was never touched // Since upgrade component failed, the old etcd manifest has either been restored or was never touched
// Now we need to check the health of etcd cluster if it is up with old manifest // Now we need to check the health of etcd cluster if it is up with old manifest
fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available") fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available")
if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil { if _, err := oldEtcdClient.WaitForClusterAvailable(noDelay, retries, retryInterval); err != nil {
fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err) fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err)
// At this point we know that etcd cluster is dead and it is safe to copy backup datastore and to rollback old etcd manifest // At this point we know that etcd cluster is dead and it is safe to copy backup datastore and to rollback old etcd manifest
@ -305,7 +309,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
// Now that we've rolled back the data, let's check if the cluster comes up // Now that we've rolled back the data, let's check if the cluster comes up
fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available") fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available")
if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil { if _, err := oldEtcdClient.WaitForClusterAvailable(noDelay, retries, retryInterval); err != nil {
fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err) fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err)
// Nothing else left to try to recover etcd cluster // Nothing else left to try to recover etcd cluster
return true, fmt.Errorf("fatal error rolling back local etcd cluster manifest: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) return true, fmt.Errorf("fatal error rolling back local etcd cluster manifest: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)
@ -334,7 +338,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
// Checking health state of etcd after the upgrade // Checking health state of etcd after the upgrade
fmt.Println("[upgrade/etcd] Waiting for etcd to become available") fmt.Println("[upgrade/etcd] Waiting for etcd to become available")
if _, err = newEtcdClient.WaitForStatus(podRestartDelay, retries, retryInterval); err != nil { if _, err = newEtcdClient.WaitForClusterAvailable(podRestartDelay, retries, retryInterval); err != nil {
fmt.Printf("[upgrade/etcd] Failed to healthcheck etcd: %v\n", err) fmt.Printf("[upgrade/etcd] Failed to healthcheck etcd: %v\n", err)
// Despite the fact that upgradeComponent was successful, there is something wrong with the etcd cluster // Despite the fact that upgradeComponent was successful, there is something wrong with the etcd cluster
// First step is to restore back up of datastore // First step is to restore back up of datastore
@ -352,7 +356,7 @@ func performEtcdStaticPodUpgrade(waiter apiclient.Waiter, pathMgr StaticPodPathM
// Assuming rollback of the old etcd manifest was successful, check the status of etcd cluster again // Assuming rollback of the old etcd manifest was successful, check the status of etcd cluster again
fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available") fmt.Println("[upgrade/etcd] Waiting for previous etcd to become available")
if _, err := oldEtcdClient.WaitForStatus(noDelay, retries, retryInterval); err != nil { if _, err := oldEtcdClient.WaitForClusterAvailable(noDelay, retries, retryInterval); err != nil {
fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err) fmt.Printf("[upgrade/etcd] Failed to healthcheck previous etcd: %v\n", err)
// Nothing else left to try to recover etcd cluster // Nothing else left to try to recover etcd cluster
return true, fmt.Errorf("fatal error rolling back local etcd cluster manifest: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir) return true, fmt.Errorf("fatal error rolling back local etcd cluster manifest: %v, the backup of etcd database is stored here:(%s)", err, backupEtcdDir)

View File

@ -30,7 +30,7 @@ import (
"github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/transport"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm" kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
kubeadmapiext "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1" kubeadmapiv1alpha1 "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm/v1alpha1"
"k8s.io/kubernetes/cmd/kubeadm/app/constants" "k8s.io/kubernetes/cmd/kubeadm/app/constants"
certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs" certsphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/certs"
controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane" controlplanephase "k8s.io/kubernetes/cmd/kubeadm/app/phases/controlplane"
@ -215,20 +215,27 @@ func (c fakeTLSEtcdClient) HasTLS() bool {
return c.TLS return c.TLS
} }
func (c fakeTLSEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { func (c fakeTLSEtcdClient) ClusterAvailable() (bool, error) { return true, nil }
clusterStatus, err := c.GetClusterStatus()
return clusterStatus[0], err func (c fakeTLSEtcdClient) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) {
return true, nil
} }
func (c fakeTLSEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { func (c fakeTLSEtcdClient) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) {
client := &clientv3.StatusResponse{ return map[string]*clientv3.StatusResponse{
Version: "3.1.12", "foo": {
} Version: "3.1.12",
return []*clientv3.StatusResponse{client}, nil }}, nil
} }
func (c fakeTLSEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { func (c fakeTLSEtcdClient) GetClusterVersions() (map[string]string, error) {
return c.GetStatus() return map[string]string{
"foo": "3.1.12",
}, nil
}
func (c fakeTLSEtcdClient) GetVersion() (string, error) {
return "3.1.12", nil
} }
type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string } type fakePodManifestEtcdClient struct{ ManifestDir, CertificatesDir string }
@ -238,12 +245,13 @@ func (c fakePodManifestEtcdClient) HasTLS() bool {
return hasTLS return hasTLS
} }
func (c fakePodManifestEtcdClient) GetStatus() (*clientv3.StatusResponse, error) { func (c fakePodManifestEtcdClient) ClusterAvailable() (bool, error) { return true, nil }
clusterStatus, err := c.GetClusterStatus()
return clusterStatus[0], err func (c fakePodManifestEtcdClient) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) {
return true, nil
} }
func (c fakePodManifestEtcdClient) GetClusterStatus() ([]*clientv3.StatusResponse, error) { func (c fakePodManifestEtcdClient) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) {
// Make sure the certificates generated from the upgrade are readable from disk // Make sure the certificates generated from the upgrade are readable from disk
tlsInfo := transport.TLSInfo{ tlsInfo := transport.TLSInfo{
CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName), CertFile: filepath.Join(c.CertificatesDir, constants.EtcdCACertName),
@ -255,13 +263,19 @@ func (c fakePodManifestEtcdClient) GetClusterStatus() ([]*clientv3.StatusRespons
return nil, err return nil, err
} }
client := &clientv3.StatusResponse{} return map[string]*clientv3.StatusResponse{
client.Version = "3.1.12" "foo": {Version: "3.1.12"},
return []*clientv3.StatusResponse{client}, nil }, nil
} }
func (c fakePodManifestEtcdClient) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) { func (c fakePodManifestEtcdClient) GetClusterVersions() (map[string]string, error) {
return c.GetStatus() return map[string]string{
"foo": "3.1.12",
}, nil
}
func (c fakePodManifestEtcdClient) GetVersion() (string, error) {
return "3.1.12", nil
} }
func TestStaticPodControlPlane(t *testing.T) { func TestStaticPodControlPlane(t *testing.T) {
@ -496,7 +510,7 @@ func getAPIServerHash(dir string) (string, error) {
} }
func getConfig(version, certsDir, etcdDataDir string) (*kubeadmapi.MasterConfiguration, error) { func getConfig(version, certsDir, etcdDataDir string) (*kubeadmapi.MasterConfiguration, error) {
externalcfg := &kubeadmapiext.MasterConfiguration{} externalcfg := &kubeadmapiv1alpha1.MasterConfiguration{}
internalcfg := &kubeadmapi.MasterConfiguration{} internalcfg := &kubeadmapi.MasterConfiguration{}
if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, etcdDataDir, version)), externalcfg); err != nil { if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), []byte(fmt.Sprintf(testConfiguration, certsDir, etcdDataDir, version)), externalcfg); err != nil {
return nil, fmt.Errorf("unable to decode config: %v", err) return nil, fmt.Errorf("unable to decode config: %v", err)

View File

@ -32,10 +32,12 @@ import (
// ClusterInterrogator is an interface to get etcd cluster related information // ClusterInterrogator is an interface to get etcd cluster related information
type ClusterInterrogator interface { type ClusterInterrogator interface {
GetStatus() (*clientv3.StatusResponse, error) ClusterAvailable() (bool, error)
WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) GetClusterStatus() (map[string]*clientv3.StatusResponse, error)
GetClusterVersions() (map[string]string, error)
GetVersion() (string, error)
HasTLS() bool HasTLS() bool
GetClusterStatus() ([]*clientv3.StatusResponse, error) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error)
} }
// Client provides connection parameters for an etcd cluster // Client provides connection parameters for an etcd cluster
@ -84,54 +86,6 @@ FlagLoop:
return true, nil return true, nil
} }
// GetStatus gets server status
func (c Client) GetStatus() (*clientv3.StatusResponse, error) {
const dialTimeout = 5 * time.Second
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: dialTimeout,
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := cli.Status(ctx, c.Endpoints[0])
cancel()
if err != nil {
return nil, err
}
return resp, nil
}
// WaitForStatus returns a StatusResponse after an initial delay and retry attempts
func (c Client) WaitForStatus(delay time.Duration, retries int, retryInterval time.Duration) (*clientv3.StatusResponse, error) {
fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay)
time.Sleep(delay)
for i := 0; i < retries; i++ {
if i > 0 {
fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval)
time.Sleep(retryInterval)
}
fmt.Printf("[util/etcd] Attempting to get etcd status %d/%d\n", i+1, retries)
resp, err := c.GetStatus()
if err != nil {
switch err {
case context.DeadlineExceeded:
fmt.Println("[util/etcd] Attempt timed out")
default:
fmt.Printf("[util/etcd] Attempt failed with error: %v\n", err)
}
continue
}
return resp, nil
}
return nil, fmt.Errorf("timeout waiting for etcd cluster status")
}
// New creates a new EtcdCluster client // New creates a new EtcdCluster client
func New(endpoints []string, ca, cert, key string) (*Client, error) { func New(endpoints []string, ca, cert, key string) (*Client, error) {
client := Client{Endpoints: endpoints} client := Client{Endpoints: endpoints}
@ -169,26 +123,100 @@ func NewFromStaticPod(endpoints []string, manifestDir string, certificatesDir st
return New(endpoints, "", "", "") return New(endpoints, "", "", "")
} }
// GetClusterStatus returns nil for status Up or error for status Down // GetVersion returns the etcd version of the cluster.
func (c Client) GetClusterStatus() ([]*clientv3.StatusResponse, error) { // An error is returned if the version of all endpoints do not match
func (c Client) GetVersion() (string, error) {
var clusterVersion string
var resp []*clientv3.StatusResponse versions, err := c.GetClusterVersions()
for _, ep := range c.Endpoints { if err != nil {
cli, err := clientv3.New(clientv3.Config{ return "", err
Endpoints: []string{ep},
DialTimeout: 5 * time.Second,
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
r, err := cli.Status(context.Background(), ep)
if err != nil {
return nil, err
}
resp = append(resp, r)
} }
return resp, nil for _, v := range versions {
if clusterVersion == "" {
// This is the first version we've seen
clusterVersion = v
} else if v != clusterVersion {
return "", fmt.Errorf("etcd cluster contains endpoints with mismatched versions: %v", versions)
} else {
clusterVersion = v
}
}
if clusterVersion == "" {
return "", fmt.Errorf("could not determine cluster etcd version")
}
return clusterVersion, nil
}
// GetClusterVersions returns a map of the endpoints and their associated versions
func (c Client) GetClusterVersions() (map[string]string, error) {
versions := make(map[string]string)
statuses, err := c.GetClusterStatus()
if err != nil {
return versions, err
}
for ep, status := range statuses {
versions[ep] = status.Version
}
return versions, nil
}
// ClusterAvailable returns true if the cluster status indicates the cluster is available.
func (c Client) ClusterAvailable() (bool, error) {
_, err := c.GetClusterStatus()
if err != nil {
return false, err
}
return true, nil
}
// GetClusterStatus returns nil for status Up or error for status Down
func (c Client) GetClusterStatus() (map[string]*clientv3.StatusResponse, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: 5 * time.Second,
TLS: c.TLS,
})
if err != nil {
return nil, err
}
defer cli.Close()
clusterStatus := make(map[string]*clientv3.StatusResponse)
for _, ep := range c.Endpoints {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := cli.Status(ctx, ep)
cancel()
if err != nil {
return nil, err
}
clusterStatus[ep] = resp
}
return clusterStatus, nil
}
// WaitForClusterAvailable returns true if all endpoints in the cluster are available after an initial delay and retry attempts, an error is returned otherwise
func (c Client) WaitForClusterAvailable(delay time.Duration, retries int, retryInterval time.Duration) (bool, error) {
fmt.Printf("[util/etcd] Waiting %v for initial delay\n", delay)
time.Sleep(delay)
for i := 0; i < retries; i++ {
if i > 0 {
fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval)
time.Sleep(retryInterval)
}
fmt.Printf("[util/etcd] Attempting to see if all cluster endpoints are available %d/%d\n", i+1, retries)
resp, err := c.ClusterAvailable()
if err != nil {
switch err {
case context.DeadlineExceeded:
fmt.Println("[util/etcd] Attempt timed out")
default:
fmt.Printf("[util/etcd] Attempt failed with error: %v\n", err)
}
continue
}
return resp, nil
}
return false, fmt.Errorf("timeout waiting for etcd cluster to be available")
} }