diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD index 5e6974f8fb4..2e95c4acee4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD @@ -10,6 +10,7 @@ go_test( name = "go_default_test", srcs = [ "compact_test.go", + "lease_manager_test.go", "store_test.go", "watcher_test.go", ], @@ -44,6 +45,7 @@ go_library( "compact.go", "errors.go", "event.go", + "lease_manager.go", "store.go", "watcher.go", ], diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go new file mode 100644 index 00000000000..dc06ac52077 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go @@ -0,0 +1,102 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "context" + "sync" + "time" + + "github.com/coreos/etcd/clientv3" +) + +// leaseManager is used to manage leases requested from etcd. If a new write +// needs a lease that has similar expiration time to the previous one, the old +// lease will be reused to reduce the overhead of etcd, since lease operations +// are expensive. In the implementation, we only store one previous lease, +// since all the events have the same ttl. +type leaseManager struct { + client *clientv3.Client // etcd client used to grant leases + leaseMu sync.Mutex + prevLeaseID clientv3.LeaseID + prevLeaseExpirationTime time.Time + // The period of time in seconds and percent of TTL that each lease is + // reused. The minimum of them is used to avoid unreasonably large + // numbers. We use var instead of const for testing purposes. + leaseReuseDurationSeconds int64 + leaseReuseDurationPercent float64 +} + +// newDefaultLeaseManager creates a new lease manager using default setting. +func newDefaultLeaseManager(client *clientv3.Client) *leaseManager { + return newLeaseManager(client, 60, 0.05) +} + +// newLeaseManager creates a new lease manager with the number of buffered +// leases, lease reuse duration in seconds and percentage. The percentage +// value x means x*100%. +func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64) *leaseManager { + return &leaseManager{ + client: client, + leaseReuseDurationSeconds: leaseReuseDurationSeconds, + leaseReuseDurationPercent: leaseReuseDurationPercent, + } +} + +// setLeaseReuseDurationSeconds is used for testing purpose. It is used to +// reduce the extra lease duration to avoid unnecessary timeout in testing. +func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) { + l.leaseMu.Lock() + defer l.leaseMu.Unlock() + l.leaseReuseDurationSeconds = duration +} + +// GetLease returns a lease based on requested ttl: if the cached previous +// lease can be reused, reuse it; otherwise request a new one from etcd. +func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) { + now := time.Now() + l.leaseMu.Lock() + defer l.leaseMu.Unlock() + // check if previous lease can be reused + reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl) + valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime) + sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime) + if valid && sufficient { + return l.prevLeaseID, nil + } + // request a lease with a little extra ttl from etcd + ttl += reuseDurationSeconds + lcr, err := l.client.Lease.Grant(ctx, ttl) + if err != nil { + return clientv3.LeaseID(0), err + } + // cache the new lease id + l.prevLeaseID = lcr.ID + l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second) + return lcr.ID, nil +} + +// getReuseDurationSecondsLocked returns the reusable duration in seconds +// based on the configuration. Lock has to be acquired before calling this +// function. +func (l *leaseManager) getReuseDurationSecondsLocked(ttl int64) int64 { + reuseDurationSeconds := int64(l.leaseReuseDurationPercent * float64(ttl)) + if reuseDurationSeconds > l.leaseReuseDurationSeconds { + reuseDurationSeconds = l.leaseReuseDurationSeconds + } + return reuseDurationSeconds +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go new file mode 100644 index 00000000000..e63a8e65e70 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager_test.go @@ -0,0 +1,44 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "testing" +) + +func TestGetReuseDurationSeconds(t *testing.T) { + testCases := []struct { + ttl int64 + duration int64 + }{ + { + ttl: 3600, + duration: 60, + }, + { + ttl: 1000, + duration: 50, + }, + } + lm := newDefaultLeaseManager(nil) + for i := 0; i < len(testCases); i++ { + dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl) + if dur != testCases[i].duration { + t.Errorf("Duration error: ttl %v, expected duration %v, get %v\n", testCases[i].ttl, testCases[i].duration, dur) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index fabb083e0ce..1513af8e7c2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -70,6 +70,7 @@ type store struct { pathPrefix string watcher *watcher pagingEnabled bool + leaseManager *leaseManager } type elemForDecode struct { @@ -107,8 +108,9 @@ func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime. // for compatibility with etcd2 impl. // no-op for default prefix of '/registry'. // keeps compatibility with etcd2 impl for custom prefixes that don't start with '/' - pathPrefix: path.Join("/", prefix), - watcher: newWatcher(c, codec, versioner, transformer), + pathPrefix: path.Join("/", prefix), + watcher: newWatcher(c, codec, versioner, transformer), + leaseManager: newDefaultLeaseManager(c), } if !quorumRead { // In case of non-quorum reads, we can set WithSerializable() @@ -758,13 +760,11 @@ func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, er if ttl == 0 { return nil, nil } - // TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to - // put keys within into same lease. We shall benchmark this and optimize the performance. - lcr, err := s.client.Lease.Grant(ctx, ttl) + id, err := s.leaseManager.GetLease(ctx, ttl) if err != nil { return nil, err } - return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil + return []clientv3.OpOption{clientv3.WithLease(id)}, nil } // decode decodes value of bytes into object. It will also set the object resource version to rev. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index b3e34be3989..1624a6829c4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -1186,6 +1186,10 @@ func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)}) ctx := context.Background() + // As 30s is the default timeout for testing in glboal configuration, + // we cannot wait longer than that in a single time: change it to 10 + // for testing purposes. See apimachinery/pkg/util/wait/wait.go + store.leaseManager.setLeaseReuseDurationSeconds(1) return ctx, store, cluster }