diff --git a/pkg/storage/etcd3/util.go b/pkg/storage/etcd3/compact.go similarity index 67% rename from pkg/storage/etcd3/util.go rename to pkg/storage/etcd3/compact.go index 11edd68b10f..afcb09f83af 100644 --- a/pkg/storage/etcd3/util.go +++ b/pkg/storage/etcd3/compact.go @@ -30,6 +30,7 @@ import ( // interval: the interval between each compaction. The first compaction happens after "interval". func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) { var curRev int64 + var err error for { select { case <-time.After(interval): @@ -37,21 +38,29 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat return } - resp, err := client.Get(ctx, "/") + curRev, err = compact(ctx, client, curRev) if err != nil { - glog.Errorf("compactor: Get failed: %v", err) + glog.Error(err) continue } - oldRev := curRev - curRev = resp.Header.Revision - if oldRev == 0 { - continue - } - err = client.Compact(ctx, oldRev) - if err != nil { - glog.Errorf("compactor: Compact failed: %v", err) - continue - } - glog.V(4).Infof("compactor: Compacted rev %d", oldRev) + glog.Infof("compactor: Compacted rev %d", curRev) } } + +// compact compacts etcd store and returns current rev. +// If it couldn't get current revision, the old rev will be returned. +func compact(ctx context.Context, client *clientv3.Client, oldRev int64) (int64, error) { + resp, err := client.Get(ctx, "/") + if err != nil { + return oldRev, err + } + curRev := resp.Header.Revision + if oldRev == 0 { + return curRev, nil + } + err = client.Compact(ctx, oldRev) + if err != nil { + return curRev, err + } + return curRev, nil +} diff --git a/pkg/storage/etcd3/util_test.go b/pkg/storage/etcd3/compact_test.go similarity index 86% rename from pkg/storage/etcd3/util_test.go rename to pkg/storage/etcd3/compact_test.go index 6e73c357ceb..af5a3ed4d1d 100644 --- a/pkg/storage/etcd3/util_test.go +++ b/pkg/storage/etcd3/compact_test.go @@ -18,7 +18,6 @@ package etcd3 import ( "testing" - "time" "github.com/coreos/etcd/clientv3" etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" @@ -26,21 +25,21 @@ import ( "golang.org/x/net/context" ) -func TestCompactor(t *testing.T) { +func TestCompact(t *testing.T) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) client := cluster.RandClient() ctx := context.Background() - cancelableCtx, cancel := context.WithCancel(ctx) putResp, err := client.Put(ctx, "/somekey", "data") if err != nil { t.Fatalf("Put failed: %v", err) } - go compactor(cancelableCtx, client, 500*time.Millisecond) - time.Sleep(2 * time.Second) - cancel() + _, err = compact(ctx, client, putResp.Header.Revision) + if err != nil { + t.Fatalf("compact failed: %v", err) + } _, err = client.Get(ctx, "/somekey", clientv3.WithRev(putResp.Header.Revision)) if err != etcdrpc.ErrCompacted {