diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index 07419f41841..dad906a8407 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -511,11 +511,11 @@ var _ = KubeDescribe("Kubectl client", func() { It("should support port-forward", func() { By("forwarding the container port to a local port") - cmd, listenPort := runPortForward(ns, simplePodName, simplePodPort) - defer tryKill(cmd) + cmd := runPortForward(ns, simplePodName, simplePodPort) + defer cmd.Stop() By("curling local port output") - localAddr := fmt.Sprintf("http://localhost:%d", listenPort) + localAddr := fmt.Sprintf("http://localhost:%d", cmd.port) body, err := curl(localAddr) Logf("got: %s", body) if err != nil { diff --git a/test/e2e/portforward.go b/test/e2e/portforward.go index dcd95f06d98..b1f55aacdc1 100644 --- a/test/e2e/portforward.go +++ b/test/e2e/portforward.go @@ -24,8 +24,11 @@ import ( "regexp" "strconv" "strings" + "syscall" + "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/util/wait" . "github.com/onsi/ginkgo" ) @@ -77,18 +80,53 @@ func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string) *a } } -func runPortForward(ns, podName string, port int) (*exec.Cmd, int) { +type portForwardCommand struct { + cmd *exec.Cmd + port int +} + +// Stop attempts to gracefully stop `kubectl port-forward`, only killing it if necessary. +// This helps avoid spdy goroutine leaks in the Kubelet. +func (c *portForwardCommand) Stop() { + // SIGINT signals that kubectl port-forward should gracefully terminate + if err := c.cmd.Process.Signal(syscall.SIGINT); err != nil { + Logf("error sending SIGINT to kubectl port-forward: %v", err) + } + + // try to wait for a clean exit + done := make(chan error) + go func() { + done <- c.cmd.Wait() + }() + + expired := time.NewTimer(wait.ForeverTestTimeout) + defer expired.Stop() + + select { + case err := <-done: + if err == nil { + // success + return + } + Logf("error waiting for kubectl port-forward to exit: %v", err) + case <-expired.C: + Logf("timed out waiting for kubectl port-forward to exit") + } + + Logf("trying to forcibly kill kubectl port-forward") + tryKill(c.cmd) +} + +func runPortForward(ns, podName string, port int) *portForwardCommand { cmd := kubectlCmd("port-forward", fmt.Sprintf("--namespace=%v", ns), podName, fmt.Sprintf(":%d", port)) // This is somewhat ugly but is the only way to retrieve the port that was picked // by the port-forward command. We don't want to hard code the port as we have no // way of guaranteeing we can pick one that isn't in use, particularly on Jenkins. Logf("starting port-forward command and streaming output") - stdout, stderr, err := startCmdAndStreamOutput(cmd) + _, stderr, err := startCmdAndStreamOutput(cmd) if err != nil { Failf("Failed to start port-forward command: %v", err) } - defer stdout.Close() - defer stderr.Close() buf := make([]byte, 128) var n int @@ -107,7 +145,10 @@ func runPortForward(ns, podName string, port int) (*exec.Cmd, int) { Failf("Error converting %s to an int: %v", match[1], err) } - return cmd, listenPort + return &portForwardCommand{ + cmd: cmd, + port: listenPort, + } } var _ = KubeDescribe("Port forwarding", func() { @@ -125,13 +166,13 @@ var _ = KubeDescribe("Port forwarding", func() { } By("Running 'kubectl port-forward'") - cmd, listenPort := runPortForward(framework.Namespace.Name, pod.Name, 80) - defer tryKill(cmd) + cmd := runPortForward(framework.Namespace.Name, pod.Name, 80) + defer cmd.Stop() By("Dialing the local port") - conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort)) + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port)) if err != nil { - Failf("Couldn't connect to port %d: %v", listenPort, err) + Failf("Couldn't connect to port %d: %v", cmd.port, err) } By("Closing the connection to the local port") @@ -164,17 +205,17 @@ var _ = KubeDescribe("Port forwarding", func() { } By("Running 'kubectl port-forward'") - cmd, listenPort := runPortForward(framework.Namespace.Name, pod.Name, 80) - defer tryKill(cmd) + cmd := runPortForward(framework.Namespace.Name, pod.Name, 80) + defer cmd.Stop() By("Dialing the local port") - addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort)) + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port)) if err != nil { Failf("Error resolving tcp addr: %v", err) } conn, err := net.DialTCP("tcp", nil, addr) if err != nil { - Failf("Couldn't connect to port %d: %v", listenPort, err) + Failf("Couldn't connect to port %d: %v", cmd.port, err) } defer func() { By("Closing the connection to the local port") @@ -226,13 +267,13 @@ var _ = KubeDescribe("Port forwarding", func() { } By("Running 'kubectl port-forward'") - cmd, listenPort := runPortForward(framework.Namespace.Name, pod.Name, 80) - defer tryKill(cmd) + cmd := runPortForward(framework.Namespace.Name, pod.Name, 80) + defer cmd.Stop() By("Dialing the local port") - conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", listenPort)) + conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port)) if err != nil { - Failf("Couldn't connect to port %d: %v", listenPort, err) + Failf("Couldn't connect to port %d: %v", cmd.port, err) } defer func() { By("Closing the connection to the local port")