Merge pull request #18672 from bprashanth/netexec

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-03-08 06:10:02 -08:00
commit 999e6311b5
6 changed files with 115 additions and 10 deletions

View File

@ -272,6 +272,11 @@ func (f *Framework) WaitForPodRunning(podName string) error {
return waitForPodRunningInNamespace(f.Client, podName, f.Namespace.Name) return waitForPodRunningInNamespace(f.Client, podName, f.Namespace.Name)
} }
// WaitForPodReady waits for the pod to flip to ready in the namespace.
func (f *Framework) WaitForPodReady(podName string) error {
return waitTimeoutForPodReadyInNamespace(f.Client, podName, f.Namespace.Name, podStartTimeout)
}
// WaitForPodRunningSlow waits for the pod to run in the namespace. // WaitForPodRunningSlow waits for the pod to run in the namespace.
// It has a longer timeout then WaitForPodRunning (util.slowPodStartTimeout). // It has a longer timeout then WaitForPodRunning (util.slowPodStartTimeout).
func (f *Framework) WaitForPodRunningSlow(podName string) error { func (f *Framework) WaitForPodRunningSlow(podName string) error {

View File

@ -46,12 +46,16 @@ const (
nodeHttpPort = 32080 nodeHttpPort = 32080
nodeUdpPort = 32081 nodeUdpPort = 32081
loadBalancerHttpPort = 100 loadBalancerHttpPort = 100
netexecImageName = "gcr.io/google_containers/netexec:1.4" netexecImageName = "gcr.io/google_containers/netexec:1.5"
testPodName = "test-container-pod" testPodName = "test-container-pod"
hostTestPodName = "host-test-container-pod" hostTestPodName = "host-test-container-pod"
nodePortServiceName = "node-port-service" nodePortServiceName = "node-port-service"
loadBalancerServiceName = "load-balancer-service" loadBalancerServiceName = "load-balancer-service"
enableLoadBalancerTest = false enableLoadBalancerTest = false
hitEndpointRetryDelay = 1 * time.Second
// Number of retries to hit a given set of endpoints. Needs to be high
// because we verify iptables statistical rr loadbalancing.
testTries = 30
) )
type KubeProxyTestConfig struct { type KubeProxyTestConfig struct {
@ -150,7 +154,7 @@ func createHTTPClient(transport *http.Transport) *http.Client {
func (config *KubeProxyTestConfig) hitClusterIP(epCount int) { func (config *KubeProxyTestConfig) hitClusterIP(epCount int) {
clusterIP := config.nodePortService.Spec.ClusterIP clusterIP := config.nodePortService.Spec.ClusterIP
tries := epCount*epCount + 15 // if epCount == 0 tries := epCount*epCount + testTries // if epCount == 0
By("dialing(udp) node1 --> clusterIP:clusterUdpPort") By("dialing(udp) node1 --> clusterIP:clusterUdpPort")
config.dialFromNode("udp", clusterIP, clusterUdpPort, tries, epCount) config.dialFromNode("udp", clusterIP, clusterUdpPort, tries, epCount)
By("dialing(http) node1 --> clusterIP:clusterHttpPort") By("dialing(http) node1 --> clusterIP:clusterHttpPort")
@ -169,7 +173,7 @@ func (config *KubeProxyTestConfig) hitClusterIP(epCount int) {
func (config *KubeProxyTestConfig) hitNodePort(epCount int) { func (config *KubeProxyTestConfig) hitNodePort(epCount int) {
node1_IP := config.externalAddrs[0] node1_IP := config.externalAddrs[0]
tries := epCount*epCount + 15 // if epCount == 0 tries := epCount*epCount + testTries // if epCount == 0
By("dialing(udp) node1 --> node1:nodeUdpPort") By("dialing(udp) node1 --> node1:nodeUdpPort")
config.dialFromNode("udp", node1_IP, nodeUdpPort, tries, epCount) config.dialFromNode("udp", node1_IP, nodeUdpPort, tries, epCount)
By("dialing(http) node1 --> node1:nodeHttpPort") By("dialing(http) node1 --> node1:nodeHttpPort")
@ -248,7 +252,10 @@ func (config *KubeProxyTestConfig) dialFromNode(protocol, targetIP string, targe
} else { } else {
cmd = fmt.Sprintf("curl -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort) cmd = fmt.Sprintf("curl -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort)
} }
forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd) // TODO: This simply tells us that we can reach the endpoints. Check that
// the probability of hitting a specific endpoint is roughly the same as
// hitting any other.
forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; sleep %v; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd, hitEndpointRetryDelay)
By(fmt.Sprintf("Dialing from node. command:%s", forLoop)) By(fmt.Sprintf("Dialing from node. command:%s", forLoop))
stdout := RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, forLoop) stdout := RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, forLoop)
Expect(strconv.Atoi(strings.TrimSpace(stdout))).To(BeNumerically("==", expectedCount)) Expect(strconv.Atoi(strings.TrimSpace(stdout))).To(BeNumerically("==", expectedCount))
@ -262,6 +269,19 @@ func (config *KubeProxyTestConfig) getSelfURL(path string, expected string) {
} }
func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod { func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod {
probe := &api.Probe{
InitialDelaySeconds: 10,
TimeoutSeconds: 30,
PeriodSeconds: 10,
SuccessThreshold: 1,
FailureThreshold: 3,
Handler: api.Handler{
HTTPGet: &api.HTTPGetAction{
Path: "/healthz",
Port: intstr.IntOrString{IntVal: endpointHttpPort},
},
},
}
pod := &api.Pod{ pod := &api.Pod{
TypeMeta: unversioned.TypeMeta{ TypeMeta: unversioned.TypeMeta{
Kind: "Pod", Kind: "Pod",
@ -293,6 +313,8 @@ func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node st
Protocol: api.ProtocolUDP, Protocol: api.ProtocolUDP,
}, },
}, },
LivenessProbe: probe,
ReadinessProbe: probe,
}, },
}, },
NodeName: node, NodeName: node,
@ -492,7 +514,7 @@ func (config *KubeProxyTestConfig) createNetProxyPods(podName string, selector m
// wait that all of them are up // wait that all of them are up
runningPods := make([]*api.Pod, 0, len(nodes.Items)) runningPods := make([]*api.Pod, 0, len(nodes.Items))
for _, p := range createdPods { for _, p := range createdPods {
expectNoError(config.f.WaitForPodRunning(p.Name)) expectNoError(config.f.WaitForPodReady(p.Name))
rp, err := config.getPodClient().Get(p.Name) rp, err := config.getPodClient().Get(p.Name)
expectNoError(err) expectNoError(err)
runningPods = append(runningPods, rp) runningPods = append(runningPods, rp)

View File

@ -930,6 +930,19 @@ func waitTimeoutForPodNoLongerRunningInNamespace(c *client.Client, podName strin
}) })
} }
func waitTimeoutForPodReadyInNamespace(c *client.Client, podName string, namespace string, timeout time.Duration) error {
return waitForPodCondition(c, namespace, podName, "running", timeout, func(pod *api.Pod) (bool, error) {
if pod.Status.Phase == api.PodRunning {
Logf("Found pod '%s' on node '%s'", podName, pod.Spec.NodeName)
return true, nil
}
if pod.Status.Phase == api.PodFailed {
return true, fmt.Errorf("Giving up; pod went into failed status: \n%s", spew.Sprintf("%#v", pod))
}
return podReady(pod), nil
})
}
// waitForPodNotPending returns an error if it took too long for the pod to go out of pending state. // waitForPodNotPending returns an error if it took too long for the pod to go out of pending state.
func waitForPodNotPending(c *client.Client, ns, podName string) error { func waitForPodNotPending(c *client.Client, ns, podName string) error {
return waitForPodCondition(c, ns, podName, "!pending", podStartTimeout, func(pod *api.Pod) (bool, error) { return waitForPodCondition(c, ns, podName, "!pending", podStartTimeout, func(pod *api.Pod) (bool, error) {
@ -2806,6 +2819,7 @@ func RunHostCmd(ns, name, cmd string) (string, error) {
// RunHostCmdOrDie calls RunHostCmd and dies on error. // RunHostCmdOrDie calls RunHostCmd and dies on error.
func RunHostCmdOrDie(ns, name, cmd string) string { func RunHostCmdOrDie(ns, name, cmd string) string {
stdout, err := RunHostCmd(ns, name, cmd) stdout, err := RunHostCmd(ns, name, cmd)
Logf("stdout: %v", stdout)
expectNoError(err) expectNoError(err)
return stdout return stdout
} }

View File

@ -14,7 +14,7 @@
.PHONY: all netexec image push clean .PHONY: all netexec image push clean
TAG = 1.4 TAG = 1.5
PREFIX = gcr.io/google_containers PREFIX = gcr.io/google_containers

View File

@ -30,15 +30,36 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"time" "time"
) )
var ( var (
httpPort = 8080 httpPort = 8080
udpPort = 8081 udpPort = 8081
shellPath = "/bin/sh" shellPath = "/bin/sh"
serverReady = &atomicBool{0}
) )
// atomicBool uses load/store operations on an int32 to simulate an atomic boolean.
type atomicBool struct {
v int32
}
// set sets the int32 to the given boolean.
func (a *atomicBool) set(value bool) {
if value {
atomic.StoreInt32(&a.v, 1)
return
}
atomic.StoreInt32(&a.v, 0)
}
// get returns true if the int32 == 1
func (a *atomicBool) get() bool {
return atomic.LoadInt32(&a.v) == 1
}
type output struct { type output struct {
responses []string responses []string
errors []string errors []string
@ -91,6 +112,18 @@ func exitHandler(w http.ResponseWriter, r *http.Request) {
func hostnameHandler(w http.ResponseWriter, r *http.Request) { func hostnameHandler(w http.ResponseWriter, r *http.Request) {
log.Printf("GET /hostname") log.Printf("GET /hostname")
fmt.Fprintf(w, getHostName()) fmt.Fprintf(w, getHostName())
http.HandleFunc("/healthz", healthzHandler)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", httpPort), nil))
}
// healthHandler response with a 200 if the UDP server is ready. It also serves
// as a health check of the HTTP server by virtue of being a HTTP handler.
func healthzHandler(w http.ResponseWriter, r *http.Request) {
if serverReady.get() {
w.WriteHeader(200)
return
}
w.WriteHeader(http.StatusPreconditionFailed)
} }
func shutdownHandler(w http.ResponseWriter, r *http.Request) { func shutdownHandler(w http.ResponseWriter, r *http.Request) {
@ -318,6 +351,13 @@ func startUDPServer(udpPort int) {
defer serverConn.Close() defer serverConn.Close()
buf := make([]byte, 1024) buf := make([]byte, 1024)
log.Printf("Started UDP server")
// Start responding to readiness probes.
serverReady.set(true)
defer func() {
log.Printf("UDP server exited")
serverReady.set(false)
}()
for { for {
n, clientAddress, err := serverConn.ReadFromUDP(buf) n, clientAddress, err := serverConn.ReadFromUDP(buf)
assertNoError(err) assertNoError(err)

View File

@ -7,9 +7,33 @@ metadata:
spec: spec:
containers: containers:
- name: netexec - name: netexec
image: gcr.io/google_containers/netexec:1.4 image: gcr.io/google_containers/netexec:1.5
ports: ports:
- containerPort: 8080 - containerPort: 8080
protocol: TCP protocol: TCP
- containerPort: 8081 - containerPort: 8081
protocol: UDP protocol: UDP
# give this pod the same liveness and readiness probe because
# we always want the kubelet to restart it if it becomes
# unready, and at the same time we want to observe readiness
# as a signal to start testing.
livenessProbe:
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 10
timeoutSeconds: 5
failureThreshold: 3
periodSeconds: 10
successThreshold: 1
readinessProbe:
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 10
timeoutSeconds: 5
failureThreshold: 3
periodSeconds: 10
successThreshold: 1