diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 46ed6d333fe..0c4fdc104c9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -66,6 +66,7 @@ import ( "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/atomic" "k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/chmod" "k8s.io/kubernetes/pkg/util/chown" @@ -324,7 +325,7 @@ func NewMainKubelet( configureCBR0: configureCBR0, reconcileCIDR: reconcileCIDR, maxPods: maxPods, - syncLoopMonitor: util.AtomicValue{}, + syncLoopMonitor: atomic.Value{}, resolverConfig: resolverConfig, cpuCFSQuota: cpuCFSQuota, daemonEndpoints: daemonEndpoints, @@ -635,7 +636,7 @@ type Kubelet struct { maxPods int // Monitor Kubelet's sync loop - syncLoopMonitor util.AtomicValue + syncLoopMonitor atomic.Value // Container restart Backoff backOff *util.Backoff diff --git a/pkg/storage/etcd/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go index c2a057e68ef..20549ef3976 100644 --- a/pkg/storage/etcd/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -19,6 +19,7 @@ package etcd import ( "net/http" "sync" + "sync/atomic" "time" "k8s.io/kubernetes/pkg/api/unversioned" @@ -42,6 +43,23 @@ const ( EtcdExpire = "expire" ) +// HighWaterMark is a thread-safe object for tracking the maximum value seen +// for some quantity. +type HighWaterMark int64 + +// Update returns true if and only if 'current' is the highest value ever seen. +func (hwm *HighWaterMark) Update(current int64) bool { + for { + old := atomic.LoadInt64((*int64)(hwm)) + if current <= old { + return false + } + if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { + return true + } + } +} + // TransformFunc attempts to convert an object to another object for use with a watcher. type TransformFunc func(runtime.Object) (runtime.Object, error) @@ -169,7 +187,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming } var ( - watchChannelHWM util.HighWaterMark + watchChannelHWM HighWaterMark ) // translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be @@ -214,7 +232,7 @@ func (w *etcdWatcher) translate() { return case res, ok := <-w.etcdIncoming: if ok { - if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Check(curLen) { + if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) { // Monitor if this gets backed up, and how much. glog.V(2).Infof("watch: %v objects queued in channel.", curLen) } diff --git a/pkg/storage/etcd/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go index 561998bac22..12a7f6d07fb 100644 --- a/pkg/storage/etcd/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -17,7 +17,9 @@ limitations under the License. package etcd import ( + "math/rand" rt "runtime" + "sync" "testing" "github.com/coreos/go-etcd/etcd" @@ -463,3 +465,34 @@ func TestWatchPurposefulShutdown(t *testing.T) { t.Errorf("Channel should be closed") } } + +func TestHighWaterMark(t *testing.T) { + var h HighWaterMark + + for i := int64(10); i < 20; i++ { + if !h.Update(i) { + t.Errorf("unexpected false for %v", i) + } + if h.Update(i - 1) { + t.Errorf("unexpected true for %v", i-1) + } + } + + m := int64(0) + wg := sync.WaitGroup{} + for i := 0; i < 300; i++ { + wg.Add(1) + v := rand.Int63() + go func(v int64) { + defer wg.Done() + h.Update(v) + }(v) + if v > m { + m = v + } + } + wg.Wait() + if m != int64(h) { + t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) + } +} diff --git a/pkg/util/atomic_value.go b/pkg/util/atomic/value.go similarity index 64% rename from pkg/util/atomic_value.go rename to pkg/util/atomic/value.go index 546e19d713f..a9bc8cd813c 100644 --- a/pkg/util/atomic_value.go +++ b/pkg/util/atomic/value.go @@ -14,47 +14,29 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package atomic import ( "sync" - "sync/atomic" ) // TODO(ArtfulCoder) // sync/atomic/Value was added in golang 1.4 // Once support is dropped for go 1.3, this type must be deprecated in favor of sync/atomic/Value. // The functions are named Load/Store to match sync/atomic/Value function names. -type AtomicValue struct { +type Value struct { value interface{} valueMutex sync.RWMutex } -func (at *AtomicValue) Store(val interface{}) { +func (at *Value) Store(val interface{}) { at.valueMutex.Lock() defer at.valueMutex.Unlock() at.value = val } -func (at *AtomicValue) Load() interface{} { +func (at *Value) Load() interface{} { at.valueMutex.RLock() defer at.valueMutex.RUnlock() return at.value } - -// HighWaterMark is a thread-safe object for tracking the maximum value seen -// for some quantity. -type HighWaterMark int64 - -// Check returns true if and only if 'current' is the highest value ever seen. -func (hwm *HighWaterMark) Check(current int64) bool { - for { - old := atomic.LoadInt64((*int64)(hwm)) - if current <= old { - return false - } - if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) { - return true - } - } -} diff --git a/pkg/util/atomic_value_test.go b/pkg/util/atomic/value_test.go similarity index 62% rename from pkg/util/atomic_value_test.go rename to pkg/util/atomic/value_test.go index d3fb0cad7f5..0cb839dd5e5 100644 --- a/pkg/util/atomic_value_test.go +++ b/pkg/util/atomic/value_test.go @@ -14,16 +14,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package atomic import ( - "math/rand" - "sync" "testing" "time" + + "k8s.io/kubernetes/pkg/util" ) -func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface{}) { +func ExpectValue(t *testing.T, atomicValue *Value, expectedValue interface{}) { actualValue := atomicValue.Load() if actualValue != expectedValue { t.Errorf("Expected to find %v, found %v", expectedValue, actualValue) @@ -38,46 +38,15 @@ func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface t.Errorf("Expected to find %v, found %v", expectedValue, actualValue) return } - case <-time.After(ForeverTestTimeout): + case <-time.After(util.ForeverTestTimeout): t.Error("Value could not be read") return } } func TestAtomicValue(t *testing.T) { - atomicValue := &AtomicValue{} + atomicValue := &Value{} ExpectValue(t, atomicValue, nil) atomicValue.Store(10) ExpectValue(t, atomicValue, 10) } - -func TestHighWaterMark(t *testing.T) { - var h HighWaterMark - - for i := int64(10); i < 20; i++ { - if !h.Check(i) { - t.Errorf("unexpected false for %v", i) - } - if h.Check(i - 1) { - t.Errorf("unexpected true for %v", i-1) - } - } - - m := int64(0) - wg := sync.WaitGroup{} - for i := 0; i < 300; i++ { - wg.Add(1) - v := rand.Int63() - go func(v int64) { - defer wg.Done() - h.Check(v) - }(v) - if v > m { - m = v - } - } - wg.Wait() - if m != int64(h) { - t.Errorf("unexpected value, wanted %v, got %v", m, int64(h)) - } -}