Merge pull request #117792 from dlipovetsky/kubeadm-etcd-client-refactor

kubeadm: Add etcd client unit tests
This commit is contained in:
Kubernetes Prow Robot 2023-05-09 11:02:20 -07:00 committed by GitHub
commit 44a93d0b9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 200 additions and 58 deletions

View File

@ -66,27 +66,69 @@ type ClusterInterrogator interface {
RemoveMember(id uint64) ([]Member, error)
}
type etcdClient interface {
// Close shuts down the client's etcd connections.
Close() error
// Endpoints lists the registered endpoints for the client.
Endpoints() []string
// MemberList lists the current cluster membership.
MemberList(ctx context.Context) (*clientv3.MemberListResponse, error)
// MemberAdd adds a new member into the cluster.
MemberAdd(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
// MemberAddAsLearner adds a new learner member into the cluster.
MemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
// MemberRemove removes an existing member from the cluster.
MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error)
// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
MemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error)
// Status gets the status of the endpoint.
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
Sync(ctx context.Context) error
}
// Client provides connection parameters for an etcd cluster
type Client struct {
Endpoints []string
TLS *tls.Config
newEtcdClient func(endpoints []string) (etcdClient, error)
}
// New creates a new EtcdCluster client
func New(endpoints []string, ca, cert, key string) (*Client, error) {
client := Client{Endpoints: endpoints}
var err error
var tlsConfig *tls.Config
if ca != "" || cert != "" || key != "" {
tlsInfo := transport.TLSInfo{
CertFile: cert,
KeyFile: key,
TrustedCAFile: ca,
}
tlsConfig, err := tlsInfo.ClientConfig()
tlsConfig, err = tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
client.TLS = tlsConfig
}
client.newEtcdClient = func(endpoints []string) (etcdClient, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: tlsConfig,
})
}
return &client, nil
@ -192,24 +234,16 @@ func getRawEtcdEndpointsFromPodAnnotationWithoutRetry(client clientset.Interface
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (c *Client) Sync() error {
// Syncs the list of endpoints
var cli *clientv3.Client
var cli etcdClient
var lastError error
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
var err error
cli, err = clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err = c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
}
defer cli.Close()
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
err = cli.Sync(ctx)
cancel()
@ -241,14 +275,7 @@ func (c *Client) listMembers() (*clientv3.MemberListResponse, error) {
var lastError error
var resp *clientv3.MemberListResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
@ -306,14 +333,7 @@ func (c *Client) RemoveMember(id uint64) ([]Member, error) {
var lastError error
var resp *clientv3.MemberRemoveResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil
@ -365,14 +385,7 @@ func (c *Client) addMember(name string, peerAddrs string, isLearner bool) ([]Mem
return nil, errors.Wrapf(err, "error parsing peer address %s", peerAddrs)
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
return nil, err
}
@ -482,14 +495,7 @@ func (c *Client) MemberPromote(learnerID uint64) error {
}
klog.V(1).Infof("[etcd] Promoting a learner as a voting member: %016x", learnerID)
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
return err
}
@ -537,14 +543,7 @@ func (c *Client) getClusterStatus() (map[string]*clientv3.StatusResponse, error)
var lastError error
var resp *clientv3.StatusResponse
err := wait.ExponentialBackoff(etcdBackoff, func() (bool, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: c.Endpoints,
DialTimeout: etcdTimeout,
DialOptions: []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
},
TLS: c.TLS,
})
cli, err := c.newEtcdClient(c.Endpoints)
if err != nil {
lastError = err
return false, nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd
import (
"context"
"fmt"
"reflect"
"strconv"
@ -24,6 +25,8 @@ import (
"github.com/pkg/errors"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@ -35,9 +38,64 @@ import (
testresources "k8s.io/kubernetes/cmd/kubeadm/test/resources"
)
var errNotImplemented = errors.New("not implemented")
type fakeEtcdClient struct {
members []*pb.Member
endpoints []string
}
// Close shuts down the client's etcd connections.
func (f *fakeEtcdClient) Close() error {
f.members = []*pb.Member{}
return nil
}
// Endpoints lists the registered endpoints for the client.
func (f *fakeEtcdClient) Endpoints() []string {
return f.endpoints
}
// MemberList lists the current cluster membership.
func (f *fakeEtcdClient) MemberList(_ context.Context) (*clientv3.MemberListResponse, error) {
return &clientv3.MemberListResponse{
Members: f.members,
}, nil
}
// MemberAdd adds a new member into the cluster.
func (f *fakeEtcdClient) MemberAdd(_ context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
return nil, errNotImplemented
}
// MemberAddAsLearner adds a new learner member into the cluster.
func (f *fakeEtcdClient) MemberAddAsLearner(_ context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
return nil, errNotImplemented
}
// MemberRemove removes an existing member from the cluster.
func (f *fakeEtcdClient) MemberRemove(_ context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
return nil, errNotImplemented
}
// MemberPromote promotes a member from raft learner (non-voting) to raft voting member.
func (f *fakeEtcdClient) MemberPromote(_ context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
return nil, errNotImplemented
}
// Status gets the status of the endpoint.
func (f *fakeEtcdClient) Status(_ context.Context, endpoint string) (*clientv3.StatusResponse, error) {
return nil, errNotImplemented
}
// Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
func (f *fakeEtcdClient) Sync(_ context.Context) error {
return errNotImplemented
}
func testGetURL(t *testing.T, getURLFunc func(*kubeadmapi.APIEndpoint) string, port int) {
portStr := strconv.Itoa(port)
var tests = []struct {
tests := []struct {
name string
advertiseAddress string
expectedURL string
@ -82,7 +140,7 @@ func TestGetPeerURL(t *testing.T) {
func TestGetClientURLByIP(t *testing.T) {
portStr := strconv.Itoa(constants.EtcdListenClientPort)
var tests = []struct {
tests := []struct {
name string
ip string
expectedURL string
@ -118,7 +176,7 @@ func TestGetClientURLByIP(t *testing.T) {
}
func TestGetEtcdEndpointsWithBackoff(t *testing.T) {
var tests = []struct {
tests := []struct {
name string
pods []testresources.FakeStaticPod
expectedEndpoints []string
@ -169,7 +227,7 @@ func TestGetEtcdEndpointsWithBackoff(t *testing.T) {
}
func TestGetRawEtcdEndpointsFromPodAnnotation(t *testing.T) {
var tests = []struct {
tests := []struct {
name string
pods []testresources.FakeStaticPod
clientSetup func(*clientsetfake.Clientset)
@ -253,7 +311,7 @@ func TestGetRawEtcdEndpointsFromPodAnnotation(t *testing.T) {
}
func TestGetRawEtcdEndpointsFromPodAnnotationWithoutRetry(t *testing.T) {
var tests = []struct {
tests := []struct {
name string
pods []testresources.FakeStaticPod
clientSetup func(*clientsetfake.Clientset)
@ -351,3 +409,88 @@ func TestGetRawEtcdEndpointsFromPodAnnotationWithoutRetry(t *testing.T) {
})
}
}
func TestClient_GetMemberID(t *testing.T) {
type fields struct {
Endpoints []string
newEtcdClient func(endpoints []string) (etcdClient, error)
}
type args struct {
peerURL string
}
tests := []struct {
name string
fields fields
args args
want uint64
wantErr bool
}{
{
name: "member ID found",
fields: fields{
Endpoints: []string{},
newEtcdClient: func(endpoints []string) (etcdClient, error) {
f := &fakeEtcdClient{
members: []*pb.Member{
{
ID: 1,
Name: "member1",
PeerURLs: []string{
"https://member1:2380",
},
},
},
}
return f, nil
},
},
args: args{
peerURL: "https://member1:2380",
},
wantErr: false,
want: 1,
},
{
name: "member ID not found",
fields: fields{
Endpoints: []string{},
newEtcdClient: func(endpoints []string) (etcdClient, error) {
f := &fakeEtcdClient{
members: []*pb.Member{
{
ID: 1,
Name: "member1",
PeerURLs: []string{
"https://member1:2380",
},
},
},
}
return f, nil
},
},
args: args{
peerURL: "https://member2:2380",
},
wantErr: false,
want: 0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
Endpoints: tt.fields.Endpoints,
newEtcdClient: tt.fields.newEtcdClient,
}
got, err := c.GetMemberID(tt.args.peerURL)
if (err != nil) != tt.wantErr {
t.Errorf("Client.GetMemberID() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("Client.GetMemberID() = %v, want %v", got, tt.want)
}
})
}
}