Expose exec and logs via WebSockets

Not all clients and systems can support SPDY protocols. This commit adds
support for two new websocket protocols, one to handle streaming of pod
logs from a pod, and the other to allow exec to be tunneled over
websocket.

Browser support for chunked encoding is still poor, and web consoles
that wish to show pod logs may need to make compromises to display the
output. The /pods/<name>/log endpoint now supports websocket upgrade to
the 'binary.k8s.io' subprotocol, which sends chunks of logs as binary to
the client. Messages are written as logs are streamed from the container
daemon, so flushing should be unaffected.

Browser support for raw communication over SDPY is not possible, and
some languages lack libraries for it and HTTP/2. The Kubelet supports
upgrade to WebSocket instead of SPDY, and will multiplex STDOUT/IN/ERR
over websockets by prepending each binary message with a single byte
representing the channel (0 for IN, 1 for OUT, and 2 for ERR). Because
framing on WebSockets suffers from head-of-line blocking, clients and
other server code should ensure that no particular stream blocks. An
alternative subprotocol 'base64.channel.k8s.io' base64 encodes the body
and uses '0'-'9' to represent the channel for ease of use in browsers.
This commit is contained in:
Clayton Coleman 2015-09-11 16:09:51 -04:00
parent 2f90f660c1
commit 363b616908
11 changed files with 1179 additions and 9 deletions

View File

