Merge pull request #17030 from csrwng/pf_cmd_streams

Automatic merge from submit-queue

Port-forward: use out and error streams instead of glog

Switches use of glog with command out and error streams
This commit is contained in:
k8s-merge-robot 2016-04-29 09:27:47 -07:00
commit 2b7021add0
5 changed files with 36 additions and 19 deletions

View File

@ -27,7 +27,6 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/util/httpstream" "k8s.io/kubernetes/pkg/util/httpstream"
@ -46,6 +45,8 @@ type PortForwarder struct {
Ready chan struct{} Ready chan struct{}
requestIDLock sync.Mutex requestIDLock sync.Mutex
requestID int requestID int
out io.Writer
errOut io.Writer
} }
// ForwardedPort contains a Local:Remote port pairing. // ForwardedPort contains a Local:Remote port pairing.
@ -107,7 +108,7 @@ func parsePorts(ports []string) ([]ForwardedPort, error) {
} }
// New creates a new PortForwarder. // New creates a new PortForwarder.
func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) { func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
if len(ports) == 0 { if len(ports) == 0 {
return nil, errors.New("You must specify at least 1 port") return nil, errors.New("You must specify at least 1 port")
} }
@ -120,6 +121,8 @@ func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}) (*P
ports: parsedPorts, ports: parsedPorts,
stopChan: stopChan, stopChan: stopChan,
Ready: make(chan struct{}), Ready: make(chan struct{}),
out: out,
errOut: errOut,
}, nil }, nil
} }
@ -151,7 +154,9 @@ func (pf *PortForwarder) forward() error {
case err == nil: case err == nil:
listenSuccess = true listenSuccess = true
default: default:
glog.Warningf("Unable to listen on port %d: %v", port.Local, err) if pf.errOut != nil {
fmt.Fprintf(pf.errOut, "Unable to listen on port %d: %v\n", port.Local, err)
}
} }
} }
@ -210,7 +215,9 @@ func (pf *PortForwarder) getListener(protocol string, hostname string, port *For
return nil, fmt.Errorf("Error parsing local port: %s from %s (%s)", err, listenerAddress, host) return nil, fmt.Errorf("Error parsing local port: %s from %s (%s)", err, listenerAddress, host)
} }
port.Local = uint16(localPortUInt) port.Local = uint16(localPortUInt)
glog.Infof("Forwarding from %s:%d -> %d", hostname, localPortUInt, port.Remote) if pf.out != nil {
fmt.Fprintf(pf.out, "Forwarding from %s:%d -> %d\n", hostname, localPortUInt, port.Remote)
}
return listener, nil return listener, nil
} }
@ -244,7 +251,9 @@ func (pf *PortForwarder) nextRequestID() int {
func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) { func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer conn.Close() defer conn.Close()
glog.Infof("Handling connection for %d", port.Local) if pf.out != nil {
fmt.Fprintf(pf.out, "Handling connection for %d\n", port.Local)
}
requestID := pf.nextRequestID() requestID := pf.nextRequestID()

View File

@ -24,6 +24,7 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"os"
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
@ -87,7 +88,7 @@ func TestParsePortsAndNew(t *testing.T) {
dialer := &fakeDialer{} dialer := &fakeDialer{}
expectedStopChan := make(chan struct{}) expectedStopChan := make(chan struct{})
pf, err := New(dialer, test.input, expectedStopChan) pf, err := New(dialer, test.input, expectedStopChan, os.Stdout, os.Stderr)
haveError = err != nil haveError = err != nil
if e, a := test.expectNewError, haveError; e != a { if e, a := test.expectNewError, haveError; e != a {
t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err) t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err)
@ -305,7 +306,7 @@ func TestForwardPorts(t *testing.T) {
stopChan := make(chan struct{}, 1) stopChan := make(chan struct{}, 1)
pf, err := New(exec, test.ports, stopChan) pf, err := New(exec, test.ports, stopChan, os.Stdout, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("%s: unexpected error calling New: %v", testName, err) t.Fatalf("%s: unexpected error calling New: %v", testName, err)
} }
@ -375,7 +376,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
stopChan1 := make(chan struct{}, 1) stopChan1 := make(chan struct{}, 1)
defer close(stopChan1) defer close(stopChan1)
pf1, err := New(exec, []string{"5555"}, stopChan1) pf1, err := New(exec, []string{"5555"}, stopChan1, os.Stdout, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("error creating pf1: %v", err) t.Fatalf("error creating pf1: %v", err)
} }
@ -383,7 +384,7 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
<-pf1.Ready <-pf1.Ready
stopChan2 := make(chan struct{}, 1) stopChan2 := make(chan struct{}, 1)
pf2, err := New(exec, []string{"5555"}, stopChan2) pf2, err := New(exec, []string{"5555"}, stopChan2, os.Stdout, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("error creating pf2: %v", err) t.Fatalf("error creating pf2: %v", err)
} }

