mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
start etcd compactor in background
This commit is contained in:
parent
93e3df8e55
commit
3144ebc7fc
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package etcd3
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
@ -24,6 +25,43 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
const compactInterval = 10 * time.Minute
|
||||
|
||||
var (
|
||||
endpointsMapMu sync.Mutex
|
||||
endpointsMap map[string]struct{}
|
||||
)
|
||||
|
||||
func init() {
|
||||
endpointsMap = make(map[string]struct{})
|
||||
}
|
||||
|
||||
// StartCompactor starts a compactor in the background in order to compact keys
|
||||
// older than fixed time.
|
||||
// We need to compact keys because we can't let on disk data grow forever.
|
||||
// We save the most recent 10 minutes data. It should be enough for slow watchers and to tolerate burst.
|
||||
// TODO: We might keep a longer history (12h) in the future once storage API can take
|
||||
// advantage of multi-version key.
|
||||
func StartCompactor(ctx context.Context, client *clientv3.Client) {
|
||||
endpointsMapMu.Lock()
|
||||
defer endpointsMapMu.Unlock()
|
||||
|
||||
// We can't have multiple compaction jobs for the same cluster.
|
||||
// Currently we rely on endpoints to differentiate clusters.
|
||||
var emptyStruct struct{}
|
||||
for _, ep := range client.Endpoints() {
|
||||
if _, ok := endpointsMap[ep]; ok {
|
||||
glog.V(4).Infof("compactor already exists for endpoints %v")
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, ep := range client.Endpoints() {
|
||||
endpointsMap[ep] = emptyStruct
|
||||
}
|
||||
|
||||
go compactor(ctx, client, compactInterval)
|
||||
}
|
||||
|
||||
// compactor periodically compacts historical versions of keys in etcd.
|
||||
// After compaction, old versions of keys set before given interval will be gone.
|
||||
// Any API call for the old versions of keys will return error.
|
||||
@ -43,7 +81,6 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat
|
||||
glog.Error(err)
|
||||
continue
|
||||
}
|
||||
glog.Infof("compactor: Compacted rev %d", curRev)
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,5 +99,6 @@ func compact(ctx context.Context, client *clientv3.Client, oldRev int64) (int64,
|
||||
if err != nil {
|
||||
return curRev, err
|
||||
}
|
||||
glog.Infof("etcd: Compacted rev %d, endpoints %v", oldRev, client.Endpoints())
|
||||
return curRev, nil
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd3"
|
||||
)
|
||||
@ -36,5 +37,6 @@ func newETCD3Storage(c Config) (storage.Interface, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
etcd3.StartCompactor(context.Background(), client)
|
||||
return etcd3.New(client, c.Codec, c.Prefix), nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user