@ -73,6 +73,7 @@ using resources with kubectl can be found in [Working with resources](../user-gu
- [Events](#events)
- [Naming conventions](#naming-conventions)
- [Label, selector, and annotation conventions](#label-selector-and-annotation-conventions)
- [WebSockets and SPDY](#websockets-and-spdy)
<!-- END MUNGE: GENERATED_TOC -->
@ -721,6 +722,22 @@ Other advice regarding use of labels, annotations, and other generic map keys by
- Use annotations to store API extensions that the controller responsible for the resource doesn't need to know about, experimental fields that aren't intended to be generally used API fields, etc. Beware that annotations aren't automatically handled by the API conversion machinery.
## WebSockets and SPDY
Some of the API operations exposed by Kubernetes involve transfer of binary streams between the client and a container, including attach, exec, portforward, and logging. The API therefore exposes certain operations over upgradeable HTTP connections ([described in RFC 2817](https://tools.ietf.org/html/rfc2817)) via the WebSocket and SPDY protocols. These actions are exposed as subresources with their associated verbs (exec, log, attach, and portforward) and are requested via a GET (to support JavaScript in a browser) and POST (semantically accurate).
There are two primary protocols in use today:
1. Streamed channels
When dealing with multiple independent binary streams of data such as the remote execution of a shell command (writing to STDIN, reading from STDOUT and STDERR) or forwarding multiple ports the streams can be multiplexed onto a single TCP connection. Kubernetes supports a SPDY based framing protocol that leverages SPDY channels and a WebSocket framing protocol that multiplexes multiple channels onto the same stream by prefixing each binary chunk with a byte indicating its channel. The WebSocket protocol supports an optional subprotocol that handles base64-encoded bytes from the client and returns base64-encoded bytes from the server and character based channel prefixes ('0', '1', '2') for ease of use from JavaScript in a browser.
2. Streaming response
The default log output for a channel of streaming data is an HTTP Chunked Transfer-Encoding, which can return an arbitrary stream of binary data from the server. Browser-based JavaScript is limited in its ability to access the raw data from a chunked response, especially when very large amounts of logs are returned, and in future API calls it may be desirable to transfer large files. The streaming API endpoints support an optional WebSocket upgrade that provides a unidirectional channel from the server to the client and chunks data as binary WebSocket frames. An optional WebSocket subprotocol is exposed that base64 encodes the stream before returning it to the client.
Clients should use the SPDY protocols if their clients have native support, or WebSockets as a fallback. Note that WebSockets is susceptible to Head-of-Line blocking and so clients must read and process each message sequentionally. In the future, an HTTP/2 implementation will be exposed that deprecates SPDY.
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/docs/devel/api-conventions.md?pixel)]()

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flushwriter"
"k8s.io/kubernetes/pkg/util/wsstream"
"k8s.io/kubernetes/pkg/version"
"github.com/emicklei/go-restful"
@ -353,6 +354,15 @@ func write(statusCode int, apiVersion string, codec runtime.Codec, object runtim
return
}
defer out.Close()
if wsstream.IsWebSocketRequest(req) {
r := wsstream.NewReader(out, true)
if err := r.Copy(w, req); err != nil {
util.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err))
}
return
}
if len(contentType) == 0 {
contentType = "application/octet-stream"
}

View File

@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/limitwriter"
"k8s.io/kubernetes/pkg/util/wsstream"
)
// Server is a http.Handler which exposes kubelet functionality over HTTP.
@ -255,9 +256,15 @@ func (s *Server) InstallDebuggingHandlers() {
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
@ -266,9 +273,15 @@ func (s *Server) InstallDebuggingHandlers() {
ws = new(restful.WebService)
ws.
Path("/attach")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
@ -533,6 +546,10 @@ func getContainerCoordinates(request *restful.Request) (namespace, pod string, u
const defaultStreamCreationTimeout = 30 * time.Second
type Closer interface {
Close() error
}
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
@ -600,17 +617,43 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
}
}
func (s *Server) createStreams(request *restful.Request, response *restful.Response) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, httpstream.Connection, bool, bool) {
// start at 1 for error stream
expectedStreams := 1
if request.QueryParameter(api.ExecStdinParam) == "1" {
expectedStreams++
// standardShellChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
// along with the approprxate duplex value
func standardShellChannels(stdin, stdout, stderr bool) []wsstream.ChannelType {
// open three half-duplex channels
channels := []wsstream.ChannelType{wsstream.ReadChannel, wsstream.WriteChannel, wsstream.WriteChannel}
if !stdin {
channels[0] = wsstream.IgnoreChannel
}
if request.QueryParameter(api.ExecStdoutParam) == "1" {
expectedStreams++
if !stdout {
channels[1] = wsstream.IgnoreChannel
}
if !stderr {
channels[2] = wsstream.IgnoreChannel
}
return channels
}
func (s *Server) createStreams(request *restful.Request, response *restful.Response) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, Closer, bool, bool) {
tty := request.QueryParameter(api.ExecTTYParam) == "1"
if !tty && request.QueryParameter(api.ExecStderrParam) == "1" {
stdin := request.QueryParameter(api.ExecStdinParam) == "1"
stdout := request.QueryParameter(api.ExecStdoutParam) == "1"
stderr := request.QueryParameter(api.ExecStderrParam) == "1"
if tty && stderr {
// TODO: make this an error before we reach this method
glog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr")
stderr = false
}
// count the streams client asked for, starting with 1
expectedStreams := 1
if stdin {
expectedStreams++
}
if stdout {
expectedStreams++
}
if stderr {
expectedStreams++
}
@ -619,6 +662,29 @@ func (s *Server) createStreams(request *restful.Request, response *restful.Respo
return nil, nil, nil, nil, nil, false, false
}
if wsstream.IsWebSocketRequest(request.Request) {
// open the requested channels, and always open the error channel
channels := append(standardShellChannels(stdin, stdout, stderr), wsstream.WriteChannel)
conn := wsstream.NewConn(channels...)
conn.SetIdleTimeout(s.host.StreamingConnectionIdleTimeout())
streams, err := conn.Open(httplog.Unlogged(response.ResponseWriter), request.Request)
if err != nil {
glog.Errorf("Unable to upgrade websocket connection: %v", err)
return nil, nil, nil, nil, nil, false, false
}
// Send an empty message to the lowest writable channel to notify the client the connection is established
// TODO: make generic to SDPY and WebSockets and do it outside of this method?
switch {
case stdout:
streams[1].Write([]byte{})
case stderr:
streams[2].Write([]byte{})
default:
streams[3].Write([]byte{})
}
return streams[0], streams[1], streams[2], streams[3], conn, tty, true
}
streamCh := make(chan httpstream.Stream)
upgrader := spdy.NewResponseUpgrader()

View File

@ -293,7 +293,7 @@ func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (re
return newUpgradeAwareProxyHandler(location, nil, false), nil
}
// Support both GET and POST methods. Over time, we want to move all clients to start using POST and then stop supporting GET.
// Support both GET and POST methods. We must support GET for browsers that want to use WebSockets.
var upgradeableMethods = []string{"GET", "POST"}
// AttachREST implements the attach subresource for a Pod

320
pkg/util/wsstream/conn.go Normal file
View File

@ -0,0 +1,320 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wsstream
import (
"encoding/base64"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"time"
"github.com/golang/glog"
"golang.org/x/net/websocket"
"k8s.io/kubernetes/pkg/util"
)
// The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating
// the channel number (zero indexed) the message was sent on. Messages in both directions should
// prefix their messages with this channel byte. When used for remote execution, the channel numbers
// are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR
// (0, 1, and 2). No other conversion is performed on the raw subprotocol - writes are sent as they
// are received by the server.
//
// Example client session:
//
// CONNECT http://server.com with subprotocol "channel.k8s.io"
// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
// CLOSE
//
const channelWebSocketProtocol = "channel.k8s.io"
// The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character
// indicating the channel number (zero indexed) the message was sent on. Messages in both directions
// should prefix their messages with this channel char. When used for remote execution, the channel
// numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT,
// and STDERR ('0', '1', and '2'). The data received on the server is base64 decoded (and must be
// be valid) and data written by the server to the client is base64 encoded.
//
// Example client session:
//
// CONNECT http://server.com with subprotocol "base64.channel.k8s.io"
// WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN)
// READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT)
// CLOSE
//
const base64ChannelWebSocketProtocol = "base64.channel.k8s.io"
type codecType int
const (
rawCodec codecType = iota
base64Codec
)
type ChannelType int
const (
IgnoreChannel ChannelType = iota
ReadChannel
WriteChannel
ReadWriteChannel
)
var (
// connectionUpgradeRegex matches any Connection header value that includes upgrade
connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
)
// IsWebSocketRequest returns true if the incoming request contains connection upgrade headers
// for WebSockets.
func IsWebSocketRequest(req *http.Request) bool {
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
}
// ignoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
// read and write deadlines are pushed every time a new message is received.
func ignoreReceives(ws *websocket.Conn, timeout time.Duration) {
defer util.HandleCrash()
var data []byte
for {
resetTimeout(ws, timeout)
if err := websocket.Message.Receive(ws, &data); err != nil {
if err == io.EOF {
return
}
}
}
}
// handshake ensures the provided user protocol matches one of the allowed protocols. It returns
// no error if no protocol is specified.
func handshake(config *websocket.Config, req *http.Request, allowed []string) error {
protocols := config.Protocol
if len(protocols) == 0 {
return nil
}
for _, protocol := range protocols {
for _, allow := range allowed {
if allow == protocol {
config.Protocol = []string{protocol}
return nil
}
}
}
return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed)
}
// Conn supports sending multiple binary channels over a websocket connection.
// Supports only the "channel.k8s.io" subprotocol.
type Conn struct {
channels []*websocketChannel
codec codecType
ready chan struct{}
ws *websocket.Conn
timeout time.Duration
}
// NewConn creates a WebSocket connection that supports a set of channels. Channels begin each
// web socket message with a single byte indicating the channel number (0-N). 255 is reserved for
// future use. The channel types for each channel are passed as an array, supporting the different
// duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer.
func NewConn(channels ...ChannelType) *Conn {
conn := &Conn{
ready: make(chan struct{}),
channels: make([]*websocketChannel, len(channels)),
}
for i := range conn.channels {
switch channels[i] {
case ReadChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false)
case WriteChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true)
case ReadWriteChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true)
case IgnoreChannel:
conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false)
}
}
return conn
}
// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
// there is no timeout on the connection.
func (conn *Conn) SetIdleTimeout(duration time.Duration) {
conn.timeout = duration
}
// Open the connection and create channels for reading and writing.
func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) ([]io.ReadWriteCloser, error) {
go func() {
defer util.HandleCrash()
defer conn.Close()
websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req)
}()
<-conn.ready
rwc := make([]io.ReadWriteCloser, len(conn.channels))
for i := range conn.channels {
rwc[i] = conn.channels[i]
}
return rwc, nil
}
func (conn *Conn) initialize(ws *websocket.Conn) {
protocols := ws.Config().Protocol
switch {
case len(protocols) == 0, protocols[0] == channelWebSocketProtocol:
conn.codec = rawCodec
case protocols[0] == base64ChannelWebSocketProtocol:
conn.codec = base64Codec
}
conn.ws = ws
close(conn.ready)
}
func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error {
return handshake(config, req, []string{channelWebSocketProtocol, base64ChannelWebSocketProtocol})
}
func (conn *Conn) resetTimeout() {
if conn.timeout > 0 {
conn.ws.SetDeadline(time.Now().Add(conn.timeout))
}
}
// Close is only valid after Open has been called
func (conn *Conn) Close() error {
<-conn.ready
for _, s := range conn.channels {
s.Close()
}
conn.ws.Close()
return nil
}
// handle implements a websocket handler.
func (conn *Conn) handle(ws *websocket.Conn) {
defer conn.Close()
conn.initialize(ws)
for {
conn.resetTimeout()
var data []byte
if err := websocket.Message.Receive(ws, &data); err != nil {
if err != io.EOF {
glog.Errorf("Error on socket receive: %v", err)
}
break
}
if len(data) == 0 {
continue
}
channel := data[0]
if conn.codec == base64Codec {
channel = channel - '0'
}
data = data[1:]
if int(channel) >= len(conn.channels) {
glog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel)
continue
}
if _, err := conn.channels[channel].DataFromSocket(data); err != nil {
glog.Errorf("Unable to write frame to %d: %v\n%s", channel, err, string(data))
continue
}
}
}
// write multiplexes the specified channel onto the websocket
func (conn *Conn) write(num byte, data []byte) (int, error) {
conn.resetTimeout()
switch conn.codec {
case rawCodec:
frame := make([]byte, len(data)+1)
frame[0] = num
copy(frame[1:], data)
if err := websocket.Message.Send(conn.ws, frame); err != nil {
return 0, err
}
case base64Codec:
frame := string('0'+num) + base64.StdEncoding.EncodeToString(data)
if err := websocket.Message.Send(conn.ws, frame); err != nil {
return 0, err
}
}
return len(data), nil
}
// websocketChannel represents a channel in a connection
type websocketChannel struct {
conn *Conn
num byte
r io.Reader
w io.WriteCloser
read, write bool
}
// newWebsocketChannel creates a pipe for writing to a websocket. Do not write to this pipe
// prior to the connection being opened. It may be no, half, or full duplex depending on
// read and write.
func newWebsocketChannel(conn *Conn, num byte, read, write bool) *websocketChannel {
r, w := io.Pipe()
return &websocketChannel{conn, num, r, w, read, write}
}
func (p *websocketChannel) Write(data []byte) (int, error) {
if !p.write {
return len(data), nil
}
return p.conn.write(p.num, data)
}
// DataFromSocket is invoked by the connection receiver to move data from the connection
// into a specific channel.
func (p *websocketChannel) DataFromSocket(data []byte) (int, error) {
if !p.read {
return len(data), nil
}
switch p.conn.codec {
case rawCodec:
return p.w.Write(data)
case base64Codec:
dst := make([]byte, len(data))
n, err := base64.StdEncoding.Decode(dst, data)
if err != nil {
return 0, err
}
return p.w.Write(dst[:n])
}
return 0, nil
}
func (p *websocketChannel) Read(data []byte) (int, error) {
if !p.read {
return 0, io.EOF
}
return p.r.Read(data)
}
func (p *websocketChannel) Close() error {
return p.w.Close()
}

