CRI Portforward needs to forward websocket ports

- adjust ports to int32
- CRI flows the websocket ports as query params

- Do not validate ports since the protocol is unknown
  SPDY flows the ports as headers and websockets uses query params
- Only flow query params if there is at least one port query param
This commit is contained in:
Michael Fraenkel 2017-01-07 00:06:19 -05:00
parent beb53fb71a
commit 93c11422e4
19 changed files with 231 additions and 92 deletions

View File

@ -42,16 +42,16 @@ import (
type fakePortForwarder struct { type fakePortForwarder struct {
lock sync.Mutex lock sync.Mutex
// stores data expected from the stream per port // stores data expected from the stream per port
expected map[uint16]string expected map[int32]string
// stores data received from the stream per port // stores data received from the stream per port
received map[uint16]string received map[int32]string
// data to be sent to the stream per port // data to be sent to the stream per port
send map[uint16]string send map[int32]string
} }
var _ portforward.PortForwarder = &fakePortForwarder{} var _ portforward.PortForwarder = &fakePortForwarder{}
func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
defer stream.Close() defer stream.Close()
// read from the client // read from the client
@ -77,14 +77,14 @@ func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16
// fakePortForwardServer creates an HTTP server that can handle port forwarding // fakePortForwardServer creates an HTTP server that can handle port forwarding
// requests. // requests.
func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[uint16]string) http.HandlerFunc { func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[int32]string) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
pf := &fakePortForwarder{ pf := &fakePortForwarder{
expected: expectedFromClient, expected: expectedFromClient,
received: make(map[uint16]string), received: make(map[int32]string),
send: serverSends, send: serverSends,
} }
portforward.ServePortForward(w, req, pf, "pod", "uid", 0, 10*time.Second, portforward.SupportedProtocols) portforward.ServePortForward(w, req, pf, "pod", "uid", nil, 0, 10*time.Second, portforward.SupportedProtocols)
for port, expected := range expectedFromClient { for port, expected := range expectedFromClient {
actual, ok := pf.received[port] actual, ok := pf.received[port]
@ -109,19 +109,19 @@ func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedF
func TestForwardPorts(t *testing.T) { func TestForwardPorts(t *testing.T) {
tests := map[string]struct { tests := map[string]struct {
ports []string ports []string
clientSends map[uint16]string clientSends map[int32]string
serverSends map[uint16]string serverSends map[int32]string
}{ }{
"forward 1 port with no data either direction": { "forward 1 port with no data either direction": {
ports: []string{"5000"}, ports: []string{"5000"},
}, },
"forward 2 ports with bidirectional data": { "forward 2 ports with bidirectional data": {
ports: []string{"5001", "6000"}, ports: []string{"5001", "6000"},
clientSends: map[uint16]string{ clientSends: map[int32]string{
5001: "abcd", 5001: "abcd",
6000: "ghij", 6000: "ghij",
}, },
serverSends: map[uint16]string{ serverSends: map[int32]string{
5001: "1234", 5001: "1234",
6000: "5678", 6000: "5678",
}, },

View File

@ -130,7 +130,7 @@ type DirectStreamingRuntime interface {
// tty. // tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error
// Forward the specified port from the specified pod to the stream. // Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error
// ContainerAttach encapsulates the attaching to containers for testability // ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher ContainerAttacher
} }
@ -141,7 +141,7 @@ type DirectStreamingRuntime interface {
type IndirectStreamingRuntime interface { type IndirectStreamingRuntime interface {
GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
} }
type ImageService interface { type ImageService interface {

View File

@ -73,7 +73,7 @@ type FakeDirectStreamingRuntime struct {
TTY bool TTY bool
// Port-forward args // Port-forward args
Pod *Pod Pod *Pod
Port uint16 Port int32
Stream io.ReadWriteCloser Stream io.ReadWriteCloser
} }
} }
@ -394,7 +394,7 @@ func (f *FakeRuntime) RemoveImage(image ImageSpec) error {
return f.Err return f.Err
} }
func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error { func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
@ -471,7 +471,7 @@ func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout,
return &url.URL{Host: FakeHost}, f.Err return &url.URL{Host: FakeHost}, f.Err
} }
func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()

View File

@ -64,7 +64,7 @@ func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream i
if port < 0 || port > math.MaxUint16 { if port < 0 || port > math.MaxUint16 {
return fmt.Errorf("invalid port %d", port) return fmt.Errorf("invalid port %d", port)
} }
return dockertools.PortForward(r.client, podSandboxID, uint16(port), stream) return dockertools.PortForward(r.client, podSandboxID, port, stream)
} }
// ExecSync executes a command in the container, and returns the stdout output. // ExecSync executes a command in the container, and returns the stdout output.

View File

@ -1353,7 +1353,7 @@ func noPodInfraContainerError(podName, podNamespace string) error {
// - match cgroups of container // - match cgroups of container
// - should we support nsenter + socat on the host? (current impl) // - should we support nsenter + socat on the host? (current impl)
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host? // - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port int32, stream io.ReadWriteCloser) error {
podInfraContainer := pod.FindContainerByName(PodInfraContainerName) podInfraContainer := pod.FindContainerByName(PodInfraContainerName)
if podInfraContainer == nil { if podInfraContainer == nil {
return noPodInfraContainerError(pod.Name, pod.Namespace) return noPodInfraContainerError(pod.Name, pod.Namespace)
@ -1369,7 +1369,7 @@ func (dm *DockerManager) UpdatePodCIDR(podCIDR string) error {
} }
// Temporarily export this function to share with dockershim. // Temporarily export this function to share with dockershim.
func PortForward(client DockerInterface, podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error { func PortForward(client DockerInterface, podInfraContainerID string, port int32, stream io.ReadWriteCloser) error {
container, err := client.InspectContainer(podInfraContainerID) container, err := client.InspectContainer(podInfraContainerID)
if err != nil { if err != nil {
return err return err

View File

@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/envvars" "k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -1394,7 +1395,7 @@ func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, contain
// PortForward connects to the pod's port and copies data between the port // PortForward connects to the pod's port and copies data between the port
// and the stream. // and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error { func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime) streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok { if !ok {
return fmt.Errorf("streaming methods not supported by runtime") return fmt.Errorf("streaming methods not supported by runtime")
@ -1467,7 +1468,7 @@ func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName
} }
// GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it. // GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) { switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime: case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly. // Kubelet will serve the attach directly.
@ -1484,7 +1485,7 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID
return nil, fmt.Errorf("pod not found (%q)", podFullName) return nil, fmt.Errorf("pod not found (%q)", podFullName)
} }
return streamingRuntime.GetPortForward(podName, podNamespace, podUID) return streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports)
default: default:
return nil, fmt.Errorf("container runtime does not support port-forward") return nil, fmt.Errorf("container runtime does not support port-forward")
} }

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
) )
@ -1607,7 +1608,7 @@ func TestPortForward(t *testing.T) {
podName = "podFoo" podName = "podFoo"
podNamespace = "nsFoo" podNamespace = "nsFoo"
podUID types.UID = "12345678" podUID types.UID = "12345678"
port uint16 = 5000 port int32 = 5000
) )
var ( var (
stream = &fakeReadWriteCloser{} stream = &fakeReadWriteCloser{}
@ -1646,7 +1647,7 @@ func TestPortForward(t *testing.T) {
podFullName := kubecontainer.GetPodFullName(podWithUidNameNs(podUID, tc.podName, podNamespace)) podFullName := kubecontainer.GetPodFullName(podWithUidNameNs(podUID, tc.podName, podNamespace))
{ // No streaming case { // No streaming case
description := "no streaming - " + tc.description description := "no streaming - " + tc.description
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
assert.Error(t, err, description) assert.Error(t, err, description)
assert.Nil(t, redirect, description) assert.Nil(t, redirect, description)
@ -1658,7 +1659,7 @@ func TestPortForward(t *testing.T) {
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime} fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
assert.NoError(t, err, description) assert.NoError(t, err, description)
assert.Nil(t, redirect, description) assert.Nil(t, redirect, description)
@ -1677,7 +1678,7 @@ func TestPortForward(t *testing.T) {
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime} fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
if tc.expectError { if tc.expectError {
assert.Error(t, err, description) assert.Error(t, err, description)
} else { } else {

View File

@ -237,7 +237,7 @@ func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID kubetypes.UID, s
} }
// GetPortForward gets the endpoint the runtime will serve the port-forward request from. // GetPortForward gets the endpoint the runtime will serve the port-forward request from.
func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID) (*url.URL, error) { func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID, ports []int32) (*url.URL, error) {
sandboxIDs, err := m.getSandboxIDByPodUID(podUID, nil) sandboxIDs, err := m.getSandboxIDByPodUID(podUID, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err) return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err)
@ -245,9 +245,9 @@ func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string,
if len(sandboxIDs) == 0 { if len(sandboxIDs) == 0 {
return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID)) return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID))
} }
// TODO: Port is unused for now, but we may need it in the future.
req := &runtimeapi.PortForwardRequest{ req := &runtimeapi.PortForwardRequest{
PodSandboxId: sandboxIDs[0], PodSandboxId: sandboxIDs[0],
Port: ports,
} }
resp, err := m.runtimeService.PortForward(req) resp, err := m.runtimeService.PortForward(req)
if err != nil { if err != nil {

View File

@ -2107,7 +2107,7 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host? // - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
// //
// TODO(yifan): Merge with the same function in dockertools. // TODO(yifan): Merge with the same function in dockertools.
func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { func (r *Runtime) PortForward(pod *kubecontainer.Pod, port int32, stream io.ReadWriteCloser) error {
glog.V(4).Infof("Rkt port forwarding in container.") glog.V(4).Infof("Rkt port forwarding in container.")
ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout)

View File

@ -240,10 +240,10 @@ func (h *httpStreamHandler) portForward(p *httpStreamPair) {
defer p.errorStream.Close() defer p.errorStream.Close()
portString := p.dataStream.Headers().Get(api.PortHeader) portString := p.dataStream.Headers().Get(api.PortHeader)
port, _ := strconv.ParseUint(portString, 10, 16) port, _ := strconv.ParseInt(portString, 10, 32)
glog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) glog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
err := h.forwarder.PortForward(h.pod, h.uid, uint16(port), p.dataStream) err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream)
glog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) glog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
if err != nil { if err != nil {

View File

@ -30,7 +30,7 @@ import (
// in a pod. // in a pod.
type PortForwarder interface { type PortForwarder interface {
// PortForwarder copies data between a data stream and a port in a pod. // PortForwarder copies data between a data stream and a port in a pod.
PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
} }
// ServePortForward handles a port forwarding request. A single request is // ServePortForward handles a port forwarding request. A single request is
@ -38,10 +38,10 @@ type PortForwarder interface {
// been timed out due to idleness. This function handles multiple forwarded // been timed out due to idleness. This function handles multiple forwarded
// connections; i.e., multiple `curl http://localhost:8888/` requests will be // connections; i.e., multiple `curl http://localhost:8888/` requests will be
// handled by a single invocation of ServePortForward. // handled by a single invocation of ServePortForward.
func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) { func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) {
var err error var err error
if wsstream.IsWebSocketRequest(req) { if wsstream.IsWebSocketRequest(req) {
err = handleWebSocketStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout) err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout)
} else { } else {
err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout) err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout)
} }

View File

@ -45,21 +45,25 @@ const (
// options contains details about which streams are required for // options contains details about which streams are required for
// port forwarding. // port forwarding.
type v4Options struct { type V4Options struct {
ports []uint16 Ports []int32
} }
// newOptions creates a new options from the Request. // newOptions creates a new options from the Request.
func newV4Options(req *http.Request) (*v4Options, error) { func NewV4Options(req *http.Request) (*V4Options, error) {
portStrings := req.URL.Query()[api.PortHeader] if !wsstream.IsWebSocketRequest(req) {
if len(portStrings) == 0 { return &V4Options{}, nil
return nil, fmt.Errorf("%q is required", api.PortHeader)
} }
ports := make([]uint16, 0, len(portStrings)) portStrings := req.URL.Query()[api.PortHeader]
if len(portStrings) == 0 {
return nil, fmt.Errorf("query parameter %q is required", api.PortHeader)
}
ports := make([]int32, 0, len(portStrings))
for _, portString := range portStrings { for _, portString := range portStrings {
if len(portString) == 0 { if len(portString) == 0 {
return nil, fmt.Errorf("%q is cannot be empty", api.PortHeader) return nil, fmt.Errorf("query parameter %q cannot be empty", api.PortHeader)
} }
for _, p := range strings.Split(portString, ",") { for _, p := range strings.Split(portString, ",") {
port, err := strconv.ParseUint(p, 10, 16) port, err := strconv.ParseUint(p, 10, 16)
@ -69,25 +73,22 @@ func newV4Options(req *http.Request) (*v4Options, error) {
if port < 1 { if port < 1 {
return nil, fmt.Errorf("port %q must be > 0", portString) return nil, fmt.Errorf("port %q must be > 0", portString)
} }
ports = append(ports, uint16(port)) ports = append(ports, int32(port))
} }
} }
return &v4Options{ return &V4Options{
ports: ports, Ports: ports,
}, nil }, nil
} }
func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error { // handleWebSocketStreams handles requests to forward ports to a pod via
opts, err := newV4Options(req) // a PortForwarder. A pair of streams are created per port (DATA n,
if err != nil { // ERROR n+1). The associated port is written to each stream as a unsigned 16
w.WriteHeader(http.StatusBadRequest) // bit integer in little endian format.
fmt.Fprint(w, err.Error()) func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, opts *V4Options, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error {
return err channels := make([]wsstream.ChannelType, 0, len(opts.Ports)*2)
} for i := 0; i < len(opts.Ports); i++ {
channels := make([]wsstream.ChannelType, 0, len(opts.ports)*2)
for i := 0; i < len(opts.ports); i++ {
channels = append(channels, wsstream.ReadWriteChannel, wsstream.WriteChannel) channels = append(channels, wsstream.ReadWriteChannel, wsstream.WriteChannel)
} }
conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{ conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
@ -111,17 +112,18 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar
return err return err
} }
defer conn.Close() defer conn.Close()
streamPairs := make([]*websocketStreamPair, len(opts.ports)) streamPairs := make([]*websocketStreamPair, len(opts.Ports))
for i := range streamPairs { for i := range streamPairs {
streamPair := websocketStreamPair{ streamPair := websocketStreamPair{
port: opts.ports[i], port: opts.Ports[i],
dataStream: streams[i*2+dataChannel], dataStream: streams[i*2+dataChannel],
errorStream: streams[i*2+errorChannel], errorStream: streams[i*2+errorChannel],
} }
streamPairs[i] = &streamPair streamPairs[i] = &streamPair
portBytes := make([]byte, 2) portBytes := make([]byte, 2)
binary.LittleEndian.PutUint16(portBytes, streamPair.port) // port is always positive so conversion is allowable
binary.LittleEndian.PutUint16(portBytes, uint16(streamPair.port))
streamPair.dataStream.Write(portBytes) streamPair.dataStream.Write(portBytes)
streamPair.errorStream.Write(portBytes) streamPair.errorStream.Write(portBytes)
} }
@ -140,7 +142,7 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar
// websocketStreamPair represents the error and data streams for a port // websocketStreamPair represents the error and data streams for a port
// forwarding request. // forwarding request.
type websocketStreamPair struct { type websocketStreamPair struct {
port uint16 port int32
dataStream io.ReadWriteCloser dataStream io.ReadWriteCloser
errorStream io.WriteCloser errorStream io.WriteCloser
} }
@ -149,7 +151,7 @@ type websocketStreamPair struct {
// request over a websocket connection // request over a websocket connection
type websocketStreamHandler struct { type websocketStreamHandler struct {
conn *wsstream.Conn conn *wsstream.Conn
ports []uint16 ports []int32
streamPairs []*websocketStreamPair streamPairs []*websocketStreamPair
pod string pod string
uid types.UID uid types.UID

View File

@ -0,0 +1,101 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"net/http"
"reflect"
"testing"
)
func TestV4Options(t *testing.T) {
tests := map[string]struct {
url string
websocket bool
expectedOpts *V4Options
expectedError string
}{
"non-ws request": {
url: "http://example.com",
expectedOpts: &V4Options{},
},
"missing port": {
url: "http://example.com",
websocket: true,
expectedError: `query parameter "port" is required`,
},
"unable to parse port": {
url: "http://example.com?port=abc",
websocket: true,
expectedError: `unable to parse "abc" as a port: strconv.ParseUint: parsing "abc": invalid syntax`,
},
"negative port": {
url: "http://example.com?port=-1",
websocket: true,
expectedError: `unable to parse "-1" as a port: strconv.ParseUint: parsing "-1": invalid syntax`,
},
"one port": {
url: "http://example.com?port=80",
websocket: true,
expectedOpts: &V4Options{
Ports: []int32{80},
},
},
"multiple ports": {
url: "http://example.com?port=80,90,100",
websocket: true,
expectedOpts: &V4Options{
Ports: []int32{80, 90, 100},
},
},
"multiple port": {
url: "http://example.com?port=80&port=90",
websocket: true,
expectedOpts: &V4Options{
Ports: []int32{80, 90},
},
},
}
for name, test := range tests {
req, err := http.NewRequest(http.MethodGet, test.url, nil)
if err != nil {
t.Errorf("%s: invalid url %q err=%q", name, test.url, err)
continue
}
if test.websocket {
req.Header.Set("Connection", "Upgrade")
req.Header.Set("Upgrade", "websocket")
}
opts, err := NewV4Options(req)
if len(test.expectedError) > 0 {
if err == nil {
t.Errorf("%s: expected err=%q, but it was nil", name, test.expectedError)
}
if e, a := test.expectedError, err.Error(); e != a {
t.Errorf("%s: expected err=%q, got %q", name, e, a)
}
continue
}
if err != nil {
t.Errorf("%s: unexpected error %v", name, err)
continue
}
if !reflect.DeepEqual(test.expectedOpts, opts) {
t.Errorf("%s: expected options %#v, got %#v", name, test.expectedOpts, err)
}
}
}

View File

@ -172,7 +172,7 @@ type HostInterface interface {
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error
GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request) ServeLogs(w http.ResponseWriter, req *http.Request)
PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
StreamingConnectionIdleTimeout() time.Duration StreamingConnectionIdleTimeout() time.Duration
ResyncInterval() time.Duration ResyncInterval() time.Duration
GetHostname() string GetHostname() string
@ -184,7 +184,7 @@ type HostInterface interface {
ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool)
GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error)
GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error)
} }
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests. // NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
@ -568,7 +568,7 @@ func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
response.WriteEntity(info) response.WriteEntity(info)
} }
type requestParams struct { type execRequestParams struct {
podNamespace string podNamespace string
podName string podName string
podUID types.UID podUID types.UID
@ -576,8 +576,8 @@ type requestParams struct {
cmd []string cmd []string
} }
func getRequestParams(req *restful.Request) requestParams { func getExecRequestParams(req *restful.Request) execRequestParams {
return requestParams{ return execRequestParams{
podNamespace: req.PathParameter("podNamespace"), podNamespace: req.PathParameter("podNamespace"),
podName: req.PathParameter("podID"), podName: req.PathParameter("podID"),
podUID: types.UID(req.PathParameter("uid")), podUID: types.UID(req.PathParameter("uid")),
@ -586,9 +586,23 @@ func getRequestParams(req *restful.Request) requestParams {
} }
} }
type portForwardRequestParams struct {
podNamespace string
podName string
podUID types.UID
}
func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams {
return portForwardRequestParams{
podNamespace: req.PathParameter("podNamespace"),
podName: req.PathParameter("podID"),
podUID: types.UID(req.PathParameter("uid")),
}
}
// getAttach handles requests to attach to a container. // getAttach handles requests to attach to a container.
func (s *Server) getAttach(request *restful.Request, response *restful.Response) { func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
params := getRequestParams(request) params := getExecRequestParams(request)
streamOpts, err := remotecommand.NewOptions(request.Request) streamOpts, err := remotecommand.NewOptions(request.Request)
if err != nil { if err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
@ -626,7 +640,7 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
// getExec handles requests to run a command inside a container. // getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) { func (s *Server) getExec(request *restful.Request, response *restful.Response) {
params := getRequestParams(request) params := getExecRequestParams(request)
streamOpts, err := remotecommand.NewOptions(request.Request) streamOpts, err := remotecommand.NewOptions(request.Request)
if err != nil { if err != nil {
utilruntime.HandleError(err) utilruntime.HandleError(err)
@ -665,7 +679,7 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
// getRun handles requests to run a command inside a container. // getRun handles requests to run a command inside a container.
func (s *Server) getRun(request *restful.Request, response *restful.Response) { func (s *Server) getRun(request *restful.Request, response *restful.Response) {
params := getRequestParams(request) params := getExecRequestParams(request)
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok { if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
@ -699,7 +713,14 @@ func writeJsonResponse(response *restful.Response, data []byte) {
// getPortForward handles a new restful port forward request. It determines the // getPortForward handles a new restful port forward request. It determines the
// pod name and uid and then calls ServePortForward. // pod name and uid and then calls ServePortForward.
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) { func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
params := getRequestParams(request) params := getPortForwardRequestParams(request)
portForwardOptions, err := portforward.NewV4Options(request.Request)
if err != nil {
utilruntime.HandleError(err)
response.WriteError(http.StatusBadRequest, err)
return
}
pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) pod, ok := s.host.GetPodByName(params.podNamespace, params.podName)
if !ok { if !ok {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
@ -710,7 +731,7 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
return return
} }
redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID) redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
if err != nil { if err != nil {
streaming.WriteError(err, response.ResponseWriter) streaming.WriteError(err, response.ResponseWriter)
return return
@ -725,6 +746,7 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
s.host, s.host,
kubecontainer.GetPodFullName(pod), kubecontainer.GetPodFullName(pod),
params.podUID, params.podUID,
portForwardOptions,
s.host.StreamingConnectionIdleTimeout(), s.host.StreamingConnectionIdleTimeout(),
remotecommand.DefaultStreamCreationTimeout, remotecommand.DefaultStreamCreationTimeout,
portforward.SupportedProtocols) portforward.SupportedProtocols)

View File

@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing" kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/term"
@ -73,7 +74,7 @@ type fakeKubelet struct {
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
execFunc func(pod string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error execFunc func(pod string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
attachFunc func(pod string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error attachFunc func(pod string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error
portForwardFunc func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error portForwardFunc func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
containerLogsFunc func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error containerLogsFunc func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
streamingConnectionIdleTimeoutFunc func() time.Duration streamingConnectionIdleTimeoutFunc func() time.Duration
hostnameFunc func() string hostnameFunc func() string
@ -139,7 +140,7 @@ func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container str
return fk.attachFunc(name, uid, container, in, out, err, tty) return fk.attachFunc(name, uid, container, in, out, err, tty)
} }
func (fk *fakeKubelet) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { func (fk *fakeKubelet) PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
return fk.portForwardFunc(name, uid, port, stream) return fk.portForwardFunc(name, uid, port, stream)
} }
@ -151,7 +152,7 @@ func (fk *fakeKubelet) GetAttach(podFullName string, podUID types.UID, container
return fk.redirectURL, nil return fk.redirectURL, nil
} }
func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
return fk.redirectURL, nil return fk.redirectURL, nil
} }
@ -1503,7 +1504,7 @@ func TestServePortForward(t *testing.T) {
portForwardFuncDone := make(chan struct{}) portForwardFuncDone := make(chan struct{})
fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
defer close(portForwardFuncDone) defer close(portForwardFuncDone)
if e, a := expectedPodName, name; e != a { if e, a := expectedPodName, name; e != a {
@ -1514,11 +1515,11 @@ func TestServePortForward(t *testing.T) {
t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a) t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a)
} }
p, err := strconv.ParseUint(test.port, 10, 16) p, err := strconv.ParseInt(test.port, 10, 32)
if err != nil { if err != nil {
t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err) t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err)
} }
if e, a := uint16(p), port; e != a { if e, a := int32(p), port; e != a {
t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a) t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a)
} }

View File

@ -70,7 +70,7 @@ func TestServeWSPortForward(t *testing.T) {
portForwardFuncDone := make(chan struct{}) portForwardFuncDone := make(chan struct{})
fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
defer close(portForwardFuncDone) defer close(portForwardFuncDone)
if e, a := expectedPodName, name; e != a { if e, a := expectedPodName, name; e != a {
@ -81,11 +81,11 @@ func TestServeWSPortForward(t *testing.T) {
t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a) t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a)
} }
p, err := strconv.ParseUint(test.port, 10, 16) p, err := strconv.ParseInt(test.port, 10, 32)
if err != nil { if err != nil {
t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err) t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err)
} }
if e, a := uint16(p), port; e != a { if e, a := int32(p), port; e != a {
t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a) t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a)
} }
@ -203,9 +203,9 @@ func TestServeWSMultiplePortForward(t *testing.T) {
portForwardWG.Add(len(ports)) portForwardWG.Add(len(ports))
portsMutex := sync.Mutex{} portsMutex := sync.Mutex{}
portsForwarded := map[uint16]struct{}{} portsForwarded := map[int32]struct{}{}
fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
defer portForwardWG.Done() defer portForwardWG.Done()
if e, a := expectedPodName, name; e != a { if e, a := expectedPodName, name; e != a {

View File

@ -302,12 +302,19 @@ func (s *server) servePortForward(req *restful.Request, resp *restful.Response)
return return
} }
portForwardOptions, err := portforward.NewV4Options(req.Request)
if err != nil {
resp.WriteError(http.StatusBadRequest, err)
return
}
portforward.ServePortForward( portforward.ServePortForward(
resp.ResponseWriter, resp.ResponseWriter,
req.Request, req.Request,
s.runtime, s.runtime,
pf.PodSandboxId, pf.PodSandboxId,
"", // unused: podUID "", // unused: podUID
portForwardOptions,
s.config.StreamIdleTimeout, s.config.StreamIdleTimeout,
s.config.StreamCreationTimeout, s.config.StreamCreationTimeout,
s.config.SupportedPortForwardProtocols) s.config.SupportedPortForwardProtocols)
@ -331,6 +338,6 @@ func (a *criAdapter) AttachContainer(podName string, podUID types.UID, container
return a.Attach(container, in, out, err, tty, resize) return a.Attach(container, in, out, err, tty, resize)
} }
func (a *criAdapter) PortForward(podName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error { func (a *criAdapter) PortForward(podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
return a.Runtime.PortForward(podName, int32(port), stream) return a.Runtime.PortForward(podName, port, stream)
} }

View File

@ -384,14 +384,13 @@ func streamParams(params url.Values, opts runtime.Object) error {
params.Add(api.ExecTTYParam, "1") params.Add(api.ExecTTYParam, "1")
} }
case *api.PodPortForwardOptions: case *api.PodPortForwardOptions:
if len(opts.Ports) == 0 { if len(opts.Ports) > 0 {
return errors.NewBadRequest("at least one port must be specified") ports := make([]string, len(opts.Ports))
for i, p := range opts.Ports {
ports[i] = strconv.FormatInt(int64(p), 10)
}
params.Add(api.PortHeader, strings.Join(ports, ","))
} }
ports := make([]string, len(opts.Ports))
for i, p := range opts.Ports {
ports[i] = strconv.FormatInt(int64(p), 10)
}
params.Add(api.PortHeader, strings.Join(ports, ","))
default: default:
return fmt.Errorf("Unknown object for streaming: %v", opts) return fmt.Errorf("Unknown object for streaming: %v", opts)
} }

View File

@ -363,16 +363,21 @@ func TestPortForwardLocation(t *testing.T) {
}, },
{ {
in: &api.Pod{ in: &api.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns",
Name: "pod1",
},
Spec: api.PodSpec{ Spec: api.PodSpec{
NodeName: "node1", NodeName: "node1",
}, },
}, },
info: &client.ConnectionInfo{},
opts: &api.PodPortForwardOptions{}, opts: &api.PodPortForwardOptions{},
expectedErr: errors.NewBadRequest("at least one port must be specified"), expectedURL: &url.URL{Host: ":", Path: "/portForward/ns/pod1"},
}, },
{ {
in: &api.Pod{ in: &api.Pod{
ObjectMeta: api.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: "ns", Namespace: "ns",
Name: "pod1", Name: "pod1",
}, },