mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #97480 from lingsamuel/etcd-lease-max-size
apiserver add lease object count metric
This commit is contained in:
commit
e054aa268e
@ -58,6 +58,7 @@ go_test(
|
|||||||
"//pkg/kubelet/client:go_default_library",
|
"//pkg/kubelet/client:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
|
"//staging/src/k8s.io/apiserver/plugin/pkg/audit/buffered:go_default_library",
|
||||||
|
@ -25,15 +25,15 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
"k8s.io/component-base/logs"
|
|
||||||
|
|
||||||
"k8s.io/apiserver/pkg/admission"
|
"k8s.io/apiserver/pkg/admission"
|
||||||
apiserveroptions "k8s.io/apiserver/pkg/server/options"
|
apiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||||
|
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
||||||
audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
|
audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
cliflag "k8s.io/component-base/cli/flag"
|
cliflag "k8s.io/component-base/cli/flag"
|
||||||
|
"k8s.io/component-base/logs"
|
||||||
"k8s.io/component-base/metrics"
|
"k8s.io/component-base/metrics"
|
||||||
kapi "k8s.io/kubernetes/pkg/apis/core"
|
kapi "k8s.io/kubernetes/pkg/apis/core"
|
||||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||||
@ -116,6 +116,7 @@ func TestAddFlags(t *testing.T) {
|
|||||||
"--request-timeout=2m",
|
"--request-timeout=2m",
|
||||||
"--storage-backend=etcd3",
|
"--storage-backend=etcd3",
|
||||||
"--service-cluster-ip-range=192.168.128.0/17",
|
"--service-cluster-ip-range=192.168.128.0/17",
|
||||||
|
"--lease-reuse-duration-seconds=100",
|
||||||
}
|
}
|
||||||
fs.Parse(args)
|
fs.Parse(args)
|
||||||
|
|
||||||
@ -161,7 +162,9 @@ func TestAddFlags(t *testing.T) {
|
|||||||
CountMetricPollPeriod: time.Minute,
|
CountMetricPollPeriod: time.Minute,
|
||||||
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
|
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
|
||||||
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
|
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
|
||||||
LeaseReuseDurationSeconds: storagebackend.DefaultLeaseReuseDurationSeconds,
|
LeaseManagerConfig: etcd3.LeaseManagerConfig{
|
||||||
|
ReuseDurationSeconds: 100,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
DefaultStorageMediaType: "application/vnd.kubernetes.protobuf",
|
DefaultStorageMediaType: "application/vnd.kubernetes.protobuf",
|
||||||
DeleteCollectionWorkers: 1,
|
DeleteCollectionWorkers: 1,
|
||||||
|
@ -184,7 +184,7 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout,
|
fs.DurationVar(&s.StorageConfig.HealthcheckTimeout, "etcd-healthcheck-timeout", s.StorageConfig.HealthcheckTimeout,
|
||||||
"The timeout to use when checking etcd health.")
|
"The timeout to use when checking etcd health.")
|
||||||
|
|
||||||
fs.Int64Var(&s.StorageConfig.LeaseReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseReuseDurationSeconds,
|
fs.Int64Var(&s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds, "lease-reuse-duration-seconds", s.StorageConfig.LeaseManagerConfig.ReuseDurationSeconds,
|
||||||
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
|
"The time in seconds that each lease is reused. A lower value could avoid large number of objects reusing the same lease. Notice that a too small value may cause performance problems at storage layer.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,7 +35,6 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
|
@ -22,8 +22,28 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.etcd.io/etcd/clientv3"
|
"go.etcd.io/etcd/clientv3"
|
||||||
|
"k8s.io/apiserver/pkg/storage/etcd3/metrics"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultLeaseReuseDurationSeconds = 60
|
||||||
|
largeLeaseThreshold = 5000
|
||||||
|
)
|
||||||
|
|
||||||
|
// LeaseManagerConfig is configuration for creating a lease manager.
|
||||||
|
type LeaseManagerConfig struct {
|
||||||
|
// ReuseDurationSeconds specifies time in seconds that each lease is reused
|
||||||
|
ReuseDurationSeconds int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDefaultLeaseManagerConfig creates a LeaseManagerConfig with default values
|
||||||
|
func NewDefaultLeaseManagerConfig() LeaseManagerConfig {
|
||||||
|
return LeaseManagerConfig{
|
||||||
|
ReuseDurationSeconds: defaultLeaseReuseDurationSeconds,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// leaseManager is used to manage leases requested from etcd. If a new write
|
// leaseManager is used to manage leases requested from etcd. If a new write
|
||||||
// needs a lease that has similar expiration time to the previous one, the old
|
// needs a lease that has similar expiration time to the previous one, the old
|
||||||
// lease will be reused to reduce the overhead of etcd, since lease operations
|
// lease will be reused to reduce the overhead of etcd, since lease operations
|
||||||
@ -36,14 +56,15 @@ type leaseManager struct {
|
|||||||
prevLeaseExpirationTime time.Time
|
prevLeaseExpirationTime time.Time
|
||||||
// The period of time in seconds and percent of TTL that each lease is
|
// The period of time in seconds and percent of TTL that each lease is
|
||||||
// reused. The minimum of them is used to avoid unreasonably large
|
// reused. The minimum of them is used to avoid unreasonably large
|
||||||
// numbers. We use var instead of const for testing purposes.
|
// numbers.
|
||||||
leaseReuseDurationSeconds int64
|
leaseReuseDurationSeconds int64
|
||||||
leaseReuseDurationPercent float64
|
leaseReuseDurationPercent float64
|
||||||
|
leaseAttachedObjectCount int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// newDefaultLeaseManager creates a new lease manager using default setting.
|
// newDefaultLeaseManager creates a new lease manager using default setting.
|
||||||
func newDefaultLeaseManager(client *clientv3.Client, leaseReuseDurationSeconds int64) *leaseManager {
|
func newDefaultLeaseManager(client *clientv3.Client, config LeaseManagerConfig) *leaseManager {
|
||||||
return newLeaseManager(client, leaseReuseDurationSeconds, 0.05)
|
return newLeaseManager(client, config.ReuseDurationSeconds, 0.05)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLeaseManager creates a new lease manager with the number of buffered
|
// newLeaseManager creates a new lease manager with the number of buffered
|
||||||
@ -67,9 +88,15 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI
|
|||||||
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
|
reuseDurationSeconds := l.getReuseDurationSecondsLocked(ttl)
|
||||||
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
|
valid := now.Add(time.Duration(ttl) * time.Second).Before(l.prevLeaseExpirationTime)
|
||||||
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
|
sufficient := now.Add(time.Duration(ttl+reuseDurationSeconds) * time.Second).After(l.prevLeaseExpirationTime)
|
||||||
|
|
||||||
|
// We count all operations that happened in the same lease, regardless of success or failure.
|
||||||
|
// Currently each GetLease call only attach 1 object
|
||||||
|
l.leaseAttachedObjectCount++
|
||||||
|
|
||||||
if valid && sufficient {
|
if valid && sufficient {
|
||||||
return l.prevLeaseID, nil
|
return l.prevLeaseID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// request a lease with a little extra ttl from etcd
|
// request a lease with a little extra ttl from etcd
|
||||||
ttl += reuseDurationSeconds
|
ttl += reuseDurationSeconds
|
||||||
lcr, err := l.client.Lease.Grant(ctx, ttl)
|
lcr, err := l.client.Lease.Grant(ctx, ttl)
|
||||||
@ -79,6 +106,12 @@ func (l *leaseManager) GetLease(ctx context.Context, ttl int64) (clientv3.LeaseI
|
|||||||
// cache the new lease id
|
// cache the new lease id
|
||||||
l.prevLeaseID = lcr.ID
|
l.prevLeaseID = lcr.ID
|
||||||
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
|
l.prevLeaseExpirationTime = now.Add(time.Duration(ttl) * time.Second)
|
||||||
|
// refresh count
|
||||||
|
if l.leaseAttachedObjectCount > largeLeaseThreshold {
|
||||||
|
klog.Infof("The object count for lease %x is large: %v", l.prevLeaseID, l.leaseAttachedObjectCount)
|
||||||
|
}
|
||||||
|
metrics.UpdateLeaseObjectCount(l.leaseAttachedObjectCount)
|
||||||
|
l.leaseAttachedObjectCount = 1
|
||||||
return lcr.ID, nil
|
return lcr.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||||||
package etcd3
|
package etcd3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -35,7 +34,7 @@ func TestGetReuseDurationSeconds(t *testing.T) {
|
|||||||
duration: 50,
|
duration: 50,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
lm := newDefaultLeaseManager(nil, storagebackend.DefaultLeaseReuseDurationSeconds)
|
lm := newDefaultLeaseManager(nil, NewDefaultLeaseManagerConfig())
|
||||||
for i := 0; i < len(testCases); i++ {
|
for i := 0; i < len(testCases); i++ {
|
||||||
dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl)
|
dur := lm.getReuseDurationSecondsLocked(testCases[i].ttl)
|
||||||
if dur != testCases[i].duration {
|
if dur != testCases[i].duration {
|
||||||
|
@ -67,6 +67,15 @@ var (
|
|||||||
},
|
},
|
||||||
[]string{"resource"},
|
[]string{"resource"},
|
||||||
)
|
)
|
||||||
|
etcdLeaseObjectCounts = compbasemetrics.NewHistogramVec(
|
||||||
|
&compbasemetrics.HistogramOpts{
|
||||||
|
Name: "etcd_lease_object_counts",
|
||||||
|
Help: "Number of objects attached to a single etcd lease.",
|
||||||
|
Buckets: []float64{10, 50, 100, 500, 1000, 2500, 5000},
|
||||||
|
StabilityLevel: compbasemetrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{},
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
var registerMetrics sync.Once
|
var registerMetrics sync.Once
|
||||||
@ -79,6 +88,7 @@ func Register() {
|
|||||||
legacyregistry.MustRegister(objectCounts)
|
legacyregistry.MustRegister(objectCounts)
|
||||||
legacyregistry.MustRegister(dbTotalSize)
|
legacyregistry.MustRegister(dbTotalSize)
|
||||||
legacyregistry.MustRegister(etcdBookmarkCounts)
|
legacyregistry.MustRegister(etcdBookmarkCounts)
|
||||||
|
legacyregistry.MustRegister(etcdLeaseObjectCounts)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -111,3 +121,10 @@ func sinceInSeconds(start time.Time) float64 {
|
|||||||
func UpdateEtcdDbSize(ep string, size int64) {
|
func UpdateEtcdDbSize(ep string, size int64) {
|
||||||
dbTotalSize.WithLabelValues(ep).Set(float64(size))
|
dbTotalSize.WithLabelValues(ep).Set(float64(size))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateLeaseObjectCount sets the etcd_lease_object_counts metric.
|
||||||
|
func UpdateLeaseObjectCount(count int64) {
|
||||||
|
// Currently we only store one previous lease, since all the events have the same ttl.
|
||||||
|
// See pkg/storage/etcd3/lease_manager.go
|
||||||
|
etcdLeaseObjectCounts.WithLabelValues().Observe(float64(count))
|
||||||
|
}
|
||||||
|
@ -83,11 +83,11 @@ type objState struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// New returns an etcd3 implementation of storage.Interface.
|
// New returns an etcd3 implementation of storage.Interface.
|
||||||
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseReuseDurationSeconds int64) storage.Interface {
|
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
|
||||||
return newStore(c, newFunc, pagingEnabled, leaseReuseDurationSeconds, codec, prefix, transformer)
|
return newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled bool, leaseReuseDurationSeconds int64, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
|
func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {
|
||||||
versioner := APIObjectVersioner{}
|
versioner := APIObjectVersioner{}
|
||||||
result := &store{
|
result := &store{
|
||||||
client: c,
|
client: c,
|
||||||
@ -100,7 +100,7 @@ func newStore(c *clientv3.Client, newFunc func() runtime.Object, pagingEnabled b
|
|||||||
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
||||||
pathPrefix: path.Join("/", prefix),
|
pathPrefix: path.Join("/", prefix),
|
||||||
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
|
watcher: newWatcher(c, codec, newFunc, versioner, transformer),
|
||||||
leaseManager: newDefaultLeaseManager(c, leaseReuseDurationSeconds),
|
leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,6 @@ import (
|
|||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/features"
|
"k8s.io/apiserver/pkg/features"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
|
||||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
"k8s.io/apiserver/pkg/storage/value"
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
@ -990,7 +989,7 @@ func TestTransformationFailure(t *testing.T) {
|
|||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
store := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
preset := []struct {
|
preset := []struct {
|
||||||
@ -1067,8 +1066,8 @@ func TestList(t *testing.T) {
|
|||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
|
||||||
disablePagingStore := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
disablePagingStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Setup storage with the following structure:
|
// Setup storage with the following structure:
|
||||||
@ -1566,7 +1565,7 @@ func TestListContinuation(t *testing.T) {
|
|||||||
etcdClient := cluster.RandClient()
|
etcdClient := cluster.RandClient()
|
||||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||||
etcdClient.KV = recorder
|
etcdClient.KV = recorder
|
||||||
store := newStore(etcdClient, newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
|
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Setup storage with the following structure:
|
// Setup storage with the following structure:
|
||||||
@ -1728,7 +1727,7 @@ func TestListContinuationWithFilter(t *testing.T) {
|
|||||||
etcdClient := cluster.RandClient()
|
etcdClient := cluster.RandClient()
|
||||||
recorder := &clientRecorder{KV: etcdClient.KV}
|
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||||
etcdClient.KV = recorder
|
etcdClient.KV = recorder
|
||||||
store := newStore(etcdClient, newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
|
store := newStore(etcdClient, codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
preset := []struct {
|
preset := []struct {
|
||||||
@ -1831,7 +1830,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, NewDefaultLeaseManagerConfig())
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Setup storage with the following structure:
|
// Setup storage with the following structure:
|
||||||
@ -1979,7 +1978,9 @@ func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
|||||||
// As 30s is the default timeout for testing in glboal configuration,
|
// As 30s is the default timeout for testing in glboal configuration,
|
||||||
// we cannot wait longer than that in a single time: change it to 10
|
// we cannot wait longer than that in a single time: change it to 10
|
||||||
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
|
// for testing purposes. See apimachinery/pkg/util/wait/wait.go
|
||||||
store := newStore(cluster.RandClient(), newPod, true, 1, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, true, LeaseManagerConfig{
|
||||||
|
ReuseDurationSeconds: 1,
|
||||||
|
})
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
return ctx, store, cluster
|
return ctx, store, cluster
|
||||||
}
|
}
|
||||||
@ -2021,7 +2022,7 @@ func TestPrefix(t *testing.T) {
|
|||||||
"/registry": "/registry",
|
"/registry": "/registry",
|
||||||
}
|
}
|
||||||
for configuredPrefix, effectivePrefix := range testcases {
|
for configuredPrefix, effectivePrefix := range testcases {
|
||||||
store := newStore(cluster.RandClient(), nil, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, configuredPrefix, transformer)
|
store := newStore(cluster.RandClient(), codec, nil, configuredPrefix, transformer, true, NewDefaultLeaseManagerConfig())
|
||||||
if store.pathPrefix != effectivePrefix {
|
if store.pathPrefix != effectivePrefix {
|
||||||
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
|
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
|
||||||
}
|
}
|
||||||
@ -2188,7 +2189,7 @@ func TestConsistentList(t *testing.T) {
|
|||||||
transformer := &fancyTransformer{
|
transformer := &fancyTransformer{
|
||||||
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
|
transformer: &prefixTransformer{prefix: []byte(defaultTestPrefix)},
|
||||||
}
|
}
|
||||||
store := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", transformer)
|
store := newStore(cluster.RandClient(), codec, newPod, "", transformer, true, NewDefaultLeaseManagerConfig())
|
||||||
transformer.store = store
|
transformer.store = store
|
||||||
|
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
|
@ -38,7 +38,6 @@ import (
|
|||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWatch(t *testing.T) {
|
func TestWatch(t *testing.T) {
|
||||||
@ -226,13 +225,13 @@ func TestWatchError(t *testing.T) {
|
|||||||
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
|
||||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
invalidStore := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")})
|
invalidStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Watch failed: %v", err)
|
t.Fatalf("Watch failed: %v", err)
|
||||||
}
|
}
|
||||||
validStore := newStore(cluster.RandClient(), newPod, true, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte("test!")})
|
validStore := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte("test!")}, true, NewDefaultLeaseManagerConfig())
|
||||||
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
|
||||||
func(runtime.Object) (runtime.Object, error) {
|
func(runtime.Object) (runtime.Object, error) {
|
||||||
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil
|
||||||
@ -322,7 +321,7 @@ func TestProgressNotify(t *testing.T) {
|
|||||||
}
|
}
|
||||||
cluster := integration.NewClusterV3(t, clusterConfig)
|
cluster := integration.NewClusterV3(t, clusterConfig)
|
||||||
defer cluster.Terminate(t)
|
defer cluster.Terminate(t)
|
||||||
store := newStore(cluster.RandClient(), newPod, false, storagebackend.DefaultLeaseReuseDurationSeconds, codec, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
store := newStore(cluster.RandClient(), codec, newPod, "", &prefixTransformer{prefix: []byte(defaultTestPrefix)}, false, NewDefaultLeaseManagerConfig())
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
key := "/somekey"
|
key := "/somekey"
|
||||||
|
@ -13,6 +13,7 @@ go_library(
|
|||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/server/egressselector:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apiserver/pkg/server/egressselector"
|
"k8s.io/apiserver/pkg/server/egressselector"
|
||||||
|
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||||
"k8s.io/apiserver/pkg/storage/value"
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -31,7 +32,6 @@ const (
|
|||||||
DefaultCompactInterval = 5 * time.Minute
|
DefaultCompactInterval = 5 * time.Minute
|
||||||
DefaultDBMetricPollInterval = 30 * time.Second
|
DefaultDBMetricPollInterval = 30 * time.Second
|
||||||
DefaultHealthcheckTimeout = 2 * time.Second
|
DefaultHealthcheckTimeout = 2 * time.Second
|
||||||
DefaultLeaseReuseDurationSeconds = 60
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
|
// TransportConfig holds all connection related info, i.e. equal TransportConfig means equal servers we talk to.
|
||||||
@ -78,8 +78,8 @@ type Config struct {
|
|||||||
DBMetricPollInterval time.Duration
|
DBMetricPollInterval time.Duration
|
||||||
// HealthcheckTimeout specifies the timeout used when checking health
|
// HealthcheckTimeout specifies the timeout used when checking health
|
||||||
HealthcheckTimeout time.Duration
|
HealthcheckTimeout time.Duration
|
||||||
// LeaseReuseDurationSeconds specifies time in seconds that each lease is reused. See pkg/storage/etcd3/lease_manager.go
|
|
||||||
LeaseReuseDurationSeconds int64
|
LeaseManagerConfig etcd3.LeaseManagerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
||||||
@ -90,6 +90,6 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
|||||||
CompactionInterval: DefaultCompactInterval,
|
CompactionInterval: DefaultCompactInterval,
|
||||||
DBMetricPollInterval: DefaultDBMetricPollInterval,
|
DBMetricPollInterval: DefaultDBMetricPollInterval,
|
||||||
HealthcheckTimeout: DefaultHealthcheckTimeout,
|
HealthcheckTimeout: DefaultHealthcheckTimeout,
|
||||||
LeaseReuseDurationSeconds: DefaultLeaseReuseDurationSeconds,
|
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -254,7 +254,7 @@ func newETCD3Storage(c storagebackend.Config, newFunc func() runtime.Object) (st
|
|||||||
if transformer == nil {
|
if transformer == nil {
|
||||||
transformer = value.IdentityTransformer
|
transformer = value.IdentityTransformer
|
||||||
}
|
}
|
||||||
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseReuseDurationSeconds), destroyFunc, nil
|
return etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
// startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the
|
||||||
|
@ -27,7 +27,6 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
|
@ -47,7 +47,6 @@ import (
|
|||||||
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
|
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||||
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
|
||||||
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
storagetesting "k8s.io/apiserver/pkg/storage/testing"
|
||||||
"k8s.io/apiserver/pkg/storage/value"
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
@ -106,7 +105,7 @@ func newPodList() runtime.Object { return &example.PodList{} }
|
|||||||
|
|
||||||
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) {
|
||||||
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
|
||||||
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true, storagebackend.DefaultLeaseReuseDurationSeconds)
|
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), newPod, prefix, value.IdentityTransformer, true, etcd3.NewDefaultLeaseManagerConfig())
|
||||||
return server, storage
|
return server, storage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user