add retry to etcd operations

This commit is contained in:
fabriziopandini 2019-11-14 09:27:03 +01:00
parent c558e7f1f2
commit 0573a2227f

View File

@ -38,10 +38,11 @@ import (
"k8s.io/kubernetes/cmd/kubeadm/app/util/config"
)
// Exponential backoff for MemberAdd/Remove (values exclude jitter):
// 0, 50, 150, 350, 750, 1550, 3150, 6350, 12750 ms
var addRemoveBackoff = wait.Backoff{
Steps: 8,
const etcdTimeout = 2 * time.Second
// Exponential backoff for etcd operations
var etcdBackoff = wait.Backoff{
Steps: 9,
Duration: 50 * time.Millisecond,
Factor: 2.0,
Jitter: 0.1,
@ -146,11 +147,21 @@ func (c *Client) Sync() error {
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = cli.Sync(ctx)
cancel()
// Syncs the list of endpoints
var lastError error
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
err = cli.Sync(ctx)
cancel()
if err == nil {
return true, nil
}
klog.V(5).Infof("Failed to sync etcd endpoints: %v", err)
lastError = err
return false, nil
})
if err != nil {
return err
return lastError
}
klog.V(1).Infof("etcd endpoints read from etcd: %s", strings.Join(cli.Endpoints(), ","))
@ -180,11 +191,22 @@ func (c *Client) GetMemberID(peerURL string) (uint64, error) {
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
resp, err := cli.MemberList(ctx)
cancel()
// Gets the member list
var lastError error
var resp *clientv3.MemberListResponse
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberList(ctx)
cancel()
if err == nil {
return true, nil
}
klog.V(5).Infof("Failed to get etcd member list: %v", err)
lastError = err
return false, nil
})
if err != nil {
return 0, err
return 0, lastError
}
for _, member := range resp.Members {
@ -213,11 +235,14 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
// Remove an existing member from the cluster
var lastError error
var resp *clientv3.MemberRemoveResponse
err = wait.ExponentialBackoff(addRemoveBackoff, func() (bool, error) {
resp, err = cli.MemberRemove(context.Background(), id)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberRemove(ctx, id)
cancel()
if err == nil {
return true, nil
}
klog.V(5).Infof("Failed to remove etcd member: %v", err)
lastError = err
return false, nil
})
@ -260,11 +285,14 @@ func (c *Client) AddMember(name string, peerAddrs string) ([]Member, error) {
// Adds a new member to the cluster
var lastError error
var resp *clientv3.MemberAddResponse
err = wait.ExponentialBackoff(addRemoveBackoff, func() (bool, error) {
resp, err = cli.MemberAdd(context.Background(), []string{peerAddrs})
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberAdd(ctx, []string{peerAddrs})
cancel()
if err == nil {
return true, nil
}
klog.V(5).Infof("Failed to add etcd member: %v", err)
lastError = err
return false, nil
})
@ -347,12 +375,24 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error)
clusterStatus := make(map[string]*clientv3.StatusResponse)
for _, ep := range c.Endpoints {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := cli.Status(ctx, ep)
cancel()
// Gets the member status
var lastError error
var resp *clientv3.StatusResponse
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.Status(ctx, ep)
cancel()
if err == nil {
return true, nil
}
klog.V(5).Infof("Failed to get etcd status for %s: %v", ep, err)
lastError = err
return false, nil
})
if err != nil {
return nil, err
return nil, lastError
}
clusterStatus[ep] = resp
}
return clusterStatus, nil