mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-19 00:31:00 +00:00
commit
91865f3576
@ -20,7 +20,8 @@ import (
|
||||
"flag"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"runtime"
|
||||
"path/filepath"
|
||||
goruntime "runtime"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -29,6 +30,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
@ -36,6 +38,7 @@ import (
|
||||
var (
|
||||
authConfig = flag.String("auth_config", os.Getenv("HOME")+"/.kubernetes_auth", "Path to the auth info file.")
|
||||
host = flag.String("host", "", "The host to connect to")
|
||||
repoRoot = flag.String("repo_root", "./", "Root directory of kubernetes repository, for finding test files. Default assumes working directory is repository root")
|
||||
)
|
||||
|
||||
func waitForPodRunning(c *client.Client, id string) {
|
||||
@ -53,7 +56,12 @@ func waitForPodRunning(c *client.Client, id string) {
|
||||
}
|
||||
}
|
||||
|
||||
func loadObjectOrDie(filePath string) interface{} {
|
||||
// assetPath returns a path to the requested file; safe on all OSes.
|
||||
func assetPath(pathElements ...string) string {
|
||||
return filepath.Join(*repoRoot, filepath.Join(pathElements...))
|
||||
}
|
||||
|
||||
func loadObjectOrDie(filePath string) runtime.Object {
|
||||
data, err := ioutil.ReadFile(filePath)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to read pod: %v", err)
|
||||
@ -128,7 +136,7 @@ func TestKubernetesROService(c *client.Client) bool {
|
||||
func TestPodUpdate(c *client.Client) bool {
|
||||
podClient := c.Pods(api.NamespaceDefault)
|
||||
|
||||
pod := loadPodOrDie("./api/examples/pod.json")
|
||||
pod := loadPodOrDie(assetPath("api", "examples", "pod.json"))
|
||||
value := strconv.Itoa(time.Now().Nanosecond())
|
||||
pod.Labels["time"] = value
|
||||
|
||||
@ -208,7 +216,7 @@ func TestKubeletSendsEvent(c *client.Client) bool {
|
||||
|
||||
podClient := c.Pods(api.NamespaceDefault)
|
||||
|
||||
pod := loadPodOrDie("./cmd/e2e/pod.json")
|
||||
pod := loadPodOrDie(assetPath("cmd", "e2e", "pod.json"))
|
||||
value := strconv.Itoa(time.Now().Nanosecond())
|
||||
pod.Labels["time"] = value
|
||||
|
||||
@ -274,9 +282,78 @@ func TestKubeletSendsEvent(c *client.Client) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func TestNetwork(c *client.Client) bool {
|
||||
ns := api.NamespaceDefault
|
||||
svc, err := c.Services(ns).Create(loadObjectOrDie(assetPath(
|
||||
"contrib", "for-tests", "network-tester", "service.json",
|
||||
)).(*api.Service))
|
||||
if err != nil {
|
||||
glog.Errorf("unable to create test service: %v", err)
|
||||
return false
|
||||
}
|
||||
// Clean up service
|
||||
defer func() {
|
||||
if err = c.Services(ns).Delete(svc.Name); err != nil {
|
||||
glog.Errorf("unable to delete svc %v: %v", svc.Name, err)
|
||||
}
|
||||
}()
|
||||
|
||||
rc, err := c.ReplicationControllers(ns).Create(loadObjectOrDie(assetPath(
|
||||
"contrib", "for-tests", "network-tester", "rc.json",
|
||||
)).(*api.ReplicationController))
|
||||
if err != nil {
|
||||
glog.Errorf("unable to create test rc: %v", err)
|
||||
return false
|
||||
}
|
||||
// Clean up rc
|
||||
defer func() {
|
||||
rc.Spec.Replicas = 0
|
||||
rc, err = c.ReplicationControllers(ns).Update(rc)
|
||||
if err != nil {
|
||||
glog.Errorf("unable to modify replica count for rc %v: %v", rc.Name, err)
|
||||
return
|
||||
}
|
||||
if err = c.ReplicationControllers(ns).Delete(rc.Name); err != nil {
|
||||
glog.Errorf("unable to delete rc %v: %v", rc.Name, err)
|
||||
}
|
||||
}()
|
||||
|
||||
const maxAttempts = 60
|
||||
for i := 0; i < maxAttempts; i++ {
|
||||
time.Sleep(time.Second)
|
||||
body, err := c.Get().Path("proxy").Path("services").Path(svc.Name).Path("status").Do().Raw()
|
||||
if err != nil {
|
||||
glog.Infof("Attempt %v/%v: service/pod still starting. (error: '%v')", i, maxAttempts, err)
|
||||
continue
|
||||
}
|
||||
switch string(body) {
|
||||
case "pass":
|
||||
glog.Infof("Passed on attempt %v. Cleaning up.", i)
|
||||
return true
|
||||
case "running":
|
||||
glog.Infof("Attempt %v/%v: test still running", i, maxAttempts)
|
||||
case "fail":
|
||||
if body, err := c.Get().Path("proxy").Path("services").Path(svc.Name).Path("read").Do().Raw(); err != nil {
|
||||
glog.Infof("Failed on attempt %v. Cleaning up. Error reading details: %v", i, err)
|
||||
} else {
|
||||
glog.Infof("Failed on attempt %v. Cleaning up. Details:\n%v", i, string(body))
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if body, err := c.Get().Path("proxy").Path("services").Path(svc.Name).Path("read").Do().Raw(); err != nil {
|
||||
glog.Infof("Timed out. Cleaning up. Error reading details: %v", err)
|
||||
} else {
|
||||
glog.Infof("Timed out. Cleaning up. Details:\n%v", string(body))
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
goruntime.GOMAXPROCS(goruntime.NumCPU())
|
||||
util.ReallyCrash = true
|
||||
util.InitLogs()
|
||||
defer util.FlushLogs()
|
||||
@ -294,6 +371,7 @@ func main() {
|
||||
TestKubeletSendsEvent,
|
||||
TestImportantURLs,
|
||||
TestPodUpdate,
|
||||
TestNetwork,
|
||||
}
|
||||
|
||||
passed := true
|
||||
|
19
contrib/for-tests/network-tester/Dockerfile
Normal file
19
contrib/for-tests/network-tester/Dockerfile
Normal file
@ -0,0 +1,19 @@
|
||||
# Copyright 2014 Google Inc. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
FROM scratch
|
||||
MAINTAINER Daniel Smith <dbsmith@google.com>
|
||||
ADD webserver webserver
|
||||
EXPOSE 8080
|
||||
ENTRYPOINT ["/webserver"]
|
13
contrib/for-tests/network-tester/Makefile
Normal file
13
contrib/for-tests/network-tester/Makefile
Normal file
@ -0,0 +1,13 @@
|
||||
all: push
|
||||
|
||||
webserver: webserver.go
|
||||
CGO_ENABLED=0 GOOS=linux go build -a -tags netgo -ldflags '-w' ./webserver.go
|
||||
|
||||
container: webserver
|
||||
sudo docker build -t kubernetes/nettest .
|
||||
|
||||
push: container
|
||||
sudo docker push kubernetes/nettest
|
||||
|
||||
clean:
|
||||
rm -f webserver
|
29
contrib/for-tests/network-tester/rc.json
Normal file
29
contrib/for-tests/network-tester/rc.json
Normal file
@ -0,0 +1,29 @@
|
||||
{
|
||||
"id": "nettestController",
|
||||
"kind": "ReplicationController",
|
||||
"apiVersion": "v1beta1",
|
||||
"desiredState": {
|
||||
"replicas": 8,
|
||||
"replicaSelector": {"name": "nettest"},
|
||||
"podTemplate": {
|
||||
"desiredState": {
|
||||
"manifest": {
|
||||
"version": "v1beta1",
|
||||
"id": "nettest",
|
||||
"containers": [{
|
||||
"name": "webserver",
|
||||
"image": "kubernetes/nettest:latest",
|
||||
"command": ["-service=nettest"],
|
||||
"ports": [{
|
||||
"containerPort": 8080
|
||||
}]
|
||||
}]
|
||||
}
|
||||
},
|
||||
"labels": {
|
||||
"name": "nettest"
|
||||
}
|
||||
}
|
||||
},
|
||||
"labels": {"name": "nettest"}
|
||||
}
|
14
contrib/for-tests/network-tester/service.json
Normal file
14
contrib/for-tests/network-tester/service.json
Normal file
@ -0,0 +1,14 @@
|
||||
{
|
||||
"id": "nettest",
|
||||
"kind": "Service",
|
||||
"apiVersion": "v1beta1",
|
||||
"port": 8080,
|
||||
"containerPort": 8080,
|
||||
"selector": {
|
||||
"name": "nettest"
|
||||
},
|
||||
"labels": {
|
||||
"name": "nettest"
|
||||
}
|
||||
}
|
||||
|
256
contrib/for-tests/network-tester/webserver.go
Normal file
256
contrib/for-tests/network-tester/webserver.go
Normal file
@ -0,0 +1,256 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// A tiny web server for checking networking connectivity.
|
||||
//
|
||||
// Will dial out to, and expect to hear from, every pod that is a member of
|
||||
// the service passed in the flag -service.
|
||||
//
|
||||
// Will serve a webserver on given -port.
|
||||
//
|
||||
// Visit /read to see the current state, or /quit to shut down.
|
||||
//
|
||||
// Visit /status to see pass/running/fail determination. (literally, it will
|
||||
// return one of those words.)
|
||||
//
|
||||
// /write is used by other network test pods to register connectivity.
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
)
|
||||
|
||||
var (
|
||||
port = flag.Int("port", 8080, "Port number to serve at.")
|
||||
peerCount = flag.Int("peers", 8, "Must find at least this many peers for the test to pass.")
|
||||
service = flag.String("service", "nettest", "Service to find other network test pods in.")
|
||||
namespace = flag.String("namespace", "default", "Namespace of this pod. TODO: kubernetes should make this discoverable.")
|
||||
)
|
||||
|
||||
// State tracks the internal state of our little http server.
|
||||
// It's returned verbatim over the /read endpoint.
|
||||
type State struct {
|
||||
// Hostname is set once and never changed-- it's always safe to read.
|
||||
Hostname string
|
||||
|
||||
// The below fields require that lock is held before reading or writing.
|
||||
Sent map[string]int
|
||||
Received map[string]int
|
||||
Errors []string
|
||||
Log []string
|
||||
StillContactingPeers bool
|
||||
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func (s *State) doneContactingPeers() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
s.StillContactingPeers = false
|
||||
}
|
||||
|
||||
// serveStatus returns "pass", "running", or "fail".
|
||||
func (s *State) serveStatus(w http.ResponseWriter, r *http.Request) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if len(s.Sent) >= *peerCount && len(s.Received) >= *peerCount {
|
||||
fmt.Fprintf(w, "pass")
|
||||
return
|
||||
}
|
||||
if s.StillContactingPeers {
|
||||
fmt.Fprintf(w, "running")
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "fail")
|
||||
}
|
||||
|
||||
// serveRead writes our json encoded state
|
||||
func (s *State) serveRead(w http.ResponseWriter, r *http.Request) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
b, err := json.MarshalIndent(s, "", "\t")
|
||||
s.appendErr(err)
|
||||
_, err = w.Write(b)
|
||||
s.appendErr(err)
|
||||
}
|
||||
|
||||
// WritePost is the format that (json encoded) requests to the /write handler should take.
|
||||
type WritePost struct {
|
||||
Source string
|
||||
Dest string
|
||||
}
|
||||
|
||||
// WriteResp is returned by /write
|
||||
type WriteResp struct {
|
||||
Hostname string
|
||||
}
|
||||
|
||||
// serveWrite records the contact in our state.
|
||||
func (s *State) serveWrite(w http.ResponseWriter, r *http.Request) {
|
||||
defer r.Body.Close()
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
var wp WritePost
|
||||
s.appendErr(json.NewDecoder(r.Body).Decode(&wp))
|
||||
if wp.Source == "" {
|
||||
s.appendErr(fmt.Errorf("%v: Got request with no source", s.Hostname))
|
||||
} else {
|
||||
if s.Received == nil {
|
||||
s.Received = map[string]int{}
|
||||
}
|
||||
s.Received[wp.Source] += 1
|
||||
}
|
||||
s.appendErr(json.NewEncoder(w).Encode(&WriteResp{Hostname: s.Hostname}))
|
||||
}
|
||||
|
||||
// appendErr adds err to the list, if err is not nil. s must be locked.
|
||||
func (s *State) appendErr(err error) {
|
||||
if err != nil {
|
||||
s.Errors = append(s.Errors, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Logf writes to the log message list. s must not be locked.
|
||||
// s's Log member will drop an old message if it would otherwise
|
||||
// become longer than 500 messages.
|
||||
func (s *State) Logf(format string, args ...interface{}) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
s.Log = append(s.Log, fmt.Sprintf(format, args...))
|
||||
if len(s.Log) > 500 {
|
||||
s.Log = s.Log[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// s must not be locked
|
||||
func (s *State) appendSuccessfulSend(toHostname string) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if s.Sent == nil {
|
||||
s.Sent = map[string]int{}
|
||||
}
|
||||
s.Sent[toHostname] += 1
|
||||
}
|
||||
|
||||
var (
|
||||
// Our one and only state object
|
||||
state State
|
||||
)
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
if *service == "" {
|
||||
log.Fatal("Must provide -service flag.")
|
||||
}
|
||||
|
||||
hostname, err := os.Hostname()
|
||||
if err != nil {
|
||||
log.Fatalf("Error getting hostname: %v", err)
|
||||
}
|
||||
|
||||
state := State{
|
||||
Hostname: hostname,
|
||||
StillContactingPeers: true,
|
||||
}
|
||||
|
||||
go contactOthers(&state)
|
||||
|
||||
http.HandleFunc("/quit", func(w http.ResponseWriter, r *http.Request) {
|
||||
os.Exit(0)
|
||||
})
|
||||
|
||||
http.HandleFunc("/read", state.serveRead)
|
||||
http.HandleFunc("/write", state.serveWrite)
|
||||
http.HandleFunc("/status", state.serveStatus)
|
||||
|
||||
go log.Fatal(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", *port), nil))
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
// Find all sibling pods in the service and post to their /write handler.
|
||||
func contactOthers(state *State) {
|
||||
defer state.doneContactingPeers()
|
||||
masterRO := url.URL{
|
||||
Scheme: "http",
|
||||
Host: os.Getenv("KUBERNETES_RO_SERVICE_HOST") + ":" + os.Getenv("KUBERNETES_RO_SERVICE_PORT"),
|
||||
Path: "/api/v1beta1",
|
||||
}
|
||||
client := &client.Client{client.NewRESTClient(&masterRO, latest.Codec)}
|
||||
|
||||
// Do this repeatedly, in case there's some propagation delay with getting
|
||||
// newly started pods into the endpoints list.
|
||||
for i := 0; i < 15; i++ {
|
||||
endpoints, err := client.Endpoints(*namespace).Get(*service)
|
||||
if err != nil {
|
||||
state.Logf("Unable to read endpoints for %v/%v: %v; will try again.", *namespace, *service, err)
|
||||
time.Sleep(time.Duration(1+rand.Intn(10)) * time.Second)
|
||||
}
|
||||
|
||||
for _, e := range endpoints.Endpoints {
|
||||
contactSingle("http://"+e, state)
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// contactSingle dials the address 'e' and tries to POST to its /write address.
|
||||
func contactSingle(e string, state *State) {
|
||||
body, err := json.Marshal(&WritePost{
|
||||
Dest: e,
|
||||
Source: state.Hostname,
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("json marshal error: %v", err)
|
||||
}
|
||||
resp, err := http.Post(e+"/write", "application/json", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
state.Logf("Warning: unable to contact '%v': '%v'", e, err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err = ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
state.Logf("Warning: unable to read response from '%v': '%v'", e, err)
|
||||
return
|
||||
}
|
||||
var wr WriteResp
|
||||
err = json.Unmarshal(body, &wr)
|
||||
if err != nil {
|
||||
state.Logf("Warning: unable to unmarshal response (%v) from '%v': '%v'", string(body), e, err)
|
||||
return
|
||||
}
|
||||
state.appendSuccessfulSend(wr.Hostname)
|
||||
}
|
Loading…
Reference in New Issue
Block a user