diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go index 77254b7fd09..04157ce1733 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/compact_test.go @@ -22,14 +22,12 @@ import ( etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/integration" + + "k8s.io/apiserver/pkg/storage/etcd3/testserver" ) func TestCompact(t *testing.T) { - integration.BeforeTestExternal(t) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) - client := cluster.RandClient() + client := testserver.RunEtcd(t, nil) ctx := context.Background() putResp, err := client.Put(ctx, "/somekey", "data") @@ -58,10 +56,7 @@ func TestCompact(t *testing.T) { // - C1 compacts first. It will succeed. // - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one. func TestCompactConflict(t *testing.T) { - integration.BeforeTestExternal(t) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) - client := cluster.RandClient() + client := testserver.RunEtcd(t, nil) ctx := context.Background() putResp, err := client.Put(ctx, "/somekey", "data") diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index fded608e74a..041dc1708d0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -33,7 +33,6 @@ import ( "testing" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/integration" "google.golang.org/grpc/grpclog" apitesting "k8s.io/apimachinery/pkg/api/apitesting" @@ -50,6 +49,7 @@ import ( examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3/testserver" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -107,9 +107,7 @@ func newPod() runtime.Object { } func TestCreate(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) - etcdClient := cluster.RandClient() + ctx, store, etcdClient := testSetup(t) key := "/testkey" out := &example.Pod{} @@ -164,8 +162,7 @@ func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clien } func TestCreateWithTTL(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key := "/somekey" @@ -183,8 +180,7 @@ func TestCreateWithTTL(t *testing.T) { } func TestCreateWithKeyExist(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key, _ := testPropogateStore(ctx, t, store, obj) out := &example.Pod{} @@ -195,8 +191,7 @@ func TestCreateWithKeyExist(t *testing.T) { } func TestGet(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) // create an object to test key, createdObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) // update the object once to allow get by exact resource version to be tested @@ -300,8 +295,7 @@ func TestGet(t *testing.T) { } func TestUnconditionalDelete(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) tests := []struct { @@ -337,8 +331,7 @@ func TestUnconditionalDelete(t *testing.T) { } func TestConditionalDelete(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}}) tests := []struct { @@ -397,8 +390,7 @@ func TestConditionalDelete(t *testing.T) { // [DONE] Added TestPreconditionalDeleteWithSuggestion func TestDeleteWithSuggestion(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) @@ -413,8 +405,7 @@ func TestDeleteWithSuggestion(t *testing.T) { } func TestDeleteWithSuggestionAndConflict(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) @@ -440,8 +431,7 @@ func TestDeleteWithSuggestionAndConflict(t *testing.T) { } func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) @@ -459,8 +449,7 @@ func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) { } func TestValidateDeletionWithSuggestion(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) @@ -509,8 +498,7 @@ func TestValidateDeletionWithSuggestion(t *testing.T) { } func TestPreconditionalDeleteWithSuggestion(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}}) @@ -538,8 +526,7 @@ func TestPreconditionalDeleteWithSuggestion(t *testing.T) { } func TestGetToList(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) prevKey, prevStoredObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}}) prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion) @@ -653,9 +640,7 @@ func TestGetToList(t *testing.T) { } func TestGuaranteedUpdate(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) - etcdClient := cluster.RandClient() + ctx, store, etcdClient := testSetup(t) key := "/testkey" tests := []struct { @@ -797,8 +782,7 @@ func TestGuaranteedUpdate(t *testing.T) { } func TestGuaranteedUpdateWithTTL(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key := "/somekey" @@ -821,8 +805,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) { } func TestGuaranteedUpdateChecksStoredData(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} key := "/somekey" @@ -888,8 +871,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) { } func TestGuaranteedUpdateWithConflict(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) errChan := make(chan error, 1) @@ -935,8 +917,7 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) { } func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) // First, update without a suggestion so originalPod is outdated @@ -983,11 +964,9 @@ func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) { } func TestTransformationFailure(t *testing.T) { - integration.BeforeTestExternal(t) + client := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) ctx := context.Background() preset := []struct { @@ -1060,13 +1039,11 @@ func TestTransformationFailure(t *testing.T) { } func TestList(t *testing.T) { - integration.BeforeTestExternal(t) + client := testserver.RunEtcd(t, nil) defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)() codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) - disablePagingStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) + disablePagingStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) ctx := context.Background() // Setup storage with the following structure: @@ -1557,12 +1534,9 @@ func TestList(t *testing.T) { } func TestListContinuation(t *testing.T) { - integration.BeforeTestExternal(t) + etcdClient := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} - etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) @@ -1720,12 +1694,9 @@ func (r *clientRecorder) resetReads() { } func TestListContinuationWithFilter(t *testing.T) { - integration.BeforeTestExternal(t) + etcdClient := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} - etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) @@ -1828,11 +1799,9 @@ func TestListContinuationWithFilter(t *testing.T) { } func TestListInconsistentContinuation(t *testing.T) { - integration.BeforeTestExternal(t) + client := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig()) ctx := context.Background() // Setup storage with the following structure: @@ -1924,7 +1893,7 @@ func TestListInconsistentContinuation(t *testing.T) { if err != nil { t.Fatal(err) } - if _, err := cluster.Client(0).KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil { + if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil { t.Fatalf("Unable to compact, %v", err) } @@ -1974,19 +1943,18 @@ func TestListInconsistentContinuation(t *testing.T) { } } -func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { - integration.BeforeTestExternal(t) +func testSetup(t *testing.T) (context.Context, *store, *clientv3.Client) { + client := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) // As 30s is the default timeout for testing in glboal configuration, // we cannot wait longer than that in a single time: change it to 10 // for testing purposes. See apimachinery/pkg/util/wait/wait.go - store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ + store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ ReuseDurationSeconds: 1, MaxObjectCount: defaultLeaseMaxObjectCount, }) ctx := context.Background() - return ctx, store, cluster + return ctx, store, client } // testPropogateStore helps propagates store with objects, automates key generation, and returns @@ -2016,10 +1984,8 @@ func testPropogateStoreWithKey(ctx context.Context, t *testing.T, store *store, } func TestPrefix(t *testing.T) { - integration.BeforeTestExternal(t) + client := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} testcases := map[string]string{ "custom/prefix": "/custom/prefix", @@ -2027,7 +1993,7 @@ func TestPrefix(t *testing.T) { "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { - store := newStore(cluster.RandClient(), codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig()) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } @@ -2186,15 +2152,13 @@ func (t *fancyTransformer) createObject() error { } func TestConsistentList(t *testing.T) { - integration.BeforeTestExternal(t) + client := testserver.RunEtcd(t, nil) codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) transformer := &fancyTransformer{ transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, } - store := newStore(cluster.RandClient(), codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) + store := newStore(client, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig()) transformer.store = store for i := 0; i < 5; i++ { @@ -2259,8 +2223,7 @@ func TestConsistentList(t *testing.T) { } func TestCount(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) resourceA := "/foo.bar.io/abc" @@ -2298,14 +2261,12 @@ func TestCount(t *testing.T) { func TestLeaseMaxObjectCount(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - integration.BeforeTestExternal(t) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ + client := testserver.RunEtcd(t, nil) + store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{ ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, MaxObjectCount: 2, }) ctx := context.Background() - defer cluster.Terminate(t) obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}} out := &example.Pod{} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go index bc22d1a6ef3..f696192b6ff 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing/test_server.go @@ -17,49 +17,27 @@ limitations under the License. package testing import ( - "io/ioutil" "testing" - "k8s.io/apiserver/pkg/storage/storagebackend" - - grpclogsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/integration" - "google.golang.org/grpc/grpclog" + + "k8s.io/apiserver/pkg/storage/etcd3/testserver" + "k8s.io/apiserver/pkg/storage/storagebackend" ) -var grpc_logger grpclogsettable.SettableLoggerV2 - -func init() { - // override logger set up by etcd integration test package - grpc_logger = grpclogsettable.ReplaceGrpcLoggerV2() -} - // EtcdTestServer encapsulates the datastructures needed to start local instance for testing type EtcdTestServer struct { - CertificatesDir string - CertFile string - KeyFile string - CAFile string - - // The following are lumped etcd3 test server params - v3Cluster *integration.ClusterV3 - V3Client *clientv3.Client + V3Client *clientv3.Client } -// Terminate will shutdown the running etcd server -func (m *EtcdTestServer) Terminate(t *testing.T) { - m.v3Cluster.Terminate(t) +func (e *EtcdTestServer) Terminate(t *testing.T) { + // no-op, server termination moved to test cleanup } // NewUnsecuredEtcd3TestClientServer creates a new client and server for testing func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) { - integration.BeforeTestExternal(t) - grpc_logger.Set(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, &testErrorWriter{t})) - server := &EtcdTestServer{ - v3Cluster: integration.NewClusterV3(&noLogT{t}, &integration.ClusterConfig{Size: 1}), - } - server.V3Client = server.v3Cluster.RandClient() + server := &EtcdTestServer{} + server.V3Client = testserver.RunEtcd(t, nil) config := &storagebackend.Config{ Type: "etcd3", Prefix: PathPrefix(), @@ -70,21 +48,3 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storageb } return server, config } - -type noLogT struct { - testing.TB -} - -func (q *noLogT) Log(s ...interface{}) { -} -func (q *noLogT) Logf(s string, params ...interface{}) { -} - -type testErrorWriter struct { - testing.TB -} - -func (t *testErrorWriter) Write(b []byte) (int, error) { - t.TB.Error(string(b)) - return len(b), nil -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testserver/test_server.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testserver/test_server.go new file mode 100644 index 00000000000..41a411bdf41 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/testserver/test_server.go @@ -0,0 +1,126 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testserver + +import ( + "fmt" + "net" + "net/url" + "os" + "strconv" + "testing" + "time" + + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "google.golang.org/grpc" +) + +// getAvailablePort returns a TCP port that is available for binding. +func getAvailablePorts(count int) ([]int, error) { + ports := []int{} + for i := 0; i < count; i++ { + l, err := net.Listen("tcp", ":0") + if err != nil { + return nil, fmt.Errorf("could not bind to a port: %v", err) + } + // It is possible but unlikely that someone else will bind this port before we get a chance to use it. + defer l.Close() + ports = append(ports, l.Addr().(*net.TCPAddr).Port) + } + return ports, nil +} + +// NewTestConfig returns a configuration for an embedded etcd server. +// The configuration is based on embed.NewConfig(), with the following adjustments: +// * sets UnsafeNoFsync = true to improve test performance (only reasonable in a test-only +// single-member server we never intend to restart or keep data from) +// * uses free ports for client and peer listeners +// * cleans up the data directory on test termination +// * silences server logs other than errors +func NewTestConfig(t *testing.T) *embed.Config { + cfg := embed.NewConfig() + + cfg.UnsafeNoFsync = true + + ports, err := getAvailablePorts(2) + if err != nil { + t.Fatal(err) + } + clientURL := url.URL{Scheme: "http", Host: net.JoinHostPort("localhost", strconv.Itoa(ports[0]))} + peerURL := url.URL{Scheme: "http", Host: net.JoinHostPort("localhost", strconv.Itoa(ports[1]))} + + cfg.LPUrls = []url.URL{peerURL} + cfg.APUrls = []url.URL{peerURL} + cfg.LCUrls = []url.URL{clientURL} + cfg.ACUrls = []url.URL{clientURL} + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) + + cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel)).Named("etcd-server")) + cfg.Dir = t.TempDir() + os.Chmod(cfg.Dir, 0700) + return cfg +} + +// RunEtcd starts an embedded etcd server with the provided config +// (or NewTestConfig(t) if nil), and returns a client connected to the server. +// The server is terminated when the test ends. +func RunEtcd(t *testing.T, cfg *embed.Config) *clientv3.Client { + t.Helper() + + if cfg == nil { + cfg = NewTestConfig(t) + } + + e, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + t.Cleanup(e.Close) + + select { + case <-e.Server.ReadyNotify(): + case <-time.After(60 * time.Second): + e.Server.Stop() // trigger a shutdown + t.Fatal("server took too long to start") + } + go func() { + err := <-e.Err() + if err != nil { + t.Error(err) + } + }() + + tlsConfig, err := cfg.ClientTLSInfo.ClientConfig() + if err != nil { + t.Fatal(err) + } + + client, err := clientv3.New(clientv3.Config{ + TLS: tlsConfig, + Endpoints: e.Server.Cluster().ClientURLs(), + DialTimeout: 10 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + Logger: zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel)).Named("etcd-client"), + }) + if err != nil { + t.Fatal(err) + } + return client +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 4c072517aba..068e612814d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -25,7 +25,6 @@ import ( "time" clientv3 "go.etcd.io/etcd/client/v3" - "go.etcd.io/etcd/tests/v3/integration" apitesting "k8s.io/apimachinery/pkg/api/apitesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +37,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/etcd3/testserver" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" ) @@ -54,8 +54,7 @@ func TestWatchList(t *testing.T) { // - update should trigger Modified event // - update that gets filtered should trigger Deleted event func testWatch(t *testing.T, recursive bool) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}} podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}} @@ -129,8 +128,7 @@ func testWatch(t *testing.T, recursive bool) { } func TestDeleteTriggerWatch(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { @@ -146,8 +144,7 @@ func TestDeleteTriggerWatch(t *testing.T) { // - watch from 0 should sync up and grab the object added before // - watch from 0 is able to return events for objects whose previous version has been compacted func TestWatchFromZero(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, client := testSetup(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) @@ -190,7 +187,7 @@ func TestWatchFromZero(t *testing.T) { if err != nil { t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) } - _, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical()) + _, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical()) if err != nil { t.Fatalf("Error compacting: %v", err) } @@ -206,8 +203,7 @@ func TestWatchFromZero(t *testing.T) { // TestWatchFromNoneZero tests that // - watch from non-0 should just watch changes after given version func TestWatchFromNoneZero(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) @@ -224,16 +220,14 @@ func TestWatchFromNoneZero(t *testing.T) { func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} - integration.BeforeTestExternal(t) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) + client := testserver.RunEtcd(t, nil) + invalidStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) + validStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig()) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil @@ -242,8 +236,7 @@ func TestWatchError(t *testing.T) { } func TestWatchContextCancel(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, _ := testSetup(t) canceledCtx, cancel := context.WithCancel(ctx) cancel() // When we watch with a canceled context, we should detect that it's context canceled. @@ -264,8 +257,7 @@ func TestWatchContextCancel(t *testing.T) { } func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { - origCtx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + origCtx, store, _ := testSetup(t) ctx, cancel := context.WithCancel(origCtx) w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything) // make resutlChan and errChan blocking to ensure ordering. @@ -287,15 +279,14 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) { } func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { - ctx, store, cluster := testSetup(t) - defer cluster.Terminate(t) + ctx, store, client := testSetup(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix()) + etcdW := client.Watch(ctx, "/", clientv3.WithPrefix()) if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil { t.Fatalf("Delete failed: %v", err) @@ -316,8 +307,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { } func TestWatchInitializationSignal(t *testing.T) { - _, store, cluster := testSetup(t) - defer cluster.Terminate(t) + _, store, _ := testSetup(t) ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) initSignal := utilflowcontrol.NewInitializationSignal() @@ -334,14 +324,10 @@ func TestWatchInitializationSignal(t *testing.T) { func TestProgressNotify(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - clusterConfig := &integration.ClusterConfig{ - Size: 1, - WatchProgressNotifyInterval: time.Second, - } - integration.BeforeTestExternal(t) - cluster := integration.NewClusterV3(t, clusterConfig) - defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) + clusterConfig := testserver.NewTestConfig(t) + clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second + client := testserver.RunEtcd(t, clusterConfig) + store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig()) ctx := context.Background() key := "/somekey" diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go index 981d2c6d6f3..6e831c49627 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/tls_test.go @@ -25,7 +25,6 @@ import ( "testing" "go.etcd.io/etcd/client/pkg/v3/transport" - "go.etcd.io/etcd/tests/v3/integration" apitesting "k8s.io/apimachinery/pkg/api/apitesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,6 +34,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage/etcd3/testing/testingcert" + "k8s.io/apiserver/pkg/storage/etcd3/testserver" "k8s.io/apiserver/pkg/storage/storagebackend" ) @@ -53,23 +53,25 @@ func TestTLSConnection(t *testing.T) { certFile, keyFile, caFile := configureTLSCerts(t) defer os.RemoveAll(filepath.Dir(certFile)) - tlsInfo := &transport.TLSInfo{ + // override server config to be TLS-enabled + etcdConfig := testserver.NewTestConfig(t) + etcdConfig.ClientTLSInfo = transport.TLSInfo{ CertFile: certFile, KeyFile: keyFile, TrustedCAFile: caFile, } + for i := range etcdConfig.LCUrls { + etcdConfig.LCUrls[i].Scheme = "https" + } + for i := range etcdConfig.ACUrls { + etcdConfig.ACUrls[i].Scheme = "https" + } - integration.BeforeTestExternal(t) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{ - Size: 1, - ClientTLS: tlsInfo, - }) - defer cluster.Terminate(t) - + client := testserver.RunEtcd(t, etcdConfig) cfg := storagebackend.Config{ Type: storagebackend.StorageTypeETCD3, Transport: storagebackend.TransportConfig{ - ServerList: []string{cluster.Members[0].GRPCAddr()}, + ServerList: client.Endpoints(), CertFile: certFile, KeyFile: keyFile, TrustedCAFile: caFile,