mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Simplify running embedded etcd server in tests
This commit is contained in:
parent
56efa75173
commit
01760927b8
@ -22,14 +22,12 @@ import (
|
||||
|
||||
etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||
)
|
||||
|
||||
func TestCompact(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
client := cluster.RandClient()
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
ctx := context.Background()
|
||||
|
||||
putResp, err := client.Put(ctx, "/somekey", "data")
|
||||
@ -58,10 +56,7 @@ func TestCompact(t *testing.T) {
|
||||
// - C1 compacts first. It will succeed.
|
||||
// - C2 compacts after. It will fail. But it will get latest logical time, which should be larger by one.
|
||||
func TestCompactConflict(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
client := cluster.RandClient()
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
ctx := context.Background()
|
||||
|
||||
putResp, err := client.Put(ctx, "/somekey", "data")
|
||||
|
@ -33,7 +33,6 @@ import (
|
||||
"testing"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
|
||||
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
|
||||
@ -50,6 +49,7 @@ import (
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
@ -107,9 +107,7 @@ func newPod() runtime.Object {
|
||||
}
|
||||
|
||||
func TestCreate(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
etcdClient := cluster.RandClient()
|
||||
ctx, store, etcdClient := testSetup(t)
|
||||
|
||||
key := "/testkey"
|
||||
out := &example.Pod{}
|
||||
@ -164,8 +162,7 @@ func checkStorageInvariants(ctx context.Context, t *testing.T, etcdClient *clien
|
||||
}
|
||||
|
||||
func TestCreateWithTTL(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key := "/somekey"
|
||||
@ -183,8 +180,7 @@ func TestCreateWithTTL(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCreateWithKeyExist(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key, _ := testPropogateStore(ctx, t, store, obj)
|
||||
out := &example.Pod{}
|
||||
@ -195,8 +191,7 @@ func TestCreateWithKeyExist(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGet(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
// create an object to test
|
||||
key, createdObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
// update the object once to allow get by exact resource version to be tested
|
||||
@ -300,8 +295,7 @@ func TestGet(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUnconditionalDelete(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
tests := []struct {
|
||||
@ -337,8 +331,7 @@ func TestUnconditionalDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConditionalDelete(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: "A"}})
|
||||
|
||||
tests := []struct {
|
||||
@ -397,8 +390,7 @@ func TestConditionalDelete(t *testing.T) {
|
||||
// [DONE] Added TestPreconditionalDeleteWithSuggestion
|
||||
|
||||
func TestDeleteWithSuggestion(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
@ -413,8 +405,7 @@ func TestDeleteWithSuggestion(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteWithSuggestionAndConflict(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
@ -440,8 +431,7 @@ func TestDeleteWithSuggestionAndConflict(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
@ -459,8 +449,7 @@ func TestDeleteWithSuggestionOfDeletedObject(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestValidateDeletionWithSuggestion(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
@ -509,8 +498,7 @@ func TestValidateDeletionWithSuggestion(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "name"}})
|
||||
|
||||
@ -538,8 +526,7 @@ func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGetToList(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
prevKey, prevStoredObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "prev"}})
|
||||
|
||||
prevRV, _ := strconv.Atoi(prevStoredObj.ResourceVersion)
|
||||
@ -653,9 +640,7 @@ func TestGetToList(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdate(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
etcdClient := cluster.RandClient()
|
||||
ctx, store, etcdClient := testSetup(t)
|
||||
key := "/testkey"
|
||||
|
||||
tests := []struct {
|
||||
@ -797,8 +782,7 @@ func TestGuaranteedUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key := "/somekey"
|
||||
@ -821,8 +805,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
input := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
key := "/somekey"
|
||||
@ -888,8 +871,7 @@ func TestGuaranteedUpdateChecksStoredData(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
key, _ := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
@ -935,8 +917,7 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
// First, update without a suggestion so originalPod is outdated
|
||||
@ -983,11 +964,9 @@ func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTransformationFailure(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
preset := []struct {
|
||||
@ -1060,13 +1039,11 @@ func TestTransformationFailure(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)()
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
|
||||
disablePagingStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
|
||||
disablePagingStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
@ -1557,12 +1534,9 @@ func TestList(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListContinuation(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
etcdClient := testserver.RunEtcd(t, nil)
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
||||
etcdClient := cluster.RandClient()
|
||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||
etcdClient.KV = recorder
|
||||
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
|
||||
@ -1720,12 +1694,9 @@ func (r *clientRecorder) resetReads() {
|
||||
}
|
||||
|
||||
func TestListContinuationWithFilter(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
etcdClient := testserver.RunEtcd(t, nil)
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
||||
etcdClient := cluster.RandClient()
|
||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||
etcdClient.KV = recorder
|
||||
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
|
||||
@ -1828,11 +1799,9 @@ func TestListContinuationWithFilter(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestListInconsistentContinuation(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
|
||||
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
// Setup storage with the following structure:
|
||||
@ -1924,7 +1893,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := cluster.Client(0).KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
|
||||
if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
|
||||
t.Fatalf("Unable to compact, %v", err)
|
||||
}
|
||||
|
||||
@ -1974,19 +1943,18 @@ func TestListInconsistentContinuation(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
integration.BeforeTestExternal(t)
|
||||
func testSetup(t *testing.T) (context.Context, *store, *clientv3.Client) {
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
// As 30s is the default timeout for testing in glboal configuration,
|
||||
// we cannot wait longer than that in a single time: change it to 10
|
||||
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
|
||||
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
|
||||
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
|
||||
ReuseDurationSeconds: 1,
|
||||
MaxObjectCount: defaultLeaseMaxObjectCount,
|
||||
})
|
||||
ctx := context.Background()
|
||||
return ctx, store, cluster
|
||||
return ctx, store, client
|
||||
}
|
||||
|
||||
// testPropogateStore helps propagates store with objects, automates key generation, and returns
|
||||
@ -2016,10 +1984,8 @@ func testPropogateStoreWithKey(ctx context.Context, t *testing.T, store *store,
|
||||
}
|
||||
|
||||
func TestPrefix(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
||||
testcases := map[string]string{
|
||||
"custom/prefix": "/custom/prefix",
|
||||
@ -2027,7 +1993,7 @@ func TestPrefix(t *testing.T) {
|
||||
"/registry": "/registry",
|
||||
}
|
||||
for configuredPrefix, effectivePrefix := range testcases {
|
||||
store := newStore(cluster.RandClient(), codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig())
|
||||
store := newStore(client, codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig())
|
||||
if store.pathPrefix != effectivePrefix {
|
||||
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
|
||||
}
|
||||
@ -2186,15 +2152,13 @@ func (t *fancyTransformer) createObject() error {
|
||||
}
|
||||
|
||||
func TestConsistentList(t *testing.T) {
|
||||
integration.BeforeTestExternal(t)
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
transformer := &fancyTransformer{
|
||||
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
|
||||
}
|
||||
store := newStore(cluster.RandClient(), codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
|
||||
store := newStore(client, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
|
||||
transformer.store = store
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
@ -2259,8 +2223,7 @@ func TestConsistentList(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCount(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
|
||||
resourceA := "/foo.bar.io/abc"
|
||||
|
||||
@ -2298,14 +2261,12 @@ func TestCount(t *testing.T) {
|
||||
|
||||
func TestLeaseMaxObjectCount(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
integration.BeforeTestExternal(t)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
|
||||
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
|
||||
MaxObjectCount: 2,
|
||||
})
|
||||
ctx := context.Background()
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", SelfLink: "testlink"}}
|
||||
out := &example.Pod{}
|
||||
|
@ -17,49 +17,27 @@ limitations under the License.
|
||||
package testing
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
|
||||
grpclogsettable "github.com/grpc-ecosystem/go-grpc-middleware/logging/settable"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
var grpc_logger grpclogsettable.SettableLoggerV2
|
||||
|
||||
func init() {
|
||||
// override logger set up by etcd integration test package
|
||||
grpc_logger = grpclogsettable.ReplaceGrpcLoggerV2()
|
||||
}
|
||||
|
||||
// EtcdTestServer encapsulates the datastructures needed to start local instance for testing
|
||||
type EtcdTestServer struct {
|
||||
CertificatesDir string
|
||||
CertFile string
|
||||
KeyFile string
|
||||
CAFile string
|
||||
|
||||
// The following are lumped etcd3 test server params
|
||||
v3Cluster *integration.ClusterV3
|
||||
V3Client *clientv3.Client
|
||||
V3Client *clientv3.Client
|
||||
}
|
||||
|
||||
// Terminate will shutdown the running etcd server
|
||||
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||
m.v3Cluster.Terminate(t)
|
||||
func (e *EtcdTestServer) Terminate(t *testing.T) {
|
||||
// no-op, server termination moved to test cleanup
|
||||
}
|
||||
|
||||
// NewUnsecuredEtcd3TestClientServer creates a new client and server for testing
|
||||
func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storagebackend.Config) {
|
||||
integration.BeforeTestExternal(t)
|
||||
grpc_logger.Set(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, &testErrorWriter{t}))
|
||||
server := &EtcdTestServer{
|
||||
v3Cluster: integration.NewClusterV3(&noLogT{t}, &integration.ClusterConfig{Size: 1}),
|
||||
}
|
||||
server.V3Client = server.v3Cluster.RandClient()
|
||||
server := &EtcdTestServer{}
|
||||
server.V3Client = testserver.RunEtcd(t, nil)
|
||||
config := &storagebackend.Config{
|
||||
Type: "etcd3",
|
||||
Prefix: PathPrefix(),
|
||||
@ -70,21 +48,3 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T) (*EtcdTestServer, *storageb
|
||||
}
|
||||
return server, config
|
||||
}
|
||||
|
||||
type noLogT struct {
|
||||
testing.TB
|
||||
}
|
||||
|
||||
func (q *noLogT) Log(s ...interface{}) {
|
||||
}
|
||||
func (q *noLogT) Logf(s string, params ...interface{}) {
|
||||
}
|
||||
|
||||
type testErrorWriter struct {
|
||||
testing.TB
|
||||
}
|
||||
|
||||
func (t *testErrorWriter) Write(b []byte) (int, error) {
|
||||
t.TB.Error(string(b))
|
||||
return len(b), nil
|
||||
}
|
||||
|
@ -0,0 +1,126 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/server/v3/embed"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// getAvailablePort returns a TCP port that is available for binding.
|
||||
func getAvailablePorts(count int) ([]int, error) {
|
||||
ports := []int{}
|
||||
for i := 0; i < count; i++ {
|
||||
l, err := net.Listen("tcp", ":0")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not bind to a port: %v", err)
|
||||
}
|
||||
// It is possible but unlikely that someone else will bind this port before we get a chance to use it.
|
||||
defer l.Close()
|
||||
ports = append(ports, l.Addr().(*net.TCPAddr).Port)
|
||||
}
|
||||
return ports, nil
|
||||
}
|
||||
|
||||
// NewTestConfig returns a configuration for an embedded etcd server.
|
||||
// The configuration is based on embed.NewConfig(), with the following adjustments:
|
||||
// * sets UnsafeNoFsync = true to improve test performance (only reasonable in a test-only
|
||||
// single-member server we never intend to restart or keep data from)
|
||||
// * uses free ports for client and peer listeners
|
||||
// * cleans up the data directory on test termination
|
||||
// * silences server logs other than errors
|
||||
func NewTestConfig(t *testing.T) *embed.Config {
|
||||
cfg := embed.NewConfig()
|
||||
|
||||
cfg.UnsafeNoFsync = true
|
||||
|
||||
ports, err := getAvailablePorts(2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
clientURL := url.URL{Scheme: "http", Host: net.JoinHostPort("localhost", strconv.Itoa(ports[0]))}
|
||||
peerURL := url.URL{Scheme: "http", Host: net.JoinHostPort("localhost", strconv.Itoa(ports[1]))}
|
||||
|
||||
cfg.LPUrls = []url.URL{peerURL}
|
||||
cfg.APUrls = []url.URL{peerURL}
|
||||
cfg.LCUrls = []url.URL{clientURL}
|
||||
cfg.ACUrls = []url.URL{clientURL}
|
||||
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
|
||||
|
||||
cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel)).Named("etcd-server"))
|
||||
cfg.Dir = t.TempDir()
|
||||
os.Chmod(cfg.Dir, 0700)
|
||||
return cfg
|
||||
}
|
||||
|
||||
// RunEtcd starts an embedded etcd server with the provided config
|
||||
// (or NewTestConfig(t) if nil), and returns a client connected to the server.
|
||||
// The server is terminated when the test ends.
|
||||
func RunEtcd(t *testing.T, cfg *embed.Config) *clientv3.Client {
|
||||
t.Helper()
|
||||
|
||||
if cfg == nil {
|
||||
cfg = NewTestConfig(t)
|
||||
}
|
||||
|
||||
e, err := embed.StartEtcd(cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(e.Close)
|
||||
|
||||
select {
|
||||
case <-e.Server.ReadyNotify():
|
||||
case <-time.After(60 * time.Second):
|
||||
e.Server.Stop() // trigger a shutdown
|
||||
t.Fatal("server took too long to start")
|
||||
}
|
||||
go func() {
|
||||
err := <-e.Err()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
|
||||
tlsConfig, err := cfg.ClientTLSInfo.ClientConfig()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
client, err := clientv3.New(clientv3.Config{
|
||||
TLS: tlsConfig,
|
||||
Endpoints: e.Server.Cluster().ClientURLs(),
|
||||
DialTimeout: 10 * time.Second,
|
||||
DialOptions: []grpc.DialOption{grpc.WithBlock()},
|
||||
Logger: zaptest.NewLogger(t, zaptest.Level(zapcore.ErrorLevel)).Named("etcd-client"),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return client
|
||||
}
|
@ -25,7 +25,6 @@ import (
|
||||
"time"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
|
||||
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -38,6 +37,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||
)
|
||||
|
||||
@ -54,8 +54,7 @@ func TestWatchList(t *testing.T) {
|
||||
// - update should trigger Modified event
|
||||
// - update that gets filtered should trigger Deleted event
|
||||
func testWatch(t *testing.T, recursive bool) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
podFoo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
|
||||
podBar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}
|
||||
|
||||
@ -129,8 +128,7 @@ func testWatch(t *testing.T, recursive bool) {
|
||||
}
|
||||
|
||||
func TestDeleteTriggerWatch(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
@ -146,8 +144,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
||||
// - watch from 0 should sync up and grab the object added before
|
||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||
func TestWatchFromZero(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, client := testSetup(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
@ -190,7 +187,7 @@ func TestWatchFromZero(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
|
||||
}
|
||||
_, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
|
||||
_, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
|
||||
if err != nil {
|
||||
t.Fatalf("Error compacting: %v", err)
|
||||
}
|
||||
@ -206,8 +203,7 @@ func TestWatchFromZero(t *testing.T) {
|
||||
// TestWatchFromNoneZero tests that
|
||||
// - watch from non-0 should just watch changes after given version
|
||||
func TestWatchFromNoneZero(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||
@ -224,16 +220,14 @@ func TestWatchFromNoneZero(t *testing.T) {
|
||||
|
||||
func TestWatchError(t *testing.T) {
|
||||
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
||||
integration.BeforeTestExternal(t)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
invalidStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
|
||||
client := testserver.RunEtcd(t, nil)
|
||||
invalidStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
validStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
|
||||
validStore := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
|
||||
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
||||
@ -242,8 +236,7 @@ func TestWatchError(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchContextCancel(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, _ := testSetup(t)
|
||||
canceledCtx, cancel := context.WithCancel(ctx)
|
||||
cancel()
|
||||
// When we watch with a canceled context, we should detect that it's context canceled.
|
||||
@ -264,8 +257,7 @@ func TestWatchContextCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||
origCtx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
origCtx, store, _ := testSetup(t)
|
||||
ctx, cancel := context.WithCancel(origCtx)
|
||||
w := store.watcher.createWatchChan(ctx, "/abc", 0, false, false, storage.Everything)
|
||||
// make resutlChan and errChan blocking to ensure ordering.
|
||||
@ -287,15 +279,14 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
ctx, store, client := testSetup(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix())
|
||||
etcdW := client.Watch(ctx, "/", clientv3.WithPrefix())
|
||||
|
||||
if err := store.Delete(ctx, key, &example.Pod{}, &storage.Preconditions{}, storage.ValidateAllObjectFunc, nil); err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
@ -316,8 +307,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestWatchInitializationSignal(t *testing.T) {
|
||||
_, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
_, store, _ := testSetup(t)
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
initSignal := utilflowcontrol.NewInitializationSignal()
|
||||
@ -334,14 +324,10 @@ func TestWatchInitializationSignal(t *testing.T) {
|
||||
|
||||
func TestProgressNotify(t *testing.T) {
|
||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||
clusterConfig := &integration.ClusterConfig{
|
||||
Size: 1,
|
||||
WatchProgressNotifyInterval: time.Second,
|
||||
}
|
||||
integration.BeforeTestExternal(t)
|
||||
cluster := integration.NewClusterV3(t, clusterConfig)
|
||||
defer cluster.Terminate(t)
|
||||
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||
clusterConfig := testserver.NewTestConfig(t)
|
||||
clusterConfig.ExperimentalWatchProgressNotifyInterval = time.Second
|
||||
client := testserver.RunEtcd(t, clusterConfig)
|
||||
store := newStore(client, codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||
ctx := context.Background()
|
||||
|
||||
key := "/somekey"
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
|
||||
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -35,6 +34,7 @@ import (
|
||||
"k8s.io/apiserver/pkg/apis/example"
|
||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testing/testingcert"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/testserver"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
@ -53,23 +53,25 @@ func TestTLSConnection(t *testing.T) {
|
||||
certFile, keyFile, caFile := configureTLSCerts(t)
|
||||
defer os.RemoveAll(filepath.Dir(certFile))
|
||||
|
||||
tlsInfo := &transport.TLSInfo{
|
||||
// override server config to be TLS-enabled
|
||||
etcdConfig := testserver.NewTestConfig(t)
|
||||
etcdConfig.ClientTLSInfo = transport.TLSInfo{
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
TrustedCAFile: caFile,
|
||||
}
|
||||
for i := range etcdConfig.LCUrls {
|
||||
etcdConfig.LCUrls[i].Scheme = "https"
|
||||
}
|
||||
for i := range etcdConfig.ACUrls {
|
||||
etcdConfig.ACUrls[i].Scheme = "https"
|
||||
}
|
||||
|
||||
integration.BeforeTestExternal(t)
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{
|
||||
Size: 1,
|
||||
ClientTLS: tlsInfo,
|
||||
})
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
client := testserver.RunEtcd(t, etcdConfig)
|
||||
cfg := storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{
|
||||
ServerList: []string{cluster.Members[0].GRPCAddr()},
|
||||
ServerList: client.Endpoints(),
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
TrustedCAFile: caFile,
|
||||
|
Loading…
Reference in New Issue
Block a user