diff --git a/cmd/kubeadm/app/phases/etcd/local.go b/cmd/kubeadm/app/phases/etcd/local.go index 144cda6cdaa..dd62f78e9a3 100644 --- a/cmd/kubeadm/app/phases/etcd/local.go +++ b/cmd/kubeadm/app/phases/etcd/local.go @@ -144,42 +144,19 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest etcdPeerAddress := etcdutil.GetPeerURL(endpoint) - klog.V(1).Infoln("[etcd] Getting the list of existing members") - initialCluster, err := etcdClient.ListMembers() + klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress) + var cluster []etcdutil.Member + cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress) if err != nil { return err } - // only add the new member if it doesn't already exists - var exists bool - klog.V(1).Infof("[etcd] Checking if the etcd member already exists: %s", etcdPeerAddress) - for i := range initialCluster { - if initialCluster[i].PeerURL == etcdPeerAddress { - exists = true - if len(initialCluster[i].Name) == 0 { - klog.V(1).Infof("[etcd] etcd member name is empty. Setting it to the node name: %s", nodeName) - initialCluster[i].Name = nodeName - } - break - } - } - - if exists { - klog.V(1).Infof("[etcd] Etcd member already exists: %s", endpoint) - } else { - klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress) - initialCluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress) - if err != nil { - return err - } - - fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster") - klog.V(1).Infof("Updated etcd member list: %v", initialCluster) - } + fmt.Println("[etcd] Announced new etcd member joining to the existing etcd cluster") + klog.V(1).Infof("Updated etcd member list: %v", cluster) fmt.Printf("[etcd] Creating static Pod manifest for %q\n", kubeadmconstants.Etcd) - if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, initialCluster, isDryRun); err != nil { + if err := prepareAndWriteEtcdStaticPod(manifestDir, patchesDir, cfg, endpoint, nodeName, cluster, isDryRun); err != nil { return err } diff --git a/cmd/kubeadm/app/util/etcd/etcd.go b/cmd/kubeadm/app/util/etcd/etcd.go index f6e7391e6a9..56d201ffad7 100644 --- a/cmd/kubeadm/app/util/etcd/etcd.go +++ b/cmd/kubeadm/app/util/etcd/etcd.go @@ -36,6 +36,8 @@ import ( "k8s.io/klog/v2" "github.com/pkg/errors" + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/transport" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" @@ -339,7 +341,9 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) { return ret, nil } -// AddMember notifies an existing etcd cluster that a new member is joining +// AddMember notifies an existing etcd cluster that a new member is joining, and +// return the updated list of members. If the member has already been added to the +// cluster, this will return the existing list of etcd members. func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { // Parse the peer address, required to add the client URL later to the list // of endpoints for this client. Parsing as a first operation to make sure that @@ -350,8 +354,10 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { } // Adds a new member to the cluster - var lastError error - var resp *clientv3.MemberAddResponse + var ( + lastError error + respMembers []*etcdserverpb.Member + ) err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: c.Endpoints, @@ -368,11 +374,26 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { defer cli.Close() ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) + defer cancel() + var resp *clientv3.MemberAddResponse resp, err = cli.MemberAdd(ctx, []string{peerAddrs}) - cancel() if err == nil { + respMembers = resp.Members return true, nil } + + // If the error indicates that the peer already exists, exit early. In this situation, resp is nil, so + // call out to MemberList to fetch all the members before returning. + if errors.Is(err, rpctypes.ErrPeerURLExist) { + klog.V(5).Info("The peer URL for the added etcd member already exists. Fetching the existing etcd members") + var listResp *clientv3.MemberListResponse + listResp, err = cli.MemberList(ctx) + if err == nil { + respMembers = listResp.Members + return true, nil + } + } + klog.V(5).Infof("Failed to add etcd member: %v", err) lastError = err return false, nil @@ -383,7 +404,7 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) { // Returns the updated list of etcd members ret := []Member{} - for _, m := range resp.Members { + for _, m := range respMembers { // If the peer address matches, this is the member we are adding. // Use the name we passed to the function. if peerAddrs == m.PeerURLs[0] { diff --git a/go.mod b/go.mod index ba376d9f3e9..99c2741fee1 100644 --- a/go.mod +++ b/go.mod @@ -80,6 +80,7 @@ require ( github.com/stretchr/testify v1.7.0 github.com/vishvananda/netlink v1.1.0 github.com/vmware/govmomi v0.20.3 + go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 go.opentelemetry.io/otel/sdk v0.20.0 diff --git a/vendor/modules.txt b/vendor/modules.txt index bf835b9a062..0c534758957 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -772,6 +772,7 @@ github.com/xlab/treeprint # go.etcd.io/bbolt v1.3.6 => go.etcd.io/bbolt v1.3.6 go.etcd.io/bbolt # go.etcd.io/etcd/api/v3 v3.5.0 => go.etcd.io/etcd/api/v3 v3.5.0 +## explicit go.etcd.io/etcd/api/v3/authpb go.etcd.io/etcd/api/v3/etcdserverpb go.etcd.io/etcd/api/v3/etcdserverpb/gw