From 5405a5d98df40c10335ab641738be9aa372e25fe Mon Sep 17 00:00:00 2001 From: harry zhang Date: Sat, 24 Oct 2015 18:17:17 +0800 Subject: [PATCH] Move atomic_value into folder Change pkg to atomic --- pkg/kubelet/kubelet.go | 5 +- pkg/storage/etcd/etcd_watcher.go | 3 +- pkg/util/atomic/highwatermark.go | 39 ++++++++++++++ pkg/util/atomic/highwatermark_test.go | 54 +++++++++++++++++++ pkg/util/{atomic_value.go => atomic/value.go} | 25 ++------- .../value_test.go} | 6 ++- 6 files changed, 106 insertions(+), 26 deletions(-) create mode 100644 pkg/util/atomic/highwatermark.go create mode 100644 pkg/util/atomic/highwatermark_test.go rename pkg/util/{atomic_value.go => atomic/value.go} (65%) rename pkg/util/{atomic_value_test.go => atomic/value_test.go} (95%) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d4f98079ff3..c8c7cbc1e4e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -66,6 +66,7 @@ import ( "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/chmod" "k8s.io/kubernetes/pkg/util/chown" @@ -324,7 +325,7 @@ func NewMainKubelet( configureCBR0: configureCBR0, reconcileCIDR: reconcileCIDR, maxPods: maxPods, - syncLoopMonitor: util.AtomicValue{}, + syncLoopMonitor: atomic.Value{}, resolverConfig: resolverConfig, cpuCFSQuota: cpuCFSQuota, daemonEndpoints: daemonEndpoints, @@ -635,7 +636,7 @@ type Kubelet struct { maxPods int // Monitor Kubelet's sync loop - syncLoopMonitor util.AtomicValue + syncLoopMonitor atomic.Value // Container restart Backoff backOff *util.Backoff diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index c2a057e68ef..be398b0cf7d 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/storage" etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" @@ -169,7 +170,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming } var ( - watchChannelHWM util.HighWaterMark + watchChannelHWM atomic.HighWaterMark ) // translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be diff --git a/pkg/util/atomic/highwatermark.go b/pkg/util/atomic/highwatermark.go new file mode 100644 index 00000000000..bdf19ffc4f4 --- /dev/null +++ b/pkg/util/atomic/highwatermark.go @@ -0,0 +1,39 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package atomic + +import ( + "sync" + "sync/atomic" +) + +// HighWaterMark is a thread-safe object for tracking the maximum value seen +// for some quantity. +type HighWaterMark int64 + +// Check returns true if and only if 'current' is the highest value ever seen. +func (hwm *HighWaterMark) Update(current int64) bool { + for { + old := atomic.LoadInt64((*int64)(hwm)) + if current <= old { + return false + } + if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { + return true + } + } +} diff --git a/pkg/util/atomic/highwatermark_test.go b/pkg/util/atomic/highwatermark_test.go new file mode 100644 index 00000000000..da724b52c79 --- /dev/null +++ b/pkg/util/atomic/highwatermark_test.go @@ -0,0 +1,54 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package atomic + +import ( + "math/rand" + "sync" + "testing" +) + +func TestHighWaterMark(t *testing.T) { + var h HighWaterMark + + for i := int64(10); i < 20; i++ { + if !h.Check(i) { + t.Errorf("unexpected false for %v", i) + } + if h.Check(i - 1) { + t.Errorf("unexpected true for %v", i-1) + } + } + + m := int64(0) + wg := sync.WaitGroup{} + for i := 0; i < 300; i++ { + wg.Add(1) + v := rand.Int63() + go func(v int64) { + defer wg.Done() + h.Check(v) + }(v) + if v > m { + m = v + } + } + wg.Wait() + if m != int64(h) { + t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) + } +} diff --git a/pkg/util/atomic_value.go b/pkg/util/atomic/value.go similarity index 65% rename from pkg/util/atomic_value.go rename to pkg/util/atomic/value.go index 546e19d713f..f87b9327d57 100644 --- a/pkg/util/atomic_value.go +++ b/pkg/util/atomic/value.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package atomic import ( "sync" @@ -25,36 +25,19 @@ import ( // sync/atomic/Value was added in golang 1.4 // Once support is dropped for go 1.3, this type must be deprecated in favor of sync/atomic/Value. // The functions are named Load/Store to match sync/atomic/Value function names. -type AtomicValue struct { +type Value struct { value interface{} valueMutex sync.RWMutex } -func (at *AtomicValue) Store(val interface{}) { +func (at *Value) Store(val interface{}) { at.valueMutex.Lock() defer at.valueMutex.Unlock() at.value = val } -func (at *AtomicValue) Load() interface{} { +func (at *Value) Load() interface{} { at.valueMutex.RLock() defer at.valueMutex.RUnlock() return at.value } - -// HighWaterMark is a thread-safe object for tracking the maximum value seen -// for some quantity. -type HighWaterMark int64 - -// Check returns true if and only if 'current' is the highest value ever seen. -func (hwm *HighWaterMark) Check(current int64) bool { - for { - old := atomic.LoadInt64((*int64)(hwm)) - if current <= old { - return false - } - if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { - return true - } - } -} diff --git a/pkg/util/atomic_value_test.go b/pkg/util/atomic/value_test.go similarity index 95% rename from pkg/util/atomic_value_test.go rename to pkg/util/atomic/value_test.go index d3fb0cad7f5..cbab545884d 100644 --- a/pkg/util/atomic_value_test.go +++ b/pkg/util/atomic/value_test.go @@ -14,13 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package atomic import ( "math/rand" "sync" "testing" "time" + + "k8s.io/kubernetes/pkg/util" ) func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface{}) { @@ -38,7 +40,7 @@ func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface t.Errorf("Expected to find %v, found %v", expectedValue, actualValue) return } - case <-time.After(ForeverTestTimeout): + case <-time.After(util.ForeverTestTimeout): t.Error("Value could not be read") return }