Merge pull request #72984 from ereslibre/wait-for-etcd-when-growing

kubeadm: wait for the etcd cluster to be available when growing it
This commit is contained in:
Kubernetes Prow Robot 2019-01-19 23:57:46 -08:00 committed by GitHub
commit f2b133d7b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 9 deletions

View File

@ -21,6 +21,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
"k8s.io/klog" "k8s.io/klog"
@ -36,8 +37,10 @@ import (
) )
const ( const (
etcdVolumeName = "etcd-data" etcdVolumeName = "etcd-data"
certsVolumeName = "etcd-certs" certsVolumeName = "etcd-certs"
etcdHealthyCheckInterval = 5 * time.Second
etcdHealthyCheckRetries = 20
) )
// CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file. // CreateLocalEtcdStaticPodManifestFile will write local etcd static pod manifest file.
@ -61,13 +64,13 @@ func CreateLocalEtcdStaticPodManifestFile(manifestDir string, cfg *kubeadmapi.In
return err return err
} }
klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) klog.V(1).Infof("[etcd] wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
return nil return nil
} }
// CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member // CheckLocalEtcdClusterStatus verifies health state of local/stacked etcd cluster before installing a new etcd member
func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error { func CheckLocalEtcdClusterStatus(client clientset.Interface, cfg *kubeadmapi.InitConfiguration) error {
fmt.Println("[etcd] Checking Etcd cluster health") fmt.Println("[etcd] Checking etcd cluster health")
// creates an etcd client that connects to all the local/stacked etcd members // creates an etcd client that connects to all the local/stacked etcd members
klog.V(1).Info("creating etcd client that connects to etcd pods") klog.V(1).Info("creating etcd client that connects to etcd pods")
@ -120,7 +123,13 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest
return err return err
} }
fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd instance to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir)) fmt.Printf("[etcd] Wrote Static Pod manifest for a local etcd member to %q\n", kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestDir))
fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries)
if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil {
return err
}
return nil return nil
} }
@ -172,7 +181,7 @@ func getEtcdCommand(cfg *kubeadmapi.InitConfiguration, initialCluster []etcdutil
if len(initialCluster) == 0 { if len(initialCluster) == 0 {
defaultArguments["initial-cluster"] = fmt.Sprintf("%s=%s", cfg.GetNodeName(), etcdutil.GetPeerURL(cfg)) defaultArguments["initial-cluster"] = fmt.Sprintf("%s=%s", cfg.GetNodeName(), etcdutil.GetPeerURL(cfg))
} else { } else {
// NB. the joining etcd instance should be part of the initialCluster list // NB. the joining etcd member should be part of the initialCluster list
endpoints := []string{} endpoints := []string{}
for _, member := range initialCluster { for _, member := range initialCluster {
endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL)) endpoints = append(endpoints, fmt.Sprintf("%s=%s", member.Name, member.PeerURL))

View File

@ -21,6 +21,7 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"net/url"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
@ -73,7 +74,7 @@ func New(endpoints []string, ca, cert, key string) (*Client, error) {
return &client, nil return &client, nil
} }
// NewFromCluster creates an etcd client for the the etcd endpoints defined in the ClusterStatus value stored in // NewFromCluster creates an etcd client for the etcd endpoints defined in the ClusterStatus value stored in
// the kubeadm-config ConfigMap in kube-system namespace. // the kubeadm-config ConfigMap in kube-system namespace.
// Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check). // Once created, the client synchronizes client's endpoints with the known endpoints from the etcd membership API (reality check).
func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) { func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client, error) {
@ -146,7 +147,15 @@ type Member struct {
} }
// AddMember notifies an existing etcd cluster that a new member is joining // AddMember notifies an existing etcd cluster that a new member is joining
func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) { func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
// Parse the peer address, required to add the client URL later to the list
// of endpoints for this client. Parsing as a first operation to make sure that
// if this fails no member addition is performed on the etcd cluster.
parsedPeerAddrs, err := url.Parse(peerAddrs)
if err != nil {
return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
}
cli, err := clientv3.New(clientv3.Config{ cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints, Endpoints: c.Endpoints,
DialTimeout: 20 * time.Second, DialTimeout: 20 * time.Second,
@ -176,6 +185,9 @@ func (c Client) AddMember(name string, peerAddrs string) ([]Member, error) {
} }
} }
// Add the new member client address to the list of endpoints
c.Endpoints = append(c.Endpoints, GetClientURLByIP(parsedPeerAddrs.Hostname()))
return ret, nil return ret, nil
} }
@ -255,7 +267,7 @@ func (c Client) WaitForClusterAvailable(retries int, retryInterval time.Duration
fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval) fmt.Printf("[util/etcd] Waiting %v until next retry\n", retryInterval)
time.Sleep(retryInterval) time.Sleep(retryInterval)
} }
fmt.Printf("[util/etcd] Attempting to see if all cluster endpoints are available %d/%d\n", i+1, retries) klog.V(2).Infof("attempting to see if all cluster endpoints (%s) are available %d/%d", c.Endpoints, i+1, retries)
resp, err := c.ClusterAvailable() resp, err := c.ClusterAvailable()
if err != nil { if err != nil {
switch err { switch err {