Merge pull request #30222 from hodovska/port-forward-cmd-struct

Automatic merge from submit-queue

kubectl/port-forward: complete/validate/run structure

```kubectl port-forward``` command is converted to a complete/validate/run kubectl command structure specified here: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/kubectl-conventions.md#command-conventions
In this PR is also exposed the ready and stop channel for API consumer.

Fixes #16504
This commit is contained in:
Kubernetes Submit Queue 2016-08-12 20:58:53 -07:00 committed by GitHub
commit 612e3c2634
4 changed files with 140 additions and 55 deletions

View File

@ -108,7 +108,7 @@ func parsePorts(ports []string) ([]ForwardedPort, error) {
} }
// New creates a new PortForwarder. // New creates a new PortForwarder.
func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, out, errOut io.Writer) (*PortForwarder, error) { func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
if len(ports) == 0 { if len(ports) == 0 {
return nil, errors.New("You must specify at least 1 port") return nil, errors.New("You must specify at least 1 port")
} }
@ -120,7 +120,7 @@ func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, out
dialer: dialer, dialer: dialer,
ports: parsedPorts, ports: parsedPorts,
stopChan: stopChan, stopChan: stopChan,
Ready: make(chan struct{}), Ready: readyChan,
out: out, out: out,
errOut: errOut, errOut: errOut,
}, nil }, nil
@ -164,7 +164,9 @@ func (pf *PortForwarder) forward() error {
return fmt.Errorf("Unable to listen on any of the requested ports: %v", pf.ports) return fmt.Errorf("Unable to listen on any of the requested ports: %v", pf.ports)
} }
close(pf.Ready) if pf.Ready != nil {
close(pf.Ready)
}
// wait for interrupt or conn closure // wait for interrupt or conn closure
select { select {

View File

@ -88,7 +88,8 @@ func TestParsePortsAndNew(t *testing.T) {
dialer := &fakeDialer{} dialer := &fakeDialer{}
expectedStopChan := make(chan struct{}) expectedStopChan := make(chan struct{})
pf, err := New(dialer, test.input, expectedStopChan, os.Stdout, os.Stderr) readyChan := make(chan struct{})
pf, err := New(dialer, test.input, expectedStopChan, readyChan, os.Stdout, os.Stderr)
haveError = err != nil haveError = err != nil
if e, a := test.expectNewError, haveError; e != a { if e, a := test.expectNewError, haveError; e != a {
t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err) t.Fatalf("%d: New: error expected=%t, got %t: %s", i, e, a, err)
@ -305,8 +306,9 @@ func TestForwardPorts(t *testing.T) {
} }
stopChan := make(chan struct{}, 1) stopChan := make(chan struct{}, 1)
readyChan := make(chan struct{})
pf, err := New(exec, test.ports, stopChan, os.Stdout, os.Stderr) pf, err := New(exec, test.ports, stopChan, readyChan, os.Stdout, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("%s: unexpected error calling New: %v", testName, err) t.Fatalf("%s: unexpected error calling New: %v", testName, err)
} }
@ -375,8 +377,9 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
stopChan1 := make(chan struct{}, 1) stopChan1 := make(chan struct{}, 1)
defer close(stopChan1) defer close(stopChan1)
readyChan1 := make(chan struct{})
pf1, err := New(exec, []string{"5555"}, stopChan1, os.Stdout, os.Stderr) pf1, err := New(exec, []string{"5555"}, stopChan1, readyChan1, os.Stdout, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("error creating pf1: %v", err) t.Fatalf("error creating pf1: %v", err)
} }
@ -384,7 +387,8 @@ func TestForwardPortsReturnsErrorWhenAllBindsFailed(t *testing.T) {
<-pf1.Ready <-pf1.Ready
stopChan2 := make(chan struct{}, 1) stopChan2 := make(chan struct{}, 1)
pf2, err := New(exec, []string{"5555"}, stopChan2, os.Stdout, os.Stderr) readyChan2 := make(chan struct{})
pf2, err := New(exec, []string{"5555"}, stopChan2, readyChan2, os.Stdout, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("error creating pf2: %v", err) t.Fatalf("error creating pf2: %v", err)
} }

View File

@ -17,21 +17,34 @@ limitations under the License.
package cmd package cmd
import ( import (
"fmt"
"io" "io"
"net/url" "net/url"
"os" "os"
"os/signal" "os/signal"
"github.com/golang/glog"
"github.com/renstrom/dedent" "github.com/renstrom/dedent"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/portforward" "k8s.io/kubernetes/pkg/client/unversioned/portforward"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand" "k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
) )
// PortForwardOptions contains all the options for running the port-forward cli command.
type PortForwardOptions struct {
Namespace string
PodName string
Config *restclient.Config
Client *client.Client
Ports []string
PortForwarder portForwarder
StopChannel chan struct{}
ReadyChannel chan struct{}
}
var ( var (
portforward_example = dedent.Dedent(` portforward_example = dedent.Dedent(`
# Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in the pod # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in the pod
@ -48,18 +61,27 @@ var (
) )
func NewCmdPortForward(f *cmdutil.Factory, cmdOut, cmdErr io.Writer) *cobra.Command { func NewCmdPortForward(f *cmdutil.Factory, cmdOut, cmdErr io.Writer) *cobra.Command {
opts := &PortForwardOptions{
PortForwarder: &defaultPortForwarder{
cmdOut: cmdOut,
cmdErr: cmdErr,
},
}
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "port-forward POD [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]", Use: "port-forward POD [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
Short: "Forward one or more local ports to a pod", Short: "Forward one or more local ports to a pod",
Long: "Forward one or more local ports to a pod.", Long: "Forward one or more local ports to a pod.",
Example: portforward_example, Example: portforward_example,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
pf := &defaultPortForwarder{ if err := opts.Complete(f, cmd, args, cmdOut, cmdErr); err != nil {
cmdOut: cmdOut, cmdutil.CheckErr(err)
cmdErr: cmdErr, }
if err := opts.Validate(); err != nil {
cmdutil.CheckErr(cmdutil.UsageError(cmd, err.Error()))
}
if err := opts.RunPortForward(); err != nil {
cmdutil.CheckErr(err)
} }
err := RunPortForward(f, cmd, args, pf)
cmdutil.CheckErr(err)
}, },
} }
cmd.Flags().StringP("pod", "p", "", "Pod name") cmd.Flags().StringP("pod", "p", "", "Pod name")
@ -68,81 +90,104 @@ func NewCmdPortForward(f *cmdutil.Factory, cmdOut, cmdErr io.Writer) *cobra.Comm
} }
type portForwarder interface { type portForwarder interface {
ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error
} }
type defaultPortForwarder struct { type defaultPortForwarder struct {
cmdOut, cmdErr io.Writer cmdOut, cmdErr io.Writer
} }
func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error { func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
dialer, err := remotecommand.NewExecutor(config, method, url) dialer, err := remotecommand.NewExecutor(opts.Config, method, url)
if err != nil { if err != nil {
return err return err
} }
fw, err := portforward.New(dialer, ports, stopChan, f.cmdOut, f.cmdErr) fw, err := portforward.New(dialer, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.cmdOut, f.cmdErr)
if err != nil { if err != nil {
return err return err
} }
return fw.ForwardPorts() return fw.ForwardPorts()
} }
func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw portForwarder) error { // Complete completes all the required options for port-forward cmd.
podName := cmdutil.GetFlagString(cmd, "pod") func (o *PortForwardOptions) Complete(f *cmdutil.Factory, cmd *cobra.Command, args []string, cmdOut io.Writer, cmdErr io.Writer) error {
if len(podName) == 0 && len(args) == 0 { var err error
o.PodName = cmdutil.GetFlagString(cmd, "pod")
if len(o.PodName) == 0 && len(args) == 0 {
return cmdutil.UsageError(cmd, "POD is required for port-forward") return cmdutil.UsageError(cmd, "POD is required for port-forward")
} }
if len(podName) != 0 { if len(o.PodName) != 0 {
printDeprecationWarning("port-forward POD", "-p POD") printDeprecationWarning("port-forward POD", "-p POD")
o.Ports = args
} else { } else {
podName = args[0] o.PodName = args[0]
args = args[1:] o.Ports = args[1:]
} }
if len(args) < 1 { o.Namespace, _, err = f.DefaultNamespace()
return cmdutil.UsageError(cmd, "at least 1 PORT is required for port-forward")
}
namespace, _, err := f.DefaultNamespace()
if err != nil { if err != nil {
return err return err
} }
client, err := f.Client() o.Client, err = f.Client()
if err != nil { if err != nil {
return err return err
} }
pod, err := client.Pods(namespace).Get(podName) o.Config, err = f.ClientConfig()
if err != nil {
return err
}
o.StopChannel = make(chan struct{}, 1)
o.ReadyChannel = make(chan struct{})
return nil
}
// Validate validates all the required options for port-forward cmd.
func (o PortForwardOptions) Validate() error {
if len(o.PodName) == 0 {
return fmt.Errorf("pod name must be specified")
}
if len(o.Ports) < 1 {
return fmt.Errorf("at least 1 PORT is required for port-forward")
}
if o.PortForwarder == nil || o.Client == nil || o.Config == nil {
return fmt.Errorf("client, client config, and portforwarder must be provided")
}
return nil
}
// RunPortForward implements all the necessary functionality for port-forward cmd.
func (o PortForwardOptions) RunPortForward() error {
pod, err := o.Client.Pods(o.Namespace).Get(o.PodName)
if err != nil { if err != nil {
return err return err
} }
if pod.Status.Phase != api.PodRunning { if pod.Status.Phase != api.PodRunning {
glog.Fatalf("Unable to execute command because pod is not running. Current status=%v", pod.Status.Phase) return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
}
config, err := f.ClientConfig()
if err != nil {
return err
} }
signals := make(chan os.Signal, 1) signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt) signal.Notify(signals, os.Interrupt)
defer signal.Stop(signals) defer signal.Stop(signals)
stopCh := make(chan struct{}, 1)
go func() { go func() {
<-signals <-signals
close(stopCh) if o.StopChannel != nil {
close(o.StopChannel)
}
}() }()
req := client.RESTClient.Post(). req := o.Client.RESTClient.Post().
Resource("pods"). Resource("pods").
Namespace(namespace). Namespace(o.Namespace).
Name(pod.Name). Name(pod.Name).
SubResource("portforward") SubResource("portforward")
return fw.ForwardPorts("POST", req.URL(), config, args, stopCh) return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
} }

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"os"
"testing" "testing"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -37,7 +38,7 @@ type fakePortForwarder struct {
pfErr error pfErr error
} }
func (f *fakePortForwarder) ForwardPorts(method string, url *url.URL, config *restclient.Config, ports []string, stopChan <-chan struct{}) error { func (f *fakePortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
f.method = method f.method = method
f.url = url f.url = url
return f.pfErr return f.pfErr
@ -68,6 +69,7 @@ func TestPortForward(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
var err error
f, tf, codec, ns := NewAPIFactory() f, tf, codec, ns := NewAPIFactory()
tf.Client = &fake.RESTClient{ tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns, NegotiatedSerializer: ns,
@ -89,12 +91,24 @@ func TestPortForward(t *testing.T) {
if test.pfErr { if test.pfErr {
ff.pfErr = fmt.Errorf("pf error") ff.pfErr = fmt.Errorf("pf error")
} }
cmd := &cobra.Command{}
cmd.Flags().StringP("pod", "p", "", "Pod name") opts := &PortForwardOptions{}
err := RunPortForward(f, cmd, []string{"foo", ":5000", ":1000"}, ff) cmd := NewCmdPortForward(f, os.Stdout, os.Stderr)
cmd.Run = func(cmd *cobra.Command, args []string) {
if err = opts.Complete(f, cmd, args, os.Stdout, os.Stderr); err != nil {
return
}
opts.PortForwarder = ff
if err = opts.Validate(); err != nil {
return
}
err = opts.RunPortForward()
}
cmd.Run(cmd, []string{"foo", ":5000", ":1000"})
if test.pfErr && err != ff.pfErr { if test.pfErr && err != ff.pfErr {
t.Errorf("%s: Unexpected exec error: %v", test.name, err) t.Errorf("%s: Unexpected port-forward error: %v", test.name, err)
} }
if !test.pfErr && err != nil { if !test.pfErr && err != nil {
t.Errorf("%s: Unexpected error: %v", test.name, err) t.Errorf("%s: Unexpected error: %v", test.name, err)
@ -109,7 +123,6 @@ func TestPortForward(t *testing.T) {
if ff.method != "POST" { if ff.method != "POST" {
t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method) t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method)
} }
} }
} }
@ -138,6 +151,7 @@ func TestPortForwardWithPFlag(t *testing.T) {
}, },
} }
for _, test := range tests { for _, test := range tests {
var err error
f, tf, codec, ns := NewAPIFactory() f, tf, codec, ns := NewAPIFactory()
tf.Client = &fake.RESTClient{ tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns, NegotiatedSerializer: ns,
@ -159,18 +173,38 @@ func TestPortForwardWithPFlag(t *testing.T) {
if test.pfErr { if test.pfErr {
ff.pfErr = fmt.Errorf("pf error") ff.pfErr = fmt.Errorf("pf error")
} }
cmd := &cobra.Command{}
podPtr := cmd.Flags().StringP("pod", "p", "", "Pod name") opts := &PortForwardOptions{}
*podPtr = "foo" cmd := NewCmdPortForward(f, os.Stdout, os.Stderr)
err := RunPortForward(f, cmd, []string{":5000", ":1000"}, ff) cmd.Run = func(cmd *cobra.Command, args []string) {
if test.pfErr && err != ff.pfErr { if err = opts.Complete(f, cmd, args, os.Stdout, os.Stderr); err != nil {
t.Errorf("%s: Unexpected exec error: %v", test.name, err) return
}
opts.PortForwarder = ff
if err = opts.Validate(); err != nil {
return
}
err = opts.RunPortForward()
} }
if !test.pfErr && ff.url.Path != test.pfPath { cmd.Flags().Set("pod", "foo")
t.Errorf("%s: Did not get expected path for portforward request", test.name)
cmd.Run(cmd, []string{":5000", ":1000"})
if test.pfErr && err != ff.pfErr {
t.Errorf("%s: Unexpected port-forward error: %v", test.name, err)
} }
if !test.pfErr && err != nil { if !test.pfErr && err != nil {
t.Errorf("%s: Unexpected error: %v", test.name, err) t.Errorf("%s: Unexpected error: %v", test.name, err)
} }
if test.pfErr {
continue
}
if ff.url.Path != test.pfPath {
t.Errorf("%s: Did not get expected path for portforward request", test.name)
}
if ff.method != "POST" {
t.Errorf("%s: Did not get method for attach request: %s", test.name, ff.method)
}
} }
} }