diff --git a/pkg/storage/etcd3/compact.go b/pkg/storage/etcd3/compact.go index afcb09f83af..cfe73c5970c 100644 --- a/pkg/storage/etcd3/compact.go +++ b/pkg/storage/etcd3/compact.go @@ -17,6 +17,7 @@ limitations under the License. package etcd3 import ( + "sync" "time" "github.com/coreos/etcd/clientv3" @@ -24,6 +25,43 @@ import ( "golang.org/x/net/context" ) +const compactInterval = 10 * time.Minute + +var ( + endpointsMapMu sync.Mutex + endpointsMap map[string]struct{} +) + +func init() { + endpointsMap = make(map[string]struct{}) +} + +// StartCompactor starts a compactor in the background in order to compact keys +// older than fixed time. +// We need to compact keys because we can't let on disk data grow forever. +// We save the most recent 10 minutes data. It should be enough for slow watchers and to tolerate burst. +// TODO: We might keep a longer history (12h) in the future once storage API can take +// advantage of multi-version key. +func StartCompactor(ctx context.Context, client *clientv3.Client) { + endpointsMapMu.Lock() + defer endpointsMapMu.Unlock() + + // We can't have multiple compaction jobs for the same cluster. + // Currently we rely on endpoints to differentiate clusters. + var emptyStruct struct{} + for _, ep := range client.Endpoints() { + if _, ok := endpointsMap[ep]; ok { + glog.V(4).Infof("compactor already exists for endpoints %v") + return + } + } + for _, ep := range client.Endpoints() { + endpointsMap[ep] = emptyStruct + } + + go compactor(ctx, client, compactInterval) +} + // compactor periodically compacts historical versions of keys in etcd. // After compaction, old versions of keys set before given interval will be gone. // Any API call for the old versions of keys will return error. @@ -43,7 +81,6 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat glog.Error(err) continue } - glog.Infof("compactor: Compacted rev %d", curRev) } } @@ -62,5 +99,6 @@ func compact(ctx context.Context, client *clientv3.Client, oldRev int64) (int64, if err != nil { return curRev, err } + glog.Infof("etcd: Compacted rev %d, endpoints %v", oldRev, client.Endpoints()) return curRev, nil } diff --git a/pkg/storage/storagebackend/etcd3.go b/pkg/storage/storagebackend/etcd3.go index 5be33b02d1e..7699eec7b38 100644 --- a/pkg/storage/storagebackend/etcd3.go +++ b/pkg/storage/storagebackend/etcd3.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/coreos/etcd/clientv3" + "golang.org/x/net/context" "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage/etcd3" ) @@ -36,5 +37,6 @@ func newETCD3Storage(c Config) (storage.Interface, error) { if err != nil { return nil, err } + etcd3.StartCompactor(context.Background(), client) return etcd3.New(client, c.Codec, c.Prefix), nil }