From ab9ac70e56f569c8394d3f242a1c30584fd4259a Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Fri, 8 Apr 2016 21:48:56 -0700 Subject: [PATCH 1/2] etcd3 store: provide compactor util --- pkg/storage/etcd3/util.go | 57 ++++++++++++++++++++++++++++++++++ pkg/storage/etcd3/util_test.go | 49 +++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 pkg/storage/etcd3/util.go create mode 100644 pkg/storage/etcd3/util_test.go diff --git a/pkg/storage/etcd3/util.go b/pkg/storage/etcd3/util.go new file mode 100644 index 00000000000..11edd68b10f --- /dev/null +++ b/pkg/storage/etcd3/util.go @@ -0,0 +1,57 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/golang/glog" + "golang.org/x/net/context" +) + +// compactor periodically compacts historical versions of keys in etcd. +// After compaction, old versions of keys set before given interval will be gone. +// Any API call for the old versions of keys will return error. +// interval: the interval between each compaction. The first compaction happens after "interval". +func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) { + var curRev int64 + for { + select { + case <-time.After(interval): + case <-ctx.Done(): + return + } + + resp, err := client.Get(ctx, "/") + if err != nil { + glog.Errorf("compactor: Get failed: %v", err) + continue + } + oldRev := curRev + curRev = resp.Header.Revision + if oldRev == 0 { + continue + } + err = client.Compact(ctx, oldRev) + if err != nil { + glog.Errorf("compactor: Compact failed: %v", err) + continue + } + glog.V(4).Infof("compactor: Compacted rev %d", oldRev) + } +} diff --git a/pkg/storage/etcd3/util_test.go b/pkg/storage/etcd3/util_test.go new file mode 100644 index 00000000000..6e73c357ceb --- /dev/null +++ b/pkg/storage/etcd3/util_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd3 + +import ( + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/coreos/etcd/integration" + "golang.org/x/net/context" +) + +func TestCompactor(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + client := cluster.RandClient() + ctx := context.Background() + cancelableCtx, cancel := context.WithCancel(ctx) + + putResp, err := client.Put(ctx, "/somekey", "data") + if err != nil { + t.Fatalf("Put failed: %v", err) + } + + go compactor(cancelableCtx, client, 500*time.Millisecond) + time.Sleep(2 * time.Second) + cancel() + + _, err = client.Get(ctx, "/somekey", clientv3.WithRev(putResp.Header.Revision)) + if err != etcdrpc.ErrCompacted { + t.Errorf("Expecting ErrCompacted, but get=%v", err) + } +} From 9f43a110d9ce5dd313cac591b24c39567429a6c2 Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Sat, 16 Apr 2016 01:51:29 +0800 Subject: [PATCH 2/2] file rename; refactor --- pkg/storage/etcd3/{util.go => compact.go} | 35 ++++++++++++------- .../etcd3/{util_test.go => compact_test.go} | 11 +++--- 2 files changed, 27 insertions(+), 19 deletions(-) rename pkg/storage/etcd3/{util.go => compact.go} (67%) rename pkg/storage/etcd3/{util_test.go => compact_test.go} (86%) diff --git a/pkg/storage/etcd3/util.go b/pkg/storage/etcd3/compact.go similarity index 67% rename from pkg/storage/etcd3/util.go rename to pkg/storage/etcd3/compact.go index 11edd68b10f..afcb09f83af 100644 --- a/pkg/storage/etcd3/util.go +++ b/pkg/storage/etcd3/compact.go @@ -30,6 +30,7 @@ import ( // interval: the interval between each compaction. The first compaction happens after "interval". func compactor(ctx context.Context, client *clientv3.Client, interval time.Duration) { var curRev int64 + var err error for { select { case <-time.After(interval): @@ -37,21 +38,29 @@ func compactor(ctx context.Context, client *clientv3.Client, interval time.Durat return } - resp, err := client.Get(ctx, "/") + curRev, err = compact(ctx, client, curRev) if err != nil { - glog.Errorf("compactor: Get failed: %v", err) + glog.Error(err) continue } - oldRev := curRev - curRev = resp.Header.Revision - if oldRev == 0 { - continue - } - err = client.Compact(ctx, oldRev) - if err != nil { - glog.Errorf("compactor: Compact failed: %v", err) - continue - } - glog.V(4).Infof("compactor: Compacted rev %d", oldRev) + glog.Infof("compactor: Compacted rev %d", curRev) } } + +// compact compacts etcd store and returns current rev. +// If it couldn't get current revision, the old rev will be returned. +func compact(ctx context.Context, client *clientv3.Client, oldRev int64) (int64, error) { + resp, err := client.Get(ctx, "/") + if err != nil { + return oldRev, err + } + curRev := resp.Header.Revision + if oldRev == 0 { + return curRev, nil + } + err = client.Compact(ctx, oldRev) + if err != nil { + return curRev, err + } + return curRev, nil +} diff --git a/pkg/storage/etcd3/util_test.go b/pkg/storage/etcd3/compact_test.go similarity index 86% rename from pkg/storage/etcd3/util_test.go rename to pkg/storage/etcd3/compact_test.go index 6e73c357ceb..af5a3ed4d1d 100644 --- a/pkg/storage/etcd3/util_test.go +++ b/pkg/storage/etcd3/compact_test.go @@ -18,7 +18,6 @@ package etcd3 import ( "testing" - "time" "github.com/coreos/etcd/clientv3" etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" @@ -26,21 +25,21 @@ import ( "golang.org/x/net/context" ) -func TestCompactor(t *testing.T) { +func TestCompact(t *testing.T) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer cluster.Terminate(t) client := cluster.RandClient() ctx := context.Background() - cancelableCtx, cancel := context.WithCancel(ctx) putResp, err := client.Put(ctx, "/somekey", "data") if err != nil { t.Fatalf("Put failed: %v", err) } - go compactor(cancelableCtx, client, 500*time.Millisecond) - time.Sleep(2 * time.Second) - cancel() + _, err = compact(ctx, client, putResp.Header.Revision) + if err != nil { + t.Fatalf("compact failed: %v", err) + } _, err = client.Get(ctx, "/somekey", clientv3.WithRev(putResp.Header.Revision)) if err != etcdrpc.ErrCompacted {