mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #64539 from cfork/lease
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. etcd: reuse leases for keys in a time window Reuse leases for keys in a time window, to reduce the overhead to etcd caused by using massive number of leases Fixes #47532 ```release-note NONE ```
This commit is contained in:
commit
8ba32978b7
@ -10,6 +10,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"compact_test.go",
|
||||
"lease_manager_test.go",
|
||||
"store_test.go",
|
||||
"watcher_test.go",
|
||||
],
|
||||
@ -44,6 +45,7 @@ go_library(
|
||||
"compact.go",
|
||||
"errors.go",
|
||||
"event.go",
|
||||
"lease_manager.go",
|
||||
"store.go",
|
||||
"watcher.go",
|
||||
],
|
||||
|
102
staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go
Normal file
102
staging/src/k8s.io/apiserver/pkg/storage/etcd3/lease_manager.go
Normal file
@ -0,0 +1,102 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
)
|
||||
|
||||
// leaseManager is used to manage leases requested from etcd. If a new write
|
||||
// needs a lease that has similar expiration time to the previous one, the old
|
||||
// lease will be reused to reduce the overhead of etcd, since lease operations
|
||||
// are expensive. In the implementation, we only store one previous lease,
|
||||
// since all the events have the same ttl.
|
||||
type leaseManager struct {
|
||||
client *clientv3.Client // etcd client used to grant leases
|
||||
leaseMu sync.Mutex
|
||||
prevLeaseID clientv3.LeaseID
|
||||
prevLeaseExpirationTime time.Time
|
||||
// The period of time in seconds and percent of TTL that each lease is
|
||||
// reused. The minimum of them is used to avoid unreasonably large
|
||||
// numbers. We use var instead of const for testing purposes.
|
||||
leaseReuseDurationSeconds int64
|
||||
leaseReuseDurationPercent float64
|
||||
}
|
||||
|
||||
// newDefaultLeaseManager creates a new lease manager using default setting.
|
||||
func newDefaultLeaseManager(client *clientv3.Client) *leaseManager {
|
||||
return newLeaseManager(client, 60, 0.05)
|
||||
}
|
||||
|
||||
// newLeaseManager creates a new lease manager with the number of buffered
|
||||
// leases, lease reuse duration in seconds and percentage. The percentage
|
||||
// value x means x*100%.
|
||||
func newLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64, leaseReuseDurationPercent float64) *leaseManager {
|
||||
return &leaseManager{
|
||||
client: client,
|
||||
leaseReuseDurationSeconds: leaseReuseDurationSeconds,
|
||||
leaseReuseDurationPercent: leaseReuseDurationPercent,
|
||||
}
|
||||
}
|
||||
|
||||
// setLeaseReuseDurationSeconds is used for testing purpose. It is used to
|
||||
// reduce the extra lease duration to avoid unnecessary timeout in testing.
|
||||
func (l *leaseManager) setLeaseReuseDurationSeconds(duration int64) {
|
||||
l.leaseMu.Lock()
|
||||
defer l.leaseMu.Unlock()
|
||||
l.leaseReuseDurationSeconds = duration
|
||||
}
|
||||
|
||||
// GetLease returns a lease based on requested ttl: if the cached previous
|
||||
// lease can be reused, reuse it; otherwise request a new one from etcd.
|
||||
func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseID, error) {
|
||||
now := time.Now()
|
||||
l.leaseMu.Lock()
|
||||
defer l.leaseMu.Unlock()
|
||||
// check if previous lease can be reused
|
||||
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
|
||||
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
|
||||
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
|
||||
if valid && sufficient {
|
||||
return l.prevLeaseID, nil
|
||||
}
|
||||
// request a lease with a little extra ttl from etcd
|
||||
ttl += reuseDurationSeconds
|
||||
lcr, err := l.client.Lease.Grant(ctx, ttl)
|
||||
if err != nil {
|
||||
return clientv3.LeaseID(0), err
|
||||
}
|
||||
// cache the new lease id
|
||||
l.prevLeaseID = lcr.ID
|
||||
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
|
||||
return lcr.ID, nil
|
||||
}
|
||||
|
||||
// getReuseDurationSecondsLocked returns the reusable duration in seconds
|
||||
// based on the configuration. Lock has to be acquired before calling this
|
||||
// function.
|
||||
func (l *leaseManager) getReuseDurationSecondsLocked(ttl int64) int64 {
|
||||
reuseDurationSeconds := int64(l.leaseReuseDurationPercent * float64(ttl))
|
||||
if reuseDurationSeconds > l.leaseReuseDurationSeconds {
|
||||
reuseDurationSeconds = l.leaseReuseDurationSeconds
|
||||
}
|
||||
return reuseDurationSeconds
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestGetReuseDurationSeconds(t *testing.T) {
|
||||
testCases := []struct {
|
||||
ttl int64
|
||||
duration int64
|
||||
}{
|
||||
{
|
||||
ttl: 3600,
|
||||
duration: 60,
|
||||
},
|
||||
{
|
||||
ttl: 1000,
|
||||
duration: 50,
|
||||
},
|
||||
}
|
||||
lm := newDefaultLeaseManager(nil)
|
||||
for i := 0; i < len(testCases); i++ {
|
||||
dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl)
|
||||
if dur != testCases[i].duration {
|
||||
t.Errorf("Duration error: ttl %v, expected duration %v, get %v\n", testCases[i].ttl, testCases[i].duration, dur)
|
||||
}
|
||||
}
|
||||
}
|
@ -70,6 +70,7 @@ type store struct {
|
||||
pathPrefix string
|
||||
watcher *watcher
|
||||
pagingEnabled bool
|
||||
leaseManager *leaseManager
|
||||
}
|
||||
|
||||
type elemForDecode struct {
|
||||
@ -107,8 +108,9 @@ func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.
|
||||
// for compatibility with etcd2 impl.
|
||||
// no-op for default prefix of '/registry'.
|
||||
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
watcher: newWatcher(c, codec, versioner, transformer),
|
||||
pathPrefix: path.Join("/", prefix),
|
||||
watcher: newWatcher(c, codec, versioner, transformer),
|
||||
leaseManager: newDefaultLeaseManager(c),
|
||||
}
|
||||
if !quorumRead {
|
||||
// In case of non-quorum reads, we can set WithSerializable()
|
||||
@ -758,13 +760,11 @@ func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, er
|
||||
if ttl == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
// TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to
|
||||
// put keys within into same lease. We shall benchmark this and optimize the performance.
|
||||
lcr, err := s.client.Lease.Grant(ctx, ttl)
|
||||
id, err := s.leaseManager.GetLease(ctx, ttl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil
|
||||
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
|
||||
}
|
||||
|
||||
// decode decodes value of bytes into object. It will also set the object resource version to rev.
|
||||
|
@ -1186,6 +1186,10 @@ func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||
ctx := context.Background()
|
||||
// As 30s is the default timeout for testing in glboal configuration,
|
||||
// we cannot wait longer than that in a single time: change it to 10
|
||||
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
|
||||
store.leaseManager.setLeaseReuseDurationSeconds(1)
|
||||
return ctx, store, cluster
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user