From 18e47ba056594841bcbee7389a9eba59b0b95e17 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 21 Nov 2014 17:05:27 -0800 Subject: [PATCH 1/2] Add network testing pod --- contrib/for-tests/network-tester/Dockerfile | 19 ++ contrib/for-tests/network-tester/Makefile | 13 + contrib/for-tests/network-tester/rc.json | 29 ++ contrib/for-tests/network-tester/service.json | 14 + contrib/for-tests/network-tester/webserver.go | 256 ++++++++++++++++++ 5 files changed, 331 insertions(+) create mode 100644 contrib/for-tests/network-tester/Dockerfile create mode 100644 contrib/for-tests/network-tester/Makefile create mode 100644 contrib/for-tests/network-tester/rc.json create mode 100644 contrib/for-tests/network-tester/service.json create mode 100644 contrib/for-tests/network-tester/webserver.go diff --git a/contrib/for-tests/network-tester/Dockerfile b/contrib/for-tests/network-tester/Dockerfile new file mode 100644 index 00000000000..7955f5a463a --- /dev/null +++ b/contrib/for-tests/network-tester/Dockerfile @@ -0,0 +1,19 @@ +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM scratch +MAINTAINER Daniel Smith +ADD webserver webserver +EXPOSE 8080 +ENTRYPOINT ["/webserver"] diff --git a/contrib/for-tests/network-tester/Makefile b/contrib/for-tests/network-tester/Makefile new file mode 100644 index 00000000000..d71bd381cf9 --- /dev/null +++ b/contrib/for-tests/network-tester/Makefile @@ -0,0 +1,13 @@ +all: push + +webserver: webserver.go + CGO_ENABLED=0 GOOS=linux go build -a -tags netgo -ldflags '-w' ./webserver.go + +container: webserver + sudo docker build -t kubernetes/nettest . + +push: container + sudo docker push kubernetes/nettest + +clean: + rm -f webserver diff --git a/contrib/for-tests/network-tester/rc.json b/contrib/for-tests/network-tester/rc.json new file mode 100644 index 00000000000..0a2712acfc2 --- /dev/null +++ b/contrib/for-tests/network-tester/rc.json @@ -0,0 +1,29 @@ +{ + "id": "nettestController", + "kind": "ReplicationController", + "apiVersion": "v1beta1", + "desiredState": { + "replicas": 8, + "replicaSelector": {"name": "nettest"}, + "podTemplate": { + "desiredState": { + "manifest": { + "version": "v1beta1", + "id": "nettest", + "containers": [{ + "name": "webserver", + "image": "kubernetes/nettest:latest", + "command": ["-service=nettest"], + "ports": [{ + "containerPort": 8080 + }] + }] + } + }, + "labels": { + "name": "nettest" + } + } + }, + "labels": {"name": "nettest"} +} diff --git a/contrib/for-tests/network-tester/service.json b/contrib/for-tests/network-tester/service.json new file mode 100644 index 00000000000..36b56331380 --- /dev/null +++ b/contrib/for-tests/network-tester/service.json @@ -0,0 +1,14 @@ +{ + "id": "nettest", + "kind": "Service", + "apiVersion": "v1beta1", + "port": 8080, + "containerPort": 8080, + "selector": { + "name": "nettest" + }, + "labels": { + "name": "nettest" + } +} + diff --git a/contrib/for-tests/network-tester/webserver.go b/contrib/for-tests/network-tester/webserver.go new file mode 100644 index 00000000000..eca05330cb5 --- /dev/null +++ b/contrib/for-tests/network-tester/webserver.go @@ -0,0 +1,256 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// A tiny web server for checking networking connectivity. +// +// Will dial out to, and expect to hear from, every pod that is a member of +// the service passed in the flag -service. +// +// Will serve a webserver on given -port. +// +// Visit /read to see the current state, or /quit to shut down. +// +// Visit /status to see pass/running/fail determination. (literally, it will +// return one of those words.) +// +// /write is used by other network test pods to register connectivity. +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "log" + "math/rand" + "net/http" + "net/url" + "os" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" +) + +var ( + port = flag.Int("port", 8080, "Port number to serve at.") + peerCount = flag.Int("peers", 8, "Must find at least this many peers for the test to pass.") + service = flag.String("service", "nettest", "Service to find other network test pods in.") + namespace = flag.String("namespace", "default", "Namespace of this pod. TODO: kubernetes should make this discoverable.") +) + +// State tracks the internal state of our little http server. +// It's returned verbatim over the /read endpoint. +type State struct { + // Hostname is set once and never changed-- it's always safe to read. + Hostname string + + // The below fields require that lock is held before reading or writing. + Sent map[string]int + Received map[string]int + Errors []string + Log []string + StillContactingPeers bool + + lock sync.Mutex +} + +func (s *State) doneContactingPeers() { + s.lock.Lock() + defer s.lock.Unlock() + s.StillContactingPeers = false +} + +// serveStatus returns "pass", "running", or "fail". +func (s *State) serveStatus(w http.ResponseWriter, r *http.Request) { + s.lock.Lock() + defer s.lock.Unlock() + if len(s.Sent) >= *peerCount && len(s.Received) >= *peerCount { + fmt.Fprintf(w, "pass") + return + } + if s.StillContactingPeers { + fmt.Fprintf(w, "running") + return + } + fmt.Fprintf(w, "fail") +} + +// serveRead writes our json encoded state +func (s *State) serveRead(w http.ResponseWriter, r *http.Request) { + s.lock.Lock() + defer s.lock.Unlock() + w.WriteHeader(http.StatusOK) + b, err := json.MarshalIndent(s, "", "\t") + s.appendErr(err) + _, err = w.Write(b) + s.appendErr(err) +} + +// WritePost is the format that (json encoded) requests to the /write handler should take. +type WritePost struct { + Source string + Dest string +} + +// WriteResp is returned by /write +type WriteResp struct { + Hostname string +} + +// serveWrite records the contact in our state. +func (s *State) serveWrite(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + s.lock.Lock() + defer s.lock.Unlock() + w.WriteHeader(http.StatusOK) + var wp WritePost + s.appendErr(json.NewDecoder(r.Body).Decode(&wp)) + if wp.Source == "" { + s.appendErr(fmt.Errorf("%v: Got request with no source", s.Hostname)) + } else { + if s.Received == nil { + s.Received = map[string]int{} + } + s.Received[wp.Source] += 1 + } + s.appendErr(json.NewEncoder(w).Encode(&WriteResp{Hostname: s.Hostname})) +} + +// appendErr adds err to the list, if err is not nil. s must be locked. +func (s *State) appendErr(err error) { + if err != nil { + s.Errors = append(s.Errors, err.Error()) + } +} + +// Logf writes to the log message list. s must not be locked. +// s's Log member will drop an old message if it would otherwise +// become longer than 500 messages. +func (s *State) Logf(format string, args ...interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + s.Log = append(s.Log, fmt.Sprintf(format, args...)) + if len(s.Log) > 500 { + s.Log = s.Log[1:] + } +} + +// s must not be locked +func (s *State) appendSuccessfulSend(toHostname string) { + s.lock.Lock() + defer s.lock.Unlock() + if s.Sent == nil { + s.Sent = map[string]int{} + } + s.Sent[toHostname] += 1 +} + +var ( + // Our one and only state object + state State +) + +func main() { + flag.Parse() + + if *service == "" { + log.Fatal("Must provide -service flag.") + } + + hostname, err := os.Hostname() + if err != nil { + log.Fatalf("Error getting hostname: %v", err) + } + + state := State{ + Hostname: hostname, + StillContactingPeers: true, + } + + go contactOthers(&state) + + http.HandleFunc("/quit", func(w http.ResponseWriter, r *http.Request) { + os.Exit(0) + }) + + http.HandleFunc("/read", state.serveRead) + http.HandleFunc("/write", state.serveWrite) + http.HandleFunc("/status", state.serveStatus) + + go log.Fatal(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", *port), nil)) + + select {} +} + +// Find all sibling pods in the service and post to their /write handler. +func contactOthers(state *State) { + defer state.doneContactingPeers() + masterRO := url.URL{ + Scheme: "http", + Host: os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), + Path: "/api/v1beta1", + } + client := &client.Client{client.NewRESTClient(&masterRO, latest.Codec)} + + // Do this repeatedly, in case there's some propagation delay with getting + // newly started pods into the endpoints list. + for i := 0; i < 15; i++ { + endpoints, err := client.Endpoints(*namespace).Get(*service) + if err != nil { + state.Logf("Unable to read endpoints for %v/%v: %v; will try again.", *namespace, *service, err) + time.Sleep(time.Duration(1+rand.Intn(10)) * time.Second) + } + + for _, e := range endpoints.Endpoints { + contactSingle("http://"+e, state) + } + + time.Sleep(5 * time.Second) + } +} + +// contactSingle dials the address 'e' and tries to POST to its /write address. +func contactSingle(e string, state *State) { + body, err := json.Marshal(&WritePost{ + Dest: e, + Source: state.Hostname, + }) + if err != nil { + log.Fatalf("json marshal error: %v", err) + } + resp, err := http.Post(e+"/write", "application/json", bytes.NewReader(body)) + if err != nil { + state.Logf("Warning: unable to contact '%v': '%v'", e, err) + return + } + defer resp.Body.Close() + + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + state.Logf("Warning: unable to read response from '%v': '%v'", e, err) + return + } + var wr WriteResp + err = json.Unmarshal(body, &wr) + if err != nil { + state.Logf("Warning: unable to unmarshal response (%v) from '%v': '%v'", string(body), e, err) + return + } + state.appendSuccessfulSend(wr.Hostname) +} From 9c3f6a402da932b318ed57afd0bf317f791b940f Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 9 Dec 2014 13:10:59 -0800 Subject: [PATCH 2/2] add network test to e2e.go --- cmd/e2e/e2e.go | 88 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 83 insertions(+), 5 deletions(-) diff --git a/cmd/e2e/e2e.go b/cmd/e2e/e2e.go index 4b4dad7663f..d1ccb91ac07 100644 --- a/cmd/e2e/e2e.go +++ b/cmd/e2e/e2e.go @@ -20,7 +20,8 @@ import ( "flag" "io/ioutil" "os" - "runtime" + "path/filepath" + goruntime "runtime" "strconv" "time" @@ -29,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) @@ -36,6 +38,7 @@ import ( var ( authConfig = flag.String("auth_config", os.Getenv("HOME")+"/.kubernetes_auth", "Path to the auth info file.") host = flag.String("host", "", "The host to connect to") + repoRoot = flag.String("repo_root", "./", "Root directory of kubernetes repository, for finding test files. Default assumes working directory is repository root") ) func waitForPodRunning(c *client.Client, id string) { @@ -53,7 +56,12 @@ func waitForPodRunning(c *client.Client, id string) { } } -func loadObjectOrDie(filePath string) interface{} { +// assetPath returns a path to the requested file; safe on all OSes. +func assetPath(pathElements ...string) string { + return filepath.Join(*repoRoot, filepath.Join(pathElements...)) +} + +func loadObjectOrDie(filePath string) runtime.Object { data, err := ioutil.ReadFile(filePath) if err != nil { glog.Fatalf("Failed to read pod: %v", err) @@ -128,7 +136,7 @@ func TestKubernetesROService(c *client.Client) bool { func TestPodUpdate(c *client.Client) bool { podClient := c.Pods(api.NamespaceDefault) - pod := loadPodOrDie("./api/examples/pod.json") + pod := loadPodOrDie(assetPath("api", "examples", "pod.json")) value := strconv.Itoa(time.Now().Nanosecond()) pod.Labels["time"] = value @@ -208,7 +216,7 @@ func TestKubeletSendsEvent(c *client.Client) bool { podClient := c.Pods(api.NamespaceDefault) - pod := loadPodOrDie("./cmd/e2e/pod.json") + pod := loadPodOrDie(assetPath("cmd", "e2e", "pod.json")) value := strconv.Itoa(time.Now().Nanosecond()) pod.Labels["time"] = value @@ -274,9 +282,78 @@ func TestKubeletSendsEvent(c *client.Client) bool { return true } +func TestNetwork(c *client.Client) bool { + ns := api.NamespaceDefault + svc, err := c.Services(ns).Create(loadObjectOrDie(assetPath( + "contrib", "for-tests", "network-tester", "service.json", + )).(*api.Service)) + if err != nil { + glog.Errorf("unable to create test service: %v", err) + return false + } + // Clean up service + defer func() { + if err = c.Services(ns).Delete(svc.Name); err != nil { + glog.Errorf("unable to delete svc %v: %v", svc.Name, err) + } + }() + + rc, err := c.ReplicationControllers(ns).Create(loadObjectOrDie(assetPath( + "contrib", "for-tests", "network-tester", "rc.json", + )).(*api.ReplicationController)) + if err != nil { + glog.Errorf("unable to create test rc: %v", err) + return false + } + // Clean up rc + defer func() { + rc.Spec.Replicas = 0 + rc, err = c.ReplicationControllers(ns).Update(rc) + if err != nil { + glog.Errorf("unable to modify replica count for rc %v: %v", rc.Name, err) + return + } + if err = c.ReplicationControllers(ns).Delete(rc.Name); err != nil { + glog.Errorf("unable to delete rc %v: %v", rc.Name, err) + } + }() + + const maxAttempts = 60 + for i := 0; i < maxAttempts; i++ { + time.Sleep(time.Second) + body, err := c.Get().Path("proxy").Path("services").Path(svc.Name).Path("status").Do().Raw() + if err != nil { + glog.Infof("Attempt %v/%v: service/pod still starting. (error: '%v')", i, maxAttempts, err) + continue + } + switch string(body) { + case "pass": + glog.Infof("Passed on attempt %v. Cleaning up.", i) + return true + case "running": + glog.Infof("Attempt %v/%v: test still running", i, maxAttempts) + case "fail": + if body, err := c.Get().Path("proxy").Path("services").Path(svc.Name).Path("read").Do().Raw(); err != nil { + glog.Infof("Failed on attempt %v. Cleaning up. Error reading details: %v", i, err) + } else { + glog.Infof("Failed on attempt %v. Cleaning up. Details:\n%v", i, string(body)) + } + return false + } + } + + if body, err := c.Get().Path("proxy").Path("services").Path(svc.Name).Path("read").Do().Raw(); err != nil { + glog.Infof("Timed out. Cleaning up. Error reading details: %v", err) + } else { + glog.Infof("Timed out. Cleaning up. Details:\n%v", string(body)) + } + + return false +} + func main() { flag.Parse() - runtime.GOMAXPROCS(runtime.NumCPU()) + goruntime.GOMAXPROCS(goruntime.NumCPU()) util.ReallyCrash = true util.InitLogs() defer util.FlushLogs() @@ -294,6 +371,7 @@ func main() { TestKubeletSendsEvent, TestImportantURLs, TestPodUpdate, + TestNetwork, } passed := true