Merge pull request #115038 from mercedes-benz/tobiasgiese/kubeadmfix-etcd-learner-join

kubeadm: fix etcd learner join
This commit is contained in:
Kubernetes Prow Robot 2023-02-06 02:09:01 -08:00 committed by GitHub
commit 561a35f358
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 99 additions and 33 deletions

View File

@ -150,7 +150,7 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest
} else { } else {
klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress) klog.V(1).Infof("[etcd] Adding etcd member: %s", etcdPeerAddress)
if features.Enabled(cfg.FeatureGates, features.EtcdLearnerMode) { if features.Enabled(cfg.FeatureGates, features.EtcdLearnerMode) {
cluster, err = etcdClient.AddMemberAsLeanerAndPromote(nodeName, etcdPeerAddress) cluster, err = etcdClient.AddMemberAsLearner(nodeName, etcdPeerAddress)
} else { } else {
cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress) cluster, err = etcdClient.AddMember(nodeName, etcdPeerAddress)
} }
@ -172,6 +172,17 @@ func CreateStackedEtcdStaticPodManifestFile(client clientset.Interface, manifest
return nil return nil
} }
if features.Enabled(cfg.FeatureGates, features.EtcdLearnerMode) {
learnerID, err := etcdClient.GetMemberID(etcdPeerAddress)
if err != nil {
return err
}
err = etcdClient.MemberPromote(learnerID)
if err != nil {
return err
}
}
fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries) fmt.Printf("[etcd] Waiting for the new etcd member to join the cluster. This can take up to %v\n", etcdHealthyCheckInterval*etcdHealthyCheckRetries)
if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil { if _, err := etcdClient.WaitForClusterAvailable(etcdHealthyCheckRetries, etcdHealthyCheckInterval); err != nil {
return err return err

View File

@ -251,7 +251,7 @@ func (c fakeTLSEtcdClient) ListMembers() ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
func (c fakeTLSEtcdClient) AddMemberAsLeanerAndPromote(name string, peerAddrs string) ([]etcdutil.Member, error) { func (c fakeTLSEtcdClient) AddMemberAsLearner(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
@ -259,6 +259,10 @@ func (c fakeTLSEtcdClient) AddMember(name string, peerAddrs string) ([]etcdutil.
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
func (c fakeTLSEtcdClient) MemberPromote(learnerID uint64) error {
return nil
}
func (c fakeTLSEtcdClient) GetMemberID(peerURL string) (uint64, error) { func (c fakeTLSEtcdClient) GetMemberID(peerURL string) (uint64, error) {
return 0, nil return 0, nil
} }
@ -290,7 +294,7 @@ func (c fakePodManifestEtcdClient) ListMembers() ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
func (c fakePodManifestEtcdClient) AddMemberAsLeanerAndPromote(name string, peerAddrs string) ([]etcdutil.Member, error) { func (c fakePodManifestEtcdClient) AddMemberAsLearner(name string, peerAddrs string) ([]etcdutil.Member, error) {
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
@ -298,6 +302,10 @@ func (c fakePodManifestEtcdClient) AddMember(name string, peerAddrs string) ([]e
return []etcdutil.Member{}, nil return []etcdutil.Member{}, nil
} }
func (c fakePodManifestEtcdClient) MemberPromote(learnerID uint64) error {
return nil
}
func (c fakePodManifestEtcdClient) GetMemberID(peerURL string) (uint64, error) { func (c fakePodManifestEtcdClient) GetMemberID(peerURL string) (uint64, error) {
return 0, nil return 0, nil
} }

View File

@ -60,7 +60,8 @@ type ClusterInterrogator interface {
Sync() error Sync() error
ListMembers() ([]Member, error) ListMembers() ([]Member, error)
AddMember(name string, peerAddrs string) ([]Member, error) AddMember(name string, peerAddrs string) ([]Member, error)
AddMemberAsLeanerAndPromote(name string, peerAddrs string) ([]Member, error) AddMemberAsLearner(name string, peerAddrs string) ([]Member, error)
MemberPromote(learnerID uint64) error
GetMemberID(peerURL string) (uint64, error) GetMemberID(peerURL string) (uint64, error)
RemoveMember(id uint64) ([]Member, error) RemoveMember(id uint64) ([]Member, error)
} }
@ -347,8 +348,8 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
return c.addMember(name, peerAddrs, false) return c.addMember(name, peerAddrs, false)
} }
// AddMemberAsLeanerAndPromote adds a new learner member into the etcd cluster and promotes it to a voting member // AddMemberAsLearner adds a new learner member into the etcd cluster.
func (c *Client) AddMemberAsLeanerAndPromote(name string, peerAddrs string) ([]Member, error) { func (c *Client) AddMemberAsLearner(name string, peerAddrs string) ([]Member, error) {
return c.addMember(name, peerAddrs, true) return c.addMember(name, peerAddrs, true)
} }
@ -364,13 +365,6 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs) return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
} }
// Adds a new member to the cluster
var (
lastError error
respMembers []*etcdserverpb.Member
learnerID uint64
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{ cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints, Endpoints: c.Endpoints,
DialTimeout: etcdTimeout, DialTimeout: etcdTimeout,
@ -380,14 +374,20 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
TLS: c.TLS, TLS: c.TLS,
}) })
if err != nil { if err != nil {
lastError = err return nil, err
return false, nil
} }
defer cli.Close() defer cli.Close()
// Adds a new member to the cluster
var (
lastError error
respMembers []*etcdserverpb.Member
learnerID uint64
resp *clientv3.MemberAddResponse
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel() defer cancel()
var resp *clientv3.MemberAddResponse
if isLearner { if isLearner {
// if learnerID is set, it means the etcd member is already added successfully. // if learnerID is set, it means the etcd member is already added successfully.
if learnerID == 0 { if learnerID == 0 {
@ -399,11 +399,6 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
} }
learnerID = resp.Member.ID learnerID = resp.Member.ID
} }
err = memberPromote(ctx, cli, learnerID)
if err != nil {
lastError = err
return false, nil
}
respMembers = resp.Members respMembers = resp.Members
return true, nil return true, nil
} }
@ -459,21 +454,73 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
return ret, nil return ret, nil
} }
func memberPromote(ctx context.Context, cli *clientv3.Client, learnerID uint64) error { // isLearner returns true if the given member ID is a learner.
func (c *Client) isLearner(memberID uint64) (bool, error) {
resp, err := c.listMembers()
if err != nil {
return false, err
}
for _, member := range resp.Members {
if member.ID == memberID && member.IsLearner {
return true, nil
}
}
return false, nil
}
// MemberPromote promotes a member as a voting member. If the given member ID is already a voting member this method
// will return early and do nothing.
func (c *Client) MemberPromote(learnerID uint64) error {
isLearner, err := c.isLearner(learnerID)
if err != nil {
return err
}
if !isLearner {
klog.V(1).Infof("[etcd] Member %016x already promoted.", learnerID)
return nil
}
klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %016x", learnerID) klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %016x", learnerID)
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
if err != nil {
return err
}
defer cli.Close()
// TODO: warning logs from etcd client should be removed. // TODO: warning logs from etcd client should be removed.
// The warning logs are printed by etcd client code for several reasons, including // The warning logs are printed by etcd client code for several reasons, including
// 1. can not promote yet(no synced) // 1. can not promote yet(no synced)
// 2. context deadline exceeded // 2. context deadline exceeded
// 3. peer URLs already exists // 3. peer URLs already exists
// Once the client provides a way to check if the etcd learner is ready to promote, the retry logic can be revisited. // Once the client provides a way to check if the etcd learner is ready to promote, the retry logic can be revisited.
_, err := cli.MemberPromote(ctx, learnerID) var (
lastError error
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
_, err = cli.MemberPromote(ctx, learnerID)
if err == nil { if err == nil {
klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %016x", learnerID) klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %016x", learnerID)
return nil return true, nil
} }
klog.V(5).Infof("[etcd] Promoting the learner %016x failed: %v", learnerID, err) klog.V(5).Infof("[etcd] Promoting the learner %016x failed: %v", learnerID, err)
return err lastError = err
return false, nil
})
if err != nil {
return lastError
}
return nil
} }
// CheckClusterHealth returns nil for status Up or error for status Down // CheckClusterHealth returns nil for status Up or error for status Down