Update port forward e2e for go 1.6

Only close the stdout/stderr pipes from kubectl port-forward when we're truly done with the command,
instead of as soon as runPortForward exits.

Also try to gracefully stop kubectl port-forward via SIGINT, instead of always sending SIGKILL, as
this will help avoid spdy goroutine leaks in the Kubelet.
This commit is contained in:
Andy Goldstein 2016-03-24 14:56:27 -04:00
parent 4a6e6b3fbf
commit 66899a47a5
2 changed files with 61 additions and 20 deletions

View File

@ -511,11 +511,11 @@ var _ = KubeDescribe("Kubectl client", func() {
It("should support port-forward", func() {
By("forwarding the container port to a local port")
cmd, listenPort := runPortForward(ns, simplePodName, simplePodPort)
defer tryKill(cmd)
cmd := runPortForward(ns, simplePodName, simplePodPort)
defer cmd.Stop()
By("curling local port output")
localAddr := fmt.Sprintf("http://localhost:%d", listenPort)
localAddr := fmt.Sprintf("http://localhost:%d", cmd.port)
body, err := curl(localAddr)
Logf("got: %s", body)
if err != nil {

View File

@ -24,8 +24,11 @@ import (
"regexp"
"strconv"
"strings"
"syscall"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo"
)
@ -77,18 +80,53 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string) *a
}
}
func runPortForward(ns, podName string, port int) (*exec.Cmd, int) {
type portForwardCommand struct {
cmd *exec.Cmd
port int
}
// Stop attempts to gracefully stop `kubectl port-forward`, only killing it if necessary.
// This helps avoid spdy goroutine leaks in the Kubelet.
func (c *portForwardCommand) Stop() {
// SIGINT signals that kubectl port-forward should gracefully terminate
if err := c.cmd.Process.Signal(syscall.SIGINT); err != nil {
Logf("error sending SIGINT to kubectl port-forward: %v", err)
}
// try to wait for a clean exit
done := make(chan error)
go func() {
done <- c.cmd.Wait()
}()
expired := time.NewTimer(wait.ForeverTestTimeout)
defer expired.Stop()
select {
case err := <-done:
if err == nil {
// success
return
}
Logf("error waiting for kubectl port-forward to exit: %v", err)
case <-expired.C:
Logf("timed out waiting for kubectl port-forward to exit")
}
Logf("trying to forcibly kill kubectl port-forward")
tryKill(c.cmd)
}
func runPortForward(ns, podName string, port int) *portForwardCommand {
cmd := kubectlCmd("port-forward", fmt.Sprintf("--namespace=%v", ns), podName, fmt.Sprintf(":%d", port))
// This is somewhat ugly but is the only way to retrieve the port that was picked
// by the port-forward command. We don't want to hard code the port as we have no
// way of guaranteeing we can pick one that isn't in use, particularly on Jenkins.
Logf("starting port-forward command and streaming output")
stdout, stderr, err := startCmdAndStreamOutput(cmd)
_, stderr, err := startCmdAndStreamOutput(cmd)
if err != nil {
Failf("Failed to start port-forward command: %v", err)
}
defer stdout.Close()
defer stderr.Close()
buf := make([]byte, 128)
var n int
@ -107,7 +145,10 @@ func runPortForward(ns, podName string, port int) (*exec.Cmd, int) {
Failf("Error converting %s to an int: %v", match[1], err)
}
return cmd, listenPort
return &portForwardCommand{
cmd: cmd,
port: listenPort,
}
}
var _ = KubeDescribe("Port forwarding", func() {
@ -125,13 +166,13 @@ var _ = KubeDescribe("Port forwarding", func() {
}
By("Running 'kubectl port-forward'")
cmd, listenPort := runPortForward(framework.Namespace.Name, pod.Name, 80)
defer tryKill(cmd)
cmd := runPortForward(framework.Namespace.Name, pod.Name, 80)
defer cmd.Stop()
By("Dialing the local port")
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort))
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
if err != nil {
Failf("Couldn't connect to port %d: %v", listenPort, err)
Failf("Couldn't connect to port %d: %v", cmd.port, err)
}
By("Closing the connection to the local port")
@ -164,17 +205,17 @@ var _ = KubeDescribe("Port forwarding", func() {
}
By("Running 'kubectl port-forward'")
cmd, listenPort := runPortForward(framework.Namespace.Name, pod.Name, 80)
defer tryKill(cmd)
cmd := runPortForward(framework.Namespace.Name, pod.Name, 80)
defer cmd.Stop()
By("Dialing the local port")
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort))
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
if err != nil {
Failf("Error resolving tcp addr: %v", err)
}
conn, err := net.DialTCP("tcp", nil, addr)
if err != nil {
Failf("Couldn't connect to port %d: %v", listenPort, err)
Failf("Couldn't connect to port %d: %v", cmd.port, err)
}
defer func() {
By("Closing the connection to the local port")
@ -226,13 +267,13 @@ var _ = KubeDescribe("Port forwarding", func() {
}
By("Running 'kubectl port-forward'")
cmd, listenPort := runPortForward(framework.Namespace.Name, pod.Name, 80)
defer tryKill(cmd)
cmd := runPortForward(framework.Namespace.Name, pod.Name, 80)
defer cmd.Stop()
By("Dialing the local port")
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort))
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
if err != nil {
Failf("Couldn't connect to port %d: %v", listenPort, err)
Failf("Couldn't connect to port %d: %v", cmd.port, err)
}
defer func() {
By("Closing the connection to the local port")