mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
fix race
This commit is contained in:
parent
6835318d1e
commit
395d69641e
@ -218,8 +218,9 @@ func TestHammerController(t *testing.T) {
|
|||||||
go controller.Run(stop)
|
go controller.Run(stop)
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i := 0; i < 10; i++ {
|
const threads = 3
|
||||||
wg.Add(1)
|
wg.Add(threads)
|
||||||
|
for i := 0; i < threads; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
// Let's add a few objects to the source.
|
// Let's add a few objects to the source.
|
||||||
@ -227,7 +228,7 @@ func TestHammerController(t *testing.T) {
|
|||||||
rs := rand.NewSource(rand.Int63())
|
rs := rand.NewSource(rand.Int63())
|
||||||
f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs)
|
f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs)
|
||||||
r := rand.New(rs) // Mustn't use r and f concurrently!
|
r := rand.New(rs) // Mustn't use r and f concurrently!
|
||||||
for i := 0; i < 750; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
var name string
|
var name string
|
||||||
var isNew bool
|
var isNew bool
|
||||||
if currentNames.Len() == 0 || r.Intn(3) == 1 {
|
if currentNames.Len() == 0 || r.Intn(3) == 1 {
|
||||||
@ -335,48 +336,40 @@ func TestUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
tests := []func(string){
|
tests := []func(string){
|
||||||
func(name string) {
|
func(name string) {
|
||||||
defer wg.Done()
|
|
||||||
testDoneWG.Add(1)
|
|
||||||
name = "a-" + name
|
name = "a-" + name
|
||||||
source.Add(pod(name, FROM))
|
source.Add(pod(name, FROM))
|
||||||
source.Modify(pod(name, TO))
|
source.Modify(pod(name, TO))
|
||||||
},
|
},
|
||||||
func(name string) {
|
func(name string) {
|
||||||
defer wg.Done()
|
|
||||||
testDoneWG.Add(1)
|
|
||||||
name = "b-" + name
|
name = "b-" + name
|
||||||
source.Add(pod(name, FROM))
|
source.Add(pod(name, FROM))
|
||||||
source.ModifyDropWatch(pod(name, TO))
|
source.ModifyDropWatch(pod(name, TO))
|
||||||
},
|
},
|
||||||
func(name string) {
|
func(name string) {
|
||||||
defer wg.Done()
|
|
||||||
testDoneWG.Add(1)
|
|
||||||
name = "c-" + name
|
name = "c-" + name
|
||||||
source.AddDropWatch(pod(name, FROM))
|
source.AddDropWatch(pod(name, FROM))
|
||||||
source.Modify(pod(name, ADD_MISSED))
|
source.Modify(pod(name, ADD_MISSED))
|
||||||
source.Modify(pod(name, TO))
|
source.Modify(pod(name, TO))
|
||||||
},
|
},
|
||||||
func(name string) {
|
func(name string) {
|
||||||
defer wg.Done()
|
|
||||||
testDoneWG.Add(1)
|
|
||||||
name = "d-" + name
|
name = "d-" + name
|
||||||
source.Add(pod(name, FROM))
|
source.Add(pod(name, FROM))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// run every test a few times, in parallel
|
// run every test a few times, in parallel
|
||||||
fuzzer := fuzz.New()
|
const threads = 3
|
||||||
for i := 0; i < 20; i++ {
|
var wg sync.WaitGroup
|
||||||
for _, f := range tests {
|
wg.Add(threads * len(tests))
|
||||||
wg.Add(1)
|
testDoneWG.Add(threads * len(tests))
|
||||||
var name string
|
for i := 0; i < threads; i++ {
|
||||||
for len(name) < 10 {
|
for j, f := range tests {
|
||||||
fuzzer.Fuzz(&name)
|
go func(name string, f func(string)) {
|
||||||
}
|
defer wg.Done()
|
||||||
go f(name)
|
f(name)
|
||||||
|
}(fmt.Sprintf("%v-%v", i, j), f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
@ -125,8 +126,15 @@ func (f *FakeControllerSource) List() (runtime.Object, error) {
|
|||||||
defer f.lock.RUnlock()
|
defer f.lock.RUnlock()
|
||||||
list := make([]runtime.Object, 0, len(f.items))
|
list := make([]runtime.Object, 0, len(f.items))
|
||||||
for _, obj := range f.items {
|
for _, obj := range f.items {
|
||||||
// TODO: should copy obj first
|
// Must make a copy to allow clients to modify the object.
|
||||||
list = append(list, obj)
|
// Otherwise, if they make a change and write it back, they
|
||||||
|
// will inadvertantly change the our canonical copy (in
|
||||||
|
// addition to racing with other clients).
|
||||||
|
objCopy, err := conversion.DeepCopy(obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
list = append(list, objCopy.(runtime.Object))
|
||||||
}
|
}
|
||||||
listObj := &api.List{}
|
listObj := &api.List{}
|
||||||
if err := runtime.SetList(listObj, list); err != nil {
|
if err := runtime.SetList(listObj, list); err != nil {
|
||||||
@ -151,7 +159,20 @@ func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, e
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if rc < len(f.changes) {
|
if rc < len(f.changes) {
|
||||||
return f.broadcaster.WatchWithPrefix(f.changes[rc:]), nil
|
changes := []watch.Event{}
|
||||||
|
for _, c := range f.changes[rc:] {
|
||||||
|
// Must make a copy to allow clients to modify the
|
||||||
|
// object. Otherwise, if they make a change and write
|
||||||
|
// it back, they will inadvertantly change the our
|
||||||
|
// canonical copy (in addition to racing with other
|
||||||
|
// clients).
|
||||||
|
objCopy, err := conversion.DeepCopy(c.Object)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
changes = append(changes, watch.Event{c.Type, objCopy.(runtime.Object)})
|
||||||
|
}
|
||||||
|
return f.broadcaster.WatchWithPrefix(changes), nil
|
||||||
} else if rc > len(f.changes) {
|
} else if rc > len(f.changes) {
|
||||||
return nil, errors.New("resource version in the future not supported by this fake")
|
return nil, errors.New("resource version in the future not supported by this fake")
|
||||||
}
|
}
|
||||||
|
@ -17,22 +17,27 @@ limitations under the License.
|
|||||||
package conversion
|
package conversion
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/gob"
|
||||||
"reflect"
|
"reflect"
|
||||||
)
|
)
|
||||||
|
|
||||||
var deepCopier = NewConverter()
|
|
||||||
|
|
||||||
// DeepCopy makes a deep copy of source. Won't work for any private fields!
|
// DeepCopy makes a deep copy of source. Won't work for any private fields!
|
||||||
// For nil slices, will return 0-length slices. These are equivilent in
|
// For nil slices, will return 0-length slices. These are equivilent in
|
||||||
// basically every way except for the way that reflect.DeepEqual checks.
|
// basically every way except for the way that reflect.DeepEqual checks.
|
||||||
func DeepCopy(source interface{}) (interface{}, error) {
|
func DeepCopy(source interface{}) (interface{}, error) {
|
||||||
src := reflect.ValueOf(source)
|
v := reflect.New(reflect.TypeOf(source))
|
||||||
v := reflect.New(src.Type()).Elem()
|
|
||||||
s := &scope{
|
buff := &bytes.Buffer{}
|
||||||
converter: deepCopier,
|
enc := gob.NewEncoder(buff)
|
||||||
}
|
dec := gob.NewDecoder(buff)
|
||||||
if err := deepCopier.convert(src, v, s); err != nil {
|
err := enc.Encode(source)
|
||||||
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return v.Interface(), nil
|
err = dec.Decode(v.Interface())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return v.Elem().Interface(), nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user