From 5f3aff2c43fe2ba7c3c88e8a51b2d53833f7bd34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Fri, 21 Aug 2015 14:15:09 -0700 Subject: [PATCH 1/7] Fix struct input test in jsonpath --- pkg/util/jsonpath/jsonpath.go | 2 +- pkg/util/jsonpath/jsonpath_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/util/jsonpath/jsonpath.go b/pkg/util/jsonpath/jsonpath.go index 817af6307e1..7d4b7620995 100644 --- a/pkg/util/jsonpath/jsonpath.go +++ b/pkg/util/jsonpath/jsonpath.go @@ -217,7 +217,7 @@ func (j *JSONPath) evalArray(input []reflect.Value, node *ArrayNode) ([]reflect. value, isNil := template.Indirect(value) if isNil || (value.Kind() != reflect.Array && value.Kind() != reflect.Slice) { - return input, fmt.Errorf("%v is not array or slice", value) + return input, fmt.Errorf("%v is not array or slice", value.Type()) } params := node.Params if !params[0].Known { diff --git a/pkg/util/jsonpath/jsonpath_test.go b/pkg/util/jsonpath/jsonpath_test.go index 9922a8fee21..6a79d9fdfc6 100644 --- a/pkg/util/jsonpath/jsonpath_test.go +++ b/pkg/util/jsonpath/jsonpath_test.go @@ -162,7 +162,7 @@ func TestStructInput(t *testing.T) { failStoreTests := []jsonpathTest{ {"invalid identfier", "{hello}", storeData, "unrecongnized identifier hello"}, {"nonexistent field", "{.hello}", storeData, "hello is not found"}, - {"invalid array", "{.Labels[0]}", storeData, " is not array or slice"}, + {"invalid array", "{.Labels[0]}", storeData, "map[string]int is not array or slice"}, {"invalid filter operator", "{.Book[?(@.Price<>10)]}", storeData, "unrecognized filter operator <>"}, {"redundent end", "{range .Labels.*}{@}{end}{end}", storeData, "not in range, nothing to end"}, } From 0b10c6cfae0853935c571aa5f1b16128ca88950d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Fri, 21 Aug 2015 16:00:44 -0700 Subject: [PATCH 2/7] Fix failing test in probe/http The error that Go 1.5 returns is different compared to 1.4. Support both. --- pkg/probe/http/http_test.go | 40 +++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/pkg/probe/http/http_test.go b/pkg/probe/http/http_test.go index 987f3103f88..65cd0398282 100644 --- a/pkg/probe/http/http_test.go +++ b/pkg/probe/http/http_test.go @@ -29,6 +29,15 @@ import ( "k8s.io/kubernetes/pkg/probe" ) +func containsAny(s string, substrs []string) bool { + for _, substr := range substrs { + if strings.Contains(s, substr) { + return true + } + } + return false +} + func TestHTTPProbeChecker(t *testing.T) { handleReq := func(s int, body string) func(w http.ResponseWriter) { return func(w http.ResponseWriter) { @@ -41,12 +50,31 @@ func TestHTTPProbeChecker(t *testing.T) { testCases := []struct { handler func(w http.ResponseWriter) health probe.Result - body string + // go1.5: error message changed for timeout, need to support + // both old and new + accBodies []string }{ // The probe will be filled in below. This is primarily testing that an HTTP GET happens. - {handleReq(http.StatusOK, "ok body"), probe.Success, "ok body"}, - {handleReq(-1, "fail body"), probe.Failure, "fail body"}, - {func(w http.ResponseWriter) { time.Sleep(3 * time.Second) }, probe.Failure, "use of closed network connection"}, + { + handleReq(http.StatusOK, "ok body"), + probe.Success, + []string{"ok body"}, + }, + { + handleReq(-1, "fail body"), + probe.Failure, + []string{"fail body"}, + }, + { + func(w http.ResponseWriter) { + time.Sleep(3 * time.Second) + }, + probe.Failure, + []string{ + "use of closed network connection", + "request canceled (Client.Timeout exceeded while awaiting headers)", + }, + }, } for _, test := range testCases { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -74,8 +102,8 @@ func TestHTTPProbeChecker(t *testing.T) { if health != test.health { t.Errorf("Expected %v, got %v", test.health, health) } - if !strings.Contains(output, test.body) { - t.Errorf("Expected %v, got %v", test.body, output) + if !containsAny(output, test.accBodies) { + t.Errorf("Expected one of %#v, got %v", test.accBodies, output) } } } From 7aca60f636ecaf43c2246f1d4767b2a333257083 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Fri, 21 Aug 2015 22:20:47 -0700 Subject: [PATCH 3/7] Fix probe/tcp test on some systems Depending on your system, the error might be "Servname not supported for ai_socktype" instead of "unknown port". Accept both. For example: http://www.ducea.com/2006/09/11/error-servname-not-supported-for-ai_socktype/ --- pkg/probe/tcp/tcp_test.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/probe/tcp/tcp_test.go b/pkg/probe/tcp/tcp_test.go index d30f088a78f..8b4c80e03c0 100644 --- a/pkg/probe/tcp/tcp_test.go +++ b/pkg/probe/tcp/tcp_test.go @@ -29,17 +29,28 @@ import ( "k8s.io/kubernetes/pkg/probe" ) +func containsAny(s string, substrs []string) bool { + for _, substr := range substrs { + if strings.Contains(s, substr) { + return true + } + } + return false +} + func TestTcpHealthChecker(t *testing.T) { prober := New() tests := []struct { expectedStatus probe.Result usePort bool expectError bool - output string + // Some errors are different depending on your system, make + // the test pass on all of them + accOutputs []string }{ // The probe will be filled in below. This is primarily testing that a connection is made. - {probe.Success, true, false, ""}, - {probe.Failure, false, false, "tcp: unknown port"}, + {probe.Success, true, false, []string{""}}, + {probe.Failure, false, false, []string{"unknown port", "Servname not supported for ai_socktype"}}, } server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -72,8 +83,8 @@ func TestTcpHealthChecker(t *testing.T) { if err == nil && test.expectError { t.Errorf("unexpected non-error.") } - if !strings.Contains(output, test.output) { - t.Errorf("expected %s, got %s", test.output, output) + if !containsAny(output, test.accOutputs) { + t.Errorf("expected one of %#v, got %s", test.accOutputs, output) } } } From c054b201485feefacd72bb44dd7d08083327642c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Fri, 21 Aug 2015 23:34:17 -0700 Subject: [PATCH 4/7] Avoid using two periods in templates Since $id_field already starts with a period, .$id_field would result in the following in Go 1.5: error: error parsing template {{range.items}}{{..metadata.name}}:{{end}}, template: output:1: unexpected . after term "." Apparently, Go 1.4 allowed this. Unnecessary anyway. --- hack/test-cmd.sh | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 6209b39ebc2..2ffa525c381 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -711,19 +711,19 @@ __EOF__ ### Create and delete persistent volume examples # Pre-condition: no persistent volumes currently exist - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" '' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" '' # Command kubectl create -f docs/user-guide/persistent-volumes/volumes/local-01.yaml "${kube_flags[@]}" - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0001:' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0001:' kubectl delete pv pv0001 "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/volumes/local-02.yaml "${kube_flags[@]}" - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0002:' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0002:' kubectl delete pv pv0002 "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/volumes/gce.yaml "${kube_flags[@]}" - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" 'pv0003:' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" 'pv0003:' kubectl delete pv pv0003 "${kube_flags[@]}" # Post-condition: no PVs - kube::test::get_object_assert pv "{{range.items}}{{.$id_field}}:{{end}}" '' + kube::test::get_object_assert pv "{{range.items}}{{$id_field}}:{{end}}" '' ############################ # Persistent Volume Claims # @@ -731,21 +731,21 @@ __EOF__ ### Create and delete persistent volume claim examples # Pre-condition: no persistent volume claims currently exist - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" '' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" '' # Command kubectl create -f docs/user-guide/persistent-volumes/claims/claim-01.yaml "${kube_flags[@]}" - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-1:' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-1:' kubectl delete pvc myclaim-1 "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/claims/claim-02.yaml "${kube_flags[@]}" - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-2:' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-2:' kubectl delete pvc myclaim-2 "${kube_flags[@]}" kubectl create -f docs/user-guide/persistent-volumes/claims/claim-03.json "${kube_flags[@]}" - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" 'myclaim-3:' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" 'myclaim-3:' kubectl delete pvc myclaim-3 "${kube_flags[@]}" # Post-condition: no PVCs - kube::test::get_object_assert pvc "{{range.items}}{{.$id_field}}:{{end}}" '' + kube::test::get_object_assert pvc "{{range.items}}{{$id_field}}:{{end}}" '' From ad243edaa3bfcefbb69ab3ff42cb6fe39938599d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Sat, 22 Aug 2015 15:01:56 -0700 Subject: [PATCH 5/7] Fix race condition in watch The Shutdown() call returned immediately, without waiting for all event distributions to be completed. Even worse, it would close all the watcher result channels before all the info was sent to them. Properly wait for all distributor goroutines - currently only one - to be finished. This fixes the flaky test TestBroadcasterDropIfChannelFull. Bonus cleanup on said test too. --- pkg/watch/mux.go | 14 +++++++++++--- pkg/watch/mux_test.go | 13 +++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 5d1f71768de..ccae32264fb 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -41,8 +41,9 @@ const incomingQueueLength = 25 type Broadcaster struct { lock sync.Mutex - watchers map[int64]*broadcasterWatcher - nextWatcher int64 + watchers map[int64]*broadcasterWatcher + nextWatcher int64 + distributing sync.WaitGroup incoming chan Event @@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B watchQueueLength: queueLength, fullChannelBehavior: fullChannelBehavior, } + m.distributing.Add(1) go m.loop() return m } @@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) { } // Shutdown disconnects all watchers (but any queued events will still be distributed). -// You must not call Action after calling Shutdown. +// You must not call Action or Watch* after calling Shutdown. This call blocks +// until all events have been distributed through the outbound channels. Note +// that since they can be buffered, this means that the watchers might not +// have received the data yet as it can remain sitting in the buffered +// channel. func (m *Broadcaster) Shutdown() { close(m.incoming) + m.distributing.Wait() } // loop receives from m.incoming and distributes to all watchers. @@ -163,6 +170,7 @@ func (m *Broadcaster) loop() { m.distribute(event) } m.closeAll() + m.distributing.Done() } // distribute sends event to all watchers. Blocking. diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index fd31910060c..d3e48279cc6 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { event2 := Event{Added, &myType{"bar", "hello world 2"}} // Add a couple watchers - const testWatchers = 2 - watches := make([]Interface, testWatchers) - for i := 0; i < testWatchers; i++ { + watches := make([]Interface, 2) + for i := range watches { watches[i] = m.Watch() } @@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { // Pull events from the queue. wg := sync.WaitGroup{} - wg.Add(testWatchers) - for i := 0; i < testWatchers; i++ { + wg.Add(len(watches)) + for i := range watches { // Verify that each watcher only gets the first event because its watch // queue of length one was full from the first one. go func(watcher int, w Interface) { @@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { e1, ok := <-w.ResultChan() if !ok { t.Errorf("Watcher %v failed to retrieve first event.", watcher) - return } if e, a := event1, e1; !reflect.DeepEqual(e, a) { t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", watcher, e.Type, e.Object, a.Type, a.Object) - } else { - t.Logf("Got (%v, %#v)", e1.Type, e1.Object) } + t.Logf("Got (%v, %#v)", e1.Type, e1.Object) e2, ok := <-w.ResultChan() if ok { t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.", From 71ca503d30b91d01da8af0db0fd582c75b54ff57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Sat, 22 Aug 2015 17:51:09 -0700 Subject: [PATCH 6/7] Try to fix flaky test in record/event The error messages were inconsistent with what was actually being tested in regards to timestamps being equal or not. --- pkg/client/unversioned/record/event_test.go | 33 +++++++++++---------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/pkg/client/unversioned/record/event_test.go b/pkg/client/unversioned/record/event_test.go index 90bb082496e..5234513c466 100644 --- a/pkg/client/unversioned/record/event_test.go +++ b/pkg/client/unversioned/record/event_test.go @@ -21,6 +21,7 @@ import ( "reflect" "strconv" "strings" + "sync" "testing" "k8s.io/kubernetes/pkg/api" @@ -271,39 +272,41 @@ func TestEventf(t *testing.T) { } for _, item := range table { - called := make(chan struct{}) + var wg sync.WaitGroup + // We expect only one callback + wg.Add(1) testEvents := testEventSink{ OnCreate: func(event *api.Event) (*api.Event, error) { + defer wg.Done() returnEvent, _ := validateEvent(event, item.expect, t) if item.expectUpdate { t.Errorf("Expected event update(), got event create()") } - called <- struct{}{} return returnEvent, nil }, OnUpdate: func(event *api.Event) (*api.Event, error) { + defer wg.Done() returnEvent, _ := validateEvent(event, item.expect, t) if !item.expectUpdate { t.Errorf("Expected event create(), got event update()") } - called <- struct{}{} return returnEvent, nil }, } eventBroadcaster := NewBroadcaster() sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents) logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful + wg.Add(1) logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) { + defer wg.Done() if e, a := item.expectLog, fmt.Sprintf(formatter, args...); e != a { t.Errorf("Expected '%v', got '%v'", e, a) } - called <- struct{}{} }) recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "eventTest"}) recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...) - <-called - <-called + wg.Wait() sinkWatcher.Stop() logWatcher1.Stop() logWatcher2.Stop() @@ -316,17 +319,17 @@ func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing. if actualEvent.FirstTimestamp.IsZero() || actualEvent.LastTimestamp.IsZero() { t.Errorf("timestamp wasn't set: %#v", *actualEvent) } - if actualEvent.FirstTimestamp.Equal(actualEvent.LastTimestamp) { - if expectCompression { - t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent) - } - } else { - if !expectCompression { - t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualEvent.FirstTimestamp, actualEvent.LastTimestamp, *actualEvent) - } - } actualFirstTimestamp := actualEvent.FirstTimestamp actualLastTimestamp := actualEvent.LastTimestamp + if actualFirstTimestamp.Equal(actualLastTimestamp) { + if expectCompression { + t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent) + } + } else { + if expectedEvent.Count == 1 { + t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, *actualEvent) + } + } // Temp clear time stamps for comparison because actual values don't matter for comparison actualEvent.FirstTimestamp = expectedEvent.FirstTimestamp actualEvent.LastTimestamp = expectedEvent.LastTimestamp From d15de72a92c8841d069b1265e433eb52edc29822 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Tue, 25 Aug 2015 22:51:05 -0700 Subject: [PATCH 7/7] Fix race condition in controller_test Our WaitGroup.Add() call might happen after some WaitGroup.Done() calls done by the controller, so make sure that doesn't happen by doing the Add() calls before letting the controller run. --- FAIL: TestUpdate (2.00s) panic: sync: WaitGroup is reused before previous Wait has returned [recovered] panic: sync: WaitGroup is reused before previous Wait has returned --- pkg/controller/framework/controller_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/controller/framework/controller_test.go b/pkg/controller/framework/controller_test.go index 619b994accf..e5d4fb1a7a0 100644 --- a/pkg/controller/framework/controller_test.go +++ b/pkg/controller/framework/controller_test.go @@ -335,10 +335,6 @@ func TestUpdate(t *testing.T) { }, ) - // Run the controller and run it until we close stop. - stop := make(chan struct{}) - go controller.Run(stop) - pod := func(name, check string) *api.Pod { return &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -371,11 +367,18 @@ func TestUpdate(t *testing.T) { }, } - // run every test a few times, in parallel const threads = 3 + testDoneWG.Add(threads * len(tests)) + + // Run the controller and run it until we close stop. + // Once Run() is called, calls to testDoneWG.Done() might start, so + // all testDoneWG.Add() calls must happen before this point + stop := make(chan struct{}) + go controller.Run(stop) + + // run every test a few times, in parallel var wg sync.WaitGroup wg.Add(threads * len(tests)) - testDoneWG.Add(threads * len(tests)) for i := 0; i < threads; i++ { for j, f := range tests { go func(name string, f func(string)) {