kubeadm: waiting for etcd learner member to be started before promoting during 'kubeadm join'

This commit is contained in:
SataQiu
2026-01-10 15:34:22 +08:00
committed by Daniel Lipovetsky
parent 8825a37c13
commit ecc28f9d84
2 changed files with 135 additions and 66 deletions

View File

@@ -284,31 +284,42 @@ type Member struct {
PeerURL string
}
func (c *Client) listMembersOnce() (*clientv3.MemberListResponse, error) {
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
return nil, err
}
defer func() { _ = cli.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err := cli.MemberList(ctx)
cancel()
if err == nil {
return resp, nil
}
klog.V(5).Infof("Failed to get etcd member list: %v", err)
return nil, err
}
func (c *Client) listMembers(timeout time.Duration) (*clientv3.MemberListResponse, error) {
// Gets the member list
var lastError error
var resp *clientv3.MemberListResponse
var (
err error
lastError error
resp *clientv3.MemberListResponse
)
if timeout == 0 {
timeout = kubeadmapi.GetActiveTimeouts().EtcdAPICall.Duration
}
err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, timeout,
err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, timeout,
true, func(_ context.Context) (bool, error) {
cli, err := c.newEtcdClient(c.Endpoints)
resp, err = c.listMembersOnce()
if err != nil {
lastError = err
return false, nil
return false, err
}
defer func() { _ = cli.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberList(ctx)
cancel()
if err == nil {
return true, nil
}
klog.V(5).Infof("Failed to get etcd member list: %v", err)
lastError = err
return false, nil
return true, nil
})
if err != nil {
return nil, lastError
@@ -528,38 +539,74 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
return ret, nil
}
// isLearner returns true if the given member ID is a learner.
func (c *Client) isLearner(memberID uint64) (bool, error) {
resp, err := c.listMembersFunc(0)
// getMemberStatus returns the status of the given member ID.
// It returns whether the member is a learner and whether it is started.
func (c *Client) getMemberStatus(memberID uint64) (isLearner bool, started bool, err error) {
resp, err := c.listMembersOnce()
if err != nil {
return false, err
return false, false, err
}
var m *etcdserverpb.Member
for _, member := range resp.Members {
if member.ID == memberID && member.IsLearner {
return true, nil
if member.ID == memberID {
m = member
break
}
}
return false, nil
if m == nil {
return false, false, fmt.Errorf("member %s not found", strconv.FormatUint(memberID, 16))
}
started = true
// There is no field for "started".
// If the member is not started, the Name and ClientURLs fields are set to their respective zero values.
if len(m.Name) == 0 {
started = false
}
return m.IsLearner, started, nil
}
// MemberPromote promotes a member as a voting member. If the given member ID is already a voting member this method
// will return early and do nothing.
// will return early and do nothing. It waits for the member to be started before attempting to promote.
func (c *Client) MemberPromote(learnerID uint64) error {
isLearner, err := c.isLearner(learnerID)
var (
lastError error
learnerIDUint = strconv.FormatUint(learnerID, 16)
)
klog.V(1).Infof("[etcd] Waiting for a learner to start: %s", learnerIDUint)
err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().EtcdAPICall.Duration,
true, func(_ context.Context) (bool, error) {
isLearner, started, err := c.getMemberStatus(learnerID)
if err != nil {
lastError = errors.WithMessagef(err, "failed to get member %s status", learnerIDUint)
return false, nil
}
if !isLearner {
klog.V(1).Infof("[etcd] Member %s was already promoted.", learnerIDUint)
return true, nil
}
if !started {
klog.V(1).Infof("[etcd] Member %s is not started yet. Waiting for it to be started.", learnerIDUint)
lastError = errors.Errorf("the etcd member %s is not started", learnerIDUint)
return false, nil
}
return true, nil
})
if err != nil {
return err
}
if !isLearner {
klog.V(1).Infof("[etcd] Member %s already promoted.", strconv.FormatUint(learnerID, 16))
return nil
return lastError
}
klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %s", strconv.FormatUint(learnerID, 16))
klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %s", learnerIDUint)
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
return err
}
defer func() { _ = cli.Close() }()
// TODO: warning logs from etcd client should be removed.
@@ -568,29 +615,16 @@ func (c *Client) MemberPromote(learnerID uint64) error {
// 2. context deadline exceeded
// 3. peer URLs already exists
// Once the client provides a way to check if the etcd learner is ready to promote, the retry logic can be revisited.
var (
lastError error
)
err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, kubeadmapi.GetActiveTimeouts().EtcdAPICall.Duration,
true, func(_ context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
isLearner, err := c.isLearner(learnerID)
if err != nil {
return false, err
}
if !isLearner {
klog.V(1).Infof("[etcd] Member %s was already promoted.", strconv.FormatUint(learnerID, 16))
return true, nil
}
_, err = cli.MemberPromote(ctx, learnerID)
if err == nil {
klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %s", strconv.FormatUint(learnerID, 16))
klog.V(1).Infof("[etcd] The learner was promoted as a voting member: %s", learnerIDUint)
return true, nil
}
klog.V(5).Infof("[etcd] Promoting the learner %s failed: %v", strconv.FormatUint(learnerID, 16), err)
klog.V(5).Infof("[etcd] Promoting the learner %s failed: %v", learnerIDUint, err)
lastError = err
return false, nil
})

View File

@@ -637,18 +637,19 @@ func TestListMembers(t *testing.T) {
}
}
func TestIsLearner(t *testing.T) {
func TestGetMemberStatus(t *testing.T) {
type fields struct {
Endpoints []string
newEtcdClient func(endpoints []string) (etcdClient, error)
listMembersFunc func(timeout time.Duration) (*clientv3.MemberListResponse, error)
}
tests := []struct {
name string
fields fields
memberID uint64
want bool
wantError bool
name string
fields fields
memberID uint64
wantLearner bool
wantStarted bool
wantError bool
}{
{
name: "The specified member is not a learner",
@@ -670,8 +671,9 @@ func TestIsLearner(t *testing.T) {
return f, nil
},
},
memberID: 1,
want: false,
memberID: 1,
wantLearner: false,
wantStarted: true,
},
{
name: "The specified member is a learner",
@@ -700,8 +702,9 @@ func TestIsLearner(t *testing.T) {
return f, nil
},
},
memberID: 1,
want: true,
memberID: 1,
wantLearner: true,
wantStarted: true,
},
{
name: "The specified member does not exist",
@@ -714,8 +717,10 @@ func TestIsLearner(t *testing.T) {
return f, nil
},
},
memberID: 3,
want: false,
memberID: 3,
wantLearner: false,
wantStarted: false,
wantError: true,
},
{
name: "Learner ID is empty",
@@ -736,7 +741,32 @@ func TestIsLearner(t *testing.T) {
return f, nil
},
},
want: true,
wantLearner: true,
wantStarted: true,
},
{
name: "Learner member is not started (no name)",
fields: fields{
Endpoints: []string{},
newEtcdClient: func(endpoints []string) (etcdClient, error) {
f := &fakeEtcdClient{
members: []*pb.Member{
{
ID: 1,
Name: "",
PeerURLs: []string{
"https://member2:2380",
},
IsLearner: true,
},
},
}
return f, nil
},
},
memberID: 1,
wantLearner: true,
wantStarted: false,
},
{
name: "ListMembers returns an error",
@@ -760,8 +790,10 @@ func TestIsLearner(t *testing.T) {
return nil, errNotImplemented
},
},
want: false,
wantError: true,
memberID: 1,
wantLearner: false,
wantStarted: false,
wantError: true,
},
}
for _, tt := range tests {
@@ -778,12 +810,15 @@ func TestIsLearner(t *testing.T) {
return resp, nil
}
}
got, err := c.isLearner(tt.memberID)
if got != tt.want {
t.Errorf("isLearner() = %v, want %v", got, tt.want)
gotLearner, gotStarted, err := c.getMemberStatus(tt.memberID)
if gotLearner != tt.wantLearner {
t.Errorf("getMemberStatus() isLearner = %v, want %v", gotLearner, tt.wantLearner)
}
if (err != nil) != (tt.wantError) {
t.Errorf("isLearner() error = %v, wantError %v", err, tt.wantError)
if gotStarted != tt.wantStarted {
t.Errorf("getMemberStatus() started = %v, want %v", gotStarted, tt.wantStarted)
}
if (err != nil) != tt.wantError {
t.Errorf("getMemberStatus() error = %v, wantError %v", err, tt.wantError)
}
})
}