mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Merge pull request #25010 from hongchaodeng/cp
Automatic merge from submit-queue start etcd compactor in background ref: #22448 What's in this PR? - StartCompactor starts a compactor in the background in order to compact keys older than fixed time. We need to compact keys because we can't let on disk data grow forever. We save the most recent 10 minutes data. It should be enough for slow watchers and to tolerate burst. We might keep a longer history (12h) in the future once storage API can take advantage of multi-version key. - Have only one compaction job for each cluster. Use endpoints from user input to differentiate clusters.
This commit is contained in:
commit
79a9a14c6f
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package etcd3
|
package etcd3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
@ -24,6 +25,43 @@ import (
|
|||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const compactInterval = 10 * time.Minute
|
||||||
|
|
||||||
|
var (
|
||||||
|
endpointsMapMu sync.Mutex
|
||||||
|
endpointsMap map[string]struct{}
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
endpointsMap = make(map[string]struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartCompactor starts a compactor in the background in order to compact keys
|
||||||
|
// older than fixed time.
|
||||||
|
// We need to compact keys because we can't let on disk data grow forever.
|
||||||
|
// We save the most recent 10 minutes data. It should be enough for slow watchers and to tolerate burst.
|
||||||
|
// TODO: We might keep a longer history (12h) in the future once storage API can take
|
||||||
|
// advantage of multi-version key.
|
||||||
|
func StartCompactor(ctx context.Context, client *clientv3.Client) {
|
||||||
|
endpointsMapMu.Lock()
|
||||||
|
defer endpointsMapMu.Unlock()
|
||||||
|
|
||||||
|
// We can't have multiple compaction jobs for the same cluster.
|
||||||
|
// Currently we rely on endpoints to differentiate clusters.
|
||||||
|
var emptyStruct struct{}
|
||||||
|
for _, ep := range client.Endpoints() {
|
||||||
|
if _, ok := endpointsMap[ep]; ok {
|
||||||
|
glog.V(4).Infof("compactor already exists for endpoints %v")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, ep := range client.Endpoints() {
|
||||||
|
endpointsMap[ep] = emptyStruct
|
||||||
|
}
|
||||||
|
|
||||||
|
go compactor(ctx, client, compactInterval)
|
||||||
|
}
|
||||||
|
|
||||||
// compactor periodically compacts historical versions of keys in etcd.
|
// compactor periodically compacts historical versions of keys in etcd.
|
||||||
// After compaction, old versions of keys set before given interval will be gone.
|
// After compaction, old versions of keys set before given interval will be gone.
|
||||||
// Any API call for the old versions of keys will return error.
|
// Any API call for the old versions of keys will return error.
|
||||||
@ -43,7 +81,6 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat
|
|||||||
glog.Error(err)
|
glog.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
glog.Infof("compactor: Compacted rev %d", curRev)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,5 +99,6 @@ func compact(ctx context.Context, client *clientv3.Client, oldRev int64) (int64,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return curRev, err
|
return curRev, err
|
||||||
}
|
}
|
||||||
|
glog.Infof("etcd: Compacted rev %d, endpoints %v", oldRev, client.Endpoints())
|
||||||
return curRev, nil
|
return curRev, nil
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/coreos/etcd/clientv3"
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
"golang.org/x/net/context"
|
||||||
"k8s.io/kubernetes/pkg/storage"
|
"k8s.io/kubernetes/pkg/storage"
|
||||||
"k8s.io/kubernetes/pkg/storage/etcd3"
|
"k8s.io/kubernetes/pkg/storage/etcd3"
|
||||||
)
|
)
|
||||||
@ -36,5 +37,6 @@ func newETCD3Storage(c Config) (storage.Interface, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
etcd3.StartCompactor(context.Background(), client)
|
||||||
return etcd3.New(client, c.Codec, c.Prefix), nil
|
return etcd3.New(client, c.Codec, c.Prefix), nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user