mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 11:13:48 +00:00
apiserver: start only one compactor per unique storage transport config
This commit is contained in:
parent
7b242533a2
commit
00a717b572
@ -19,6 +19,7 @@ package factory
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -108,16 +109,88 @@ func newETCD3Client(c storagebackend.TransportConfig) (*clientv3.Client, error)
|
||||
return clientv3.New(cfg)
|
||||
}
|
||||
|
||||
type runningCompactor struct {
|
||||
interval time.Duration
|
||||
cancel context.CancelFunc
|
||||
client *clientv3.Client
|
||||
refs int
|
||||
}
|
||||
|
||||
var (
|
||||
lock sync.Mutex
|
||||
compactors = map[string]*runningCompactor{}
|
||||
)
|
||||
|
||||
// startCompactorOnce start one compactor per transport. If the interval get smaller on repeated calls, the
|
||||
// compactor is replaced. A destroy func is returned. If all destroy funcs with the same transport are called,
|
||||
// the compactor is stopped.
|
||||
func startCompactorOnce(c storagebackend.TransportConfig, interval time.Duration) (func(), error) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
key := fmt.Sprintf("%v", c) // gives: {[server1 server2] keyFile certFile caFile}
|
||||
if compactor, foundBefore := compactors[key]; !foundBefore || compactor.interval > interval {
|
||||
compactorClient, err := newETCD3Client(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if foundBefore {
|
||||
// replace compactor
|
||||
compactor.cancel()
|
||||
compactor.client.Close()
|
||||
} else {
|
||||
// start new compactor
|
||||
compactor = &runningCompactor{}
|
||||
compactors[key] = compactor
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
compactor.interval = interval
|
||||
compactor.cancel = cancel
|
||||
compactor.client = compactorClient
|
||||
|
||||
etcd3.StartCompactor(ctx, compactorClient, interval)
|
||||
}
|
||||
|
||||
compactors[key].refs++
|
||||
|
||||
return func() {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
compactor := compactors[key]
|
||||
compactor.refs--
|
||||
if compactor.refs == 0 {
|
||||
compactor.cancel()
|
||||
compactor.client.Close()
|
||||
delete(compactors, key)
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
|
||||
client, err := newETCD3Client(c.Transport)
|
||||
stopCompactor, err := startCompactorOnce(c.Transport, c.CompactionInterval)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
etcd3.StartCompactor(ctx, client, c.CompactionInterval)
|
||||
|
||||
client, err := newETCD3Client(c.Transport)
|
||||
if err != nil {
|
||||
stopCompactor()
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var once sync.Once
|
||||
destroyFunc := func() {
|
||||
cancel()
|
||||
client.Close()
|
||||
// we know that storage destroy funcs are called multiple times (due to reuse in subresources).
|
||||
// Hence, we only destroy once.
|
||||
// TODO: fix duplicated storage destroy calls higher level
|
||||
once.Do(func() {
|
||||
stopCompactor()
|
||||
client.Close()
|
||||
})
|
||||
}
|
||||
transformer := c.Transformer
|
||||
if transformer == nil {
|
||||
|
@ -74,7 +74,7 @@ func StartRealMasterOrDie(t *testing.T) *Master {
|
||||
kubeAPIServerOptions.InsecureServing.BindPort = 0
|
||||
kubeAPIServerOptions.SecureServing.Listener = listener
|
||||
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
|
||||
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
|
||||
kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()}
|
||||
kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd
|
||||
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
|
||||
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
|
||||
@ -88,7 +88,7 @@ func StartRealMasterOrDie(t *testing.T) *Master {
|
||||
}
|
||||
|
||||
// get etcd client before starting API server
|
||||
rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig)
|
||||
rawClient, kvClient, err := integration.GetEtcdClients(completedOptions.Etcd.StorageConfig.Transport)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user