diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 8ad927bbe8f..baa1eafcab3 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -155,12 +155,13 @@ func TestAddFlags(t *testing.T) { TrustedCAFile: "/var/run/kubernetes/etcdca.crt", CertFile: "/var/run/kubernetes/etcdce.crt", }, - Paging: true, - Prefix: "/registry", - CompactionInterval: storagebackend.DefaultCompactInterval, - CountMetricPollPeriod: time.Minute, - DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, - HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout, + Paging: true, + Prefix: "/registry", + CompactionInterval: storagebackend.DefaultCompactInterval, + CountMetricPollPeriod: time.Minute, + DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, + HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout, + LeaseReuseDurationSeconds: storagebackend.DefaultLeaseReuseDurationSeconds, }, DefaultStorageMediaType: "application/vnd.kubernetes.protobuf", DeleteCollectionWorkers: 1, diff --git a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go index d9c6f9e54bb..87fa32ad17c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go +++ b/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go @@ -183,6 +183,9 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout, "The timeout to use when checking etcd health.") + + fs.Int64Var(&s.StorageConfig.LeaseReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseReuseDurationSeconds, + "The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.") } func (s *EtcdOptions) ApplyTo(c *server.Config) error { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index 982455986f3..1c13b56b084 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -35,6 +35,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go index 6b5a5700a9e..b34c7fb0950 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go @@ -42,8 +42,8 @@ type leaseManager struct { } // newDefaultLeaseManager creates a new lease manager using default setting. -func newDefaultLeaseManager(client *clientv3.Client) *leaseManager { - return newLeaseManager(client, 60, 0.05) +func newDefaultLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64) *leaseManager { + return newLeaseManager(client, leaseReuseDurationSeconds, 0.05) } // newLeaseManager creates a new lease manager with the number of buffered @@ -57,14 +57,6 @@ func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, l } } -// setLeaseReuseDurationSeconds is used for testing purpose. It is used to -// reduce the extra lease duration to avoid unnecessary timeout in testing. -func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) { - l.leaseMu.Lock() - defer l.leaseMu.Unlock() - l.leaseReuseDurationSeconds = duration -} - // GetLease returns a lease based on requested ttl: if the cached previous // lease can be reused, reuse it; otherwise request a new one from etcd. func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go index e63a8e65e70..122453a72d5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package etcd3 import ( + "k8s.io/apiserver/pkg/storage/storagebackend" "testing" ) @@ -34,7 +35,7 @@ func TestGetReuseDurationSeconds(t *testing.T) { duration: 50, }, } - lm := newDefaultLeaseManager(nil) + lm := newDefaultLeaseManager(nil, storagebackend.DefaultLeaseReuseDurationSeconds) for i := 0; i < len(testCases); i++ { dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl) if dur != testCases[i].duration { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 0cff6b3fc9d..f1b5725338e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -83,11 +83,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface { - return newStore(c, newFunc, pagingEnabled, codec, prefix, transformer) +func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseReuseDurationSeconds int64) storage.Interface { + return newStore(c, newFunc, pagingEnabled, leaseReuseDurationSeconds, codec, prefix, transformer) } -func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store { +func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, leaseReuseDurationSeconds int64, codec runtime.Codec, prefix string, transformer value.Transformer) *store { versioner := APIObjectVersioner{} result := &store{ client: c, @@ -100,7 +100,7 @@ func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled b // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' pathPrefix: path.Join("/", prefix), watcher: newWatcher(c, codec, newFunc, versioner, transformer), - leaseManager: newDefaultLeaseManager(c), + leaseManager: newDefaultLeaseManager(c, leaseReuseDurationSeconds), } return result } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 8496e03a7fd..a42121ede7f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -47,6 +47,7 @@ import ( examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -823,7 +824,7 @@ func TestTransformationFailure(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() preset := []struct { @@ -900,8 +901,8 @@ func TestList(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) - disablePagingStore := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + disablePagingStore := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1399,7 +1400,7 @@ func TestListContinuation(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, newPod, true, codec, "", transformer) + store := newStore(etcdClient, newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) ctx := context.Background() // Setup storage with the following structure: @@ -1561,7 +1562,7 @@ func TestListContinuationWithFilter(t *testing.T) { etcdClient := cluster.RandClient() recorder := &clientRecorder{KV: etcdClient.KV} etcdClient.KV = recorder - store := newStore(etcdClient, newPod, true, codec, "", transformer) + store := newStore(etcdClient, newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) ctx := context.Background() preset := []struct { @@ -1664,7 +1665,7 @@ func TestListInconsistentContinuation(t *testing.T) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() // Setup storage with the following structure: @@ -1809,12 +1810,11 @@ func TestListInconsistentContinuation(t *testing.T) { func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) - store := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) - ctx := context.Background() // As 30s is the default timeout for testing in glboal configuration, // we cannot wait longer than that in a single time: change it to 10 // for testing purposes. See apimachinery/pkg/util/wait/wait.go - store.leaseManager.setLeaseReuseDurationSeconds(1) + store := newStore(cluster.RandClient(), newPod, true, 1, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + ctx := context.Background() return ctx, store, cluster } @@ -1855,7 +1855,7 @@ func TestPrefix(t *testing.T) { "/registry": "/registry", } for configuredPrefix, effectivePrefix := range testcases { - store := newStore(cluster.RandClient(), nil, true, codec, configuredPrefix, transformer) + store := newStore(cluster.RandClient(), nil, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, configuredPrefix, transformer) if store.pathPrefix != effectivePrefix { t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix) } @@ -2022,7 +2022,7 @@ func TestConsistentList(t *testing.T) { transformer := &fancyTransformer{ transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)}, } - store := newStore(cluster.RandClient(), newPod, true, codec, "", transformer) + store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer) transformer.store = store for i := 0; i < 5; i++ { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 3bf47011dfe..8294778edd4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" ) func TestWatch(t *testing.T) { @@ -225,13 +226,13 @@ func TestWatchError(t *testing.T) { codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)} cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) - invalidStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")}) + invalidStore := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } - validStore := newStore(cluster.RandClient(), newPod, true, codec, "", &prefixTransformer{prefix: []byte("test!")}) + validStore := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")}) validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate( func(runtime.Object) (runtime.Object, error) { return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil @@ -321,7 +322,7 @@ func TestProgressNotify(t *testing.T) { } cluster := integration.NewClusterV3(t, clusterConfig) defer cluster.Terminate(t) - store := newStore(cluster.RandClient(), newPod, false, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) + store := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() key := "/somekey" diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index af94efcea88..73868a8c493 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -28,9 +28,10 @@ const ( StorageTypeUnset = "" StorageTypeETCD3 = "etcd3" - DefaultCompactInterval = 5 * time.Minute - DefaultDBMetricPollInterval = 30 * time.Second - DefaultHealthcheckTimeout = 2 * time.Second + DefaultCompactInterval = 5 * time.Minute + DefaultDBMetricPollInterval = 30 * time.Second + DefaultHealthcheckTimeout = 2 * time.Second + DefaultLeaseReuseDurationSeconds = 60 ) // TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to. @@ -77,15 +78,18 @@ type Config struct { DBMetricPollInterval time.Duration // HealthcheckTimeout specifies the timeout used when checking health HealthcheckTimeout time.Duration + // LeaseReuseDurationSeconds specifies time in seconds that each lease is reused. See pkg/storage/etcd3/lease_manager.go + LeaseReuseDurationSeconds int64 } func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { return &Config{ - Paging: true, - Prefix: prefix, - Codec: codec, - CompactionInterval: DefaultCompactInterval, - DBMetricPollInterval: DefaultDBMetricPollInterval, - HealthcheckTimeout: DefaultHealthcheckTimeout, + Paging: true, + Prefix: prefix, + Codec: codec, + CompactionInterval: DefaultCompactInterval, + DBMetricPollInterval: DefaultDBMetricPollInterval, + HealthcheckTimeout: DefaultHealthcheckTimeout, + LeaseReuseDurationSeconds: DefaultLeaseReuseDurationSeconds, } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index 9a1618df8dd..83fdebd58ad 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -254,7 +254,7 @@ func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (st if transformer == nil { transformer = value.IdentityTransformer } - return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging), destroyFunc, nil + return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseReuseDurationSeconds), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD index fdd648ff6a5..37cd06ae1a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD @@ -27,6 +27,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index fbc1d27455e..7d6360a7d2d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -47,6 +47,7 @@ import ( cacherstorage "k8s.io/apiserver/pkg/storage/cacher" "k8s.io/apiserver/pkg/storage/etcd3" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/storagebackend" storagetesting "k8s.io/apiserver/pkg/storage/testing" "k8s.io/apiserver/pkg/storage/value" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -105,7 +106,7 @@ func newPodList() runtime.Object { return &example.PodList{} } func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true, storagebackend.DefaultLeaseReuseDurationSeconds) return server, storage }