Merge pull request #16451 from ncdc/exec-interop-testing

Automatic merge from submit-queue

Refactor streaming code to support interop testing

Refactor exec/attach/port forward client and server code to better
support interop testing of different client and server subprotocol
versions.

Fixes #16119
This commit is contained in:
k8s-merge-robot
2016-04-01 17:11:26 -07:00
14 changed files with 894 additions and 804 deletions

View File

@@ -43,12 +43,12 @@ import (
"k8s.io/kubernetes/pkg/api/validation"
"k8s.io/kubernetes/pkg/auth/authenticator"
"k8s.io/kubernetes/pkg/auth/authorizer"
"k8s.io/kubernetes/pkg/client/unversioned/remotecommand"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
@@ -58,7 +58,6 @@ import (
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/limitwriter"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wsstream"
"k8s.io/kubernetes/pkg/volume"
)
@@ -540,12 +539,7 @@ func getContainerCoordinates(request *restful.Request) (namespace, pod string, u
return
}
const defaultStreamCreationTimeout = 30 * time.Second
type Closer interface {
Close() error
}
// getAttach handles requests to attach to a container.
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
@@ -554,21 +548,35 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
return
}
stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(request, response)
if conn != nil {
defer conn.Close()
}
remotecommand.ServeAttach(response.ResponseWriter,
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
uid,
container,
s.host.StreamingConnectionIdleTimeout(),
remotecommand.DefaultStreamCreationTimeout,
remotecommand.SupportedStreamingProtocols)
}
// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
// error is handled in the createStreams function
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
err := s.host.AttachContainer(kubecontainer.GetPodFullName(pod), uid, container, stdinStream, stdoutStream, stderrStream, tty)
if err != nil {
msg := fmt.Sprintf("Error executing command in container: %v", err)
glog.Error(msg)
errorStream.Write([]byte(msg))
}
remotecommand.ServeExec(response.ResponseWriter,
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
uid,
container,
s.host.StreamingConnectionIdleTimeout(),
remotecommand.DefaultStreamCreationTimeout,
remotecommand.SupportedStreamingProtocols)
}
// getRun handles requests to run a command inside a container.
@@ -588,187 +596,6 @@ func (s *Server) getRun(request *restful.Request, response *restful.Response) {
writeJsonResponse(response, data)
}
// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(request, response)
if conn != nil {
defer conn.Close()
}
if !ok {
// error is handled in the createStreams function
return
}
cmd := request.Request.URL.Query()[api.ExecCommandParamm]
err := s.host.ExecInContainer(kubecontainer.GetPodFullName(pod), uid, container, cmd, stdinStream, stdoutStream, stderrStream, tty)
if err != nil {
msg := fmt.Sprintf("Error executing command in container: %v", err)
glog.Error(msg)
errorStream.Write([]byte(msg))
}
}
// standardShellChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
// along with the approprxate duplex value
func standardShellChannels(stdin, stdout, stderr bool) []wsstream.ChannelType {
// open three half-duplex channels
channels := []wsstream.ChannelType{wsstream.ReadChannel, wsstream.WriteChannel, wsstream.WriteChannel}
if !stdin {
channels[0] = wsstream.IgnoreChannel
}
if !stdout {
channels[1] = wsstream.IgnoreChannel
}
if !stderr {
channels[2] = wsstream.IgnoreChannel
}
return channels
}
// streamAndReply holds both a Stream and a channel that is closed when the stream's reply frame is
// enqueued. Consumers can wait for replySent to be closed prior to proceeding, to ensure that the
// replyFrame is enqueued before the connection's goaway frame is sent (e.g. if a stream was
// received and right after, the connection gets closed).
type streamAndReply struct {
httpstream.Stream
replySent <-chan struct{}
}
func (s *Server) createStreams(request *restful.Request, response *restful.Response) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, Closer, bool, bool) {
tty := request.QueryParameter(api.ExecTTYParam) == "1"
stdin := request.QueryParameter(api.ExecStdinParam) == "1"
stdout := request.QueryParameter(api.ExecStdoutParam) == "1"
stderr := request.QueryParameter(api.ExecStderrParam) == "1"
if tty && stderr {
// TODO: make this an error before we reach this method
glog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr")
stderr = false
}
// count the streams client asked for, starting with 1
expectedStreams := 1
if stdin {
expectedStreams++
}
if stdout {
expectedStreams++
}
if stderr {
expectedStreams++
}
if expectedStreams == 1 {
response.WriteError(http.StatusBadRequest, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr"))
return nil, nil, nil, nil, nil, false, false
}
if wsstream.IsWebSocketRequest(request.Request) {
// open the requested channels, and always open the error channel
channels := append(standardShellChannels(stdin, stdout, stderr), wsstream.WriteChannel)
conn := wsstream.NewConn(channels...)
conn.SetIdleTimeout(s.host.StreamingConnectionIdleTimeout())
streams, err := conn.Open(httplog.Unlogged(response.ResponseWriter), request.Request)
if err != nil {
glog.Errorf("Unable to upgrade websocket connection: %v", err)
return nil, nil, nil, nil, nil, false, false
}
// Send an empty message to the lowest writable channel to notify the client the connection is established
// TODO: make generic to SDPY and WebSockets and do it outside of this method?
switch {
case stdout:
streams[1].Write([]byte{})
case stderr:
streams[2].Write([]byte{})
default:
streams[3].Write([]byte{})
}
return streams[0], streams[1], streams[2], streams[3], conn, tty, true
}
supportedStreamProtocols := []string{remotecommand.StreamProtocolV2Name, remotecommand.StreamProtocolV1Name}
_, err := httpstream.Handshake(request.Request, response.ResponseWriter, supportedStreamProtocols, remotecommand.StreamProtocolV1Name)
// negotiated protocol isn't used server side at the moment, but could be in the future
if err != nil {
return nil, nil, nil, nil, nil, false, false
}
streamCh := make(chan streamAndReply)
upgrader := spdy.NewResponseUpgrader()
conn := upgrader.UpgradeResponse(response.ResponseWriter, request.Request, func(stream httpstream.Stream, replySent <-chan struct{}) error {
streamCh <- streamAndReply{Stream: stream, replySent: replySent}
return nil
})
// from this point on, we can no longer call methods on response
if conn == nil {
// The upgrader is responsible for notifying the client of any errors that
// occurred during upgrading. All we can do is return here at this point
// if we weren't successful in upgrading.
return nil, nil, nil, nil, nil, false, false
}
conn.SetIdleTimeout(s.host.StreamingConnectionIdleTimeout())
// TODO make it configurable?
expired := time.NewTimer(defaultStreamCreationTimeout)
var errorStream, stdinStream, stdoutStream, stderrStream httpstream.Stream
receivedStreams := 0
replyChan := make(chan struct{})
stop := make(chan struct{})
defer close(stop)
WaitForStreams:
for {
select {
case stream := <-streamCh:
streamType := stream.Headers().Get(api.StreamType)
switch streamType {
case api.StreamTypeError:
errorStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdin:
stdinStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStdout:
stdoutStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
case api.StreamTypeStderr:
stderrStream = stream
go waitStreamReply(stream.replySent, replyChan, stop)
default:
glog.Errorf("Unexpected stream type: '%s'", streamType)
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
case <-expired.C:
// TODO find a way to return the error to the user. Maybe use a separate
// stream to report errors?
glog.Error("Timed out waiting for client to create streams")
return nil, nil, nil, nil, nil, false, false
}
}
return stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, true
}
// waitStreamReply waits until either replySent or stop is closed. If replySent is closed, it sends
// an empty struct to the notify channel.
func waitStreamReply(replySent <-chan struct{}, notify chan<- struct{}, stop <-chan struct{}) {
select {
case <-replySent:
notify <- struct{}{}
case <-stop:
}
}
func getPodCoordinates(request *restful.Request) (namespace, pod string, uid types.UID) {
namespace = request.PathParameter("podNamespace")
pod = request.PathParameter("podID")
@@ -811,7 +638,7 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
podName := kubecontainer.GetPodFullName(pod)
ServePortForward(response.ResponseWriter, request.Request, s.host, podName, uid, s.host.StreamingConnectionIdleTimeout(), defaultStreamCreationTimeout)
ServePortForward(response.ResponseWriter, request.Request, s.host, podName, uid, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout)
}
// ServePortForward handles a port forwarding request. A single request is
@@ -821,7 +648,7 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
// handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, idleTimeout time.Duration, streamCreationTimeout time.Duration) {
supportedPortForwardProtocols := []string{portforward.PortForwardProtocolV1Name}
_, err := httpstream.Handshake(req, w, supportedPortForwardProtocols, portforward.PortForwardProtocolV1Name)
_, err := httpstream.Handshake(req, w, supportedPortForwardProtocols)
// negotiated protocol isn't currently used server side, but could be in the future
if err != nil {
// Handshake writes the error to the client