View File

@ -0,0 +1,169 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wsstream
import (
"encoding/base64"
"io"
"io/ioutil"
"net/http/httptest"
"reflect"
"sync"
"testing"
"golang.org/x/net/websocket"
)
func newServer(handler websocket.Handler) (*httptest.Server, string) {
server := httptest.NewServer(handler)
serverAddr := server.Listener.Addr().String()
return server, serverAddr
}
func TestRawConn(t *testing.T) {
conn := NewConn(ReadWriteChannel, ReadWriteChannel, IgnoreChannel, ReadChannel, WriteChannel)
s, addr := newServer(conn.handle)
defer s.Close()
client, err := websocket.Dial("ws://"+addr, "", "http://localhost/")
if err != nil {
t.Fatal(err)
}
defer client.Close()
<-conn.ready
wg := sync.WaitGroup{}
// verify we can read a client write
wg.Add(1)
go func() {
defer wg.Done()
data, err := ioutil.ReadAll(conn.channels[0])
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(data, []byte("client")) {
t.Errorf("unexpected server read: %v", data)
}
}()
if n, err := client.Write(append([]byte{0}, []byte("client")...)); err != nil || n != 7 {
t.Fatalf("%d: %v", n, err)
}
// verify we can read a server write
wg.Add(1)
go func() {
defer wg.Done()
if n, err := conn.channels[1].Write([]byte("server")); err != nil && n != 6 {
t.Fatalf("%d: %v", n, err)
}
}()
data := make([]byte, 1024)
if n, err := io.ReadAtLeast(client, data, 6); n != 7 || err != nil {
t.Fatalf("%d: %v", n, err)
}
if !reflect.DeepEqual(data[:7], append([]byte{1}, []byte("server")...)) {
t.Errorf("unexpected client read: %v", data[:7])
}
// verify that an ignore channel is empty in both directions.
if n, err := conn.channels[2].Write([]byte("test")); n != 4 || err != nil {
t.Errorf("writes should be ignored")
}
data = make([]byte, 1024)
if n, err := conn.channels[2].Read(data); n != 0 || err != io.EOF {
t.Errorf("reads should be ignored")
}
// verify that a write to a Read channel doesn't block
if n, err := conn.channels[3].Write([]byte("test")); n != 4 || err != nil {
t.Errorf("writes should be ignored")
}
// verify that a read from a Write channel doesn't block
data = make([]byte, 1024)
if n, err := conn.channels[4].Read(data); n != 0 || err != io.EOF {
t.Errorf("reads should be ignored")
}
// verify that a client write to a Write channel doesn't block (is dropped)
if n, err := client.Write(append([]byte{4}, []byte("ignored")...)); err != nil || n != 8 {
t.Fatalf("%d: %v", n, err)
}
client.Close()
wg.Wait()
}
func TestBase64Conn(t *testing.T) {
conn := NewConn(ReadWriteChannel, ReadWriteChannel)
s, addr := newServer(conn.handle)
defer s.Close()
config, err := websocket.NewConfig("ws://"+addr, "http://localhost/")
if err != nil {
t.Fatal(err)
}
config.Protocol = []string{"base64.channel.k8s.io"}
client, err := websocket.DialConfig(config)
if err != nil {
t.Fatal(err)
}
defer client.Close()
<-conn.ready
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
data, err := ioutil.ReadAll(conn.channels[0])
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(data, []byte("client")) {
t.Errorf("unexpected server read: %s", string(data))
}
}()
clientData := base64.StdEncoding.EncodeToString([]byte("client"))
if n, err := client.Write(append([]byte{'0'}, clientData...)); err != nil || n != len(clientData)+1 {
t.Fatalf("%d: %v", n, err)
}
wg.Add(1)
go func() {
defer wg.Done()
if n, err := conn.channels[1].Write([]byte("server")); err != nil && n != 6 {
t.Fatalf("%d: %v", n, err)
}
}()
data := make([]byte, 1024)
if n, err := io.ReadAtLeast(client, data, 9); n != 9 || err != nil {
t.Fatalf("%d: %v", n, err)
}
expect := []byte(base64.StdEncoding.EncodeToString([]byte("server")))
if !reflect.DeepEqual(data[:9], append([]byte{'1'}, expect...)) {
t.Errorf("unexpected client read: %v", data[:9])
}
client.Close()
wg.Wait()
}

