Update Exec and Portforward client to use pod subresource

This commit is contained in:
Cesar Wong 2015-05-04 13:13:55 -04:00
parent 619332d58e
commit d954c31b22
5 changed files with 342 additions and 56 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion/queryparams"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
"github.com/golang/glog"
@ -39,7 +40,8 @@ func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (h
return req.Upgrade(config, spdy.NewRoundTripper)
}
type RemoteCommandExecutor struct {
// Executor executes a command on a pod container
type Executor struct {
req *client.Request
config *client.Config
command []string
@ -51,8 +53,9 @@ type RemoteCommandExecutor struct {
upgrader upgrader
}
func New(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) *RemoteCommandExecutor {
return &RemoteCommandExecutor{
// New creates a new RemoteCommandExecutor
func New(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) *Executor {
return &Executor{
req: req,
config: config,
command: command,
@ -66,26 +69,27 @@ func New(req *client.Request, config *client.Config, command []string, stdin io.
// Execute sends a remote command execution request, upgrading the
// connection and creating streams to represent stdin/stdout/stderr. Data is
// copied between these streams and the supplied stdin/stdout/stderr parameters.
func (e *RemoteCommandExecutor) Execute() error {
doStdin := (e.stdin != nil)
doStdout := (e.stdout != nil)
doStderr := (!e.tty && e.stderr != nil)
if doStdin {
e.req.Param(api.ExecStdinParam, "1")
}
if doStdout {
e.req.Param(api.ExecStdoutParam, "1")
}
if doStderr {
e.req.Param(api.ExecStderrParam, "1")
}
if e.tty {
e.req.Param(api.ExecTTYParam, "1")
func (e *Executor) Execute() error {
opts := api.PodExecOptions{
Stdin: (e.stdin != nil),
Stdout: (e.stdout != nil),
Stderr: (!e.tty && e.stderr != nil),
TTY: e.tty,
Command: e.command,
}
for _, s := range e.command {
e.req.Param(api.ExecCommandParamm, s)
versioned, err := api.Scheme.ConvertToVersion(&opts, e.config.Version)
if err != nil {
return err
}
params, err := queryparams.Convert(versioned)
if err != nil {
return err
}
for k, v := range params {
for _, vv := range v {
e.req.Param(k, vv)
}
}
if e.upgrader == nil {
@ -130,7 +134,7 @@ func (e *RemoteCommandExecutor) Execute() error {
}()
defer errorStream.Reset()
if doStdin {
if opts.Stdin {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err := conn.CreateStream(headers)
if err != nil {
@ -147,7 +151,7 @@ func (e *RemoteCommandExecutor) Execute() error {
waitCount := 0
completedStreams := 0
if doStdout {
if opts.Stdout {
waitCount++
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err := conn.CreateStream(headers)
@ -158,7 +162,7 @@ func (e *RemoteCommandExecutor) Execute() error {
go cp(api.StreamTypeStdout, e.stdout, remoteStdout)
}
if doStderr && !e.tty {
if opts.Stderr && !e.tty {
waitCount++
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err := conn.CreateStream(headers)

View File

@ -23,6 +23,7 @@ import (
"syscall"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/remotecommand"
cmdutil "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/cmd/util"
"github.com/docker/docker/pkg/term"
@ -39,36 +40,53 @@ $ kubectl exec -p 123456-7890 -c ruby-container -i -t -- bash -il`
)
func NewCmdExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) *cobra.Command {
params := &execParams{}
cmd := &cobra.Command{
Use: "exec -p POD -c CONTAINER -- COMMAND [args...]",
Short: "Execute a command in a container.",
Long: "Execute a command in a container.",
Example: exec_example,
Run: func(cmd *cobra.Command, args []string) {
err := RunExec(f, cmdIn, cmdOut, cmdErr, cmd, args)
err := RunExec(f, cmd, cmdIn, cmdOut, cmdErr, params, args, &defaultRemoteExecutor{})
cmdutil.CheckErr(err)
},
}
cmd.Flags().StringP("pod", "p", "", "Pod name")
cmd.Flags().StringVarP(&params.podName, "pod", "p", "", "Pod name")
cmd.MarkFlagRequired("pod")
// TODO support UID
cmd.Flags().StringP("container", "c", "", "Container name")
cmd.Flags().StringVarP(&params.containerName, "container", "c", "", "Container name")
cmd.MarkFlagRequired("container")
cmd.Flags().BoolP("stdin", "i", false, "Pass stdin to the container")
cmd.Flags().BoolP("tty", "t", false, "Stdin is a TTY")
cmd.Flags().BoolVarP(&params.stdin, "stdin", "i", false, "Pass stdin to the container")
cmd.Flags().BoolVarP(&params.tty, "tty", "t", false, "Stdin is a TTY")
return cmd
}
func RunExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd *cobra.Command, args []string) error {
podName := cmdutil.GetFlagString(cmd, "pod")
if len(podName) == 0 {
type remoteExecutor interface {
Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
}
type defaultRemoteExecutor struct{}
func (*defaultRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
executor := remotecommand.New(req, config, command, stdin, stdout, stderr, tty)
return executor.Execute()
}
type execParams struct {
podName string
containerName string
stdin bool
tty bool
}
func RunExec(f *cmdutil.Factory, cmd *cobra.Command, cmdIn io.Reader, cmdOut, cmdErr io.Writer, p *execParams, args []string, re remoteExecutor) error {
if len(p.podName) == 0 {
return cmdutil.UsageError(cmd, "POD is required for exec")
}
if len(args) < 1 {
return cmdutil.UsageError(cmd, "COMMAND is required for exec")
}
namespace, err := f.DefaultNamespace()
if err != nil {
return err
@ -79,7 +97,7 @@ func RunExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd
return err
}
pod, err := client.Pods(namespace).Get(podName)
pod, err := client.Pods(namespace).Get(p.podName)
if err != nil {
return err
}
@ -88,14 +106,14 @@ func RunExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd
glog.Fatalf("Unable to execute command because pod is not running. Current status=%v", pod.Status.Phase)
}
containerName := cmdutil.GetFlagString(cmd, "container")
containerName := p.containerName
if len(containerName) == 0 {
containerName = pod.Spec.Containers[0].Name
}
var stdin io.Reader
tty := cmdutil.GetFlagBool(cmd, "tty")
if cmdutil.GetFlagBool(cmd, "stdin") {
tty := p.tty
if p.stdin {
stdin = cmdIn
if tty {
if file, ok := cmdIn.(*os.File); ok {
@ -135,11 +153,11 @@ func RunExec(f *cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer, cmd
}
req := client.RESTClient.Get().
Prefix("proxy").
Resource("nodes").
Name(pod.Spec.Host).
Suffix("exec", namespace, podName, containerName)
Resource("pods").
Name(pod.Name).
Namespace(namespace).
SubResource("exec").
Param("container", containerName)
e := remotecommand.New(req, config, args, stdin, cmdOut, cmdErr, tty)
return e.Execute()
return re.Execute(req, config, args, stdin, cmdOut, cmdErr, tty)
}

View File

@ -0,0 +1,140 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"bytes"
"fmt"
"io"
"net/http"
"testing"
"github.com/spf13/cobra"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
)
type fakeRemoteExecutor struct {
req *client.Request
execErr error
}
func (f *fakeRemoteExecutor) Execute(req *client.Request, config *client.Config, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
f.req = req
return f.execErr
}
func TestExec(t *testing.T) {
tests := []struct {
name, version, podPath, execPath, container string
nsInQuery bool
pod *api.Pod
execErr bool
}{
{
name: "v1beta1 - pod exec",
version: "v1beta1",
podPath: "/api/v1beta1/pods/foo",
execPath: "/api/v1beta1/pods/foo/exec",
nsInQuery: true,
pod: execPod(),
},
{
name: "v1beta3 - pod exec",
version: "v1beta3",
podPath: "/api/v1beta3/namespaces/test/pods/foo",
execPath: "/api/v1beta3/namespaces/test/pods/foo/exec",
nsInQuery: false,
pod: execPod(),
},
{
name: "v1beta3 - pod exec error",
version: "v1beta3",
podPath: "/api/v1beta3/namespaces/test/pods/foo",
execPath: "/api/v1beta3/namespaces/test/pods/foo/exec",
nsInQuery: false,
pod: execPod(),
execErr: true,
},
}
for _, test := range tests {
f, tf, codec := NewAPIFactory()
tf.Client = &client.FakeRESTClient{
Codec: codec,
Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == test.podPath && m == "GET":
if test.nsInQuery {
if ns := req.URL.Query().Get("namespace"); ns != "test" {
t.Errorf("%s: did not get expected namespace: %s\n", test.name, ns)
}
}
body := objBody(codec, test.pod)
return &http.Response{StatusCode: 200, Body: body}, nil
default:
// Ensures no GET is performed when deleting by name
t.Errorf("%s: unexpected request: %#v\n%#v", test.name, req.URL, req)
return nil, nil
}
}),
}
tf.Namespace = "test"
tf.ClientConfig = &client.Config{Version: test.version}
bufOut := bytes.NewBuffer([]byte{})
bufErr := bytes.NewBuffer([]byte{})
bufIn := bytes.NewBuffer([]byte{})
ex := &fakeRemoteExecutor{}
if test.execErr {
ex.execErr = fmt.Errorf("exec error")
}
params := &execParams{
podName: "foo",
containerName: "bar",
}
cmd := &cobra.Command{}
err := RunExec(f, cmd, bufIn, bufOut, bufErr, params, []string{"test", "command"}, ex)
if test.execErr && err != ex.execErr {
t.Errorf("%s: Unexpected exec error: %v", test.name, err)
}
if !test.execErr && ex.req.URL().Path != test.execPath {
t.Errorf("%s: Did not get expected path for exec request", test.name)
}
if !test.execErr && err != nil {
t.Errorf("%s: Unexpected error: %v", test.name, err)
}
}
}
func execPod() *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "test", ResourceVersion: "10"},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
Containers: []api.Container{
{
Name: "bar",
},
},
},
Status: api.PodStatus{
Phase: api.PodRunning,
},
}
}

View File

@ -21,6 +21,7 @@ import (
"os/signal"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/portforward"
cmdutil "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/cmd/util"
"github.com/golang/glog"
@ -49,7 +50,7 @@ func NewCmdPortForward(f *cmdutil.Factory) *cobra.Command {
Long: "Forward one or more local ports to a pod.",
Example: portforward_example,
Run: func(cmd *cobra.Command, args []string) {
err := RunPortForward(f, cmd, args)
err := RunPortForward(f, cmd, args, &defaultPortForwarder{})
cmdutil.CheckErr(err)
},
}
@ -59,12 +60,25 @@ func NewCmdPortForward(f *cmdutil.Factory) *cobra.Command {
return cmd
}
func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string) error {
type portForwarder interface {
ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error
}
type defaultPortForwarder struct{}
func (*defaultPortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error {
fw, err := portforward.New(req, config, ports, stopChan)
if err != nil {
return err
}
return fw.ForwardPorts()
}
func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string, fw portForwarder) error {
podName := cmdutil.GetFlagString(cmd, "pod")
if len(podName) == 0 {
return cmdutil.UsageError(cmd, "POD is required for exec")
}
if len(args) < 1 {
return cmdutil.UsageError(cmd, "at least 1 PORT is required for port-forward")
}
@ -104,15 +118,10 @@ func RunPortForward(f *cmdutil.Factory, cmd *cobra.Command, args []string) error
}()
req := client.RESTClient.Get().
Prefix("proxy").
Resource("nodes").
Name(pod.Spec.Host).
Suffix("portForward", namespace, podName)
Resource("pods").
Namespace(namespace).
Name(pod.Name).
SubResource("portforward")
pf, err := portforward.New(req, config, args, stopCh)
if err != nil {
return err
}
return pf.ForwardPorts()
return fw.ForwardPorts(req, config, args, stopCh)
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"fmt"
"net/http"
"testing"
"github.com/spf13/cobra"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
)
type fakePortForwarder struct {
req *client.Request
pfErr error
}
func (f *fakePortForwarder) ForwardPorts(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) error {
f.req = req
return f.pfErr
}
func TestPortForward(t *testing.T) {
tests := []struct {
name, version, podPath, pfPath, container string
nsInQuery bool
pod *api.Pod
pfErr bool
}{
{
name: "v1beta1 - pod portforward",
version: "v1beta1",
podPath: "/api/v1beta1/pods/foo",
pfPath: "/api/v1beta1/pods/foo/portforward",
nsInQuery: true,
pod: execPod(),
},
{
name: "v1beta3 - pod portforward",
version: "v1beta3",
podPath: "/api/v1beta3/namespaces/test/pods/foo",
pfPath: "/api/v1beta3/namespaces/test/pods/foo/portforward",
nsInQuery: false,
pod: execPod(),
},
{
name: "v1beta3 - pod portforward error",
version: "v1beta3",
podPath: "/api/v1beta3/namespaces/test/pods/foo",
pfPath: "/api/v1beta3/namespaces/test/pods/foo/portforward",
nsInQuery: false,
pod: execPod(),
pfErr: true,
},
}
for _, test := range tests {
f, tf, codec := NewAPIFactory()
tf.Client = &client.FakeRESTClient{
Codec: codec,
Client: client.HTTPClientFunc(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == test.podPath && m == "GET":
if test.nsInQuery {
if ns := req.URL.Query().Get("namespace"); ns != "test" {
t.Errorf("%s: did not get expected namespace: %s\n", test.name, ns)
}
}
body := objBody(codec, test.pod)
return &http.Response{StatusCode: 200, Body: body}, nil
default:
// Ensures no GET is performed when deleting by name
t.Errorf("%s: unexpected request: %#v\n%#v", test.name, req.URL, req)
return nil, nil
}
}),
}
tf.Namespace = "test"
tf.ClientConfig = &client.Config{Version: test.version}
ff := &fakePortForwarder{}
if test.pfErr {
ff.pfErr = fmt.Errorf("pf error")
}
cmd := &cobra.Command{}
podPtr := cmd.Flags().StringP("pod", "p", "", "Pod name")
*podPtr = "foo"
err := RunPortForward(f, cmd, []string{":5000", ":1000"}, ff)
if test.pfErr && err != ff.pfErr {
t.Errorf("%s: Unexpected exec error: %v", test.name, err)
}
if !test.pfErr && ff.req.URL().Path != test.pfPath {
t.Errorf("%s: Did not get expected path for portforward request", test.name)
}
if !test.pfErr && err != nil {
t.Errorf("%s: Unexpected error: %v", test.name, err)
}
}
}