From 7049bb6b55dfb3cf197cea031b4e59699dc48a8f Mon Sep 17 00:00:00 2001 From: Satnam Singh Date: Sat, 28 Mar 2015 12:28:48 -0700 Subject: [PATCH] Issue queries in parallel --- test/soak/serve_hostnames/Makefile | 5 +- test/soak/serve_hostnames/serve_hostnames.go | 81 ++++++++++++++------ 2 files changed, 61 insertions(+), 25 deletions(-) diff --git a/test/soak/serve_hostnames/Makefile b/test/soak/serve_hostnames/Makefile index c05c9c9f062..9458e3d0982 100644 --- a/test/soak/serve_hostnames/Makefile +++ b/test/soak/serve_hostnames/Makefile @@ -1,2 +1,5 @@ all: - go build ./serve_hostnames.go + go build serve_hostnames.go + +clean: + rm -rf serve_hostnames diff --git a/test/soak/serve_hostnames/serve_hostnames.go b/test/soak/serve_hostnames/serve_hostnames.go index 3e08dd044b7..048c2a04e6d 100644 --- a/test/soak/serve_hostnames/serve_hostnames.go +++ b/test/soak/serve_hostnames/serve_hostnames.go @@ -40,14 +40,16 @@ import ( ) var ( - queriesAverage = flag.Int("queries", 10, "Number of hostname queries to make in each iteration per pod on average") + queriesAverage = flag.Int("queries", 20, "Number of hostname queries to make in each iteration per pod on average") podsPerNode = flag.Int("pods_per_node", 1, "Number of serve_hostname pods per node") upTo = flag.Int("up_to", 1, "Number of iterations or -1 for no limit") + maxPar = flag.Int("max_par", 500, "Maximum number of queries in flight") ) const ( deleteTimeout = 2 * time.Minute endpointTimeout = 5 * time.Minute + nodeListTimeout = 2 * time.Minute podCreateTimeout = 2 * time.Minute podStartTimeout = 30 * time.Minute serviceCreateTimeout = 2 * time.Minute @@ -73,16 +75,23 @@ func main() { glog.Fatalf("Failed to make client: %v", err) } - nodes, err := c.Nodes().List() + var nodes *api.NodeList + for start := time.Now(); time.Since(start) < nodeListTimeout; time.Sleep(2 * time.Second) { + nodes, err = c.Nodes().List() + if err == nil { + break + } + glog.Warningf("Failed to list nodes: %v", err) + } if err != nil { - glog.Fatalf("Failed to list nodes: %v", err) + glog.Fatalf("Giving up trying to list nodes: %v", err) } if len(nodes.Items) == 0 { glog.Fatalf("Failed to find any nodes.") } - glog.Infof("Nodes found on this cluster:") + glog.Infof("Found %d nodes on this cluster:", len(nodes.Items)) for i, node := range nodes.Items { glog.Infof("%d: %s", i, node.Name) } @@ -130,7 +139,7 @@ func main() { } // Clean up service defer func() { - glog.Infof("Cleaning up service %s/server-hostnames", ns) + glog.Infof("Cleaning up service %s/serve-hostnames", ns) // Make several attempts to delete the service. for start := time.Now(); time.Since(start) < deleteTimeout; time.Sleep(1 * time.Second) { if err := c.Services(ns).Delete(svc.Name); err == nil { @@ -209,9 +218,9 @@ func main() { } if pod.Status.Phase != api.PodRunning { glog.Warningf("Gave up waiting on pod %s/%s to be running (saw %v)", ns, podName, pod.Status.Phase) - return + } else { + glog.Infof("%s/%s is running", ns, podName) } - glog.Infof("%s/%s is running", ns, podName) } // Wait for the endpoints to propagate. @@ -239,26 +248,50 @@ func main() { // Repeatedly make requests. for iteration := 0; iteration != *upTo; iteration++ { - responses := make(map[string]int, *podsPerNode*len(nodes.Items)) + responseChan := make(chan string, queries) + // Use a channel of size *maxPar to throttle the number + // of in-flight requests to avoid overloading the service. + inFlight := make(chan struct{}, *maxPar) start := time.Now() for q := 0; q < queries; q++ { - t := time.Now() - hostname, err := c.Get(). - Namespace(ns). - Prefix("proxy"). - Resource("services"). - Name("serve-hostnames"). - DoRaw() - glog.V(4).Infof("Proxy call in namespace %s took %v", ns, time.Since(t)) - if err != nil { - glog.Infof("Call failed during iteration %d query %d : %v", iteration, q, err) - } else { - responses[string(hostname)]++ - } - + go func() { + inFlight <- struct{}{} + t := time.Now() + hostname, err := c.Get(). + Namespace(ns). + Prefix("proxy"). + Resource("services"). + Name("serve-hostnames"). + DoRaw() + glog.V(4).Infof("Proxy call in namespace %s took %v", ns, time.Since(t)) + i := iteration + query := q + if err != nil { + glog.Warningf("Call failed during iteration %d query %d : %v", i, query, err) + // If the query failed return a string which starts with a character + // that can't be part of a hostname. + responseChan <- fmt.Sprintf("!failed in iteration %d to issue query %d: %v", i, query, err) + } else { + responseChan <- string(hostname) + } + <-inFlight + }() } - for k, v := range responses { - glog.V(4).Infof("%s: %d ", k, v) + responses := make(map[string]int, *podsPerNode*len(nodes.Items)) + missing := 0 + for q := 0; q < queries; q++ { + r := <-responseChan + glog.V(4).Infof("Got response from %s", r) + responses[r]++ + // If the returned hostname starts with '!' then it indicates + // an error response. + if len(r) > 0 && r[0] == '!' { + glog.V(3).Infof("Got response %s", r) + missing++ + } + } + if missing > 0 { + glog.Warningf("Missing %d responses out of %d", missing, queries) } // Report any nodes that did not respond. for n, node := range nodes.Items {