Merge pull request #16237 from ZJU-SEL/fix-util

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-12-14 18:41:14 -08:00
commit 1f0e46abb8
5 changed files with 66 additions and 63 deletions

View File

@ -66,6 +66,7 @@ import (
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/atomic"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/chmod"
"k8s.io/kubernetes/pkg/util/chown"
@ -324,7 +325,7 @@ func NewMainKubelet(
configureCBR0: configureCBR0,
reconcileCIDR: reconcileCIDR,
maxPods: maxPods,
syncLoopMonitor: util.AtomicValue{},
syncLoopMonitor: atomic.Value{},
resolverConfig: resolverConfig,
cpuCFSQuota: cpuCFSQuota,
daemonEndpoints: daemonEndpoints,
@ -635,7 +636,7 @@ type Kubelet struct {
maxPods int
// Monitor Kubelet's sync loop
syncLoopMonitor util.AtomicValue
syncLoopMonitor atomic.Value
// Container restart Backoff
backOff *util.Backoff

View File

@ -19,6 +19,7 @@ package etcd
import (
"net/http"
"sync"
"sync/atomic"
"time"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -42,6 +43,23 @@ const (
EtcdExpire = "expire"
)
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Update returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Update(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}
// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)
@ -169,7 +187,7 @@ func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming
}
var (
watchChannelHWM util.HighWaterMark
watchChannelHWM HighWaterMark
)
// translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
@ -214,7 +232,7 @@ func (w *etcdWatcher) translate() {
return
case res, ok := <-w.etcdIncoming:
if ok {
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Check(curLen) {
if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
glog.V(2).Infof("watch: %v objects queued in channel.", curLen)
}

View File

@ -17,7 +17,9 @@ limitations under the License.
package etcd
import (
"math/rand"
rt "runtime"
"sync"
"testing"
"github.com/coreos/go-etcd/etcd"
@ -463,3 +465,34 @@ func TestWatchPurposefulShutdown(t *testing.T) {
t.Errorf("Channel should be closed")
}
}
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Update(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Update(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Update(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}

View File

@ -14,47 +14,29 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package atomic
import (
"sync"
"sync/atomic"
)
// TODO(ArtfulCoder)
// sync/atomic/Value was added in golang 1.4
// Once support is dropped for go 1.3, this type must be deprecated in favor of sync/atomic/Value.
// The functions are named Load/Store to match sync/atomic/Value function names.
type AtomicValue struct {
type Value struct {
value interface{}
valueMutex sync.RWMutex
}
func (at *AtomicValue) Store(val interface{}) {
func (at *Value) Store(val interface{}) {
at.valueMutex.Lock()
defer at.valueMutex.Unlock()
at.value = val
}
func (at *AtomicValue) Load() interface{} {
func (at *Value) Load() interface{} {
at.valueMutex.RLock()
defer at.valueMutex.RUnlock()
return at.value
}
// HighWaterMark is a thread-safe object for tracking the maximum value seen
// for some quantity.
type HighWaterMark int64
// Check returns true if and only if 'current' is the highest value ever seen.
func (hwm *HighWaterMark) Check(current int64) bool {
for {
old := atomic.LoadInt64((*int64)(hwm))
if current <= old {
return false
}
if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
return true
}
}
}

View File

@ -14,16 +14,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package atomic
import (
"math/rand"
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/util"
)
func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface{}) {
func ExpectValue(t *testing.T, atomicValue *Value, expectedValue interface{}) {
actualValue := atomicValue.Load()
if actualValue != expectedValue {
t.Errorf("Expected to find %v, found %v", expectedValue, actualValue)
@ -38,46 +38,15 @@ func ExpectValue(t *testing.T, atomicValue *AtomicValue, expectedValue interface
t.Errorf("Expected to find %v, found %v", expectedValue, actualValue)
return
}
case <-time.After(ForeverTestTimeout):
case <-time.After(util.ForeverTestTimeout):
t.Error("Value could not be read")
return
}
}
func TestAtomicValue(t *testing.T) {
atomicValue := &AtomicValue{}
atomicValue := &Value{}
ExpectValue(t, atomicValue, nil)
atomicValue.Store(10)
ExpectValue(t, atomicValue, 10)
}
func TestHighWaterMark(t *testing.T) {
var h HighWaterMark
for i := int64(10); i < 20; i++ {
if !h.Check(i) {
t.Errorf("unexpected false for %v", i)
}
if h.Check(i - 1) {
t.Errorf("unexpected true for %v", i-1)
}
}
m := int64(0)
wg := sync.WaitGroup{}
for i := 0; i < 300; i++ {
wg.Add(1)
v := rand.Int63()
go func(v int64) {
defer wg.Done()
h.Check(v)
}(v)
if v > m {
m = v
}
}
wg.Wait()
if m != int64(h) {
t.Errorf("unexpected value, wanted %v, got %v", m, int64(h))
}
}