file rename; refactor

This commit is contained in:
Hongchao Deng 2016-04-16 01:51:29 +08:00
parent ab9ac70e56
commit 9f43a110d9
2 changed files with 27 additions and 19 deletions

View File

@ -30,6 +30,7 @@ import (
// interval: the interval between each compaction. The first compaction happens after "interval". // interval: the interval between each compaction. The first compaction happens after "interval".
func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) { func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) {
var curRev int64 var curRev int64
var err error
for { for {
select { select {
case <-time.After(interval): case <-time.After(interval):
@ -37,21 +38,29 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat
return return
} }
resp, err := client.Get(ctx, "/") curRev, err = compact(ctx, client, curRev)
if err != nil { if err != nil {
glog.Errorf("compactor: Get failed: %v", err) glog.Error(err)
continue continue
} }
oldRev := curRev glog.Infof("compactor: Compacted rev %d", curRev)
curRev = resp.Header.Revision }
}
// compact compacts etcd store and returns current rev.
// If it couldn't get current revision, the old rev will be returned.
func compact(ctx context.Context, client *clientv3.Client, oldRev int64) (int64, error) {
resp, err := client.Get(ctx, "/")
if err != nil {
return oldRev, err
}
curRev := resp.Header.Revision
if oldRev == 0 { if oldRev == 0 {
continue return curRev, nil
} }
err = client.Compact(ctx, oldRev) err = client.Compact(ctx, oldRev)
if err != nil { if err != nil {
glog.Errorf("compactor: Compact failed: %v", err) return curRev, err
continue
}
glog.V(4).Infof("compactor: Compacted rev %d", oldRev)
} }
return curRev, nil
} }

View File

@ -18,7 +18,6 @@ package etcd3
import ( import (
"testing" "testing"
"time"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
@ -26,21 +25,21 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
func TestCompactor(t *testing.T) { func TestCompact(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t) defer cluster.Terminate(t)
client := cluster.RandClient() client := cluster.RandClient()
ctx := context.Background() ctx := context.Background()
cancelableCtx, cancel := context.WithCancel(ctx)
putResp, err := client.Put(ctx, "/somekey", "data") putResp, err := client.Put(ctx, "/somekey", "data")
if err != nil { if err != nil {
t.Fatalf("Put failed: %v", err) t.Fatalf("Put failed: %v", err)
} }
go compactor(cancelableCtx, client, 500*time.Millisecond) _, err = compact(ctx, client, putResp.Header.Revision)
time.Sleep(2 * time.Second) if err != nil {
cancel() t.Fatalf("compact failed: %v", err)
}
_, err = client.Get(ctx, "/somekey", clientv3.WithRev(putResp.Header.Revision)) _, err = client.Get(ctx, "/somekey", clientv3.WithRev(putResp.Header.Revision))
if err != etcdrpc.ErrCompacted { if err != etcdrpc.ErrCompacted {