From 00a717b572f3582d0d20633644e827dd60991dce Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 12 Sep 2018 11:54:14 +0200 Subject: [PATCH] apiserver: start only one compactor per unique storage transport config --- .../storage/storagebackend/factory/etcd3.go | 83 +++++++++++++++++-- test/integration/etcd/server.go | 4 +- 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index a41f09de1ff..6bac7424383 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -19,6 +19,7 @@ package factory import ( "context" "fmt" + "sync" "sync/atomic" "time" @@ -108,16 +109,88 @@ func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error) return clientv3.New(cfg) } +type runningCompactor struct { + interval time.Duration + cancel context.CancelFunc + client *clientv3.Client + refs int +} + +var ( + lock sync.Mutex + compactors = map[string]*runningCompactor{} +) + +// startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the +// compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called, +// the compactor is stopped. +func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) { + lock.Lock() + defer lock.Unlock() + + key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile} + if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval { + compactorClient, err := newETCD3Client(c) + if err != nil { + return nil, err + } + + if foundBefore { + // replace compactor + compactor.cancel() + compactor.client.Close() + } else { + // start new compactor + compactor = &runningCompactor{} + compactors[key] = compactor + } + + ctx, cancel := context.WithCancel(context.Background()) + + compactor.interval = interval + compactor.cancel = cancel + compactor.client = compactorClient + + etcd3.StartCompactor(ctx, compactorClient, interval) + } + + compactors[key].refs++ + + return func() { + lock.Lock() + defer lock.Unlock() + + compactor := compactors[key] + compactor.refs-- + if compactor.refs == 0 { + compactor.cancel() + compactor.client.Close() + delete(compactors, key) + } + }, nil +} + func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { - client, err := newETCD3Client(c.Transport) + stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval) if err != nil { return nil, nil, err } - ctx, cancel := context.WithCancel(context.Background()) - etcd3.StartCompactor(ctx, client, c.CompactionInterval) + + client, err := newETCD3Client(c.Transport) + if err != nil { + stopCompactor() + return nil, nil, err + } + + var once sync.Once destroyFunc := func() { - cancel() - client.Close() + // we know that storage destroy funcs are called multiple times (due to reuse in subresources). + // Hence, we only destroy once. + // TODO: fix duplicated storage destroy calls higher level + once.Do(func() { + stopCompactor() + client.Close() + }) } transformer := c.Transformer if transformer == nil { diff --git a/test/integration/etcd/server.go b/test/integration/etcd/server.go index d86534ba292..8c433618014 100644 --- a/test/integration/etcd/server.go +++ b/test/integration/etcd/server.go @@ -74,7 +74,7 @@ func StartRealMasterOrDie(t *testing.T) *Master { kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.SecureServing.Listener = listener kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir - kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()} + kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"} @@ -88,7 +88,7 @@ func StartRealMasterOrDie(t *testing.T) *Master { } // get etcd client before starting API server - rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig) + rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig.Transport) if err != nil { t.Fatal(err) }