wait for all webserver endpoints to come up before contacting peers

This commit is contained in:
Minhan Xia 2016-02-23 15:18:51 -08:00
parent cdbbeae3e5
commit 66940c2d44
6 changed files with 36 additions and 22 deletions

View File

@ -126,7 +126,7 @@ var _ = Describe("Networking", func() {
"Rerun it with at least two nodes to get complete coverage.")
}
podNames := LaunchNetTestPodPerNode(f, nodes, svcname, "1.7")
podNames := LaunchNetTestPodPerNode(f, nodes, svcname, "1.8")
// Clean up the pods
defer func() {
@ -171,7 +171,9 @@ var _ = Describe("Networking", func() {
DoRaw()
}
timeout := time.Now().Add(2 * time.Minute)
// nettest containers will wait for all service endpoints to come up for 2 minutes
// apply a 3 minutes observation period here to avoid this test to time out before the nettest starts to contact peers
timeout := time.Now().Add(3 * time.Minute)
for i := 0; !passed && timeout.After(time.Now()); i++ {
time.Sleep(2 * time.Second)
Logf("About to make a proxy status call")

View File

@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
TAG = 1.7
TAG = 1.8
PREFIX = gcr.io/google_containers
all: push

View File

@ -22,7 +22,7 @@
"containers": [
{
"name": "webserver",
"image": "gcr.io/google_containers/nettest:1.7",
"image": "gcr.io/google_containers/nettest:1.8",
"imagePullPolicy": "Always",
"args": [
"-service=nettest",

View File

@ -11,7 +11,7 @@
"containers": [
{
"name": "webserver",
"image": "gcr.io/google_containers/nettest:1.7",
"image": "gcr.io/google_containers/nettest:1.8",
"args": [
"-service=nettest",
"-delay-shutdown=10"

View File

@ -23,7 +23,7 @@
"containers": [
{
"name": "webserver",
"image": "gcr.io/google_containers/nettest:1.7",
"image": "gcr.io/google_containers/nettest:1.8",
"args": [
"-service=nettest",
"-delay-shutdown=10"

View File

@ -36,7 +36,6 @@ import (
"fmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
@ -215,6 +214,7 @@ func main() {
// Find all sibling pods in the service and post to their /write handler.
func contactOthers(state *State) {
const waitTimeout = 2 * time.Minute
defer state.doneContactingPeers()
client, err := client.NewInCluster()
if err != nil {
@ -227,32 +227,44 @@ func contactOthers(state *State) {
log.Printf("Server version: %#v\n", v)
}
for start := time.Now(); time.Since(start) < waitTimeout; time.Sleep(5 * time.Second) {
eps := getWebserverEndpoints(client)
if eps.Len() >= *peerCount {
break
}
state.Logf("%v/%v has %v endpoints, which is less than %v as expected. Waiting for all endpoints to come up.", *namespace, *service, len(eps), *peerCount)
}
// Do this repeatedly, in case there's some propagation delay with getting
// newly started pods into the endpoints list.
for i := 0; i < 15; i++ {
endpoints, err := client.Endpoints(*namespace).Get(*service)
if err != nil {
state.Logf("Unable to read the endpoints for %v/%v: %v; will try again.", *namespace, *service, err)
time.Sleep(time.Duration(1+rand.Intn(10)) * time.Second)
}
eps := sets.String{}
for _, ss := range endpoints.Subsets {
for _, a := range ss.Addresses {
for _, p := range ss.Ports {
eps.Insert(fmt.Sprintf("http://%s:%d", a.IP, p.Port))
}
}
}
eps := getWebserverEndpoints(client)
for ep := range eps {
state.Logf("Attempting to contact %s", ep)
contactSingle(ep, state)
}
time.Sleep(5 * time.Second)
}
}
//getWebserverEndpoints returns the webserver endpoints as a set of String, each in the format like "http://{ip}:{port}"
func getWebserverEndpoints(client *client.Client) sets.String {
endpoints, err := client.Endpoints(*namespace).Get(*service)
eps := sets.String{}
if err != nil {
state.Logf("Unable to read the endpoints for %v/%v: %v.", *namespace, *service, err)
return eps
}
for _, ss := range endpoints.Subsets {
for _, a := range ss.Addresses {
for _, p := range ss.Ports {
eps.Insert(fmt.Sprintf("http://%s:%d", a.IP, p.Port))
}
}
}
return eps
}
// contactSingle dials the address 'e' and tries to POST to its /write address.
func contactSingle(e string, state *State) {
body, err := json.Marshal(&WritePost{