View File

@ -213,7 +213,7 @@ Find more information at https://github.com/kubernetes/kubernetes.`,
cmds.AddCommand(NewCmdAttach(f, in, out, err)) cmds.AddCommand(NewCmdAttach(f, in, out, err))
cmds.AddCommand(NewCmdExec(f, in, out, err)) cmds.AddCommand(NewCmdExec(f, in, out, err))
cmds.AddCommand(NewCmdPortForward(f)) cmds.AddCommand(NewCmdPortForward(f, out, err))
cmds.AddCommand(NewCmdProxy(f, out)) cmds.AddCommand(NewCmdProxy(f, out))
cmds.AddCommand(NewCmdRun(f, in, out, err)) cmds.AddCommand(NewCmdRun(f, in, out, err))

View File

@ -17,6 +17,7 @@ limitations under the License.
package cmd package cmd
import ( import (
"io"
"net/url" "net/url"
"os" "os"
"os/signal" "os/signal"
@ -45,14 +46,18 @@ kubectl port-forward mypod :5000
kubectl port-forward mypod 0:5000` kubectl port-forward mypod 0:5000`
) )
func NewCmdPortForward(f *cmdutil.Factory) *cobra.Command { func NewCmdPortForward(f *cmdutil.Factory, cmdOut, cmdErr io.Writer) *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "port-forward POD [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]", Use: "port-forward POD [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
Short: "Forward one or more local ports to a pod.", Short: "Forward one or more local ports to a pod.",
Long: "Forward one or more local ports to a pod.", Long: "Forward one or more local ports to a pod.",
Example: portforward_example, Example: portforward_example,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
err := RunPortForward(f, cmd, args, &defaultPortForwarder{}) pf := &defaultPortForwarder{
cmdOut: cmdOut,
cmdErr: cmdErr,
}
err := RunPortForward(f, cmd, args, pf)
cmdutil.CheckErr(err) cmdutil.CheckErr(err)
}, },
} }
@ -65,14 +70,16 @@ type portForwarder interface {
ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error
} }
type defaultPortForwarder struct{} type defaultPortForwarder struct {
cmdOut, cmdErr io.Writer
}
func (*defaultPortForwarder) ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error { func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error {
dialer, err := remotecommand.NewExecutor(config, method, url) dialer, err := remotecommand.NewExecutor(config, method, url)
if err != nil { if err != nil {
return err return err
} }
fw, err := portforward.New(dialer, ports, stopChan) fw, err := portforward.New(dialer, ports, stopChan, f.cmdOut, f.cmdErr)
if err != nil { if err != nil {
return err return err
} }

View File

@ -124,16 +124,16 @@ func runPortForward(ns, podName string, port int) *portForwardCommand {
// by the port-forward command. We don't want to hard code the port as we have no // by the port-forward command. We don't want to hard code the port as we have no
// way of guaranteeing we can pick one that isn't in use, particularly on Jenkins. // way of guaranteeing we can pick one that isn't in use, particularly on Jenkins.
framework.Logf("starting port-forward command and streaming output") framework.Logf("starting port-forward command and streaming output")
_, stderr, err := framework.StartCmdAndStreamOutput(cmd) stdout, _, err := framework.StartCmdAndStreamOutput(cmd)
if err != nil { if err != nil {
framework.Failf("Failed to start port-forward command: %v", err) framework.Failf("Failed to start port-forward command: %v", err)
} }
buf := make([]byte, 128) buf := make([]byte, 128)
var n int var n int
framework.Logf("reading from `kubectl port-forward` command's stderr") framework.Logf("reading from `kubectl port-forward` command's stdout")
if n, err = stderr.Read(buf); err != nil { if n, err = stdout.Read(buf); err != nil {
framework.Failf("Failed to read from kubectl port-forward stderr: %v", err) framework.Failf("Failed to read from kubectl port-forward stdout: %v", err)
} }
portForwardOutput := string(buf[:n]) portForwardOutput := string(buf[:n])
match := portForwardRegexp.FindStringSubmatch(portForwardOutput) match := portForwardRegexp.FindStringSubmatch(portForwardOutput)