21
pkg/util/wsstream/doc.go Normal file
View File

@ -0,0 +1,21 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package wsstream contains utilities for streaming content over WebSockets.
// The Conn type allows callers to multiplex multiple read/write channels over
// a single websocket. The Reader type allows an io.Reader to be copied over
// a websocket channel as binary content.
package wsstream

124
pkg/util/wsstream/stream.go Normal file
View File

@ -0,0 +1,124 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wsstream
import (
"encoding/base64"
"io"
"net/http"
"time"
"golang.org/x/net/websocket"
"k8s.io/kubernetes/pkg/util"
)
// The WebSocket subprotocol "binary.k8s.io" will only send messages to the
// client and ignore messages sent to the server. The received messages are
// the exact bytes written to the stream. Zero byte messages are possible.
const binaryWebSocketProtocol = "binary.k8s.io"
// The WebSocket subprotocol "base64.binary.k8s.io" will only send messages to the
// client and ignore messages sent to the server. The received messages are
// a base64 version of the bytes written to the stream. Zero byte messages are
// possible.
const base64BinaryWebSocketProtocol = "base64.binary.k8s.io"
// Reader supports returning an arbitrary byte stream over a websocket channel.
// Supports the "binary.k8s.io" and "base64.binary.k8s.io" subprotocols.
type Reader struct {
err chan error
r io.Reader
ping bool
timeout time.Duration
}
// NewReader creates a WebSocket pipe that will copy the contents of r to a provided
// WebSocket connection. If ping is true, a zero length message will be sent to the client
// before the stream begins reading.
func NewReader(r io.Reader, ping bool) *Reader {
return &Reader{
r: r,
err: make(chan error),
ping: ping,
}
}
// SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
// there is no timeout on the reader.
func (r *Reader) SetIdleTimeout(duration time.Duration) {
r.timeout = duration
}
func (r *Reader) handshake(config *websocket.Config, req *http.Request) error {
return handshake(config, req, []string{binaryWebSocketProtocol, base64BinaryWebSocketProtocol})
}
// Copy the reader to the response. The created WebSocket is closed after this
// method completes.
func (r *Reader) Copy(w http.ResponseWriter, req *http.Request) error {
go func() {
defer util.HandleCrash()
websocket.Server{Handshake: r.handshake, Handler: r.handle}.ServeHTTP(w, req)
}()
return <-r.err
}
// handle implements a WebSocket handler.
func (r *Reader) handle(ws *websocket.Conn) {
encode := len(ws.Config().Protocol) > 0 && ws.Config().Protocol[0] == base64BinaryWebSocketProtocol
defer close(r.err)
defer ws.Close()
go ignoreReceives(ws, r.timeout)
r.err <- messageCopy(ws, r.r, encode, r.ping, r.timeout)
}
func resetTimeout(ws *websocket.Conn, timeout time.Duration) {
if timeout > 0 {
ws.SetDeadline(time.Now().Add(timeout))
}
}
func messageCopy(ws *websocket.Conn, r io.Reader, base64Encode, ping bool, timeout time.Duration) error {
buf := make([]byte, 2048)
if ping {
resetTimeout(ws, timeout)
if err := websocket.Message.Send(ws, []byte{}); err != nil {
return err
}
}
for {
resetTimeout(ws, timeout)
n, err := r.Read(buf)
if err != nil {
if err == io.EOF {
return nil
}
return err
}
if n > 0 {
if base64Encode {
if err := websocket.Message.Send(ws, base64.StdEncoding.EncodeToString(buf[:n])); err != nil {
return err
}
} else {
if err := websocket.Message.Send(ws, buf[:n]); err != nil {
return err
}
}
}
}
}

