From 18e47ba056594841bcbee7389a9eba59b0b95e17 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 21 Nov 2014 17:05:27 -0800 Subject: [PATCH] Add network testing pod --- contrib/for-tests/network-tester/Dockerfile | 19 ++ contrib/for-tests/network-tester/Makefile | 13 + contrib/for-tests/network-tester/rc.json | 29 ++ contrib/for-tests/network-tester/service.json | 14 + contrib/for-tests/network-tester/webserver.go | 256 ++++++++++++++++++ 5 files changed, 331 insertions(+) create mode 100644 contrib/for-tests/network-tester/Dockerfile create mode 100644 contrib/for-tests/network-tester/Makefile create mode 100644 contrib/for-tests/network-tester/rc.json create mode 100644 contrib/for-tests/network-tester/service.json create mode 100644 contrib/for-tests/network-tester/webserver.go diff --git a/contrib/for-tests/network-tester/Dockerfile b/contrib/for-tests/network-tester/Dockerfile new file mode 100644 index 00000000000..7955f5a463a --- /dev/null +++ b/contrib/for-tests/network-tester/Dockerfile @@ -0,0 +1,19 @@ +# Copyright 2014 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM scratch +MAINTAINER Daniel Smith +ADD webserver webserver +EXPOSE 8080 +ENTRYPOINT ["/webserver"] diff --git a/contrib/for-tests/network-tester/Makefile b/contrib/for-tests/network-tester/Makefile new file mode 100644 index 00000000000..d71bd381cf9 --- /dev/null +++ b/contrib/for-tests/network-tester/Makefile @@ -0,0 +1,13 @@ +all: push + +webserver: webserver.go + CGO_ENABLED=0 GOOS=linux go build -a -tags netgo -ldflags '-w' ./webserver.go + +container: webserver + sudo docker build -t kubernetes/nettest . + +push: container + sudo docker push kubernetes/nettest + +clean: + rm -f webserver diff --git a/contrib/for-tests/network-tester/rc.json b/contrib/for-tests/network-tester/rc.json new file mode 100644 index 00000000000..0a2712acfc2 --- /dev/null +++ b/contrib/for-tests/network-tester/rc.json @@ -0,0 +1,29 @@ +{ + "id": "nettestController", + "kind": "ReplicationController", + "apiVersion": "v1beta1", + "desiredState": { + "replicas": 8, + "replicaSelector": {"name": "nettest"}, + "podTemplate": { + "desiredState": { + "manifest": { + "version": "v1beta1", + "id": "nettest", + "containers": [{ + "name": "webserver", + "image": "kubernetes/nettest:latest", + "command": ["-service=nettest"], + "ports": [{ + "containerPort": 8080 + }] + }] + } + }, + "labels": { + "name": "nettest" + } + } + }, + "labels": {"name": "nettest"} +} diff --git a/contrib/for-tests/network-tester/service.json b/contrib/for-tests/network-tester/service.json new file mode 100644 index 00000000000..36b56331380 --- /dev/null +++ b/contrib/for-tests/network-tester/service.json @@ -0,0 +1,14 @@ +{ + "id": "nettest", + "kind": "Service", + "apiVersion": "v1beta1", + "port": 8080, + "containerPort": 8080, + "selector": { + "name": "nettest" + }, + "labels": { + "name": "nettest" + } +} + diff --git a/contrib/for-tests/network-tester/webserver.go b/contrib/for-tests/network-tester/webserver.go new file mode 100644 index 00000000000..eca05330cb5 --- /dev/null +++ b/contrib/for-tests/network-tester/webserver.go @@ -0,0 +1,256 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// A tiny web server for checking networking connectivity. +// +// Will dial out to, and expect to hear from, every pod that is a member of +// the service passed in the flag -service. +// +// Will serve a webserver on given -port. +// +// Visit /read to see the current state, or /quit to shut down. +// +// Visit /status to see pass/running/fail determination. (literally, it will +// return one of those words.) +// +// /write is used by other network test pods to register connectivity. +package main + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "log" + "math/rand" + "net/http" + "net/url" + "os" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" +) + +var ( + port = flag.Int("port", 8080, "Port number to serve at.") + peerCount = flag.Int("peers", 8, "Must find at least this many peers for the test to pass.") + service = flag.String("service", "nettest", "Service to find other network test pods in.") + namespace = flag.String("namespace", "default", "Namespace of this pod. TODO: kubernetes should make this discoverable.") +) + +// State tracks the internal state of our little http server. +// It's returned verbatim over the /read endpoint. +type State struct { + // Hostname is set once and never changed-- it's always safe to read. + Hostname string + + // The below fields require that lock is held before reading or writing. + Sent map[string]int + Received map[string]int + Errors []string + Log []string + StillContactingPeers bool + + lock sync.Mutex +} + +func (s *State) doneContactingPeers() { + s.lock.Lock() + defer s.lock.Unlock() + s.StillContactingPeers = false +} + +// serveStatus returns "pass", "running", or "fail". +func (s *State) serveStatus(w http.ResponseWriter, r *http.Request) { + s.lock.Lock() + defer s.lock.Unlock() + if len(s.Sent) >= *peerCount && len(s.Received) >= *peerCount { + fmt.Fprintf(w, "pass") + return + } + if s.StillContactingPeers { + fmt.Fprintf(w, "running") + return + } + fmt.Fprintf(w, "fail") +} + +// serveRead writes our json encoded state +func (s *State) serveRead(w http.ResponseWriter, r *http.Request) { + s.lock.Lock() + defer s.lock.Unlock() + w.WriteHeader(http.StatusOK) + b, err := json.MarshalIndent(s, "", "\t") + s.appendErr(err) + _, err = w.Write(b) + s.appendErr(err) +} + +// WritePost is the format that (json encoded) requests to the /write handler should take. +type WritePost struct { + Source string + Dest string +} + +// WriteResp is returned by /write +type WriteResp struct { + Hostname string +} + +// serveWrite records the contact in our state. +func (s *State) serveWrite(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + s.lock.Lock() + defer s.lock.Unlock() + w.WriteHeader(http.StatusOK) + var wp WritePost + s.appendErr(json.NewDecoder(r.Body).Decode(&wp)) + if wp.Source == "" { + s.appendErr(fmt.Errorf("%v: Got request with no source", s.Hostname)) + } else { + if s.Received == nil { + s.Received = map[string]int{} + } + s.Received[wp.Source] += 1 + } + s.appendErr(json.NewEncoder(w).Encode(&WriteResp{Hostname: s.Hostname})) +} + +// appendErr adds err to the list, if err is not nil. s must be locked. +func (s *State) appendErr(err error) { + if err != nil { + s.Errors = append(s.Errors, err.Error()) + } +} + +// Logf writes to the log message list. s must not be locked. +// s's Log member will drop an old message if it would otherwise +// become longer than 500 messages. +func (s *State) Logf(format string, args ...interface{}) { + s.lock.Lock() + defer s.lock.Unlock() + s.Log = append(s.Log, fmt.Sprintf(format, args...)) + if len(s.Log) > 500 { + s.Log = s.Log[1:] + } +} + +// s must not be locked +func (s *State) appendSuccessfulSend(toHostname string) { + s.lock.Lock() + defer s.lock.Unlock() + if s.Sent == nil { + s.Sent = map[string]int{} + } + s.Sent[toHostname] += 1 +} + +var ( + // Our one and only state object + state State +) + +func main() { + flag.Parse() + + if *service == "" { + log.Fatal("Must provide -service flag.") + } + + hostname, err := os.Hostname() + if err != nil { + log.Fatalf("Error getting hostname: %v", err) + } + + state := State{ + Hostname: hostname, + StillContactingPeers: true, + } + + go contactOthers(&state) + + http.HandleFunc("/quit", func(w http.ResponseWriter, r *http.Request) { + os.Exit(0) + }) + + http.HandleFunc("/read", state.serveRead) + http.HandleFunc("/write", state.serveWrite) + http.HandleFunc("/status", state.serveStatus) + + go log.Fatal(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", *port), nil)) + + select {} +} + +// Find all sibling pods in the service and post to their /write handler. +func contactOthers(state *State) { + defer state.doneContactingPeers() + masterRO := url.URL{ + Scheme: "http", + Host: os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"), + Path: "/api/v1beta1", + } + client := &client.Client{client.NewRESTClient(&masterRO, latest.Codec)} + + // Do this repeatedly, in case there's some propagation delay with getting + // newly started pods into the endpoints list. + for i := 0; i < 15; i++ { + endpoints, err := client.Endpoints(*namespace).Get(*service) + if err != nil { + state.Logf("Unable to read endpoints for %v/%v: %v; will try again.", *namespace, *service, err) + time.Sleep(time.Duration(1+rand.Intn(10)) * time.Second) + } + + for _, e := range endpoints.Endpoints { + contactSingle("http://"+e, state) + } + + time.Sleep(5 * time.Second) + } +} + +// contactSingle dials the address 'e' and tries to POST to its /write address. +func contactSingle(e string, state *State) { + body, err := json.Marshal(&WritePost{ + Dest: e, + Source: state.Hostname, + }) + if err != nil { + log.Fatalf("json marshal error: %v", err) + } + resp, err := http.Post(e+"/write", "application/json", bytes.NewReader(body)) + if err != nil { + state.Logf("Warning: unable to contact '%v': '%v'", e, err) + return + } + defer resp.Body.Close() + + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + state.Logf("Warning: unable to read response from '%v': '%v'", e, err) + return + } + var wr WriteResp + err = json.Unmarshal(body, &wr) + if err != nil { + state.Logf("Warning: unable to unmarshal response (%v) from '%v': '%v'", string(body), e, err) + return + } + state.appendSuccessfulSend(wr.Hostname) +}