diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 6209b39ebc2..2ffa525c381 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -711,19 +711,19 @@ __EOF__ ### Create and delete persistent volume examples # Pre-condition: no persistent volumes currently exist - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" '' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" '' # Command kubectl create -f docs/user-guide/persistent-volumes/volumes/local-01.yaml "${kube_flags[@]}" - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0001:' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0001:' kubectl delete pv pv0001 "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/volumes/local-02.yaml "${kube_flags[@]}" - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0002:' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0002:' kubectl delete pv pv0002 "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/volumes/gce.yaml "${kube_flags[@]}" - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0003:' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0003:' kubectl delete pv pv0003 "${kube_flags[@]}" # Post-condition: no PVs - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" '' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" '' ############################ # Persistent Volume Claims # @@ -731,21 +731,21 @@ __EOF__ ### Create and delete persistent volume claim examples # Pre-condition: no persistent volume claims currently exist - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" '' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" '' # Command kubectl create -f docs/user-guide/persistent-volumes/claims/claim-01.yaml "${kube_flags[@]}" - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-1:' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-1:' kubectl delete pvc myclaim-1 "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/claims/claim-02.yaml "${kube_flags[@]}" - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-2:' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-2:' kubectl delete pvc myclaim-2 "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/claims/claim-03.json "${kube_flags[@]}" - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-3:' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-3:' kubectl delete pvc myclaim-3 "${kube_flags[@]}" # Post-condition: no PVCs - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" '' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" '' diff --git a/pkg/client/unversioned/record/event_test.go b/pkg/client/unversioned/record/event_test.go index 90bb082496e..5234513c466 100644 --- a/pkg/client/unversioned/record/event_test.go +++ b/pkg/client/unversioned/record/event_test.go @@ -21,6 +21,7 @@ import ( "reflect" "strconv" "strings" + "sync" "testing" "k8s.io/kubernetes/pkg/api" @@ -271,39 +272,41 @@ func TestEventf(t *testing.T) { } for _, item := range table { - called := make(chan struct{}) + var wg sync.WaitGroup + // We expect only one callback + wg.Add(1) testEvents := testEventSink{ OnCreate: func(event *api.Event) (*api.Event, error) { + defer wg.Done() returnEvent, _ := validateEvent(event, item.expect, t) if item.expectUpdate { t.Errorf("Expected event update(), got event create()") } - called <- struct{}{} return returnEvent, nil }, OnUpdate: func(event *api.Event) (*api.Event, error) { + defer wg.Done() returnEvent, _ := validateEvent(event, item.expect, t) if !item.expectUpdate { t.Errorf("Expected event create(), got event update()") } - called <- struct{}{} return returnEvent, nil }, } eventBroadcaster := NewBroadcaster() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful + wg.Add(1) logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { + defer wg.Done() if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { t.Errorf("Expected '%v', got '%v'", e, a) } - called <- struct{}{} }) recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) - <-called - <-called + wg.Wait() sinkWatcher.Stop() logWatcher1.Stop() logWatcher2.Stop() @@ -316,17 +319,17 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing. if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() { t.Errorf("timestamp wasn't set: %#v", *actualEvent) } - if actualEvent.FirstTimestamp.Equal(actualEvent.LastTimestamp) { - if expectCompression { - t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent) - } - } else { - if !expectCompression { - t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent) - } - } actualFirstTimestamp := actualEvent.FirstTimestamp actualLastTimestamp := actualEvent.LastTimestamp + if actualFirstTimestamp.Equal(actualLastTimestamp) { + if expectCompression { + t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent) + } + } else { + if expectedEvent.Count == 1 { + t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent) + } + } // Temp clear time stamps for comparison because actual values don't matter for comparison actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp actualEvent.LastTimestamp = expectedEvent.LastTimestamp diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index 836790b3c83..7241c784fd5 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -335,10 +335,6 @@ func TestUpdate(t *testing.T) { }, ) - // Run the controller and run it until we close stop. - stop := make(chan struct{}) - go controller.Run(stop) - pod := func(name, check string) *api.Pod { return &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -371,11 +367,18 @@ func TestUpdate(t *testing.T) { }, } - // run every test a few times, in parallel const threads = 3 + testDoneWG.Add(threads * len(tests)) + + // Run the controller and run it until we close stop. + // Once Run() is called, calls to testDoneWG.Done() might start, so + // all testDoneWG.Add() calls must happen before this point + stop := make(chan struct{}) + go controller.Run(stop) + + // run every test a few times, in parallel var wg sync.WaitGroup wg.Add(threads * len(tests)) - testDoneWG.Add(threads * len(tests)) for i := 0; i < threads; i++ { for j, f := range tests { go func(name string, f func(string)) { diff --git a/pkg/probe/http/http_test.go b/pkg/probe/http/http_test.go index 987f3103f88..65cd0398282 100644 --- a/pkg/probe/http/http_test.go +++ b/pkg/probe/http/http_test.go @@ -29,6 +29,15 @@ import ( "k8s.io/kubernetes/pkg/probe" ) +func containsAny(s string, substrs []string) bool { + for _, substr := range substrs { + if strings.Contains(s, substr) { + return true + } + } + return false +} + func TestHTTPProbeChecker(t *testing.T) { handleReq := func(s int, body string) func(w http.ResponseWriter) { return func(w http.ResponseWriter) { @@ -41,12 +50,31 @@ func TestHTTPProbeChecker(t *testing.T) { testCases := []struct { handler func(w http.ResponseWriter) health probe.Result - body string + // go1.5: error message changed for timeout, need to support + // both old and new + accBodies []string }{ // The probe will be filled in below. This is primarily testing that an HTTP GET happens. - {handleReq(http.StatusOK, "ok body"), probe.Success, "ok body"}, - {handleReq(-1, "fail body"), probe.Failure, "fail body"}, - {func(w http.ResponseWriter) { time.Sleep(3 * time.Second) }, probe.Failure, "use of closed network connection"}, + { + handleReq(http.StatusOK, "ok body"), + probe.Success, + []string{"ok body"}, + }, + { + handleReq(-1, "fail body"), + probe.Failure, + []string{"fail body"}, + }, + { + func(w http.ResponseWriter) { + time.Sleep(3 * time.Second) + }, + probe.Failure, + []string{ + "use of closed network connection", + "request canceled (Client.Timeout exceeded while awaiting headers)", + }, + }, } for _, test := range testCases { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -74,8 +102,8 @@ func TestHTTPProbeChecker(t *testing.T) { if health != test.health { t.Errorf("Expected %v, got %v", test.health, health) } - if !strings.Contains(output, test.body) { - t.Errorf("Expected %v, got %v", test.body, output) + if !containsAny(output, test.accBodies) { + t.Errorf("Expected one of %#v, got %v", test.accBodies, output) } } } diff --git a/pkg/probe/tcp/tcp_test.go b/pkg/probe/tcp/tcp_test.go index d30f088a78f..8b4c80e03c0 100644 --- a/pkg/probe/tcp/tcp_test.go +++ b/pkg/probe/tcp/tcp_test.go @@ -29,17 +29,28 @@ import ( "k8s.io/kubernetes/pkg/probe" ) +func containsAny(s string, substrs []string) bool { + for _, substr := range substrs { + if strings.Contains(s, substr) { + return true + } + } + return false +} + func TestTcpHealthChecker(t *testing.T) { prober := New() tests := []struct { expectedStatus probe.Result usePort bool expectError bool - output string + // Some errors are different depending on your system, make + // the test pass on all of them + accOutputs []string }{ // The probe will be filled in below. This is primarily testing that a connection is made. - {probe.Success, true, false, ""}, - {probe.Failure, false, false, "tcp: unknown port"}, + {probe.Success, true, false, []string{""}}, + {probe.Failure, false, false, []string{"unknown port", "Servname not supported for ai_socktype"}}, } server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -72,8 +83,8 @@ func TestTcpHealthChecker(t *testing.T) { if err == nil && test.expectError { t.Errorf("unexpected non-error.") } - if !strings.Contains(output, test.output) { - t.Errorf("expected %s, got %s", test.output, output) + if !containsAny(output, test.accOutputs) { + t.Errorf("expected one of %#v, got %s", test.accOutputs, output) } } } diff --git a/pkg/util/jsonpath/jsonpath.go b/pkg/util/jsonpath/jsonpath.go index 817af6307e1..7d4b7620995 100644 --- a/pkg/util/jsonpath/jsonpath.go +++ b/pkg/util/jsonpath/jsonpath.go @@ -217,7 +217,7 @@ func (j *JSONPath) evalArray(input []reflect.Value, node *ArrayNode) ([]reflect. value, isNil := template.Indirect(value) if isNil || (value.Kind() != reflect.Array && value.Kind() != reflect.Slice) { - return input, fmt.Errorf("%v is not array or slice", value) + return input, fmt.Errorf("%v is not array or slice", value.Type()) } params := node.Params if !params[0].Known { diff --git a/pkg/util/jsonpath/jsonpath_test.go b/pkg/util/jsonpath/jsonpath_test.go index 9922a8fee21..6a79d9fdfc6 100644 --- a/pkg/util/jsonpath/jsonpath_test.go +++ b/pkg/util/jsonpath/jsonpath_test.go @@ -162,7 +162,7 @@ func TestStructInput(t *testing.T) { failStoreTests := []jsonpathTest{ {"invalid identfier", "{hello}", storeData, "unrecongnized identifier hello"}, {"nonexistent field", "{.hello}", storeData, "hello is not found"}, - {"invalid array", "{.Labels[0]}", storeData, " is not array or slice"}, + {"invalid array", "{.Labels[0]}", storeData, "map[string]int is not array or slice"}, {"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>"}, {"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end"}, } diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 5d1f71768de..ccae32264fb 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -41,8 +41,9 @@ const incomingQueueLength = 25 type Broadcaster struct { lock sync.Mutex - watchers map[int64]*broadcasterWatcher - nextWatcher int64 + watchers map[int64]*broadcasterWatcher + nextWatcher int64 + distributing sync.WaitGroup incoming chan Event @@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B watchQueueLength: queueLength, fullChannelBehavior: fullChannelBehavior, } + m.distributing.Add(1) go m.loop() return m } @@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) { } // Shutdown disconnects all watchers (but any queued events will still be distributed). -// You must not call Action after calling Shutdown. +// You must not call Action or Watch* after calling Shutdown. This call blocks +// until all events have been distributed through the outbound channels. Note +// that since they can be buffered, this means that the watchers might not +// have received the data yet as it can remain sitting in the buffered +// channel. func (m *Broadcaster) Shutdown() { close(m.incoming) + m.distributing.Wait() } // loop receives from m.incoming and distributes to all watchers. @@ -163,6 +170,7 @@ func (m *Broadcaster) loop() { m.distribute(event) } m.closeAll() + m.distributing.Done() } // distribute sends event to all watchers. Blocking. diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index fd31910060c..d3e48279cc6 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { event2 := Event{Added, &myType{"bar", "hello world 2"}} // Add a couple watchers - const testWatchers = 2 - watches := make([]Interface, testWatchers) - for i := 0; i < testWatchers; i++ { + watches := make([]Interface, 2) + for i := range watches { watches[i] = m.Watch() } @@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { // Pull events from the queue. wg := sync.WaitGroup{} - wg.Add(testWatchers) - for i := 0; i < testWatchers; i++ { + wg.Add(len(watches)) + for i := range watches { // Verify that each watcher only gets the first event because its watch // queue of length one was full from the first one. go func(watcher int, w Interface) { @@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { e1, ok := <-w.ResultChan() if !ok { t.Errorf("Watcher %v failed to retrieve first event.", watcher) - return } if e, a := event1, e1; !reflect.DeepEqual(e, a) { t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", watcher, e.Type, e.Object, a.Type, a.Object) - } else { - t.Logf("Got (%v, %#v)", e1.Type, e1.Object) } + t.Logf("Got (%v, %#v)", e1.Type, e1.Object) e2, ok := <-w.ResultChan() if ok { t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",