View File

@ -0,0 +1,232 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package wsstream
import (
"bytes"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"reflect"
"strings"
"testing"
"time"
"golang.org/x/net/websocket"
)
func TestStream(t *testing.T) {
input := "some random text"
r := NewReader(bytes.NewBuffer([]byte(input)), true)
r.SetIdleTimeout(time.Second)
data, err := readWebSocket(r, t, nil)
if !reflect.DeepEqual(data, []byte(input)) {
t.Errorf("unexpected server read: %v", data)
}
if err != nil {
t.Fatal(err)
}
}
func TestStreamPing(t *testing.T) {
input := "some random text"
r := NewReader(bytes.NewBuffer([]byte(input)), true)
r.SetIdleTimeout(time.Second)
err := expectWebSocketFrames(r, t, nil, [][]byte{
{},
[]byte(input),
})
if err != nil {
t.Fatal(err)
}
}
func TestStreamBase64(t *testing.T) {
input := "some random text"
encoded := base64.StdEncoding.EncodeToString([]byte(input))
r := NewReader(bytes.NewBuffer([]byte(input)), true)
data, err := readWebSocket(r, t, nil, base64BinaryWebSocketProtocol)
if !reflect.DeepEqual(data, []byte(encoded)) {
t.Errorf("unexpected server read: %v\n%v", data, []byte(encoded))
}
if err != nil {
t.Fatal(err)
}
}
func TestStreamError(t *testing.T) {
input := "some random text"
errs := &errorReader{
reads: [][]byte{
[]byte("some random"),
[]byte(" text"),
},
err: fmt.Errorf("bad read"),
}
r := NewReader(errs, false)
data, err := readWebSocket(r, t, nil)
if !reflect.DeepEqual(data, []byte(input)) {
t.Errorf("unexpected server read: %v", data)
}
if err == nil || err.Error() != "bad read" {
t.Fatal(err)
}
}
func TestStreamSurvivesPanic(t *testing.T) {
input := "some random text"
errs := &errorReader{
reads: [][]byte{
[]byte("some random"),
[]byte(" text"),
},
panicMessage: "bad read",
}
r := NewReader(errs, false)
data, err := readWebSocket(r, t, nil)
if !reflect.DeepEqual(data, []byte(input)) {
t.Errorf("unexpected server read: %v", data)
}
if err != nil {
t.Fatal(err)
}
}
func TestStreamClosedDuringRead(t *testing.T) {
ch := make(chan struct{})
input := "some random text"
errs := &errorReader{
reads: [][]byte{
[]byte("some random"),
[]byte(" text"),
},
err: fmt.Errorf("stuff"),
pause: ch,
}
r := NewReader(errs, false)
data, err := readWebSocket(r, t, func(c *websocket.Conn) {
c.Close()
time.Sleep(time.Millisecond)
close(ch)
})
if !reflect.DeepEqual(data, []byte(input)) {
t.Errorf("unexpected server read: %v", data)
}
if err == nil || !strings.Contains(err.Error(), "use of closed network connection") {
t.Fatal(err)
}
}
type errorReader struct {
reads [][]byte
err error
panicMessage string
pause chan struct{}
}
func (r *errorReader) Read(p []byte) (int, error) {
if len(r.reads) == 0 {
if r.pause != nil {
<-r.pause
}
if len(r.panicMessage) != 0 {
panic(r.panicMessage)
}
return 0, r.err
}
next := r.reads[0]
r.reads = r.reads[1:]
copy(p, next)
return len(next), nil
}
func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols ...string) ([]byte, error) {
errCh := make(chan error, 1)
s, addr := newServer(func(ws *websocket.Conn) {
cfg := ws.Config()
cfg.Protocol = protocols
go ignoreReceives(ws, 0)
go func() {
err := <-r.err
errCh <- err
}()
r.handle(ws)
})
defer s.Close()
config, _ := websocket.NewConfig("ws://"+addr, "http://"+addr)
client, err := websocket.DialConfig(config)
if err != nil {
return nil, err
}
defer client.Close()
if fn != nil {
fn(client)
}
data, err := ioutil.ReadAll(client)
if err != nil {
return data, err
}
return data, <-errCh
}
func expectWebSocketFrames(r *Reader, t *testing.T, fn func(*websocket.Conn), frames [][]byte, protocols ...string) error {
errCh := make(chan error, 1)
s, addr := newServer(func(ws *websocket.Conn) {
cfg := ws.Config()
cfg.Protocol = protocols
go ignoreReceives(ws, 0)
go func() {
err := <-r.err
errCh <- err
}()
r.handle(ws)
})
defer s.Close()
config, _ := websocket.NewConfig("ws://"+addr, "http://"+addr)
ws, err := websocket.DialConfig(config)
if err != nil {
return err
}
defer ws.Close()
if fn != nil {
fn(ws)
}
for i := range frames {
var data []byte
if err := websocket.Message.Receive(ws, &data); err != nil {
return err
}
if !reflect.DeepEqual(frames[i], data) {
return fmt.Errorf("frame %d did not match expected: %v", data)
}
}
var data []byte
if err := websocket.Message.Receive(ws, &data); err != io.EOF {
return fmt.Errorf("expected no more frames: %v (%v)", err, data)
}
return <-errCh
}

