mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Drop etcd v2 support from test util
This commit is contained in:
parent
5f1983d871
commit
77ab4bdbbc
@ -17,274 +17,29 @@ limitations under the License.
|
||||
package testing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testing/testingcert"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
|
||||
"context"
|
||||
|
||||
etcd "go.etcd.io/etcd/client"
|
||||
"go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/etcdserver"
|
||||
"go.etcd.io/etcd/etcdserver/api/etcdhttp"
|
||||
"go.etcd.io/etcd/etcdserver/api/v2http"
|
||||
"go.etcd.io/etcd/integration"
|
||||
"go.etcd.io/etcd/pkg/testutil"
|
||||
"go.etcd.io/etcd/pkg/transport"
|
||||
"go.etcd.io/etcd/pkg/types"
|
||||
"go.uber.org/zap"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
|
||||
type EtcdTestServer struct {
|
||||
// The following are lumped etcd2 test server params
|
||||
// TODO: Deprecate in a post 1.5 release
|
||||
etcdserver.ServerConfig
|
||||
PeerListeners, ClientListeners []net.Listener
|
||||
Client etcd.Client
|
||||
|
||||
CertificatesDir string
|
||||
CertFile string
|
||||
KeyFile string
|
||||
CAFile string
|
||||
|
||||
raftHandler http.Handler
|
||||
s *etcdserver.EtcdServer
|
||||
hss []*httptest.Server
|
||||
|
||||
// The following are lumped etcd3 test server params
|
||||
v3Cluster *integration.ClusterV3
|
||||
V3Client *clientv3.Client
|
||||
}
|
||||
|
||||
// newLocalListener opens a port localhost using any port
|
||||
func newLocalListener(t *testing.T) net.Listener {
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// newSecuredLocalListener opens a port localhost using any port
|
||||
// with SSL enable
|
||||
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
|
||||
var l net.Listener
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
TrustedCAFile: caFile,
|
||||
}
|
||||
tlscfg, err := tlsInfo.ServerConfig()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected serverConfig error: %v", err)
|
||||
}
|
||||
l, err = transport.NewKeepAliveListener(l, "https", tlscfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
// newHTTPTransport create a new tls-based transport.
|
||||
func newHTTPTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
|
||||
tlsInfo := transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
TrustedCAFile: caFile,
|
||||
}
|
||||
tr, err := transport.NewTransport(tlsInfo, time.Second)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return tr
|
||||
}
|
||||
|
||||
// configureTestCluster will set the params to start an etcd server
|
||||
func configureTestCluster(t *testing.T, name string, https bool) *EtcdTestServer {
|
||||
var err error
|
||||
m := &EtcdTestServer{}
|
||||
|
||||
pln := newLocalListener(t)
|
||||
m.PeerListeners = []net.Listener{pln}
|
||||
m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Allow test launches to control where etcd data goes, for space or performance reasons
|
||||
baseDir := os.Getenv("TEST_ETCD_DIR")
|
||||
if len(baseDir) == 0 {
|
||||
baseDir = os.TempDir()
|
||||
}
|
||||
|
||||
if https {
|
||||
m.CertificatesDir, err = ioutil.TempDir(baseDir, "etcd_certificates")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
|
||||
if err = ioutil.WriteFile(m.CertFile, []byte(testingcert.CertFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
|
||||
if err = ioutil.WriteFile(m.KeyFile, []byte(testingcert.KeyFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
|
||||
if err = ioutil.WriteFile(m.CAFile, []byte(testingcert.CAFileContent), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
|
||||
m.ClientListeners = []net.Listener{cln}
|
||||
m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
cln := newLocalListener(t)
|
||||
m.ClientListeners = []net.Listener{cln}
|
||||
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
m.AuthToken = "simple"
|
||||
m.Name = name
|
||||
m.DataDir, err = ioutil.TempDir(baseDir, "etcd")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
|
||||
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
m.InitialClusterToken = "TestEtcd"
|
||||
m.NewCluster = true
|
||||
m.ForceNewCluster = false
|
||||
m.ElectionTicks = 10
|
||||
m.TickMs = uint(10)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// launch will attempt to start the etcd server
|
||||
func (m *EtcdTestServer) launch(t *testing.T) error {
|
||||
var err error
|
||||
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
|
||||
return fmt.Errorf("failed to initialize the etcd server: %v", err)
|
||||
}
|
||||
m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
|
||||
m.s.Start()
|
||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(zap.NewExample(), m.s)}
|
||||
for _, ln := range m.PeerListeners {
|
||||
hs := &httptest.Server{
|
||||
Listener: ln,
|
||||
Config: &http.Server{Handler: m.raftHandler},
|
||||
}
|
||||
hs.Start()
|
||||
m.hss = append(m.hss, hs)
|
||||
}
|
||||
for _, ln := range m.ClientListeners {
|
||||
hs := &httptest.Server{
|
||||
Listener: ln,
|
||||
Config: &http.Server{Handler: v2http.NewClientHandler(zap.NewExample(), m.s, m.ServerConfig.ReqTimeout())},
|
||||
}
|
||||
hs.Start()
|
||||
m.hss = append(m.hss, hs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitForEtcd wait until etcd is propagated correctly
|
||||
func (m *EtcdTestServer) waitUntilUp() error {
|
||||
membersAPI := etcd.NewMembersAPI(m.Client)
|
||||
for start := time.Now(); time.Since(start) < wait.ForeverTestTimeout; time.Sleep(10 * time.Millisecond) {
|
||||
members, err := membersAPI.List(context.TODO())
|
||||
if err != nil {
|
||||
klog.Errorf("Error when getting etcd cluster members")
|
||||
continue
|
||||
}
|
||||
if len(members) == 1 && len(members[0].ClientURLs) > 0 {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("timeout on waiting for etcd cluster")
|
||||
}
|
||||
|
||||
// Terminate will shutdown the running etcd server
|
||||
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||
if m.v3Cluster != nil {
|
||||
m.v3Cluster.Terminate(t)
|
||||
} else {
|
||||
m.Client = nil
|
||||
m.s.Stop()
|
||||
// TODO: This is a pretty ugly hack to workaround races during closing
|
||||
// in-memory etcd server in unit tests - see #18928 for more details.
|
||||
// We should get rid of it as soon as we have a proper fix - etcd clients
|
||||
// have overwritten transport counting opened connections (probably by
|
||||
// overwriting Dial function) and termination function waiting for all
|
||||
// connections to be closed and stopping accepting new ones.
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
for _, hs := range m.hss {
|
||||
hs.CloseClientConnections()
|
||||
hs.Close()
|
||||
}
|
||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(m.CertificatesDir) > 0 {
|
||||
if err := os.RemoveAll(m.CertificatesDir); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
|
||||
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||
server := configureTestCluster(t, "foo", true)
|
||||
err := server.launch(t)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to start etcd server error=%v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
cfg := etcd.Config{
|
||||
Endpoints: server.ClientURLs.StringSlice(),
|
||||
Transport: newHTTPTransport(t, server.CertFile, server.KeyFile, server.CAFile),
|
||||
}
|
||||
server.Client, err = etcd.New(cfg)
|
||||
if err != nil {
|
||||
server.Terminate(t)
|
||||
t.Fatalf("Unexpected error in NewEtcdTestClientServer (%v)", err)
|
||||
return nil
|
||||
}
|
||||
if err := server.waitUntilUp(); err != nil {
|
||||
server.Terminate(t)
|
||||
t.Fatalf("Unexpected error in waitUntilUp (%v)", err)
|
||||
return nil
|
||||
}
|
||||
return server
|
||||
m.v3Cluster.Terminate(t)
|
||||
}
|
||||
|
||||
// NewUnsecuredEtcd3TestClientServer creates a new client and server for testing
|
||||
|
Loading…
Reference in New Issue
Block a user