Simplify running embedded etcd server in tests

This commit is contained in:
Jordan Liggitt 2021-06-15 12:07:46 -04:00
parent 56efa75173
commit 01760927b8
6 changed files with 207 additions and 177 deletions

View File

@ -22,14 +22,12 @@ import (
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
)
func TestCompact(t *testing.T) {
integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
client := cluster.RandClient()
client := testserver.RunEtcd(t, nil)
ctx := context.Background()
putResp, err := client.Put(ctx, "/somekey", "data")
@ -58,10 +56,7 @@ func TestCompact(t *testing.T) {
// - C1 compacts first. It will succeed.
// - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one.
func TestCompactConflict(t *testing.T) {
integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
client := cluster.RandClient()
client := testserver.RunEtcd(t, nil)
ctx := context.Background()
putResp, err := client.Put(ctx, "/somekey", "data")

View File

@ -33,7 +33,6 @@ import (
"testing"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc/grpclog"
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
@ -50,6 +49,7 @@ import (
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -107,9 +107,7 @@ func newPod() runtime.Object {
}
func TestCreate(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
etcdClient := cluster.RandClient()
ctx, store, etcdClient := testSetup(t)
key := "/testkey"
out := &example.Pod{}
@ -164,8 +162,7 @@ func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clien
}
func TestCreateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey"
@ -183,8 +180,7 @@ func TestCreateWithTTL(t *testing.T) {
}
func TestCreateWithKeyExist(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key, _ := testPropogateStore(ctx, t, store, obj)
out := &example.Pod{}
@ -195,8 +191,7 @@ func TestCreateWithKeyExist(t *testing.T) {
}
func TestGet(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
// create an object to test
key, createdObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
// update the object once to allow get by exact resource version to be tested
@ -300,8 +295,7 @@ func TestGet(t *testing.T) {
}
func TestUnconditionalDelete(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
tests := []struct {
@ -337,8 +331,7 @@ func TestUnconditionalDelete(t *testing.T) {
}
func TestConditionalDelete(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
tests := []struct {
@ -397,8 +390,7 @@ func TestConditionalDelete(t *testing.T) {
// [DONE] Added TestPreconditionalDeleteWithSuggestion
func TestDeleteWithSuggestion(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
@ -413,8 +405,7 @@ func TestDeleteWithSuggestion(t *testing.T) {
}
func TestDeleteWithSuggestionAndConflict(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
@ -440,8 +431,7 @@ func TestDeleteWithSuggestionAndConflict(t *testing.T) {
}
func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
@ -459,8 +449,7 @@ func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) {
}
func TestValidateDeletionWithSuggestion(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
@ -509,8 +498,7 @@ func TestValidateDeletionWithSuggestion(t *testing.T) {
}
func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
@ -538,8 +526,7 @@ func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
}
func TestGetToList(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
prevKey, prevStoredObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}})
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
@ -653,9 +640,7 @@ func TestGetToList(t *testing.T) {
}
func TestGuaranteedUpdate(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
etcdClient := cluster.RandClient()
ctx, store, etcdClient := testSetup(t)
key := "/testkey"
tests := []struct {
@ -797,8 +782,7 @@ func TestGuaranteedUpdate(t *testing.T) {
}
func TestGuaranteedUpdateWithTTL(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey"
@ -821,8 +805,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
}
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
key := "/somekey"
@ -888,8 +871,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
}
func TestGuaranteedUpdateWithConflict(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
errChan := make(chan error, 1)
@ -935,8 +917,7 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
}
func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
// First, update without a suggestion so originalPod is outdated
@ -983,11 +964,9 @@ func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
}
func TestTransformationFailure(t *testing.T) {
integration.BeforeTestExternal(t)
client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background()
preset := []struct {
@ -1060,13 +1039,11 @@ func TestTransformationFailure(t *testing.T) {
}
func TestList(t *testing.T) {
integration.BeforeTestExternal(t)
client := testserver.RunEtcd(t, nil)
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)()
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
disablePagingStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
disablePagingStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background()
// Setup storage with the following structure:
@ -1557,12 +1534,9 @@ func TestList(t *testing.T) {
}
func TestListContinuation(t *testing.T) {
integration.BeforeTestExternal(t)
etcdClient := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
@ -1720,12 +1694,9 @@ func (r *clientRecorder) resetReads() {
}
func TestListContinuationWithFilter(t *testing.T) {
integration.BeforeTestExternal(t)
etcdClient := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
etcdClient := cluster.RandClient()
recorder := &clientRecorder{KV: etcdClient.KV}
etcdClient.KV = recorder
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
@ -1828,11 +1799,9 @@ func TestListContinuationWithFilter(t *testing.T) {
}
func TestListInconsistentContinuation(t *testing.T) {
integration.BeforeTestExternal(t)
client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
ctx := context.Background()
// Setup storage with the following structure:
@ -1924,7 +1893,7 @@ func TestListInconsistentContinuation(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if _, err := cluster.Client(0).KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
t.Fatalf("Unable to compact, %v", err)
}
@ -1974,19 +1943,18 @@ func TestListInconsistentContinuation(t *testing.T) {
}
}
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
integration.BeforeTestExternal(t)
func testSetup(t *testing.T) (context.Context, *store, *clientv3.Client) {
client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
// As 30s is the default timeout for testing in glboal configuration,
// we cannot wait longer than that in a single time: change it to 10
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
ReuseDurationSeconds: 1,
MaxObjectCount: defaultLeaseMaxObjectCount,
})
ctx := context.Background()
return ctx, store, cluster
return ctx, store, client
}
// testPropogateStore helps propagates store with objects, automates key generation, and returns
@ -2016,10 +1984,8 @@ func testPropogateStoreWithKey(ctx context.Context, t *testing.T, store *store,
}
func TestPrefix(t *testing.T) {
integration.BeforeTestExternal(t)
client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
testcases := map[string]string{
"custom/prefix": "/custom/prefix",
@ -2027,7 +1993,7 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry",
}
for configuredPrefix, effectivePrefix := range testcases {
store := newStore(cluster.RandClient(), codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig())
if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
}
@ -2186,15 +2152,13 @@ func (t *fancyTransformer) createObject() error {
}
func TestConsistentList(t *testing.T) {
integration.BeforeTestExternal(t)
client := testserver.RunEtcd(t, nil)
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
transformer := &fancyTransformer{
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
}
store := newStore(cluster.RandClient(), codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
store := newStore(client, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
transformer.store = store
for i := 0; i < 5; i++ {
@ -2259,8 +2223,7 @@ func TestConsistentList(t *testing.T) {
}
func TestCount(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
resourceA := "/foo.bar.io/abc"
@ -2298,14 +2261,12 @@ func TestCount(t *testing.T) {
func TestLeaseMaxObjectCount(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
client := testserver.RunEtcd(t, nil)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
MaxObjectCount: 2,
})
ctx := context.Background()
defer cluster.Terminate(t)
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}}
out := &example.Pod{}

View File

@ -17,49 +17,27 @@ limitations under the License.
package testing
import (
"io/ioutil"
"testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
grpclogsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc/grpclog"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
var grpc_logger grpclogsettable.SettableLoggerV2
func init() {
// override logger set up by etcd integration test package
grpc_logger = grpclogsettable.ReplaceGrpcLoggerV2()
}
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
type EtcdTestServer struct {
CertificatesDir string
CertFile string
KeyFile string
CAFile string
// The following are lumped etcd3 test server params
v3Cluster *integration.ClusterV3
V3Client *clientv3.Client
}
// Terminate will shutdown the running etcd server
func (m *EtcdTestServer) Terminate(t *testing.T) {
m.v3Cluster.Terminate(t)
func (e *EtcdTestServer) Terminate(t *testing.T) {
// no-op, server termination moved to test cleanup
}
// NewUnsecuredEtcd3TestClientServer creates a new client and server for testing
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
integration.BeforeTestExternal(t)
grpc_logger.Set(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, &testErrorWriter{t}))
server := &EtcdTestServer{
v3Cluster: integration.NewClusterV3(&noLogT{t}, &integration.ClusterConfig{Size: 1}),
}
server.V3Client = server.v3Cluster.RandClient()
server := &EtcdTestServer{}
server.V3Client = testserver.RunEtcd(t, nil)
config := &storagebackend.Config{
Type: "etcd3",
Prefix: PathPrefix(),
@ -70,21 +48,3 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storageb
}
return server, config
}
type noLogT struct {
testing.TB
}
func (q *noLogT) Log(s ...interface{}) {
}
func (q *noLogT) Logf(s string, params ...interface{}) {
}
type testErrorWriter struct {
testing.TB
}
func (t *testErrorWriter) Write(b []byte) (int, error) {
t.TB.Error(string(b))
return len(b), nil
}

View File

@ -0,0 +1,126 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testserver
import (
"fmt"
"net"
"net/url"
"os"
"strconv"
"testing"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
)
// getAvailablePort returns a TCP port that is available for binding.
func getAvailablePorts(count int) ([]int, error) {
ports := []int{}
for i := 0; i < count; i++ {
l, err := net.Listen("tcp", ":0")
if err != nil {
return nil, fmt.Errorf("could not bind to a port: %v", err)
}
// It is possible but unlikely that someone else will bind this port before we get a chance to use it.
defer l.Close()
ports = append(ports, l.Addr().(*net.TCPAddr).Port)
}
return ports, nil
}
// NewTestConfig returns a configuration for an embedded etcd server.
// The configuration is based on embed.NewConfig(), with the following adjustments:
// * sets UnsafeNoFsync = true to improve test performance (only reasonable in a test-only
// single-member server we never intend to restart or keep data from)
// * uses free ports for client and peer listeners
// * cleans up the data directory on test termination
// * silences server logs other than errors
func NewTestConfig(t *testing.T) *embed.Config {
cfg := embed.NewConfig()
cfg.UnsafeNoFsync = true
ports, err := getAvailablePorts(2)
if err != nil {
t.Fatal(err)
}
clientURL := url.URL{Scheme: "http", Host: net.JoinHostPort("localhost", strconv.Itoa(ports[0]))}
peerURL := url.URL{Scheme: "http", Host: net.JoinHostPort("localhost", strconv.Itoa(ports[1]))}
cfg.LPUrls = []url.URL{peerURL}
cfg.APUrls = []url.URL{peerURL}
cfg.LCUrls = []url.URL{clientURL}
cfg.ACUrls = []url.URL{clientURL}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel)).Named("etcd-server"))
cfg.Dir = t.TempDir()
os.Chmod(cfg.Dir, 0700)
return cfg
}
// RunEtcd starts an embedded etcd server with the provided config
// (or NewTestConfig(t) if nil), and returns a client connected to the server.
// The server is terminated when the test ends.
func RunEtcd(t *testing.T, cfg *embed.Config) *clientv3.Client {
t.Helper()
if cfg == nil {
cfg = NewTestConfig(t)
}
e, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
t.Cleanup(e.Close)
select {
case <-e.Server.ReadyNotify():
case <-time.After(60 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Fatal("server took too long to start")
}
go func() {
err := <-e.Err()
if err != nil {
t.Error(err)
}
}()
tlsConfig, err := cfg.ClientTLSInfo.ClientConfig()
if err != nil {
t.Fatal(err)
}
client, err := clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: e.Server.Cluster().ClientURLs(),
DialTimeout: 10 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
Logger: zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel)).Named("etcd-client"),
})
if err != nil {
t.Fatal(err)
}
return client
}

View File

@ -25,7 +25,6 @@ import (
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/integration"
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -38,6 +37,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
)
@ -54,8 +54,7 @@ func TestWatchList(t *testing.T) {
// - update should trigger Modified event
// - update that gets filtered should trigger Deleted event
func testWatch(t *testing.T, recursive bool) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
@ -129,8 +128,7 @@ func testWatch(t *testing.T, recursive bool) {
}
func TestDeleteTriggerWatch(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil {
@ -146,8 +144,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
// - watch from 0 should sync up and grab the object added before
// - watch from 0 is able to return events for objects whose previous version has been compacted
func TestWatchFromZero(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, client := testSetup(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
@ -190,7 +187,7 @@ func TestWatchFromZero(t *testing.T) {
if err != nil {
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
}
_, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
_, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
if err != nil {
t.Fatalf("Error compacting: %v", err)
}
@ -206,8 +203,7 @@ func TestWatchFromZero(t *testing.T) {
// TestWatchFromNoneZero tests that
// - watch from non-0 should just watch changes after given version
func TestWatchFromNoneZero(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
@ -224,16 +220,14 @@ func TestWatchFromNoneZero(t *testing.T) {
func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
client := testserver.RunEtcd(t, nil)
invalidStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
validStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
@ -242,8 +236,7 @@ func TestWatchError(t *testing.T) {
}
func TestWatchContextCancel(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, _ := testSetup(t)
canceledCtx, cancel := context.WithCancel(ctx)
cancel()
// When we watch with a canceled context, we should detect that it's context canceled.
@ -264,8 +257,7 @@ func TestWatchContextCancel(t *testing.T) {
}
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
origCtx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
origCtx, store, _ := testSetup(t)
ctx, cancel := context.WithCancel(origCtx)
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
// make resutlChan and errChan blocking to ensure ordering.
@ -287,15 +279,14 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
}
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
ctx, store, client := testSetup(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix())
etcdW := client.Watch(ctx, "/", clientv3.WithPrefix())
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
t.Fatalf("Delete failed: %v", err)
@ -316,8 +307,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
}
func TestWatchInitializationSignal(t *testing.T) {
_, store, cluster := testSetup(t)
defer cluster.Terminate(t)
_, store, _ := testSetup(t)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
initSignal := utilflowcontrol.NewInitializationSignal()
@ -334,14 +324,10 @@ func TestWatchInitializationSignal(t *testing.T) {
func TestProgressNotify(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
clusterConfig := &integration.ClusterConfig{
Size: 1,
WatchProgressNotifyInterval: time.Second,
}
integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, clusterConfig)
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
clusterConfig := testserver.NewTestConfig(t)
clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second
client := testserver.RunEtcd(t, clusterConfig)
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
ctx := context.Background()
key := "/somekey"

View File

@ -25,7 +25,6 @@ import (
"testing"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/tests/v3/integration"
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -35,6 +34,7 @@ import (
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage/etcd3/testing/testingcert"
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
"k8s.io/apiserver/pkg/storage/storagebackend"
)
@ -53,23 +53,25 @@ func TestTLSConnection(t *testing.T) {
certFile, keyFile, caFile := configureTLSCerts(t)
defer os.RemoveAll(filepath.Dir(certFile))
tlsInfo := &transport.TLSInfo{
// override server config to be TLS-enabled
etcdConfig := testserver.NewTestConfig(t)
etcdConfig.ClientTLSInfo = transport.TLSInfo{
CertFile: certFile,
KeyFile: keyFile,
TrustedCAFile: caFile,
}
for i := range etcdConfig.LCUrls {
etcdConfig.LCUrls[i].Scheme = "https"
}
for i := range etcdConfig.ACUrls {
etcdConfig.ACUrls[i].Scheme = "https"
}
integration.BeforeTestExternal(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{
Size: 1,
ClientTLS: tlsInfo,
})
defer cluster.Terminate(t)
client := testserver.RunEtcd(t, etcdConfig)
cfg := storagebackend.Config{
Type: storagebackend.StorageTypeETCD3,
Transport: storagebackend.TransportConfig{
ServerList: []string{cluster.Members[0].GRPCAddr()},
ServerList: client.Endpoints(),
CertFile: certFile,
KeyFile: keyFile,
TrustedCAFile: caFile,