View File

@ -17,10 +17,15 @@ limitations under the License.
package e2e
import (
"bytes"
"fmt"
"io"
"strconv"
"strings"
"time"
"golang.org/x/net/websocket"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
@ -602,6 +607,154 @@ var _ = Describe("Pods", func() {
}, 0, defaultObservationTimeout)
})
It("should support remote command execution over websockets", func() {
config, err := loadConfig()
if err != nil {
Failf("Unable to get base config: %v", err)
}
podClient := framework.Client.Pods(framework.Namespace.Name)
By("creating the pod")
name := "pod-exec-websocket-" + string(util.NewUUID())
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "main",
Image: "gcr.io/google_containers/busybox",
Command: []string{"/bin/sh", "-c", "echo container is alive; sleep 600"},
},
},
},
}
By("submitting the pod to kubernetes")
defer func() {
By("deleting the pod")
podClient.Delete(pod.Name, api.NewDeleteOptions(0))
}()
pod, err = podClient.Create(pod)
if err != nil {
Failf("Failed to create pod: %v", err)
}
expectNoError(framework.WaitForPodRunning(pod.Name))
req := framework.Client.Get().
Namespace(framework.Namespace.Name).
Resource("pods").
Name(pod.Name).
Suffix("exec").
Param("stderr", "1").
Param("stdout", "1").
Param("container", pod.Spec.Containers[0].Name).
Param("command", "cat").
Param("command", "/etc/resolv.conf")
url := req.URL()
ws, err := OpenWebSocketForURL(url, config, []string{"channel.k8s.io"})
if err != nil {
Failf("Failed to open websocket to %s: %v", url.String(), err)
}
defer ws.Close()
buf := &bytes.Buffer{}
for {
var msg []byte
if err := websocket.Message.Receive(ws, &msg); err != nil {
if err == io.EOF {
break
}
Failf("Failed to read completely from websocket %s: %v", url.String(), err)
}
if len(msg) == 0 {
continue
}
if msg[0] != 1 {
Failf("Got message from server that didn't start with channel 1 (STDOUT): %v", msg)
}
buf.Write(msg[1:])
}
if buf.Len() == 0 {
Failf("Unexpected output from server")
}
if !strings.Contains(buf.String(), "nameserver") {
Failf("Expected to find 'nameserver' in %q", buf.String())
}
})
It("should support retrieving logs from the container over websockets", func() {
config, err := loadConfig()
if err != nil {
Failf("Unable to get base config: %v", err)
}
podClient := framework.Client.Pods(framework.Namespace.Name)
By("creating the pod")
name := "pod-logs-websocket-" + string(util.NewUUID())
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "main",
Image: "gcr.io/google_containers/busybox",
Command: []string{"/bin/sh", "-c", "echo container is alive; sleep 600"},
},
},
},
}
By("submitting the pod to kubernetes")
defer func() {
By("deleting the pod")
podClient.Delete(pod.Name, api.NewDeleteOptions(0))
}()
pod, err = podClient.Create(pod)
if err != nil {
Failf("Failed to create pod: %v", err)
}
expectNoError(framework.WaitForPodRunning(pod.Name))
req := framework.Client.Get().
Namespace(framework.Namespace.Name).
Resource("pods").
Name(pod.Name).
Suffix("log").
Param("container", pod.Spec.Containers[0].Name)
url := req.URL()
ws, err := OpenWebSocketForURL(url, config, []string{"binary.k8s.io"})
if err != nil {
Failf("Failed to open websocket to %s: %v", url.String(), err)
}
defer ws.Close()
buf := &bytes.Buffer{}
for {
var msg []byte
if err := websocket.Message.Receive(ws, &msg); err != nil {
if err == io.EOF {
break
}
Failf("Failed to read completely from websocket %s: %v", url.String(), err)
}
if len(msg) == 0 {
continue
}
buf.Write(msg)
}
if buf.String() != "container is alive\n" {
Failf("Unexpected websocket logs:\n%s", buf.String())
}
})
// The following tests for remote command execution and port forwarding are
// commented out because the GCE environment does not currently have nsenter
// in the kubelet's PATH, nor does it have socat installed. Once we figure

