mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Move etcdtest to testing
This commit is contained in:
parent
762518fd47
commit
28fa6de629
@ -1,29 +0,0 @@
|
|||||||
package(default_visibility = ["//visibility:public"])
|
|
||||||
|
|
||||||
load(
|
|
||||||
"@io_bazel_rules_go//go:def.bzl",
|
|
||||||
"go_library",
|
|
||||||
)
|
|
||||||
|
|
||||||
go_library(
|
|
||||||
name = "go_default_library",
|
|
||||||
srcs = [
|
|
||||||
"doc.go",
|
|
||||||
"etcdtest.go",
|
|
||||||
],
|
|
||||||
importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storage/etcd/etcdtest",
|
|
||||||
importpath = "k8s.io/apiserver/pkg/storage/etcd/etcdtest",
|
|
||||||
)
|
|
||||||
|
|
||||||
filegroup(
|
|
||||||
name = "package-srcs",
|
|
||||||
srcs = glob(["**"]),
|
|
||||||
tags = ["automanaged"],
|
|
||||||
visibility = ["//visibility:private"],
|
|
||||||
)
|
|
||||||
|
|
||||||
filegroup(
|
|
||||||
name = "all-srcs",
|
|
||||||
srcs = [":package-srcs"],
|
|
||||||
tags = ["automanaged"],
|
|
||||||
)
|
|
@ -1,17 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2014 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package etcdtest // import "k8s.io/apiserver/pkg/storage/etcd/etcdtest"
|
|
@ -1,36 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2014 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package etcdtest
|
|
||||||
|
|
||||||
import (
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Returns the prefix set via the ETCD_PREFIX environment variable (if any).
|
|
||||||
func PathPrefix() string {
|
|
||||||
pref := os.Getenv("ETCD_PREFIX")
|
|
||||||
if pref == "" {
|
|
||||||
pref = "registry"
|
|
||||||
}
|
|
||||||
return path.Join("/", pref)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Adds the ETCD_PREFIX to the provided key
|
|
||||||
func AddPrefix(in string) string {
|
|
||||||
return path.Join(PathPrefix(), in)
|
|
||||||
}
|
|
@ -0,0 +1,303 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package testing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apiserver/pkg/storage/etcd/testing/testingcert"
|
||||||
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
|
|
||||||
|
"context"
|
||||||
|
|
||||||
|
etcd "github.com/coreos/etcd/client"
|
||||||
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
"github.com/coreos/etcd/etcdserver"
|
||||||
|
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
|
||||||
|
"github.com/coreos/etcd/etcdserver/api/v2http"
|
||||||
|
"github.com/coreos/etcd/integration"
|
||||||
|
"github.com/coreos/etcd/pkg/testutil"
|
||||||
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
|
"github.com/coreos/etcd/pkg/types"
|
||||||
|
"k8s.io/klog"
|
||||||
|
)
|
||||||
|
|
||||||
|
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
|
||||||
|
type EtcdTestServer struct {
|
||||||
|
// The following are lumped etcd2 test server params
|
||||||
|
// TODO: Deprecate in a post 1.5 release
|
||||||
|
etcdserver.ServerConfig
|
||||||
|
PeerListeners, ClientListeners []net.Listener
|
||||||
|
Client etcd.Client
|
||||||
|
|
||||||
|
CertificatesDir string
|
||||||
|
CertFile string
|
||||||
|
KeyFile string
|
||||||
|
CAFile string
|
||||||
|
|
||||||
|
raftHandler http.Handler
|
||||||
|
s *etcdserver.EtcdServer
|
||||||
|
hss []*httptest.Server
|
||||||
|
|
||||||
|
// The following are lumped etcd3 test server params
|
||||||
|
v3Cluster *integration.ClusterV3
|
||||||
|
V3Client *clientv3.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// newLocalListener opens a port localhost using any port
|
||||||
|
func newLocalListener(t *testing.T) net.Listener {
|
||||||
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
// newSecuredLocalListener opens a port localhost using any port
|
||||||
|
// with SSL enable
|
||||||
|
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
|
||||||
|
var l net.Listener
|
||||||
|
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
tlsInfo := transport.TLSInfo{
|
||||||
|
CertFile: certFile,
|
||||||
|
KeyFile: keyFile,
|
||||||
|
CAFile: caFile,
|
||||||
|
}
|
||||||
|
tlscfg, err := tlsInfo.ServerConfig()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected serverConfig error: %v", err)
|
||||||
|
}
|
||||||
|
l, err = transport.NewKeepAliveListener(l, "https", tlscfg)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
|
||||||
|
tlsInfo := transport.TLSInfo{
|
||||||
|
CertFile: certFile,
|
||||||
|
KeyFile: keyFile,
|
||||||
|
CAFile: caFile,
|
||||||
|
}
|
||||||
|
tr, err := transport.NewTransport(tlsInfo, time.Second)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return tr
|
||||||
|
}
|
||||||
|
|
||||||
|
// configureTestCluster will set the params to start an etcd server
|
||||||
|
func configureTestCluster(t *testing.T, name string, https bool) *EtcdTestServer {
|
||||||
|
var err error
|
||||||
|
m := &EtcdTestServer{}
|
||||||
|
|
||||||
|
pln := newLocalListener(t)
|
||||||
|
m.PeerListeners = []net.Listener{pln}
|
||||||
|
m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow test launches to control where etcd data goes, for space or performance reasons
|
||||||
|
baseDir := os.Getenv("TEST_ETCD_DIR")
|
||||||
|
if len(baseDir) == 0 {
|
||||||
|
baseDir = os.TempDir()
|
||||||
|
}
|
||||||
|
|
||||||
|
if https {
|
||||||
|
m.CertificatesDir, err = ioutil.TempDir(baseDir, "etcd_certificates")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
|
||||||
|
if err = ioutil.WriteFile(m.CertFile, []byte(testingcert.CertFileContent), 0644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
|
||||||
|
if err = ioutil.WriteFile(m.KeyFile, []byte(testingcert.KeyFileContent), 0644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
|
||||||
|
if err = ioutil.WriteFile(m.CAFile, []byte(testingcert.CAFileContent), 0644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
|
||||||
|
m.ClientListeners = []net.Listener{cln}
|
||||||
|
m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cln := newLocalListener(t)
|
||||||
|
m.ClientListeners = []net.Listener{cln}
|
||||||
|
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m.AuthToken = "simple"
|
||||||
|
m.Name = name
|
||||||
|
m.DataDir, err = ioutil.TempDir(baseDir, "etcd")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
|
||||||
|
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
m.InitialClusterToken = "TestEtcd"
|
||||||
|
m.NewCluster = true
|
||||||
|
m.ForceNewCluster = false
|
||||||
|
m.ElectionTicks = 10
|
||||||
|
m.TickMs = uint(10)
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// launch will attempt to start the etcd server
|
||||||
|
func (m *EtcdTestServer) launch(t *testing.T) error {
|
||||||
|
var err error
|
||||||
|
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
|
||||||
|
return fmt.Errorf("failed to initialize the etcd server: %v", err)
|
||||||
|
}
|
||||||
|
m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
|
||||||
|
m.s.Start()
|
||||||
|
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
|
||||||
|
for _, ln := range m.PeerListeners {
|
||||||
|
hs := &httptest.Server{
|
||||||
|
Listener: ln,
|
||||||
|
Config: &http.Server{Handler: m.raftHandler},
|
||||||
|
}
|
||||||
|
hs.Start()
|
||||||
|
m.hss = append(m.hss, hs)
|
||||||
|
}
|
||||||
|
for _, ln := range m.ClientListeners {
|
||||||
|
hs := &httptest.Server{
|
||||||
|
Listener: ln,
|
||||||
|
Config: &http.Server{Handler: v2http.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
|
||||||
|
}
|
||||||
|
hs.Start()
|
||||||
|
m.hss = append(m.hss, hs)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForEtcd wait until etcd is propagated correctly
|
||||||
|
func (m *EtcdTestServer) waitUntilUp() error {
|
||||||
|
membersAPI := etcd.NewMembersAPI(m.Client)
|
||||||
|
for start := time.Now(); time.Since(start) < wait.ForeverTestTimeout; time.Sleep(10 * time.Millisecond) {
|
||||||
|
members, err := membersAPI.List(context.TODO())
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Error when getting etcd cluster members")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(members) == 1 && len(members[0].ClientURLs) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("timeout on waiting for etcd cluster")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate will shutdown the running etcd server
|
||||||
|
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||||
|
if m.v3Cluster != nil {
|
||||||
|
m.v3Cluster.Terminate(t)
|
||||||
|
} else {
|
||||||
|
m.Client = nil
|
||||||
|
m.s.Stop()
|
||||||
|
// TODO: This is a pretty ugly hack to workaround races during closing
|
||||||
|
// in-memory etcd server in unit tests - see #18928 for more details.
|
||||||
|
// We should get rid of it as soon as we have a proper fix - etcd clients
|
||||||
|
// have overwritten transport counting opened connections (probably by
|
||||||
|
// overwriting Dial function) and termination function waiting for all
|
||||||
|
// connections to be closed and stopping accepting new ones.
|
||||||
|
time.Sleep(250 * time.Millisecond)
|
||||||
|
for _, hs := range m.hss {
|
||||||
|
hs.CloseClientConnections()
|
||||||
|
hs.Close()
|
||||||
|
}
|
||||||
|
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(m.CertificatesDir) > 0 {
|
||||||
|
if err := os.RemoveAll(m.CertificatesDir); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
|
||||||
|
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
||||||
|
server := configureTestCluster(t, "foo", true)
|
||||||
|
err := server.launch(t)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to start etcd server error=%v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := etcd.Config{
|
||||||
|
Endpoints: server.ClientURLs.StringSlice(),
|
||||||
|
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
|
||||||
|
}
|
||||||
|
server.Client, err = etcd.New(cfg)
|
||||||
|
if err != nil {
|
||||||
|
server.Terminate(t)
|
||||||
|
t.Fatalf("Unexpected error in NewEtcdTestClientServer (%v)", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := server.waitUntilUp(); err != nil {
|
||||||
|
server.Terminate(t)
|
||||||
|
t.Fatalf("Unexpected error in waitUntilUp (%v)", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewEtcd3TestClientServer creates a new client and server for testing
|
||||||
|
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
|
||||||
|
server := &EtcdTestServer{
|
||||||
|
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
|
||||||
|
}
|
||||||
|
server.V3Client = server.v3Cluster.RandClient()
|
||||||
|
config := &storagebackend.Config{
|
||||||
|
Type: "etcd3",
|
||||||
|
Prefix: PathPrefix(),
|
||||||
|
Transport: storagebackend.TransportConfig{
|
||||||
|
ServerList: server.V3Client.Endpoints(),
|
||||||
|
},
|
||||||
|
Paging: true,
|
||||||
|
}
|
||||||
|
return server, config
|
||||||
|
}
|
@ -17,288 +17,20 @@ limitations under the License.
|
|||||||
package testing
|
package testing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"net/http/httptest"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
|
|
||||||
"k8s.io/apiserver/pkg/storage/etcd/testing/testingcert"
|
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
|
||||||
|
|
||||||
"context"
|
|
||||||
|
|
||||||
etcd "github.com/coreos/etcd/client"
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
|
||||||
"github.com/coreos/etcd/etcdserver"
|
|
||||||
"github.com/coreos/etcd/etcdserver/api/etcdhttp"
|
|
||||||
"github.com/coreos/etcd/etcdserver/api/v2http"
|
|
||||||
"github.com/coreos/etcd/integration"
|
|
||||||
"github.com/coreos/etcd/pkg/testutil"
|
|
||||||
"github.com/coreos/etcd/pkg/transport"
|
|
||||||
"github.com/coreos/etcd/pkg/types"
|
|
||||||
"k8s.io/klog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
|
// Returns the prefix set via the ETCD_PREFIX environment variable (if any).
|
||||||
type EtcdTestServer struct {
|
func PathPrefix() string {
|
||||||
// The following are lumped etcd2 test server params
|
pref := os.Getenv("ETCD_PREFIX")
|
||||||
// TODO: Deprecate in a post 1.5 release
|
if pref == "" {
|
||||||
etcdserver.ServerConfig
|
pref = "registry"
|
||||||
PeerListeners, ClientListeners []net.Listener
|
}
|
||||||
Client etcd.Client
|
return path.Join("/", pref)
|
||||||
|
|
||||||
CertificatesDir string
|
|
||||||
CertFile string
|
|
||||||
KeyFile string
|
|
||||||
CAFile string
|
|
||||||
|
|
||||||
raftHandler http.Handler
|
|
||||||
s *etcdserver.EtcdServer
|
|
||||||
hss []*httptest.Server
|
|
||||||
|
|
||||||
// The following are lumped etcd3 test server params
|
|
||||||
v3Cluster *integration.ClusterV3
|
|
||||||
V3Client *clientv3.Client
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLocalListener opens a port localhost using any port
|
// Adds the ETCD_PREFIX to the provided key
|
||||||
func newLocalListener(t *testing.T) net.Listener {
|
func AddPrefix(in string) string {
|
||||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
return path.Join(PathPrefix(), in)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
// newSecuredLocalListener opens a port localhost using any port
|
|
||||||
// with SSL enable
|
|
||||||
func newSecuredLocalListener(t *testing.T, certFile, keyFile, caFile string) net.Listener {
|
|
||||||
var l net.Listener
|
|
||||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
tlsInfo := transport.TLSInfo{
|
|
||||||
CertFile: certFile,
|
|
||||||
KeyFile: keyFile,
|
|
||||||
CAFile: caFile,
|
|
||||||
}
|
|
||||||
tlscfg, err := tlsInfo.ServerConfig()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("unexpected serverConfig error: %v", err)
|
|
||||||
}
|
|
||||||
l, err = transport.NewKeepAliveListener(l, "https", tlscfg)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return l
|
|
||||||
}
|
|
||||||
|
|
||||||
func newHttpTransport(t *testing.T, certFile, keyFile, caFile string) etcd.CancelableTransport {
|
|
||||||
tlsInfo := transport.TLSInfo{
|
|
||||||
CertFile: certFile,
|
|
||||||
KeyFile: keyFile,
|
|
||||||
CAFile: caFile,
|
|
||||||
}
|
|
||||||
tr, err := transport.NewTransport(tlsInfo, time.Second)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
return tr
|
|
||||||
}
|
|
||||||
|
|
||||||
// configureTestCluster will set the params to start an etcd server
|
|
||||||
func configureTestCluster(t *testing.T, name string, https bool) *EtcdTestServer {
|
|
||||||
var err error
|
|
||||||
m := &EtcdTestServer{}
|
|
||||||
|
|
||||||
pln := newLocalListener(t)
|
|
||||||
m.PeerListeners = []net.Listener{pln}
|
|
||||||
m.PeerURLs, err = types.NewURLs([]string{"http://" + pln.Addr().String()})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allow test launches to control where etcd data goes, for space or performance reasons
|
|
||||||
baseDir := os.Getenv("TEST_ETCD_DIR")
|
|
||||||
if len(baseDir) == 0 {
|
|
||||||
baseDir = os.TempDir()
|
|
||||||
}
|
|
||||||
|
|
||||||
if https {
|
|
||||||
m.CertificatesDir, err = ioutil.TempDir(baseDir, "etcd_certificates")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
m.CertFile = path.Join(m.CertificatesDir, "etcdcert.pem")
|
|
||||||
if err = ioutil.WriteFile(m.CertFile, []byte(testingcert.CertFileContent), 0644); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
m.KeyFile = path.Join(m.CertificatesDir, "etcdkey.pem")
|
|
||||||
if err = ioutil.WriteFile(m.KeyFile, []byte(testingcert.KeyFileContent), 0644); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
m.CAFile = path.Join(m.CertificatesDir, "ca.pem")
|
|
||||||
if err = ioutil.WriteFile(m.CAFile, []byte(testingcert.CAFileContent), 0644); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cln := newSecuredLocalListener(t, m.CertFile, m.KeyFile, m.CAFile)
|
|
||||||
m.ClientListeners = []net.Listener{cln}
|
|
||||||
m.ClientURLs, err = types.NewURLs([]string{"https://" + cln.Addr().String()})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
cln := newLocalListener(t)
|
|
||||||
m.ClientListeners = []net.Listener{cln}
|
|
||||||
m.ClientURLs, err = types.NewURLs([]string{"http://" + cln.Addr().String()})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
m.AuthToken = "simple"
|
|
||||||
m.Name = name
|
|
||||||
m.DataDir, err = ioutil.TempDir(baseDir, "etcd")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
clusterStr := fmt.Sprintf("%s=http://%s", name, pln.Addr().String())
|
|
||||||
m.InitialPeerURLsMap, err = types.NewURLsMap(clusterStr)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
m.InitialClusterToken = "TestEtcd"
|
|
||||||
m.NewCluster = true
|
|
||||||
m.ForceNewCluster = false
|
|
||||||
m.ElectionTicks = 10
|
|
||||||
m.TickMs = uint(10)
|
|
||||||
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
// launch will attempt to start the etcd server
|
|
||||||
func (m *EtcdTestServer) launch(t *testing.T) error {
|
|
||||||
var err error
|
|
||||||
if m.s, err = etcdserver.NewServer(m.ServerConfig); err != nil {
|
|
||||||
return fmt.Errorf("failed to initialize the etcd server: %v", err)
|
|
||||||
}
|
|
||||||
m.s.SyncTicker = time.NewTicker(500 * time.Millisecond)
|
|
||||||
m.s.Start()
|
|
||||||
m.raftHandler = &testutil.PauseableHandler{Next: etcdhttp.NewPeerHandler(m.s)}
|
|
||||||
for _, ln := range m.PeerListeners {
|
|
||||||
hs := &httptest.Server{
|
|
||||||
Listener: ln,
|
|
||||||
Config: &http.Server{Handler: m.raftHandler},
|
|
||||||
}
|
|
||||||
hs.Start()
|
|
||||||
m.hss = append(m.hss, hs)
|
|
||||||
}
|
|
||||||
for _, ln := range m.ClientListeners {
|
|
||||||
hs := &httptest.Server{
|
|
||||||
Listener: ln,
|
|
||||||
Config: &http.Server{Handler: v2http.NewClientHandler(m.s, m.ServerConfig.ReqTimeout())},
|
|
||||||
}
|
|
||||||
hs.Start()
|
|
||||||
m.hss = append(m.hss, hs)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForEtcd wait until etcd is propagated correctly
|
|
||||||
func (m *EtcdTestServer) waitUntilUp() error {
|
|
||||||
membersAPI := etcd.NewMembersAPI(m.Client)
|
|
||||||
for start := time.Now(); time.Since(start) < wait.ForeverTestTimeout; time.Sleep(10 * time.Millisecond) {
|
|
||||||
members, err := membersAPI.List(context.TODO())
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Error when getting etcd cluster members")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if len(members) == 1 && len(members[0].ClientURLs) > 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return fmt.Errorf("timeout on waiting for etcd cluster")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Terminate will shutdown the running etcd server
|
|
||||||
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
|
||||||
if m.v3Cluster != nil {
|
|
||||||
m.v3Cluster.Terminate(t)
|
|
||||||
} else {
|
|
||||||
m.Client = nil
|
|
||||||
m.s.Stop()
|
|
||||||
// TODO: This is a pretty ugly hack to workaround races during closing
|
|
||||||
// in-memory etcd server in unit tests - see #18928 for more details.
|
|
||||||
// We should get rid of it as soon as we have a proper fix - etcd clients
|
|
||||||
// have overwritten transport counting opened connections (probably by
|
|
||||||
// overwriting Dial function) and termination function waiting for all
|
|
||||||
// connections to be closed and stopping accepting new ones.
|
|
||||||
time.Sleep(250 * time.Millisecond)
|
|
||||||
for _, hs := range m.hss {
|
|
||||||
hs.CloseClientConnections()
|
|
||||||
hs.Close()
|
|
||||||
}
|
|
||||||
if err := os.RemoveAll(m.ServerConfig.DataDir); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if len(m.CertificatesDir) > 0 {
|
|
||||||
if err := os.RemoveAll(m.CertificatesDir); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewEtcdTestClientServer DEPRECATED creates a new client and server for testing
|
|
||||||
func NewEtcdTestClientServer(t *testing.T) *EtcdTestServer {
|
|
||||||
server := configureTestCluster(t, "foo", true)
|
|
||||||
err := server.launch(t)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to start etcd server error=%v", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg := etcd.Config{
|
|
||||||
Endpoints: server.ClientURLs.StringSlice(),
|
|
||||||
Transport: newHttpTransport(t, server.CertFile, server.KeyFile, server.CAFile),
|
|
||||||
}
|
|
||||||
server.Client, err = etcd.New(cfg)
|
|
||||||
if err != nil {
|
|
||||||
server.Terminate(t)
|
|
||||||
t.Fatalf("Unexpected error in NewEtcdTestClientServer (%v)", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if err := server.waitUntilUp(); err != nil {
|
|
||||||
server.Terminate(t)
|
|
||||||
t.Fatalf("Unexpected error in waitUntilUp (%v)", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return server
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewEtcd3TestClientServer creates a new client and server for testing
|
|
||||||
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
|
|
||||||
server := &EtcdTestServer{
|
|
||||||
v3Cluster: integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}),
|
|
||||||
}
|
|
||||||
server.V3Client = server.v3Cluster.RandClient()
|
|
||||||
config := &storagebackend.Config{
|
|
||||||
Type: "etcd3",
|
|
||||||
Prefix: etcdtest.PathPrefix(),
|
|
||||||
Transport: storagebackend.TransportConfig{
|
|
||||||
ServerList: server.V3Client.Endpoints(),
|
|
||||||
},
|
|
||||||
Paging: true,
|
|
||||||
}
|
|
||||||
return server, config
|
|
||||||
}
|
}
|
||||||
|
@ -44,7 +44,6 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
|
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
|
||||||
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
|
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd/etcdtest"
|
|
||||||
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
@ -148,7 +147,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGet(t *testing.T) {
|
func TestGet(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -179,7 +178,7 @@ func TestGet(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetToList(t *testing.T) {
|
func TestGetToList(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -235,7 +234,7 @@ func TestGetToList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -316,7 +315,7 @@ func TestList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestInfiniteList(t *testing.T) {
|
func TestInfiniteList(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -370,7 +369,7 @@ func (self *injectListError) List(ctx context.Context, key string, resourceVersi
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
// Inject one list error to make sure we test the relist case.
|
// Inject one list error to make sure we test the relist case.
|
||||||
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
@ -447,7 +446,7 @@ func TestWatch(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestWatcherTimeout(t *testing.T) {
|
func TestWatcherTimeout(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -489,7 +488,7 @@ func TestWatcherTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestFiltering(t *testing.T) {
|
func TestFiltering(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, _ := newTestCacher(etcdStorage, 10)
|
cacher, _ := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -551,7 +550,7 @@ func TestFiltering(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestStartingResourceVersion(t *testing.T) {
|
func TestStartingResourceVersion(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -599,7 +598,7 @@ func TestStartingResourceVersion(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestEmptyWatchEventCache(t *testing.T) {
|
func TestEmptyWatchEventCache(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
|
|
||||||
// add a few objects
|
// add a few objects
|
||||||
@ -663,7 +662,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRandomWatchDeliver(t *testing.T) {
|
func TestRandomWatchDeliver(t *testing.T) {
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -789,7 +788,7 @@ func TestCacherListerWatcherPagination(t *testing.T) {
|
|||||||
func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
||||||
|
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
@ -851,7 +850,7 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
|
|||||||
func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
|
func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
|
||||||
|
|
||||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
server, etcdStorage := newEtcdTestStorage(t, etcdtesting.PathPrefix())
|
||||||
defer server.Terminate(t)
|
defer server.Terminate(t)
|
||||||
cacher, v := newTestCacher(etcdStorage, 10)
|
cacher, v := newTestCacher(etcdStorage, 10)
|
||||||
defer cacher.Stop()
|
defer cacher.Stop()
|
||||||
|
Loading…
Reference in New Issue
Block a user