diff --git a/pkg/client/tests/portfoward_test.go b/pkg/client/tests/portfoward_test.go index aaf7dba8bcf..077901ae41b 100644 --- a/pkg/client/tests/portfoward_test.go +++ b/pkg/client/tests/portfoward_test.go @@ -42,16 +42,16 @@ import ( type fakePortForwarder struct { lock sync.Mutex // stores data expected from the stream per port - expected map[uint16]string + expected map[int32]string // stores data received from the stream per port - received map[uint16]string + received map[int32]string // data to be sent to the stream per port - send map[uint16]string + send map[int32]string } var _ portforward.PortForwarder = &fakePortForwarder{} -func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { +func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { defer stream.Close() // read from the client @@ -77,14 +77,14 @@ func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16 // fakePortForwardServer creates an HTTP server that can handle port forwarding // requests. -func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[uint16]string) http.HandlerFunc { +func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[int32]string) http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { pf := &fakePortForwarder{ expected: expectedFromClient, - received: make(map[uint16]string), + received: make(map[int32]string), send: serverSends, } - portforward.ServePortForward(w, req, pf, "pod", "uid", 0, 10*time.Second, portforward.SupportedProtocols) + portforward.ServePortForward(w, req, pf, "pod", "uid", nil, 0, 10*time.Second, portforward.SupportedProtocols) for port, expected := range expectedFromClient { actual, ok := pf.received[port] @@ -109,19 +109,19 @@ func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedF func TestForwardPorts(t *testing.T) { tests := map[string]struct { ports []string - clientSends map[uint16]string - serverSends map[uint16]string + clientSends map[int32]string + serverSends map[int32]string }{ "forward 1 port with no data either direction": { ports: []string{"5000"}, }, "forward 2 ports with bidirectional data": { ports: []string{"5001", "6000"}, - clientSends: map[uint16]string{ + clientSends: map[int32]string{ 5001: "abcd", 6000: "ghij", }, - serverSends: map[uint16]string{ + serverSends: map[int32]string{ 5001: "1234", 6000: "5678", }, diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 3997d02f756..16417901e3d 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -130,7 +130,7 @@ type DirectStreamingRuntime interface { // tty. ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error // Forward the specified port from the specified pod to the stream. - PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error + PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error // ContainerAttach encapsulates the attaching to containers for testability ContainerAttacher } @@ -141,7 +141,7 @@ type DirectStreamingRuntime interface { type IndirectStreamingRuntime interface { GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) - GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) + GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) } type ImageService interface { diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 5ae3f2fd65b..21ede430502 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -73,7 +73,7 @@ type FakeDirectStreamingRuntime struct { TTY bool // Port-forward args Pod *Pod - Port uint16 + Port int32 Stream io.ReadWriteCloser } } @@ -394,7 +394,7 @@ func (f *FakeRuntime) RemoveImage(image ImageSpec) error { return f.Err } -func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error { +func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error { f.Lock() defer f.Unlock() @@ -471,7 +471,7 @@ func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout, return &url.URL{Host: FakeHost}, f.Err } -func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { +func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/dockershim/docker_streaming.go b/pkg/kubelet/dockershim/docker_streaming.go index e7bfb3e2222..9f9f3bef2ba 100644 --- a/pkg/kubelet/dockershim/docker_streaming.go +++ b/pkg/kubelet/dockershim/docker_streaming.go @@ -64,7 +64,7 @@ func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream i if port < 0 || port > math.MaxUint16 { return fmt.Errorf("invalid port %d", port) } - return dockertools.PortForward(r.client, podSandboxID, uint16(port), stream) + return dockertools.PortForward(r.client, podSandboxID, port, stream) } // ExecSync executes a command in the container, and returns the stdout output. diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 122965d2b34..019f1317a58 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -1353,7 +1353,7 @@ func noPodInfraContainerError(podName, podNamespace string) error { // - match cgroups of container // - should we support nsenter + socat on the host? (current impl) // - should we support nsenter + socat in a container, running with elevated privs and --pid=host? -func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { +func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port int32, stream io.ReadWriteCloser) error { podInfraContainer := pod.FindContainerByName(PodInfraContainerName) if podInfraContainer == nil { return noPodInfraContainerError(pod.Name, pod.Namespace) @@ -1369,7 +1369,7 @@ func (dm *DockerManager) UpdatePodCIDR(podCIDR string) error { } // Temporarily export this function to share with dockershim. -func PortForward(client DockerInterface, podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error { +func PortForward(client DockerInterface, podInfraContainerID string, port int32, stream io.ReadWriteCloser) error { container, err := client.InspectContainer(podInfraContainerID) if err != nil { return err diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 34fdd8d6787..c662f1a2ce0 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -50,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/envvars" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/qos" + "k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -1394,7 +1395,7 @@ func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, contain // PortForward connects to the pod's port and copies data between the port // and the stream. -func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error { +func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error { streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime) if !ok { return fmt.Errorf("streaming methods not supported by runtime") @@ -1467,7 +1468,7 @@ func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName } // GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it. -func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { +func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) { switch streamingRuntime := kl.containerRuntime.(type) { case kubecontainer.DirectStreamingRuntime: // Kubelet will serve the attach directly. @@ -1484,7 +1485,7 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID return nil, fmt.Errorf("pod not found (%q)", podFullName) } - return streamingRuntime.GetPortForward(podName, podNamespace, podUID) + return streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports) default: return nil, fmt.Errorf("container runtime does not support port-forward") } diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index c5b1d2bba1c..ba8d0e854fc 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" ) @@ -1607,7 +1608,7 @@ func TestPortForward(t *testing.T) { podName = "podFoo" podNamespace = "nsFoo" podUID types.UID = "12345678" - port uint16 = 5000 + port int32 = 5000 ) var ( stream = &fakeReadWriteCloser{} @@ -1646,7 +1647,7 @@ func TestPortForward(t *testing.T) { podFullName := kubecontainer.GetPodFullName(podWithUidNameNs(podUID, tc.podName, podNamespace)) { // No streaming case description := "no streaming - " + tc.description - redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) + redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{}) assert.Error(t, err, description) assert.Nil(t, redirect, description) @@ -1658,7 +1659,7 @@ func TestPortForward(t *testing.T) { fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime} kubelet.containerRuntime = fakeRuntime - redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) + redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{}) assert.NoError(t, err, description) assert.Nil(t, redirect, description) @@ -1677,7 +1678,7 @@ func TestPortForward(t *testing.T) { fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime} kubelet.containerRuntime = fakeRuntime - redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID) + redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{}) if tc.expectError { assert.Error(t, err, description) } else { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go index dc115e41f70..ec9ecd1d64d 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go @@ -237,7 +237,7 @@ func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID kubetypes.UID, s } // GetPortForward gets the endpoint the runtime will serve the port-forward request from. -func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID) (*url.URL, error) { +func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID, ports []int32) (*url.URL, error) { sandboxIDs, err := m.getSandboxIDByPodUID(podUID, nil) if err != nil { return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err) @@ -245,9 +245,9 @@ func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, if len(sandboxIDs) == 0 { return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID)) } - // TODO: Port is unused for now, but we may need it in the future. req := &runtimeapi.PortForwardRequest{ PodSandboxId: sandboxIDs[0], + Port: ports, } resp, err := m.runtimeService.PortForward(req) if err != nil { diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index e3fea86420b..31ff226c814 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -2107,7 +2107,7 @@ func (r *Runtime) ExecInContainer(containerID kubecontainer.ContainerID, cmd []s // - should we support nsenter + socat in a container, running with elevated privs and --pid=host? // // TODO(yifan): Merge with the same function in dockertools. -func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { +func (r *Runtime) PortForward(pod *kubecontainer.Pod, port int32, stream io.ReadWriteCloser) error { glog.V(4).Infof("Rkt port forwarding in container.") ctx, cancel := context.WithTimeout(context.Background(), r.requestTimeout) diff --git a/pkg/kubelet/server/portforward/httpstream.go b/pkg/kubelet/server/portforward/httpstream.go index 8af23b966ef..5f872c82010 100644 --- a/pkg/kubelet/server/portforward/httpstream.go +++ b/pkg/kubelet/server/portforward/httpstream.go @@ -240,10 +240,10 @@ func (h *httpStreamHandler) portForward(p *httpStreamPair) { defer p.errorStream.Close() portString := p.dataStream.Headers().Get(api.PortHeader) - port, _ := strconv.ParseUint(portString, 10, 16) + port, _ := strconv.ParseInt(portString, 10, 32) glog.V(5).Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) - err := h.forwarder.PortForward(h.pod, h.uid, uint16(port), p.dataStream) + err := h.forwarder.PortForward(h.pod, h.uid, int32(port), p.dataStream) glog.V(5).Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) if err != nil { diff --git a/pkg/kubelet/server/portforward/portforward.go b/pkg/kubelet/server/portforward/portforward.go index a812d61420d..60a96e51a23 100644 --- a/pkg/kubelet/server/portforward/portforward.go +++ b/pkg/kubelet/server/portforward/portforward.go @@ -30,7 +30,7 @@ import ( // in a pod. type PortForwarder interface { // PortForwarder copies data between a data stream and a port in a pod. - PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error + PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error } // ServePortForward handles a port forwarding request. A single request is @@ -38,10 +38,10 @@ type PortForwarder interface { // been timed out due to idleness. This function handles multiple forwarded // connections; i.e., multiple `curl http://localhost:8888/` requests will be // handled by a single invocation of ServePortForward. -func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) { +func ServePortForward(w http.ResponseWriter, req *http.Request, portForwarder PortForwarder, podName string, uid types.UID, portForwardOptions *V4Options, idleTimeout time.Duration, streamCreationTimeout time.Duration, supportedProtocols []string) { var err error if wsstream.IsWebSocketRequest(req) { - err = handleWebSocketStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout) + err = handleWebSocketStreams(req, w, portForwarder, podName, uid, portForwardOptions, supportedProtocols, idleTimeout, streamCreationTimeout) } else { err = handleHttpStreams(req, w, portForwarder, podName, uid, supportedProtocols, idleTimeout, streamCreationTimeout) } diff --git a/pkg/kubelet/server/portforward/websocket.go b/pkg/kubelet/server/portforward/websocket.go index f201062735c..801d7b37ef6 100644 --- a/pkg/kubelet/server/portforward/websocket.go +++ b/pkg/kubelet/server/portforward/websocket.go @@ -45,21 +45,25 @@ const ( // options contains details about which streams are required for // port forwarding. -type v4Options struct { - ports []uint16 +type V4Options struct { + Ports []int32 } // newOptions creates a new options from the Request. -func newV4Options(req *http.Request) (*v4Options, error) { - portStrings := req.URL.Query()[api.PortHeader] - if len(portStrings) == 0 { - return nil, fmt.Errorf("%q is required", api.PortHeader) +func NewV4Options(req *http.Request) (*V4Options, error) { + if !wsstream.IsWebSocketRequest(req) { + return &V4Options{}, nil } - ports := make([]uint16, 0, len(portStrings)) + portStrings := req.URL.Query()[api.PortHeader] + if len(portStrings) == 0 { + return nil, fmt.Errorf("query parameter %q is required", api.PortHeader) + } + + ports := make([]int32, 0, len(portStrings)) for _, portString := range portStrings { if len(portString) == 0 { - return nil, fmt.Errorf("%q is cannot be empty", api.PortHeader) + return nil, fmt.Errorf("query parameter %q cannot be empty", api.PortHeader) } for _, p := range strings.Split(portString, ",") { port, err := strconv.ParseUint(p, 10, 16) @@ -69,25 +73,22 @@ func newV4Options(req *http.Request) (*v4Options, error) { if port < 1 { return nil, fmt.Errorf("port %q must be > 0", portString) } - ports = append(ports, uint16(port)) + ports = append(ports, int32(port)) } } - return &v4Options{ - ports: ports, + return &V4Options{ + Ports: ports, }, nil } -func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error { - opts, err := newV4Options(req) - if err != nil { - w.WriteHeader(http.StatusBadRequest) - fmt.Fprint(w, err.Error()) - return err - } - - channels := make([]wsstream.ChannelType, 0, len(opts.ports)*2) - for i := 0; i < len(opts.ports); i++ { +// handleWebSocketStreams handles requests to forward ports to a pod via +// a PortForwarder. A pair of streams are created per port (DATA n, +// ERROR n+1). The associated port is written to each stream as a unsigned 16 +// bit integer in little endian format. +func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwarder PortForwarder, podName string, uid types.UID, opts *V4Options, supportedPortForwardProtocols []string, idleTimeout, streamCreationTimeout time.Duration) error { + channels := make([]wsstream.ChannelType, 0, len(opts.Ports)*2) + for i := 0; i < len(opts.Ports); i++ { channels = append(channels, wsstream.ReadWriteChannel, wsstream.WriteChannel) } conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{ @@ -111,17 +112,18 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar return err } defer conn.Close() - streamPairs := make([]*websocketStreamPair, len(opts.ports)) + streamPairs := make([]*websocketStreamPair, len(opts.Ports)) for i := range streamPairs { streamPair := websocketStreamPair{ - port: opts.ports[i], + port: opts.Ports[i], dataStream: streams[i*2+dataChannel], errorStream: streams[i*2+errorChannel], } streamPairs[i] = &streamPair portBytes := make([]byte, 2) - binary.LittleEndian.PutUint16(portBytes, streamPair.port) + // port is always positive so conversion is allowable + binary.LittleEndian.PutUint16(portBytes, uint16(streamPair.port)) streamPair.dataStream.Write(portBytes) streamPair.errorStream.Write(portBytes) } @@ -140,7 +142,7 @@ func handleWebSocketStreams(req *http.Request, w http.ResponseWriter, portForwar // websocketStreamPair represents the error and data streams for a port // forwarding request. type websocketStreamPair struct { - port uint16 + port int32 dataStream io.ReadWriteCloser errorStream io.WriteCloser } @@ -149,7 +151,7 @@ type websocketStreamPair struct { // request over a websocket connection type websocketStreamHandler struct { conn *wsstream.Conn - ports []uint16 + ports []int32 streamPairs []*websocketStreamPair pod string uid types.UID diff --git a/pkg/kubelet/server/portforward/websocket_test.go b/pkg/kubelet/server/portforward/websocket_test.go new file mode 100644 index 00000000000..8f508d9bd85 --- /dev/null +++ b/pkg/kubelet/server/portforward/websocket_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package portforward + +import ( + "net/http" + "reflect" + "testing" +) + +func TestV4Options(t *testing.T) { + tests := map[string]struct { + url string + websocket bool + expectedOpts *V4Options + expectedError string + }{ + "non-ws request": { + url: "http://example.com", + expectedOpts: &V4Options{}, + }, + "missing port": { + url: "http://example.com", + websocket: true, + expectedError: `query parameter "port" is required`, + }, + "unable to parse port": { + url: "http://example.com?port=abc", + websocket: true, + expectedError: `unable to parse "abc" as a port: strconv.ParseUint: parsing "abc": invalid syntax`, + }, + "negative port": { + url: "http://example.com?port=-1", + websocket: true, + expectedError: `unable to parse "-1" as a port: strconv.ParseUint: parsing "-1": invalid syntax`, + }, + "one port": { + url: "http://example.com?port=80", + websocket: true, + expectedOpts: &V4Options{ + Ports: []int32{80}, + }, + }, + "multiple ports": { + url: "http://example.com?port=80,90,100", + websocket: true, + expectedOpts: &V4Options{ + Ports: []int32{80, 90, 100}, + }, + }, + "multiple port": { + url: "http://example.com?port=80&port=90", + websocket: true, + expectedOpts: &V4Options{ + Ports: []int32{80, 90}, + }, + }, + } + for name, test := range tests { + req, err := http.NewRequest(http.MethodGet, test.url, nil) + if err != nil { + t.Errorf("%s: invalid url %q err=%q", name, test.url, err) + continue + } + if test.websocket { + req.Header.Set("Connection", "Upgrade") + req.Header.Set("Upgrade", "websocket") + } + opts, err := NewV4Options(req) + if len(test.expectedError) > 0 { + if err == nil { + t.Errorf("%s: expected err=%q, but it was nil", name, test.expectedError) + } + if e, a := test.expectedError, err.Error(); e != a { + t.Errorf("%s: expected err=%q, got %q", name, e, a) + } + continue + } + if err != nil { + t.Errorf("%s: unexpected error %v", name, err) + continue + } + if !reflect.DeepEqual(test.expectedOpts, opts) { + t.Errorf("%s: expected options %#v, got %#v", name, test.expectedOpts, err) + } + } +} diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 6918b8e68fb..8f77f8befc3 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -172,7 +172,7 @@ type HostInterface interface { AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) - PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error + PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error StreamingConnectionIdleTimeout() time.Duration ResyncInterval() time.Duration GetHostname() string @@ -184,7 +184,7 @@ type HostInterface interface { ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommand.Options) (*url.URL, error) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommand.Options) (*url.URL, error) - GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) + GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) } // NewServer initializes and configures a kubelet.Server object to handle HTTP requests. @@ -568,7 +568,7 @@ func (s *Server) getSpec(request *restful.Request, response *restful.Response) { response.WriteEntity(info) } -type requestParams struct { +type execRequestParams struct { podNamespace string podName string podUID types.UID @@ -576,8 +576,8 @@ type requestParams struct { cmd []string } -func getRequestParams(req *restful.Request) requestParams { - return requestParams{ +func getExecRequestParams(req *restful.Request) execRequestParams { + return execRequestParams{ podNamespace: req.PathParameter("podNamespace"), podName: req.PathParameter("podID"), podUID: types.UID(req.PathParameter("uid")), @@ -586,9 +586,23 @@ func getRequestParams(req *restful.Request) requestParams { } } +type portForwardRequestParams struct { + podNamespace string + podName string + podUID types.UID +} + +func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams { + return portForwardRequestParams{ + podNamespace: req.PathParameter("podNamespace"), + podName: req.PathParameter("podID"), + podUID: types.UID(req.PathParameter("uid")), + } +} + // getAttach handles requests to attach to a container. func (s *Server) getAttach(request *restful.Request, response *restful.Response) { - params := getRequestParams(request) + params := getExecRequestParams(request) streamOpts, err := remotecommand.NewOptions(request.Request) if err != nil { utilruntime.HandleError(err) @@ -626,7 +640,7 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response) // getExec handles requests to run a command inside a container. func (s *Server) getExec(request *restful.Request, response *restful.Response) { - params := getRequestParams(request) + params := getExecRequestParams(request) streamOpts, err := remotecommand.NewOptions(request.Request) if err != nil { utilruntime.HandleError(err) @@ -665,7 +679,7 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) { // getRun handles requests to run a command inside a container. func (s *Server) getRun(request *restful.Request, response *restful.Response) { - params := getRequestParams(request) + params := getExecRequestParams(request) pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) @@ -699,7 +713,14 @@ func writeJsonResponse(response *restful.Response, data []byte) { // getPortForward handles a new restful port forward request. It determines the // pod name and uid and then calls ServePortForward. func (s *Server) getPortForward(request *restful.Request, response *restful.Response) { - params := getRequestParams(request) + params := getPortForwardRequestParams(request) + + portForwardOptions, err := portforward.NewV4Options(request.Request) + if err != nil { + utilruntime.HandleError(err) + response.WriteError(http.StatusBadRequest, err) + return + } pod, ok := s.host.GetPodByName(params.podNamespace, params.podName) if !ok { response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) @@ -710,7 +731,7 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp return } - redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID) + redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions) if err != nil { streaming.WriteError(err, response.ResponseWriter) return @@ -725,6 +746,7 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp s.host, kubecontainer.GetPodFullName(pod), params.podUID, + portForwardOptions, s.host.StreamingConnectionIdleTimeout(), remotecommand.DefaultStreamCreationTimeout, portforward.SupportedProtocols) diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 053cae4893e..98062ae512e 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -52,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/util/term" @@ -73,7 +74,7 @@ type fakeKubelet struct { runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) execFunc func(pod string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error attachFunc func(pod string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error - portForwardFunc func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error + portForwardFunc func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error containerLogsFunc func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error streamingConnectionIdleTimeoutFunc func() time.Duration hostnameFunc func() string @@ -139,7 +140,7 @@ func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container str return fk.attachFunc(name, uid, container, in, out, err, tty) } -func (fk *fakeKubelet) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { +func (fk *fakeKubelet) PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { return fk.portForwardFunc(name, uid, port, stream) } @@ -151,7 +152,7 @@ func (fk *fakeKubelet) GetAttach(podFullName string, podUID types.UID, container return fk.redirectURL, nil } -func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) { +func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) { return fk.redirectURL, nil } @@ -1503,7 +1504,7 @@ func TestServePortForward(t *testing.T) { portForwardFuncDone := make(chan struct{}) - fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { + fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { defer close(portForwardFuncDone) if e, a := expectedPodName, name; e != a { @@ -1514,11 +1515,11 @@ func TestServePortForward(t *testing.T) { t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a) } - p, err := strconv.ParseUint(test.port, 10, 16) + p, err := strconv.ParseInt(test.port, 10, 32) if err != nil { t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err) } - if e, a := uint16(p), port; e != a { + if e, a := int32(p), port; e != a { t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a) } diff --git a/pkg/kubelet/server/server_websocket_test.go b/pkg/kubelet/server/server_websocket_test.go index 2d55100bfec..080e09fb6b9 100644 --- a/pkg/kubelet/server/server_websocket_test.go +++ b/pkg/kubelet/server/server_websocket_test.go @@ -70,7 +70,7 @@ func TestServeWSPortForward(t *testing.T) { portForwardFuncDone := make(chan struct{}) - fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { + fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { defer close(portForwardFuncDone) if e, a := expectedPodName, name; e != a { @@ -81,11 +81,11 @@ func TestServeWSPortForward(t *testing.T) { t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a) } - p, err := strconv.ParseUint(test.port, 10, 16) + p, err := strconv.ParseInt(test.port, 10, 32) if err != nil { t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err) } - if e, a := uint16(p), port; e != a { + if e, a := int32(p), port; e != a { t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a) } @@ -203,9 +203,9 @@ func TestServeWSMultiplePortForward(t *testing.T) { portForwardWG.Add(len(ports)) portsMutex := sync.Mutex{} - portsForwarded := map[uint16]struct{}{} + portsForwarded := map[int32]struct{}{} - fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { + fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { defer portForwardWG.Done() if e, a := expectedPodName, name; e != a { diff --git a/pkg/kubelet/server/streaming/server.go b/pkg/kubelet/server/streaming/server.go index 9d70d4cb21b..05cae998560 100644 --- a/pkg/kubelet/server/streaming/server.go +++ b/pkg/kubelet/server/streaming/server.go @@ -302,12 +302,19 @@ func (s *server) servePortForward(req *restful.Request, resp *restful.Response) return } + portForwardOptions, err := portforward.NewV4Options(req.Request) + if err != nil { + resp.WriteError(http.StatusBadRequest, err) + return + } + portforward.ServePortForward( resp.ResponseWriter, req.Request, s.runtime, pf.PodSandboxId, "", // unused: podUID + portForwardOptions, s.config.StreamIdleTimeout, s.config.StreamCreationTimeout, s.config.SupportedPortForwardProtocols) @@ -331,6 +338,6 @@ func (a *criAdapter) AttachContainer(podName string, podUID types.UID, container return a.Attach(container, in, out, err, tty, resize) } -func (a *criAdapter) PortForward(podName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error { - return a.Runtime.PortForward(podName, int32(port), stream) +func (a *criAdapter) PortForward(podName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error { + return a.Runtime.PortForward(podName, port, stream) } diff --git a/pkg/registry/core/pod/strategy.go b/pkg/registry/core/pod/strategy.go index 5ef5de96d24..3f5c1f64fe6 100644 --- a/pkg/registry/core/pod/strategy.go +++ b/pkg/registry/core/pod/strategy.go @@ -384,14 +384,13 @@ func streamParams(params url.Values, opts runtime.Object) error { params.Add(api.ExecTTYParam, "1") } case *api.PodPortForwardOptions: - if len(opts.Ports) == 0 { - return errors.NewBadRequest("at least one port must be specified") + if len(opts.Ports) > 0 { + ports := make([]string, len(opts.Ports)) + for i, p := range opts.Ports { + ports[i] = strconv.FormatInt(int64(p), 10) + } + params.Add(api.PortHeader, strings.Join(ports, ",")) } - ports := make([]string, len(opts.Ports)) - for i, p := range opts.Ports { - ports[i] = strconv.FormatInt(int64(p), 10) - } - params.Add(api.PortHeader, strings.Join(ports, ",")) default: return fmt.Errorf("Unknown object for streaming: %v", opts) } diff --git a/pkg/registry/core/pod/strategy_test.go b/pkg/registry/core/pod/strategy_test.go index 6c5d140fc68..1f9e78c25cc 100644 --- a/pkg/registry/core/pod/strategy_test.go +++ b/pkg/registry/core/pod/strategy_test.go @@ -363,16 +363,21 @@ func TestPortForwardLocation(t *testing.T) { }, { in: &api.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "pod1", + }, Spec: api.PodSpec{ NodeName: "node1", }, }, + info: &client.ConnectionInfo{}, opts: &api.PodPortForwardOptions{}, - expectedErr: errors.NewBadRequest("at least one port must be specified"), + expectedURL: &url.URL{Host: ":", Path: "/portForward/ns/pod1"}, }, { in: &api.Pod{ - ObjectMeta: api.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Namespace: "ns", Name: "pod1", },