View File

@ -22,6 +22,8 @@ import (
"io"
"math"
"math/rand"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
@ -49,6 +51,7 @@ import (
"github.com/davecgh/go-spew/spew"
"golang.org/x/crypto/ssh"
"golang.org/x/net/websocket"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -2021,3 +2024,58 @@ func getHostExternalAddress(client *client.Client, p *api.Pod) (externalAddress
}
return
}
type extractRT struct {
http.Header
}
func (rt *extractRT) RoundTrip(req *http.Request) (*http.Response, error) {
rt.Header = req.Header
return nil, nil
}
// headersForConfig extracts any http client logic necessary for the provided
// config.
func headersForConfig(c *client.Config) (http.Header, error) {
extract := &extractRT{}
rt, err := client.HTTPWrappersForConfig(c, extract)
if err != nil {
return nil, err
}
if _, err := rt.RoundTrip(&http.Request{}); err != nil {
return nil, err
}
return extract.Header, nil
}
// OpenWebSocketForURL constructs a websocket connection to the provided URL, using the client
// config, with the specified protocols.
func OpenWebSocketForURL(url *url.URL, config *client.Config, protocols []string) (*websocket.Conn, error) {
tlsConfig, err := client.TLSConfigFor(config)
if err != nil {
return nil, fmt.Errorf("failed to create tls config: %v", err)
}
if tlsConfig != nil {
url.Scheme = "wss"
if !strings.Contains(url.Host, ":") {
url.Host += ":443"
}
} else {
url.Scheme = "ws"
if !strings.Contains(url.Host, ":") {
url.Host += ":80"
}
}
headers, err := headersForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to load http headers: %v", err)
}
cfg, err := websocket.NewConfig(url.String(), "http://localhost")
if err != nil {
return nil, fmt.Errorf("failed to create websocket config: %v", err)
}
cfg.Header = headers
cfg.TlsConfig = tlsConfig
cfg.Protocol = protocols
return websocket.DialConfig(cfg)
}