diff --git a/pkg/storage/etcd3/util.go b/pkg/storage/etcd3/util.go new file mode 100644 index 00000000000..11edd68b10f --- /dev/null +++ b/pkg/storage/etcd3/util.go @@ -0,0 +1,57 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/golang/glog" + "golang.org/x/net/context" +) + +// compactor periodically compacts historical versions of keys in etcd. +// After compaction, old versions of keys set before given interval will be gone. +// Any API call for the old versions of keys will return error. +// interval: the interval between each compaction. The first compaction happens after "interval". +func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) { + var curRev int64 + for { + select { + case <-time.After(interval): + case <-ctx.Done(): + return + } + + resp, err := client.Get(ctx, "/") + if err != nil { + glog.Errorf("compactor: Get failed: %v", err) + continue + } + oldRev := curRev + curRev = resp.Header.Revision + if oldRev == 0 { + continue + } + err = client.Compact(ctx, oldRev) + if err != nil { + glog.Errorf("compactor: Compact failed: %v", err) + continue + } + glog.V(4).Infof("compactor: Compacted rev %d", oldRev) + } +} diff --git a/pkg/storage/etcd3/util_test.go b/pkg/storage/etcd3/util_test.go new file mode 100644 index 00000000000..6e73c357ceb --- /dev/null +++ b/pkg/storage/etcd3/util_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/coreos/etcd/integration" + "golang.org/x/net/context" +) + +func TestCompactor(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + client := cluster.RandClient() + ctx := context.Background() + cancelableCtx, cancel := context.WithCancel(ctx) + + putResp, err := client.Put(ctx, "/somekey", "data") + if err != nil { + t.Fatalf("Put failed: %v", err) + } + + go compactor(cancelableCtx, client, 500*time.Millisecond) + time.Sleep(2 * time.Second) + cancel() + + _, err = client.Get(ctx, "/somekey", clientv3.WithRev(putResp.Header.Revision)) + if err != etcdrpc.ErrCompacted { + t.Errorf("Expecting ErrCompacted, but get=%v", err) + } +}