mirror of
https://github.com/kubernetes/client-go.git
synced 2025-07-19 17:39:56 +00:00
Move client/unversioned/remotecommand to client-go
Module remotecommand originally part of kubernetes/pkg/client/unversioned was moved to client-go/tools, and will be used as authoritative in kubectl, e2e and other places. Module remotecommand relies on util/exec module which will be copied to client-go/pkg/util Kubernetes-commit: 2612e0c78ad18ac87bbd200d547100cf99f36089
This commit is contained in:
parent
29b5bff0fa
commit
20e59c60f8
14
pkg/util/exec/BUILD
Normal file
14
pkg/util/exec/BUILD
Normal file
@ -0,0 +1,14 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["exec.go"],
|
||||
tags = ["automanaged"],
|
||||
)
|
188
pkg/util/exec/exec.go
Normal file
188
pkg/util/exec/exec.go
Normal file
@ -0,0 +1,188 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package exec
|
||||
|
||||
import (
|
||||
"io"
|
||||
osexec "os/exec"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ErrExecutableNotFound is returned if the executable is not found.
|
||||
var ErrExecutableNotFound = osexec.ErrNotFound
|
||||
|
||||
// Interface is an interface that presents a subset of the os/exec API. Use this
|
||||
// when you want to inject fakeable/mockable exec behavior.
|
||||
type Interface interface {
|
||||
// Command returns a Cmd instance which can be used to run a single command.
|
||||
// This follows the pattern of package os/exec.
|
||||
Command(cmd string, args ...string) Cmd
|
||||
|
||||
// LookPath wraps os/exec.LookPath
|
||||
LookPath(file string) (string, error)
|
||||
}
|
||||
|
||||
// Cmd is an interface that presents an API that is very similar to Cmd from os/exec.
|
||||
// As more functionality is needed, this can grow. Since Cmd is a struct, we will have
|
||||
// to replace fields with get/set method pairs.
|
||||
type Cmd interface {
|
||||
// CombinedOutput runs the command and returns its combined standard output
|
||||
// and standard error. This follows the pattern of package os/exec.
|
||||
CombinedOutput() ([]byte, error)
|
||||
// Output runs the command and returns standard output, but not standard err
|
||||
Output() ([]byte, error)
|
||||
SetDir(dir string)
|
||||
SetStdin(in io.Reader)
|
||||
SetStdout(out io.Writer)
|
||||
// Stops the command by sending SIGTERM. It is not guaranteed the
|
||||
// process will stop before this function returns. If the process is not
|
||||
// responding, an internal timer function will send a SIGKILL to force
|
||||
// terminate after 10 seconds.
|
||||
Stop()
|
||||
}
|
||||
|
||||
// ExitError is an interface that presents an API similar to os.ProcessState, which is
|
||||
// what ExitError from os/exec is. This is designed to make testing a bit easier and
|
||||
// probably loses some of the cross-platform properties of the underlying library.
|
||||
type ExitError interface {
|
||||
String() string
|
||||
Error() string
|
||||
Exited() bool
|
||||
ExitStatus() int
|
||||
}
|
||||
|
||||
// Implements Interface in terms of really exec()ing.
|
||||
type executor struct{}
|
||||
|
||||
// New returns a new Interface which will os/exec to run commands.
|
||||
func New() Interface {
|
||||
return &executor{}
|
||||
}
|
||||
|
||||
// Command is part of the Interface interface.
|
||||
func (executor *executor) Command(cmd string, args ...string) Cmd {
|
||||
return (*cmdWrapper)(osexec.Command(cmd, args...))
|
||||
}
|
||||
|
||||
// LookPath is part of the Interface interface
|
||||
func (executor *executor) LookPath(file string) (string, error) {
|
||||
return osexec.LookPath(file)
|
||||
}
|
||||
|
||||
// Wraps exec.Cmd so we can capture errors.
|
||||
type cmdWrapper osexec.Cmd
|
||||
|
||||
func (cmd *cmdWrapper) SetDir(dir string) {
|
||||
cmd.Dir = dir
|
||||
}
|
||||
|
||||
func (cmd *cmdWrapper) SetStdin(in io.Reader) {
|
||||
cmd.Stdin = in
|
||||
}
|
||||
|
||||
func (cmd *cmdWrapper) SetStdout(out io.Writer) {
|
||||
cmd.Stdout = out
|
||||
}
|
||||
|
||||
// CombinedOutput is part of the Cmd interface.
|
||||
func (cmd *cmdWrapper) CombinedOutput() ([]byte, error) {
|
||||
out, err := (*osexec.Cmd)(cmd).CombinedOutput()
|
||||
if err != nil {
|
||||
return out, handleError(err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (cmd *cmdWrapper) Output() ([]byte, error) {
|
||||
out, err := (*osexec.Cmd)(cmd).Output()
|
||||
if err != nil {
|
||||
return out, handleError(err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Stop is part of the Cmd interface.
|
||||
func (cmd *cmdWrapper) Stop() {
|
||||
c := (*osexec.Cmd)(cmd)
|
||||
if c.ProcessState.Exited() {
|
||||
return
|
||||
}
|
||||
c.Process.Signal(syscall.SIGTERM)
|
||||
time.AfterFunc(10*time.Second, func() {
|
||||
if c.ProcessState.Exited() {
|
||||
return
|
||||
}
|
||||
c.Process.Signal(syscall.SIGKILL)
|
||||
})
|
||||
}
|
||||
|
||||
func handleError(err error) error {
|
||||
if ee, ok := err.(*osexec.ExitError); ok {
|
||||
// Force a compile fail if exitErrorWrapper can't convert to ExitError.
|
||||
var x ExitError = &ExitErrorWrapper{ee}
|
||||
return x
|
||||
}
|
||||
if ee, ok := err.(*osexec.Error); ok {
|
||||
if ee.Err == osexec.ErrNotFound {
|
||||
return ErrExecutableNotFound
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ExitErrorWrapper is an implementation of ExitError in terms of os/exec ExitError.
|
||||
// Note: standard exec.ExitError is type *os.ProcessState, which already implements Exited().
|
||||
type ExitErrorWrapper struct {
|
||||
*osexec.ExitError
|
||||
}
|
||||
|
||||
var _ ExitError = ExitErrorWrapper{}
|
||||
|
||||
// ExitStatus is part of the ExitError interface.
|
||||
func (eew ExitErrorWrapper) ExitStatus() int {
|
||||
ws, ok := eew.Sys().(syscall.WaitStatus)
|
||||
if !ok {
|
||||
panic("can't call ExitStatus() on a non-WaitStatus exitErrorWrapper")
|
||||
}
|
||||
return ws.ExitStatus()
|
||||
}
|
||||
|
||||
// CodeExitError is an implementation of ExitError consisting of an error object
|
||||
// and an exit code (the upper bits of os.exec.ExitStatus).
|
||||
type CodeExitError struct {
|
||||
Err error
|
||||
Code int
|
||||
}
|
||||
|
||||
var _ ExitError = CodeExitError{}
|
||||
|
||||
func (e CodeExitError) Error() string {
|
||||
return e.Err.Error()
|
||||
}
|
||||
|
||||
func (e CodeExitError) String() string {
|
||||
return e.Err.Error()
|
||||
}
|
||||
|
||||
func (e CodeExitError) Exited() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (e CodeExitError) ExitStatus() int {
|
||||
return e.Code
|
||||
}
|
51
tools/remotecommand/BUILD
Normal file
51
tools/remotecommand/BUILD
Normal file
@ -0,0 +1,51 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"v2_test.go",
|
||||
"v4_test.go",
|
||||
],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/httpstream:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/pkg/api:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"errorstream.go",
|
||||
"remotecommand.go",
|
||||
"resize.go",
|
||||
"v1.go",
|
||||
"v2.go",
|
||||
"v3.go",
|
||||
"v4.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/httpstream:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/client-go/pkg/api:go_default_library",
|
||||
"//vendor/k8s.io/client-go/pkg/util/exec:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/k8s.io/client-go/transport:go_default_library",
|
||||
],
|
||||
)
|
20
tools/remotecommand/doc.go
Normal file
20
tools/remotecommand/doc.go
Normal file
@ -0,0 +1,20 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package remotecommand adds support for executing commands in containers,
|
||||
// with support for separate stdin, stdout, and stderr streams, as well as
|
||||
// TTY.
|
||||
package remotecommand // import "k8s.io/client-go/tools/remotecommand"
|
55
tools/remotecommand/errorstream.go
Normal file
55
tools/remotecommand/errorstream.go
Normal file
@ -0,0 +1,55 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
)
|
||||
|
||||
// errorStreamDecoder interprets the data on the error channel and creates a go error object from it.
|
||||
type errorStreamDecoder interface {
|
||||
decode(message []byte) error
|
||||
}
|
||||
|
||||
// watchErrorStream watches the errorStream for remote command error data,
|
||||
// decodes it with the given errorStreamDecoder, sends the decoded error (or nil if the remote
|
||||
// command exited successfully) to the returned error channel, and closes it.
|
||||
// This function returns immediately.
|
||||
func watchErrorStream(errorStream io.Reader, d errorStreamDecoder) chan error {
|
||||
errorChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
message, err := ioutil.ReadAll(errorStream)
|
||||
switch {
|
||||
case err != nil && err != io.EOF:
|
||||
errorChan <- fmt.Errorf("error reading from error stream: %s", err)
|
||||
case len(message) > 0:
|
||||
errorChan <- d.decode(message)
|
||||
default:
|
||||
errorChan <- nil
|
||||
}
|
||||
close(errorChan)
|
||||
}()
|
||||
|
||||
return errorChan
|
||||
}
|
178
tools/remotecommand/remotecommand.go
Normal file
178
tools/remotecommand/remotecommand.go
Normal file
@ -0,0 +1,178 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
|
||||
"k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport"
|
||||
)
|
||||
|
||||
// StreamOptions holds information pertaining to the current streaming session: supported stream
|
||||
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
|
||||
// support terminal resizing.
|
||||
type StreamOptions struct {
|
||||
SupportedProtocols []string
|
||||
Stdin io.Reader
|
||||
Stdout io.Writer
|
||||
Stderr io.Writer
|
||||
Tty bool
|
||||
TerminalSizeQueue TerminalSizeQueue
|
||||
}
|
||||
|
||||
// Executor is an interface for transporting shell-style streams.
|
||||
type Executor interface {
|
||||
// Stream initiates the transport of the standard shell streams. It will transport any
|
||||
// non-nil stream to a remote system, and return an error if a problem occurs. If tty
|
||||
// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
|
||||
// stdout stream).
|
||||
Stream(options StreamOptions) error
|
||||
}
|
||||
|
||||
// StreamExecutor supports the ability to dial an httpstream connection and the ability to
|
||||
// run a command line stream protocol over that dialer.
|
||||
type StreamExecutor interface {
|
||||
Executor
|
||||
httpstream.Dialer
|
||||
}
|
||||
|
||||
// streamExecutor handles transporting standard shell streams over an httpstream connection.
|
||||
type streamExecutor struct {
|
||||
upgrader httpstream.UpgradeRoundTripper
|
||||
transport http.RoundTripper
|
||||
|
||||
method string
|
||||
url *url.URL
|
||||
}
|
||||
|
||||
// NewExecutor connects to the provided server and upgrades the connection to
|
||||
// multiplexed bidirectional streams. The current implementation uses SPDY,
|
||||
// but this could be replaced with HTTP/2 once it's available, or something else.
|
||||
// TODO: the common code between this and portforward could be abstracted.
|
||||
func NewExecutor(config *restclient.Config, method string, url *url.URL) (StreamExecutor, error) {
|
||||
tlsConfig, err := restclient.TLSConfigFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
upgradeRoundTripper := spdy.NewRoundTripper(tlsConfig, true)
|
||||
wrapper, err := restclient.HTTPWrappersForConfig(config, upgradeRoundTripper)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &streamExecutor{
|
||||
upgrader: upgradeRoundTripper,
|
||||
transport: wrapper,
|
||||
method: method,
|
||||
url: url,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// NewStreamExecutor upgrades the request so that it supports multiplexed bidirectional
|
||||
// streams. This method takes a stream upgrader and an optional function that is invoked
|
||||
// to wrap the round tripper. This method may be used by clients that are lower level than
|
||||
// Kubernetes clients or need to provide their own upgrade round tripper.
|
||||
func NewStreamExecutor(upgrader httpstream.UpgradeRoundTripper, fn func(http.RoundTripper) http.RoundTripper, method string, url *url.URL) (StreamExecutor, error) {
|
||||
rt := http.RoundTripper(upgrader)
|
||||
if fn != nil {
|
||||
rt = fn(rt)
|
||||
}
|
||||
return &streamExecutor{
|
||||
upgrader: upgrader,
|
||||
transport: rt,
|
||||
method: method,
|
||||
url: url,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Dial opens a connection to a remote server and attempts to negotiate a SPDY
|
||||
// connection. Upon success, it returns the connection and the protocol
|
||||
// selected by the server.
|
||||
func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, string, error) {
|
||||
rt := transport.DebugWrappers(e.transport)
|
||||
|
||||
// TODO the client probably shouldn't be created here, as it doesn't allow
|
||||
// flexibility to allow callers to configure it.
|
||||
client := &http.Client{Transport: rt}
|
||||
|
||||
req, err := http.NewRequest(e.method, e.url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("error creating request: %v", err)
|
||||
}
|
||||
for i := range protocols {
|
||||
req.Header.Add(httpstream.HeaderProtocolVersion, protocols[i])
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("error sending request: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
conn, err := e.upgrader.NewConnection(resp)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
|
||||
}
|
||||
|
||||
type streamCreator interface {
|
||||
CreateStream(headers http.Header) (httpstream.Stream, error)
|
||||
}
|
||||
|
||||
type streamProtocolHandler interface {
|
||||
stream(conn streamCreator) error
|
||||
}
|
||||
|
||||
// Stream opens a protocol streamer to the server and streams until a client closes
|
||||
// the connection or the server disconnects.
|
||||
func (e *streamExecutor) Stream(options StreamOptions) error {
|
||||
conn, protocol, err := e.Dial(options.SupportedProtocols...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
var streamer streamProtocolHandler
|
||||
|
||||
switch protocol {
|
||||
case remotecommand.StreamProtocolV4Name:
|
||||
streamer = newStreamProtocolV4(options)
|
||||
case remotecommand.StreamProtocolV3Name:
|
||||
streamer = newStreamProtocolV3(options)
|
||||
case remotecommand.StreamProtocolV2Name:
|
||||
streamer = newStreamProtocolV2(options)
|
||||
case "":
|
||||
glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
|
||||
fallthrough
|
||||
case remotecommand.StreamProtocolV1Name:
|
||||
streamer = newStreamProtocolV1(options)
|
||||
}
|
||||
|
||||
return streamer.stream(conn)
|
||||
}
|
33
tools/remotecommand/resize.go
Normal file
33
tools/remotecommand/resize.go
Normal file
@ -0,0 +1,33 @@
|
||||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
// TerminalSize and TerminalSizeQueue was a part of k8s.io/kubernetes/pkg/util/term
|
||||
// and were moved in order to decouple client from other term dependencies
|
||||
|
||||
// TerminalSize represents the width and height of a terminal.
|
||||
type TerminalSize struct {
|
||||
Width uint16
|
||||
Height uint16
|
||||
}
|
||||
|
||||
// TerminalSizeQueue is capable of returning terminal resize events as they occur.
|
||||
type TerminalSizeQueue interface {
|
||||
// Next returns the new terminal size after the terminal has been resized. It returns nil when
|
||||
// monitoring has been stopped.
|
||||
Next() *TerminalSize
|
||||
}
|
160
tools/remotecommand/v1.go
Normal file
160
tools/remotecommand/v1.go
Normal file
@ -0,0 +1,160 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
)
|
||||
|
||||
// streamProtocolV1 implements the first version of the streaming exec & attach
|
||||
// protocol. This version has some bugs, such as not being able to detect when
|
||||
// non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
|
||||
// http://issues.k8s.io/13395 for more details.
|
||||
type streamProtocolV1 struct {
|
||||
StreamOptions
|
||||
|
||||
errorStream httpstream.Stream
|
||||
remoteStdin httpstream.Stream
|
||||
remoteStdout httpstream.Stream
|
||||
remoteStderr httpstream.Stream
|
||||
}
|
||||
|
||||
var _ streamProtocolHandler = &streamProtocolV1{}
|
||||
|
||||
func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
|
||||
return &streamProtocolV1{
|
||||
StreamOptions: options,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV1) stream(conn streamCreator) error {
|
||||
doneChan := make(chan struct{}, 2)
|
||||
errorChan := make(chan error)
|
||||
|
||||
cp := func(s string, dst io.Writer, src io.Reader) {
|
||||
glog.V(6).Infof("Copying %s", s)
|
||||
defer glog.V(6).Infof("Done copying %s", s)
|
||||
if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
|
||||
glog.Errorf("Error copying %s: %v", s, err)
|
||||
}
|
||||
if s == api.StreamTypeStdout || s == api.StreamTypeStderr {
|
||||
doneChan <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// set up all the streams first
|
||||
var err error
|
||||
headers := http.Header{}
|
||||
headers.Set(api.StreamType, api.StreamTypeError)
|
||||
p.errorStream, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer p.errorStream.Reset()
|
||||
|
||||
// Create all the streams first, then start the copy goroutines. The server doesn't start its copy
|
||||
// goroutines until it's received all of the streams. If the client creates the stdin stream and
|
||||
// immediately begins copying stdin data to the server, it's possible to overwhelm and wedge the
|
||||
// spdy frame handler in the server so that it is full of unprocessed frames. The frames aren't
|
||||
// getting processed because the server hasn't started its copying, and it won't do that until it
|
||||
// gets all the streams. By creating all the streams first, we ensure that the server is ready to
|
||||
// process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
|
||||
if p.Stdin != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdin)
|
||||
p.remoteStdin, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer p.remoteStdin.Reset()
|
||||
}
|
||||
|
||||
if p.Stdout != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdout)
|
||||
p.remoteStdout, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer p.remoteStdout.Reset()
|
||||
}
|
||||
|
||||
if p.Stderr != nil && !p.Tty {
|
||||
headers.Set(api.StreamType, api.StreamTypeStderr)
|
||||
p.remoteStderr, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer p.remoteStderr.Reset()
|
||||
}
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
// always read from errorStream
|
||||
go func() {
|
||||
message, err := ioutil.ReadAll(p.errorStream)
|
||||
if err != nil && err != io.EOF {
|
||||
errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
|
||||
return
|
||||
}
|
||||
if len(message) > 0 {
|
||||
errorChan <- fmt.Errorf("Error executing remote command: %s", message)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
if p.Stdin != nil {
|
||||
// TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
|
||||
// because stdin is not closed until the process exits. If we try to call
|
||||
// stdin.Close(), it returns no error but doesn't unblock the copy. It will
|
||||
// exit when the process exits, instead.
|
||||
go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin)
|
||||
}
|
||||
|
||||
waitCount := 0
|
||||
completedStreams := 0
|
||||
|
||||
if p.Stdout != nil {
|
||||
waitCount++
|
||||
go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout)
|
||||
}
|
||||
|
||||
if p.Stderr != nil && !p.Tty {
|
||||
waitCount++
|
||||
go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr)
|
||||
}
|
||||
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-doneChan:
|
||||
completedStreams++
|
||||
if completedStreams == waitCount {
|
||||
break Loop
|
||||
}
|
||||
case err := <-errorChan:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
195
tools/remotecommand/v2.go
Normal file
195
tools/remotecommand/v2.go
Normal file
@ -0,0 +1,195 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
)
|
||||
|
||||
// streamProtocolV2 implements version 2 of the streaming protocol for attach
|
||||
// and exec. The original streaming protocol was metav1. As a result, this
|
||||
// version is referred to as version 2, even though it is the first actual
|
||||
// numbered version.
|
||||
type streamProtocolV2 struct {
|
||||
StreamOptions
|
||||
|
||||
errorStream io.Reader
|
||||
remoteStdin io.ReadWriteCloser
|
||||
remoteStdout io.Reader
|
||||
remoteStderr io.Reader
|
||||
}
|
||||
|
||||
var _ streamProtocolHandler = &streamProtocolV2{}
|
||||
|
||||
func newStreamProtocolV2(options StreamOptions) streamProtocolHandler {
|
||||
return &streamProtocolV2{
|
||||
StreamOptions: options,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) createStreams(conn streamCreator) error {
|
||||
var err error
|
||||
headers := http.Header{}
|
||||
|
||||
// set up error stream
|
||||
headers.Set(api.StreamType, api.StreamTypeError)
|
||||
p.errorStream, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// set up stdin stream
|
||||
if p.Stdin != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdin)
|
||||
p.remoteStdin, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// set up stdout stream
|
||||
if p.Stdout != nil {
|
||||
headers.Set(api.StreamType, api.StreamTypeStdout)
|
||||
p.remoteStdout, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// set up stderr stream
|
||||
if p.Stderr != nil && !p.Tty {
|
||||
headers.Set(api.StreamType, api.StreamTypeStderr)
|
||||
p.remoteStderr, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStdin() {
|
||||
if p.Stdin != nil {
|
||||
var once sync.Once
|
||||
|
||||
// copy from client's stdin to container's stdin
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
// if p.stdin is noninteractive, p.g. `echo abc | kubectl exec -i <pod> -- cat`, make sure
|
||||
// we close remoteStdin as soon as the copy from p.stdin to remoteStdin finishes. Otherwise
|
||||
// the executed command will remain running.
|
||||
defer once.Do(func() { p.remoteStdin.Close() })
|
||||
|
||||
if _, err := io.Copy(p.remoteStdin, p.Stdin); err != nil {
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
|
||||
// read from remoteStdin until the stream is closed. this is essential to
|
||||
// be able to exit interactive sessions cleanly and not leak goroutines or
|
||||
// hang the client's terminal.
|
||||
//
|
||||
// TODO we aren't using go-dockerclient any more; revisit this to determine if it's still
|
||||
// required by engine-api.
|
||||
//
|
||||
// go-dockerclient's current hijack implementation
|
||||
// (https://github.com/fsouza/go-dockerclient/blob/89f3d56d93788dfe85f864a44f85d9738fca0670/client.go#L564)
|
||||
// waits for all three streams (stdin/stdout/stderr) to finish copying
|
||||
// before returning. When hijack finishes copying stdout/stderr, it calls
|
||||
// Close() on its side of remoteStdin, which allows this copy to complete.
|
||||
// When that happens, we must Close() on our side of remoteStdin, to
|
||||
// allow the copy in hijack to complete, and hijack to return.
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer once.Do(func() { p.remoteStdin.Close() })
|
||||
|
||||
// this "copy" doesn't actually read anything - it's just here to wait for
|
||||
// the server to close remoteStdin.
|
||||
if _, err := io.Copy(ioutil.Discard, p.remoteStdin); err != nil {
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStdout(wg *sync.WaitGroup) {
|
||||
if p.Stdout == nil {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer wg.Done()
|
||||
|
||||
if _, err := io.Copy(p.Stdout, p.remoteStdout); err != nil {
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
|
||||
if p.Stderr == nil || p.Tty {
|
||||
return
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
defer wg.Done()
|
||||
|
||||
if _, err := io.Copy(p.Stderr, p.remoteStderr); err != nil {
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV2) stream(conn streamCreator) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})
|
||||
|
||||
p.copyStdin()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
// waits for errorStream to finish reading with an error or nil
|
||||
return <-errorChan
|
||||
}
|
||||
|
||||
// errorDecoderV2 interprets the error channel data as plain text.
|
||||
type errorDecoderV2 struct{}
|
||||
|
||||
func (d *errorDecoderV2) decode(message []byte) error {
|
||||
return fmt.Errorf("error executing remote command: %s", message)
|
||||
}
|
228
tools/remotecommand/v2_test.go
Normal file
228
tools/remotecommand/v2_test.go
Normal file
@ -0,0 +1,228 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
)
|
||||
|
||||
type fakeReader struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (r *fakeReader) Read([]byte) (int, error) { return 0, r.err }
|
||||
|
||||
type fakeWriter struct{}
|
||||
|
||||
func (*fakeWriter) Write([]byte) (int, error) { return 0, nil }
|
||||
|
||||
type fakeStreamCreator struct {
|
||||
created map[string]bool
|
||||
errors map[string]error
|
||||
}
|
||||
|
||||
var _ streamCreator = &fakeStreamCreator{}
|
||||
|
||||
func (f *fakeStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) {
|
||||
streamType := headers.Get(api.StreamType)
|
||||
f.created[streamType] = true
|
||||
return nil, f.errors[streamType]
|
||||
}
|
||||
|
||||
func TestV2CreateStreams(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
stdin bool
|
||||
stdinError error
|
||||
stdout bool
|
||||
stdoutError error
|
||||
stderr bool
|
||||
stderrError error
|
||||
errorError error
|
||||
tty bool
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "stdin error",
|
||||
stdin: true,
|
||||
stdinError: errors.New("stdin error"),
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "stdout error",
|
||||
stdout: true,
|
||||
stdoutError: errors.New("stdout error"),
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "stderr error",
|
||||
stderr: true,
|
||||
stderrError: errors.New("stderr error"),
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "error stream error",
|
||||
stdin: true,
|
||||
stdout: true,
|
||||
stderr: true,
|
||||
errorError: errors.New("error stream error"),
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "no errors",
|
||||
stdin: true,
|
||||
stdout: true,
|
||||
stderr: true,
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
name: "no errors, stderr & tty set, don't expect stderr",
|
||||
stdin: true,
|
||||
stdout: true,
|
||||
stderr: true,
|
||||
tty: true,
|
||||
expectError: false,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
conn := &fakeStreamCreator{
|
||||
created: make(map[string]bool),
|
||||
errors: map[string]error{
|
||||
api.StreamTypeStdin: test.stdinError,
|
||||
api.StreamTypeStdout: test.stdoutError,
|
||||
api.StreamTypeStderr: test.stderrError,
|
||||
api.StreamTypeError: test.errorError,
|
||||
},
|
||||
}
|
||||
|
||||
opts := StreamOptions{Tty: test.tty}
|
||||
if test.stdin {
|
||||
opts.Stdin = &fakeReader{}
|
||||
}
|
||||
if test.stdout {
|
||||
opts.Stdout = &fakeWriter{}
|
||||
}
|
||||
if test.stderr {
|
||||
opts.Stderr = &fakeWriter{}
|
||||
}
|
||||
|
||||
h := newStreamProtocolV2(opts).(*streamProtocolV2)
|
||||
err := h.createStreams(conn)
|
||||
|
||||
if test.expectError {
|
||||
if err == nil {
|
||||
t.Errorf("%s: expected error", test.name)
|
||||
continue
|
||||
}
|
||||
if e, a := test.stdinError, err; test.stdinError != nil && e != a {
|
||||
t.Errorf("%s: expected %v, got %v", test.name, e, a)
|
||||
}
|
||||
if e, a := test.stdoutError, err; test.stdoutError != nil && e != a {
|
||||
t.Errorf("%s: expected %v, got %v", test.name, e, a)
|
||||
}
|
||||
if e, a := test.stderrError, err; test.stderrError != nil && e != a {
|
||||
t.Errorf("%s: expected %v, got %v", test.name, e, a)
|
||||
}
|
||||
if e, a := test.errorError, err; test.errorError != nil && e != a {
|
||||
t.Errorf("%s: expected %v, got %v", test.name, e, a)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if !test.expectError && err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if test.stdin && !conn.created[api.StreamTypeStdin] {
|
||||
t.Errorf("%s: expected stdin stream", test.name)
|
||||
}
|
||||
if test.stdout && !conn.created[api.StreamTypeStdout] {
|
||||
t.Errorf("%s: expected stdout stream", test.name)
|
||||
}
|
||||
if test.stderr {
|
||||
if test.tty && conn.created[api.StreamTypeStderr] {
|
||||
t.Errorf("%s: unexpected stderr stream because tty is set", test.name)
|
||||
} else if !test.tty && !conn.created[api.StreamTypeStderr] {
|
||||
t.Errorf("%s: expected stderr stream", test.name)
|
||||
}
|
||||
}
|
||||
if !conn.created[api.StreamTypeError] {
|
||||
t.Errorf("%s: expected error stream", test.name)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func TestV2ErrorStreamReading(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
stream io.Reader
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "error reading from stream",
|
||||
stream: &fakeReader{errors.New("foo")},
|
||||
expectedError: errors.New("error reading from error stream: foo"),
|
||||
},
|
||||
{
|
||||
name: "stream returns an error",
|
||||
stream: strings.NewReader("some error"),
|
||||
expectedError: errors.New("error executing remote command: some error"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
h := newStreamProtocolV2(StreamOptions{}).(*streamProtocolV2)
|
||||
h.errorStream = test.stream
|
||||
|
||||
ch := watchErrorStream(h.errorStream, &errorDecoderV2{})
|
||||
if ch == nil {
|
||||
t.Fatalf("%s: unexpected nil channel", test.name)
|
||||
}
|
||||
|
||||
var err error
|
||||
select {
|
||||
case err = <-ch:
|
||||
case <-time.After(wait.ForeverTestTimeout):
|
||||
t.Fatalf("%s: timed out", test.name)
|
||||
}
|
||||
|
||||
if test.expectedError != nil {
|
||||
if err == nil {
|
||||
t.Errorf("%s: expected an error", test.name)
|
||||
} else if e, a := test.expectedError, err; e.Error() != a.Error() {
|
||||
t.Errorf("%s: expected %q, got %q", test.name, e, a)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if test.expectedError == nil && err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.name, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
111
tools/remotecommand/v3.go
Normal file
111
tools/remotecommand/v3.go
Normal file
@ -0,0 +1,111 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/pkg/api"
|
||||
)
|
||||
|
||||
// streamProtocolV3 implements version 3 of the streaming protocol for attach
|
||||
// and exec. This version adds support for resizing the container's terminal.
|
||||
type streamProtocolV3 struct {
|
||||
*streamProtocolV2
|
||||
|
||||
resizeStream io.Writer
|
||||
}
|
||||
|
||||
var _ streamProtocolHandler = &streamProtocolV3{}
|
||||
|
||||
func newStreamProtocolV3(options StreamOptions) streamProtocolHandler {
|
||||
return &streamProtocolV3{
|
||||
streamProtocolV2: newStreamProtocolV2(options).(*streamProtocolV2),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV3) createStreams(conn streamCreator) error {
|
||||
// set up the streams from v2
|
||||
if err := p.streamProtocolV2.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// set up resize stream
|
||||
if p.Tty {
|
||||
headers := http.Header{}
|
||||
headers.Set(api.StreamType, api.StreamTypeResize)
|
||||
var err error
|
||||
p.resizeStream, err = conn.CreateStream(headers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *streamProtocolV3) handleResizes() {
|
||||
if p.resizeStream == nil || p.TerminalSizeQueue == nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
|
||||
encoder := json.NewEncoder(p.resizeStream)
|
||||
for {
|
||||
size := p.TerminalSizeQueue.Next()
|
||||
if size == nil {
|
||||
return
|
||||
}
|
||||
if err := encoder.Encode(&size); err != nil {
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV3) stream(conn streamCreator) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})
|
||||
|
||||
p.handleResizes()
|
||||
|
||||
p.copyStdin()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
// waits for errorStream to finish reading with an error or nil
|
||||
return <-errorChan
|
||||
}
|
||||
|
||||
type errorDecoderV3 struct {
|
||||
errorDecoderV2
|
||||
}
|
119
tools/remotecommand/v4.go
Normal file
119
tools/remotecommand/v4.go
Normal file
@ -0,0 +1,119 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
"k8s.io/client-go/pkg/util/exec"
|
||||
)
|
||||
|
||||
// streamProtocolV4 implements version 4 of the streaming protocol for attach
|
||||
// and exec. This version adds support for exit codes on the error stream through
|
||||
// the use of metav1.Status instead of plain text messages.
|
||||
type streamProtocolV4 struct {
|
||||
*streamProtocolV3
|
||||
}
|
||||
|
||||
var _ streamProtocolHandler = &streamProtocolV4{}
|
||||
|
||||
func newStreamProtocolV4(options StreamOptions) streamProtocolHandler {
|
||||
return &streamProtocolV4{
|
||||
streamProtocolV3: newStreamProtocolV3(options).(*streamProtocolV3),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV4) createStreams(conn streamCreator) error {
|
||||
return p.streamProtocolV3.createStreams(conn)
|
||||
}
|
||||
|
||||
func (p *streamProtocolV4) handleResizes() {
|
||||
p.streamProtocolV3.handleResizes()
|
||||
}
|
||||
|
||||
func (p *streamProtocolV4) stream(conn streamCreator) error {
|
||||
if err := p.createStreams(conn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// now that all the streams have been created, proceed with reading & copying
|
||||
|
||||
errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})
|
||||
|
||||
p.handleResizes()
|
||||
|
||||
p.copyStdin()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
p.copyStdout(&wg)
|
||||
p.copyStderr(&wg)
|
||||
|
||||
// we're waiting for stdout/stderr to finish copying
|
||||
wg.Wait()
|
||||
|
||||
// waits for errorStream to finish reading with an error or nil
|
||||
return <-errorChan
|
||||
}
|
||||
|
||||
// errorDecoderV4 interprets the json-marshaled metav1.Status on the error channel
|
||||
// and creates an exec.ExitError from it.
|
||||
type errorDecoderV4 struct{}
|
||||
|
||||
func (d *errorDecoderV4) decode(message []byte) error {
|
||||
status := metav1.Status{}
|
||||
err := json.Unmarshal(message, &status)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error stream protocol error: %v in %q", err, string(message))
|
||||
}
|
||||
switch status.Status {
|
||||
case metav1.StatusSuccess:
|
||||
return nil
|
||||
case metav1.StatusFailure:
|
||||
if status.Reason == remotecommand.NonZeroExitCodeReason {
|
||||
if status.Details == nil {
|
||||
return errors.New("error stream protocol error: details must be set")
|
||||
}
|
||||
for i := range status.Details.Causes {
|
||||
c := &status.Details.Causes[i]
|
||||
if c.Type != remotecommand.ExitCodeCauseType {
|
||||
continue
|
||||
}
|
||||
|
||||
rc, err := strconv.ParseUint(c.Message, 10, 8)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error stream protocol error: invalid exit code value %q", c.Message)
|
||||
}
|
||||
return exec.CodeExitError{
|
||||
Err: fmt.Errorf("command terminated with exit code %d", rc),
|
||||
Code: int(rc),
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("error stream protocol error: no %s cause given", remotecommand.ExitCodeCauseType)
|
||||
}
|
||||
default:
|
||||
return errors.New("error stream protocol error: unknown error")
|
||||
}
|
||||
|
||||
return fmt.Errorf(status.Message)
|
||||
}
|
71
tools/remotecommand/v4_test.go
Normal file
71
tools/remotecommand/v4_test.go
Normal file
@ -0,0 +1,71 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestV4ErrorDecoder(t *testing.T) {
|
||||
dec := errorDecoderV4{}
|
||||
|
||||
type Test struct {
|
||||
message string
|
||||
err string
|
||||
}
|
||||
|
||||
for _, test := range []Test{
|
||||
{
|
||||
message: "{}",
|
||||
err: "error stream protocol error: unknown error",
|
||||
},
|
||||
{
|
||||
message: "{",
|
||||
err: "error stream protocol error: unexpected end of JSON input in \"{\"",
|
||||
},
|
||||
{
|
||||
message: `{"status": "Success" }`,
|
||||
err: "",
|
||||
},
|
||||
{
|
||||
message: `{"status": "Failure", "message": "foobar" }`,
|
||||
err: "foobar",
|
||||
},
|
||||
{
|
||||
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "foo"}] } }`,
|
||||
err: "error stream protocol error: no ExitCode cause given",
|
||||
},
|
||||
{
|
||||
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "ExitCode"}] } }`,
|
||||
err: "error stream protocol error: invalid exit code value \"\"",
|
||||
},
|
||||
{
|
||||
message: `{"status": "Failure", "message": "foobar", "reason": "NonZeroExitCode", "details": {"causes": [{"reason": "ExitCode", "message": "42"}] } }`,
|
||||
err: "command terminated with exit code 42",
|
||||
},
|
||||
} {
|
||||
err := dec.decode([]byte(test.message))
|
||||
want := test.err
|
||||
if want == "" {
|
||||
want = "<nil>"
|
||||
}
|
||||
if got := fmt.Sprintf("%v", err); got != want {
|
||||
t.Errorf("wrong error for message %q: want=%q, got=%q", test.message, want, got)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user