Issue queries in parallel

This commit is contained in:
Satnam Singh 2015-03-28 12:28:48 -07:00
parent 6bdb3bf4fe
commit 7049bb6b55
2 changed files with 61 additions and 25 deletions

View File

@ -1,2 +1,5 @@
all: all:
go build ./serve_hostnames.go go build serve_hostnames.go
clean:
rm -rf serve_hostnames

View File

@ -40,14 +40,16 @@ import (
) )
var ( var (
queriesAverage = flag.Int("queries", 10, "Number of hostname queries to make in each iteration per pod on average") queriesAverage = flag.Int("queries", 20, "Number of hostname queries to make in each iteration per pod on average")
podsPerNode = flag.Int("pods_per_node", 1, "Number of serve_hostname pods per node") podsPerNode = flag.Int("pods_per_node", 1, "Number of serve_hostname pods per node")
upTo = flag.Int("up_to", 1, "Number of iterations or -1 for no limit") upTo = flag.Int("up_to", 1, "Number of iterations or -1 for no limit")
maxPar = flag.Int("max_par", 500, "Maximum number of queries in flight")
) )
const ( const (
deleteTimeout = 2 * time.Minute deleteTimeout = 2 * time.Minute
endpointTimeout = 5 * time.Minute endpointTimeout = 5 * time.Minute
nodeListTimeout = 2 * time.Minute
podCreateTimeout = 2 * time.Minute podCreateTimeout = 2 * time.Minute
podStartTimeout = 30 * time.Minute podStartTimeout = 30 * time.Minute
serviceCreateTimeout = 2 * time.Minute serviceCreateTimeout = 2 * time.Minute
@ -73,16 +75,23 @@ func main() {
glog.Fatalf("Failed to make client: %v", err) glog.Fatalf("Failed to make client: %v", err)
} }
nodes, err := c.Nodes().List() var nodes *api.NodeList
for start := time.Now(); time.Since(start) < nodeListTimeout; time.Sleep(2 * time.Second) {
nodes, err = c.Nodes().List()
if err == nil {
break
}
glog.Warningf("Failed to list nodes: %v", err)
}
if err != nil { if err != nil {
glog.Fatalf("Failed to list nodes: %v", err) glog.Fatalf("Giving up trying to list nodes: %v", err)
} }
if len(nodes.Items) == 0 { if len(nodes.Items) == 0 {
glog.Fatalf("Failed to find any nodes.") glog.Fatalf("Failed to find any nodes.")
} }
glog.Infof("Nodes found on this cluster:") glog.Infof("Found %d nodes on this cluster:", len(nodes.Items))
for i, node := range nodes.Items { for i, node := range nodes.Items {
glog.Infof("%d: %s", i, node.Name) glog.Infof("%d: %s", i, node.Name)
} }
@ -130,7 +139,7 @@ func main() {
} }
// Clean up service // Clean up service
defer func() { defer func() {
glog.Infof("Cleaning up service %s/server-hostnames", ns) glog.Infof("Cleaning up service %s/serve-hostnames", ns)
// Make several attempts to delete the service. // Make several attempts to delete the service.
for start := time.Now(); time.Since(start) < deleteTimeout; time.Sleep(1 * time.Second) { for start := time.Now(); time.Since(start) < deleteTimeout; time.Sleep(1 * time.Second) {
if err := c.Services(ns).Delete(svc.Name); err == nil { if err := c.Services(ns).Delete(svc.Name); err == nil {
@ -209,10 +218,10 @@ func main() {
} }
if pod.Status.Phase != api.PodRunning { if pod.Status.Phase != api.PodRunning {
glog.Warningf("Gave up waiting on pod %s/%s to be running (saw %v)", ns, podName, pod.Status.Phase) glog.Warningf("Gave up waiting on pod %s/%s to be running (saw %v)", ns, podName, pod.Status.Phase)
return } else {
}
glog.Infof("%s/%s is running", ns, podName) glog.Infof("%s/%s is running", ns, podName)
} }
}
// Wait for the endpoints to propagate. // Wait for the endpoints to propagate.
for start := time.Now(); time.Since(start) < endpointTimeout; time.Sleep(10 * time.Second) { for start := time.Now(); time.Since(start) < endpointTimeout; time.Sleep(10 * time.Second) {
@ -239,9 +248,14 @@ func main() {
// Repeatedly make requests. // Repeatedly make requests.
for iteration := 0; iteration != *upTo; iteration++ { for iteration := 0; iteration != *upTo; iteration++ {
responses := make(map[string]int, *podsPerNode*len(nodes.Items)) responseChan := make(chan string, queries)
// Use a channel of size *maxPar to throttle the number
// of in-flight requests to avoid overloading the service.
inFlight := make(chan struct{}, *maxPar)
start := time.Now() start := time.Now()
for q := 0; q < queries; q++ { for q := 0; q < queries; q++ {
go func() {
inFlight <- struct{}{}
t := time.Now() t := time.Now()
hostname, err := c.Get(). hostname, err := c.Get().
Namespace(ns). Namespace(ns).
@ -250,15 +264,34 @@ func main() {
Name("serve-hostnames"). Name("serve-hostnames").
DoRaw() DoRaw()
glog.V(4).Infof("Proxy call in namespace %s took %v", ns, time.Since(t)) glog.V(4).Infof("Proxy call in namespace %s took %v", ns, time.Since(t))
i := iteration
query := q
if err != nil { if err != nil {
glog.Infof("Call failed during iteration %d query %d : %v", iteration, q, err) glog.Warningf("Call failed during iteration %d query %d : %v", i, query, err)
// If the query failed return a string which starts with a character
// that can't be part of a hostname.
responseChan <- fmt.Sprintf("!failed in iteration %d to issue query %d: %v", i, query, err)
} else { } else {
responses[string(hostname)]++ responseChan <- string(hostname)
} }
<-inFlight
}()
} }
for k, v := range responses { responses := make(map[string]int, *podsPerNode*len(nodes.Items))
glog.V(4).Infof("%s: %d ", k, v) missing := 0
for q := 0; q < queries; q++ {
r := <-responseChan
glog.V(4).Infof("Got response from %s", r)
responses[r]++
// If the returned hostname starts with '!' then it indicates
// an error response.
if len(r) > 0 && r[0] == '!' {
glog.V(3).Infof("Got response %s", r)
missing++
}
}
if missing > 0 {
glog.Warningf("Missing %d responses out of %d", missing, queries)
} }
// Report any nodes that did not respond. // Report any nodes that did not respond.
for n, node := range nodes.Items { for n, node := range nodes.Items {