kubeadm: switch from ExponentialBackoff() to PollUntilContextTimeout()

Switch to PollUntilContextTimeout() everywhere to allow
usage of the exposed timeouts in the kubeadm API. Exponential backoff
options are more difficult to expose in this regard and a bit too
detailed for the common user - i.e. have "steps", "factor" and so on.
This commit is contained in:
Lubomir I. Ivanov 2023-12-31 17:16:52 +02:00
parent caf5311413
commit 5f876b9d0a
5 changed files with 183 additions and 186 deletions

View File

@ -28,7 +28,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
apimachineryversion "k8s.io/apimachinery/pkg/version"
componentversion "k8s.io/component-base/version"
netutils "k8s.io/utils/net"
@ -495,15 +494,6 @@ var (
// the bootstrap tokens to access the kubeadm-certs Secret during the join of a new control-plane
KubeadmCertsClusterRoleName = fmt.Sprintf("kubeadm:%s", KubeadmCertsSecret)
// StaticPodMirroringDefaultRetry is used a backoff strategy for
// waiting for static pods to be mirrored to the apiserver.
StaticPodMirroringDefaultRetry = wait.Backoff{
Steps: 30,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 0.1,
}
// defaultKubernetesPlaceholderVersion is a placeholder version in case the component-base
// version was not populated during build.
defaultKubernetesPlaceholderVersion = version.MustParseSemantic("v1.0.0-placeholder-version")

View File

@ -22,6 +22,7 @@ import (
"fmt"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
@ -195,27 +196,31 @@ func getNodeNameFromKubeletConfig(fileName string) (string, error) {
}
func getAPIEndpoint(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint) error {
return getAPIEndpointWithBackoff(client, nodeName, apiEndpoint, constants.StaticPodMirroringDefaultRetry)
return getAPIEndpointWithRetry(client, nodeName, apiEndpoint,
constants.StaticPodMirroringRetryInterval, constants.StaticPodMirroringTimeout)
}
func getAPIEndpointWithBackoff(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint, backoff wait.Backoff) error {
func getAPIEndpointWithRetry(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint,
interval, timeout time.Duration) error {
var err error
var errs []error
if err = getAPIEndpointFromPodAnnotation(client, nodeName, apiEndpoint, backoff); err == nil {
if err = getAPIEndpointFromPodAnnotation(client, nodeName, apiEndpoint, interval, timeout); err == nil {
return nil
}
errs = append(errs, errors.WithMessagef(err, "could not retrieve API endpoints for node %q using pod annotations", nodeName))
return errorsutil.NewAggregate(errs)
}
func getAPIEndpointFromPodAnnotation(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint, backoff wait.Backoff) error {
func getAPIEndpointFromPodAnnotation(client clientset.Interface, nodeName string, apiEndpoint *kubeadmapi.APIEndpoint,
interval, timeout time.Duration) error {
var rawAPIEndpoint string
var lastErr error
// Let's tolerate some unexpected transient failures from the API server or load balancers. Also, if
// static pods were not yet mirrored into the API server we want to wait for this propagation.
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
rawAPIEndpoint, lastErr = getRawAPIEndpointFromPodAnnotationWithoutRetry(client, nodeName)
err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true,
func(ctx context.Context) (bool, error) {
rawAPIEndpoint, lastErr = getRawAPIEndpointFromPodAnnotationWithoutRetry(ctx, client, nodeName)
return lastErr == nil, nil
})
if err != nil {
@ -229,9 +234,9 @@ func getAPIEndpointFromPodAnnotation(client clientset.Interface, nodeName string
return nil
}
func getRawAPIEndpointFromPodAnnotationWithoutRetry(client clientset.Interface, nodeName string) (string, error) {
func getRawAPIEndpointFromPodAnnotationWithoutRetry(ctx context.Context, client clientset.Interface, nodeName string) (string, error) {
podList, err := client.CoreV1().Pods(metav1.NamespaceSystem).List(
context.TODO(),
ctx,
metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
LabelSelector: fmt.Sprintf("component=%s,tier=%s", constants.KubeAPIServer, constants.ControlPlaneTier),

View File

@ -25,6 +25,7 @@ import (
"strconv"
"strings"
"testing"
"time"
"github.com/pkg/errors"
@ -32,7 +33,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
@ -461,7 +461,8 @@ func TestGetAPIEndpointWithBackoff(t *testing.T) {
}
}
apiEndpoint := kubeadmapi.APIEndpoint{}
err := getAPIEndpointWithBackoff(client, rt.nodeName, &apiEndpoint, wait.Backoff{Duration: 0, Jitter: 0, Steps: 1})
err := getAPIEndpointWithRetry(client, rt.nodeName, &apiEndpoint,
time.Millisecond*10, time.Millisecond*100)
if err != nil && !rt.expectedErr {
t.Errorf("got error %q; was expecting no errors", err)
return
@ -718,7 +719,8 @@ func TestGetAPIEndpointFromPodAnnotation(t *testing.T) {
rt.clientSetup(client)
}
apiEndpoint := kubeadmapi.APIEndpoint{}
err := getAPIEndpointFromPodAnnotation(client, rt.nodeName, &apiEndpoint, wait.Backoff{Duration: 0, Jitter: 0, Steps: 1})
err := getAPIEndpointFromPodAnnotation(client, rt.nodeName, &apiEndpoint,
time.Millisecond*10, time.Millisecond*100)
if err != nil && !rt.expectedErr {
t.Errorf("got error %v, but wasn't expecting any error", err)
return
@ -832,7 +834,7 @@ func TestGetRawAPIEndpointFromPodAnnotationWithoutRetry(t *testing.T) {
if rt.clientSetup != nil {
rt.clientSetup(client)
}
endpoint, err := getRawAPIEndpointFromPodAnnotationWithoutRetry(client, rt.nodeName)
endpoint, err := getRawAPIEndpointFromPodAnnotationWithoutRetry(context.Background(), client, rt.nodeName)
if err != nil && !rt.expectedErr {
t.Errorf("got error %v, but wasn't expecting any error", err)
return

View File

@ -45,14 +45,6 @@ import (
const etcdTimeout = 2 * time.Second
// Exponential backoff for etcd operations (up to ~200 seconds)
var etcdBackoff = wait.Backoff{
Steps: 18,
Duration: 100 * time.Millisecond,
Factor: 1.5,
Jitter: 0.1,
}
// ErrNoMemberIDForPeerURL is returned when it is not possible to obtain a member ID
// from a given peer URL
var ErrNoMemberIDForPeerURL = errors.New("no member id found for peer URL")
@ -105,7 +97,7 @@ type Client struct {
newEtcdClient func(endpoints []string) (etcdClient, error)
listMembersFunc func(backoff *wait.Backoff) (*clientv3.MemberListResponse, error)
listMembersFunc func(timeout time.Duration) (*clientv3.MemberListResponse, error)
}
// New creates a new EtcdCluster client
@ -178,20 +170,22 @@ func NewFromCluster(client clientset.Interface, certificatesDir string) (*Client
// getEtcdEndpoints returns the list of etcd endpoints.
func getEtcdEndpoints(client clientset.Interface) ([]string, error) {
return getEtcdEndpointsWithBackoff(client, constants.StaticPodMirroringDefaultRetry)
return getEtcdEndpointsWithRetry(client,
constants.StaticPodMirroringRetryInterval, constants.StaticPodMirroringTimeout)
}
func getEtcdEndpointsWithBackoff(client clientset.Interface, backoff wait.Backoff) ([]string, error) {
return getRawEtcdEndpointsFromPodAnnotation(client, backoff)
func getEtcdEndpointsWithRetry(client clientset.Interface, interval, timeout time.Duration) ([]string, error) {
return getRawEtcdEndpointsFromPodAnnotation(client, interval, timeout)
}
// getRawEtcdEndpointsFromPodAnnotation returns the list of endpoints as reported on etcd's pod annotations using the given backoff
func getRawEtcdEndpointsFromPodAnnotation(client clientset.Interface, backoff wait.Backoff) ([]string, error) {
func getRawEtcdEndpointsFromPodAnnotation(client clientset.Interface, interval, timeout time.Duration) ([]string, error) {
etcdEndpoints := []string{}
var lastErr error
// Let's tolerate some unexpected transient failures from the API server or load balancers. Also, if
// static pods were not yet mirrored into the API server we want to wait for this propagation.
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true,
func(_ context.Context) (bool, error) {
var overallEtcdPodCount int
if etcdEndpoints, overallEtcdPodCount, lastErr = getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client); lastErr != nil {
return false, nil
@ -244,14 +238,15 @@ func (c *Client) Sync() error {
// Syncs the list of endpoints
var cli etcdClient
var lastError error
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
true, func(_ context.Context) (bool, error) {
var err error
cli, err = c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
defer func() { _ = cli.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
err = cli.Sync(ctx)
cancel()
@ -278,20 +273,21 @@ type Member struct {
PeerURL string
}
func (c *Client) listMembers(backoff *wait.Backoff) (*clientv3.MemberListResponse, error) {
func (c *Client) listMembers(timeout time.Duration) (*clientv3.MemberListResponse, error) {
// Gets the member list
var lastError error
var resp *clientv3.MemberListResponse
if backoff == nil {
backoff = &etcdBackoff
if timeout == 0 {
timeout = constants.EtcdAPICallTimeout
}
err := wait.ExponentialBackoff(*backoff, func() (bool, error) {
err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, timeout,
true, func(_ context.Context) (bool, error) {
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
defer func() { _ = cli.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberList(ctx)
@ -311,7 +307,7 @@ func (c *Client) listMembers(backoff *wait.Backoff) (*clientv3.MemberListRespons
// GetMemberID returns the member ID of the given peer URL
func (c *Client) GetMemberID(peerURL string) (uint64, error) {
resp, err := c.listMembersFunc(nil)
resp, err := c.listMembersFunc(0)
if err != nil {
return 0, err
}
@ -326,7 +322,7 @@ func (c *Client) GetMemberID(peerURL string) (uint64, error) {
// ListMembers returns the member list.
func (c *Client) ListMembers() ([]Member, error) {
resp, err := c.listMembersFunc(nil)
resp, err := c.listMembersFunc(0)
if err != nil {
return nil, err
}
@ -343,13 +339,14 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
// Remove an existing member from the cluster
var lastError error
var resp *clientv3.MemberRemoveResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
true, func(_ context.Context) (bool, error) {
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
defer func() { _ = cli.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.MemberRemove(ctx, id)
@ -407,7 +404,7 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
if err != nil {
return nil, err
}
defer cli.Close()
defer func() { _ = cli.Close() }()
// Adds a new member to the cluster
var (
@ -416,7 +413,8 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
learnerID uint64
resp *clientv3.MemberAddResponse
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
true, func(_ context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
if isLearner {
@ -487,7 +485,7 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
// isLearner returns true if the given member ID is a learner.
func (c *Client) isLearner(memberID uint64) (bool, error) {
resp, err := c.listMembersFunc(nil)
resp, err := c.listMembersFunc(0)
if err != nil {
return false, err
}
@ -517,7 +515,7 @@ func (c *Client) MemberPromote(learnerID uint64) error {
if err != nil {
return err
}
defer cli.Close()
defer func() { _ = cli.Close() }()
// TODO: warning logs from etcd client should be removed.
// The warning logs are printed by etcd client code for several reasons, including
@ -528,7 +526,8 @@ func (c *Client) MemberPromote(learnerID uint64) error {
var (
lastError error
)
err = wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err = wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
true, func(_ context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
@ -560,13 +559,14 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error)
// Gets the member status
var lastError error
var resp *clientv3.StatusResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
err := wait.PollUntilContextTimeout(context.Background(), constants.EtcdAPICallRetryInterval, constants.EtcdAPICallTimeout,
true, func(_ context.Context) (bool, error) {
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
defer func() { _ = cli.Close() }()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
resp, err = cli.Status(ctx, ep)

View File

@ -22,6 +22,7 @@ import (
"reflect"
"strconv"
"testing"
"time"
"github.com/pkg/errors"
@ -29,7 +30,6 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
@ -208,7 +208,7 @@ func TestGetEtcdEndpointsWithBackoff(t *testing.T) {
t.Errorf("error setting up test creating pod for node %q", pod.NodeName)
}
}
endpoints, err := getEtcdEndpointsWithBackoff(client, wait.Backoff{Duration: 0, Jitter: 0, Steps: 1})
endpoints, err := getEtcdEndpointsWithRetry(client, time.Microsecond*10, time.Millisecond*100)
if err != nil && !rt.expectedErr {
t.Errorf("got error %q; was expecting no errors", err)
return
@ -293,7 +293,7 @@ func TestGetRawEtcdEndpointsFromPodAnnotation(t *testing.T) {
if rt.clientSetup != nil {
rt.clientSetup(client)
}
endpoints, err := getRawEtcdEndpointsFromPodAnnotation(client, wait.Backoff{Duration: 0, Jitter: 0, Steps: 1})
endpoints, err := getRawEtcdEndpointsFromPodAnnotation(client, time.Microsecond*10, time.Millisecond*100)
if err != nil && !rt.expectedErr {
t.Errorf("got error %v, but wasn't expecting any error", err)
return
@ -482,9 +482,9 @@ func TestClient_GetMemberID(t *testing.T) {
Endpoints: tt.fields.Endpoints,
newEtcdClient: tt.fields.newEtcdClient,
}
c.listMembersFunc = func(backoff *wait.Backoff) (*clientv3.MemberListResponse, error) {
c.listMembersFunc = func(_ time.Duration) (*clientv3.MemberListResponse, error) {
f, _ := c.newEtcdClient([]string{})
resp, _ := f.MemberList(context.TODO())
resp, _ := f.MemberList(context.Background())
return resp, nil
}
@ -504,7 +504,7 @@ func TestListMembers(t *testing.T) {
type fields struct {
Endpoints []string
newEtcdClient func(endpoints []string) (etcdClient, error)
listMembersFunc func(backoff *wait.Backoff) (*clientv3.MemberListResponse, error)
listMembersFunc func(timeout time.Duration) (*clientv3.MemberListResponse, error)
}
tests := []struct {
name string
@ -606,7 +606,7 @@ func TestListMembers(t *testing.T) {
}
return f, nil
},
listMembersFunc: func(backoff *wait.Backoff) (*clientv3.MemberListResponse, error) {
listMembersFunc: func(_ time.Duration) (*clientv3.MemberListResponse, error) {
return nil, errNotImplemented
},
},
@ -622,8 +622,8 @@ func TestListMembers(t *testing.T) {
listMembersFunc: tt.fields.listMembersFunc,
}
if c.listMembersFunc == nil {
c.listMembersFunc = func(backoff *wait.Backoff) (*clientv3.MemberListResponse, error) {
return c.listMembers(&wait.Backoff{Steps: 1})
c.listMembersFunc = func(_ time.Duration) (*clientv3.MemberListResponse, error) {
return c.listMembers(100 * time.Millisecond)
}
}
got, err := c.ListMembers()
@ -641,7 +641,7 @@ func TestIsLearner(t *testing.T) {
type fields struct {
Endpoints []string
newEtcdClient func(endpoints []string) (etcdClient, error)
listMembersFunc func(backoff *wait.Backoff) (*clientv3.MemberListResponse, error)
listMembersFunc func(timeout time.Duration) (*clientv3.MemberListResponse, error)
}
tests := []struct {
name string
@ -756,7 +756,7 @@ func TestIsLearner(t *testing.T) {
}
return f, nil
},
listMembersFunc: func(backoff *wait.Backoff) (*clientv3.MemberListResponse, error) {
listMembersFunc: func(_ time.Duration) (*clientv3.MemberListResponse, error) {
return nil, errNotImplemented
},
},
@ -772,9 +772,9 @@ func TestIsLearner(t *testing.T) {
listMembersFunc: tt.fields.listMembersFunc,
}
if c.listMembersFunc == nil {
c.listMembersFunc = func(backoff *wait.Backoff) (*clientv3.MemberListResponse, error) {
c.listMembersFunc = func(t_ time.Duration) (*clientv3.MemberListResponse, error) {
f, _ := c.newEtcdClient([]string{})
resp, _ := f.MemberList(context.TODO())
resp, _ := f.MemberList(context.Background())
return resp, nil
}
}