Merge pull request #64006 from Random-Liu/streaming-auth

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Add proxy for container streaming in kubelet for streaming auth.

For https://github.com/kubernetes/kubernetes/issues/36666, option 2 of https://github.com/kubernetes/kubernetes/issues/36666#issuecomment-378440458.

This PR:
1. Removed the `DirectStreamingRuntime`, and changed `IndirectStreamingRuntime` to `StreamingRuntime`. All `DirectStreamingRuntime`s, `dockertools` and `rkt`, were removed.
2. Proxy container streaming in kubelet instead of returning redirect to apiserver. This solves the container runtime authentication issue, which is what we agreed on in https://github.com/kubernetes/kubernetes/issues/36666.

Please note that, this PR replaced the redirect with proxy directly instead of adding a knob to switch between the 2 behaviors. For existing CRI runtimes like containerd and cri-o, they should change to serve container streaming on localhost, so as to make the whole container streaming connection secure.

 If a general authentication mechanism proposed in https://github.com/kubernetes/kubernetes/issues/62747 is ready, we can switch back to redirect, and all code can be found in github history.

Please also note that this added some overhead in kubelet when there are container streaming connections. However, the actual bottleneck is in the apiserver anyway, because it does proxy for all container streaming happens in the cluster. So it seems fine to get security and simplicity with this overhead. @derekwaynecarr @mrunalp Are you ok with this? Or do you prefer a knob?

@yujuhong @timstclair @dchen1107 @mikebrow @feiskyer 
/cc @kubernetes/sig-node-pr-reviews 
**Release note**:

```release-note
Kubelet now proxies container streaming between apiserver and container runtime. The connection between kubelet and apiserver is authenticated. Container runtime should change streaming server to serve on localhost, to make the connection between kubelet and container runtime local.

In this way, the whole container streaming connection is secure. To switch back to the old behavior, set `--redirect-container-streaming=true` flag.
```
This commit is contained in:
Kubernetes Submit Queue 2018-05-31 22:45:29 -07:00 committed by GitHub
commit 8d10a8f74f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 759 additions and 1093 deletions

View File

@ -46,6 +46,7 @@ func NewContainerRuntimeOptions() *config.ContainerRuntimeOptions {
return &config.ContainerRuntimeOptions{ return &config.ContainerRuntimeOptions{
ContainerRuntime: kubetypes.DockerContainerRuntime, ContainerRuntime: kubetypes.DockerContainerRuntime,
RedirectContainerStreaming: false,
DockerEndpoint: dockerEndpoint, DockerEndpoint: dockerEndpoint,
DockershimRootDirectory: "/var/lib/dockershim", DockershimRootDirectory: "/var/lib/dockershim",
DockerDisableSharedPID: true, DockerDisableSharedPID: true,

View File

@ -1173,30 +1173,13 @@ func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConf
SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols, SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
} }
// Standalone dockershim will always start the local streaming server.
ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings, ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings,
f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, r.DockerDisableSharedPID) f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, r.DockerDisableSharedPID, true /*startLocalStreamingServer*/)
if err != nil { if err != nil {
return err return err
} }
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds) server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
if err := server.Start(stopCh); err != nil { return server.Start(stopCh)
return err
}
streamingServer := &http.Server{
Addr: net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))),
Handler: ds,
}
go func() {
<-stopCh
streamingServer.Shutdown(context.Background())
}()
// Start the streaming server
if err := streamingServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return err
}
return nil
} }

View File

@ -132,7 +132,6 @@ go_library(
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
"//vendor/k8s.io/client-go/util/certificate:go_default_library", "//vendor/k8s.io/client-go/util/certificate:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
"//vendor/k8s.io/client-go/util/integer:go_default_library", "//vendor/k8s.io/client-go/util/integer:go_default_library",

View File

@ -31,6 +31,15 @@ type ContainerRuntimeOptions struct {
ContainerRuntime string ContainerRuntime string
// RuntimeCgroups that container runtime is expected to be isolated in. // RuntimeCgroups that container runtime is expected to be isolated in.
RuntimeCgroups string RuntimeCgroups string
// RedirectContainerStreaming enables container streaming redirect.
// When RedirectContainerStreaming is false, kubelet will proxy container streaming data
// between apiserver and container runtime. This approach is more secure, but the proxy
// introduces some overhead.
// When RedirectContainerStreaming is true, kubelet will return an http redirect to apiserver,
// and apiserver will access container runtime directly. This approach is more performant,
// but less secure because the connection between apiserver and container runtime is not
// authenticated.
RedirectContainerStreaming bool
// Docker-specific options. // Docker-specific options.
@ -77,6 +86,7 @@ func (s *ContainerRuntimeOptions) AddFlags(fs *pflag.FlagSet) {
// General settings. // General settings.
fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'remote', 'rkt (deprecated)'.") fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'remote', 'rkt (deprecated)'.")
fs.StringVar(&s.RuntimeCgroups, "runtime-cgroups", s.RuntimeCgroups, "Optional absolute name of cgroups to create and run the runtime in.") fs.StringVar(&s.RuntimeCgroups, "runtime-cgroups", s.RuntimeCgroups, "Optional absolute name of cgroups to create and run the runtime in.")
fs.BoolVar(&s.RedirectContainerStreaming, "redirect-container-streaming", s.RedirectContainerStreaming, "Enables container streaming redirect. If false, kubelet will proxy container streaming data between apiserver and container runtime; if true, kubelet will return an http redirect to apiserver, and apiserver will access container runtime directly. The proxy approach is more secure, but introduces some overhead. The redirect approach is more performant, but less secure because the connection between apiserver and container runtime is not authenticated.")
// Docker-specific settings. // Docker-specific settings.
fs.BoolVar(&s.ExperimentalDockershim, "experimental-dockershim", s.ExperimentalDockershim, "Enable dockershim only mode. In this mode, kubelet will only start dockershim without any other functionalities. This flag only serves test purpose, please do not use it unless you are conscious of what you are doing. [default=false]") fs.BoolVar(&s.ExperimentalDockershim, "experimental-dockershim", s.ExperimentalDockershim, "Enable dockershim only mode. In this mode, kubelet will only start dockershim without any other functionalities. This flag only serves test purpose, please do not use it unless you are conscious of what you are doing. [default=false]")

View File

@ -21,7 +21,6 @@ go_library(
"//pkg/api/legacyscheme:go_default_library", "//pkg/api/legacyscheme:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/util/format:go_default_library", "//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/ioutils:go_default_library",
"//pkg/util/hash:go_default_library", "//pkg/util/hash:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//third_party/forked/golang/expansion:go_default_library", "//third_party/forked/golang/expansion:go_default_library",

View File

@ -17,11 +17,9 @@ limitations under the License.
package container package container
import ( import (
"bytes"
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"strings" "strings"
"time"
"github.com/golang/glog" "github.com/golang/glog"
@ -32,7 +30,6 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
"k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/ioutils"
hashutil "k8s.io/kubernetes/pkg/util/hash" hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/third_party/forked/golang/expansion" "k8s.io/kubernetes/third_party/forked/golang/expansion"
) )
@ -265,22 +262,6 @@ func FormatPod(pod *Pod) string {
return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.ID) return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.ID)
} }
type containerCommandRunnerWrapper struct {
DirectStreamingRuntime
}
var _ ContainerCommandRunner = &containerCommandRunnerWrapper{}
func (r *containerCommandRunnerWrapper) RunInContainer(id ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
var buffer bytes.Buffer
output := ioutils.WriteCloserWrapper(&buffer)
err := r.ExecInContainer(id, cmd, nil, output, output, false, nil, timeout)
// Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
// the command returned a nonzero exit code). Therefore, always return the output along with the
// error.
return buffer.Bytes(), err
}
// GetContainerSpec gets the container spec by containerName. // GetContainerSpec gets the container spec by containerName.
func GetContainerSpec(pod *v1.Pod, containerName string) *v1.Container { func GetContainerSpec(pod *v1.Pod, containerName string) *v1.Container {
for i, c := range pod.Spec.Containers { for i, c := range pod.Spec.Containers {

View File

@ -124,22 +124,10 @@ type Runtime interface {
UpdatePodCIDR(podCIDR string) error UpdatePodCIDR(podCIDR string) error
} }
// DirectStreamingRuntime is the interface implemented by runtimes for which the streaming calls // StreamingRuntime is the interface implemented by runtimes that handle the serving of the
// (exec/attach/port-forward) should be served directly by the Kubelet.
type DirectStreamingRuntime interface {
// Runs the command in the container of the specified pod. Attaches
// the processes stdin, stdout, and stderr. Optionally uses a tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error
// ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher
}
// IndirectStreamingRuntime is the interface implemented by runtimes that handle the serving of the
// streaming calls (exec/attach/port-forward) themselves. In this case, Kubelet should redirect to // streaming calls (exec/attach/port-forward) themselves. In this case, Kubelet should redirect to
// the runtime server. // the runtime server.
type IndirectStreamingRuntime interface { type StreamingRuntime interface {
GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)

View File

@ -26,7 +26,6 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
. "k8s.io/kubernetes/pkg/kubelet/container" . "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
@ -59,34 +58,13 @@ type FakeRuntime struct {
StatusErr error StatusErr error
} }
type FakeDirectStreamingRuntime struct {
*FakeRuntime
// Arguments to streaming method calls.
Args struct {
// Attach / Exec args
ContainerID ContainerID
Cmd []string
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
TTY bool
// Port-forward args
Pod *Pod
Port int32
Stream io.ReadWriteCloser
}
}
var _ DirectStreamingRuntime = &FakeDirectStreamingRuntime{}
const FakeHost = "localhost:12345" const FakeHost = "localhost:12345"
type FakeIndirectStreamingRuntime struct { type FakeStreamingRuntime struct {
*FakeRuntime *FakeRuntime
} }
var _ IndirectStreamingRuntime = &FakeIndirectStreamingRuntime{} var _ StreamingRuntime = &FakeStreamingRuntime{}
// FakeRuntime should implement Runtime. // FakeRuntime should implement Runtime.
var _ Runtime = &FakeRuntime{} var _ Runtime = &FakeRuntime{}
@ -311,35 +289,6 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS
return &status, f.Err return &status, f.Err
} }
func (f *FakeDirectStreamingRuntime) ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "ExecInContainer")
f.Args.ContainerID = containerID
f.Args.Cmd = cmd
f.Args.Stdin = stdin
f.Args.Stdout = stdout
f.Args.Stderr = stderr
f.Args.TTY = tty
return f.Err
}
func (f *FakeDirectStreamingRuntime) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "AttachContainer")
f.Args.ContainerID = containerID
f.Args.Stdin = stdin
f.Args.Stdout = stdout
f.Args.Stderr = stderr
f.Args.TTY = tty
return f.Err
}
func (f *FakeRuntime) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { func (f *FakeRuntime) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
@ -394,18 +343,6 @@ func (f *FakeRuntime) RemoveImage(image ImageSpec) error {
return f.Err return f.Err
} }
func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "PortForward")
f.Args.Pod = pod
f.Args.Port = port
f.Args.Stream = stream
return f.Err
}
func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) { func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
@ -455,7 +392,7 @@ func (f *FakeRuntime) ImageStats() (*ImageStats, error) {
return nil, f.Err return nil, f.Err
} }
func (f *FakeIndirectStreamingRuntime) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) { func (f *FakeStreamingRuntime) GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
@ -463,7 +400,7 @@ func (f *FakeIndirectStreamingRuntime) GetExec(id ContainerID, cmd []string, std
return &url.URL{Host: FakeHost}, f.Err return &url.URL{Host: FakeHost}, f.Err
} }
func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) { func (f *FakeStreamingRuntime) GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error) {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
@ -471,7 +408,7 @@ func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout,
return &url.URL{Host: FakeHost}, f.Err return &url.URL{Host: FakeHost}, f.Err
} }
func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) { func (f *FakeStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()

View File

@ -85,7 +85,7 @@ const (
type CRIService interface { type CRIService interface {
runtimeapi.RuntimeServiceServer runtimeapi.RuntimeServiceServer
runtimeapi.ImageServiceServer runtimeapi.ImageServiceServer
Start() error Start(<-chan struct{}) error
} }
// DockerService is an interface that embeds the new RuntimeService and // DockerService is an interface that embeds the new RuntimeService and
@ -188,7 +188,8 @@ func NewDockerClientFromConfig(config *ClientConfig) libdocker.Interface {
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config, func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config,
pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, disableSharedPID bool) (DockerService, error) { pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string,
disableSharedPID, startLocalStreamingServer bool) (DockerService, error) {
client := NewDockerClientFromConfig(config) client := NewDockerClientFromConfig(config)
@ -210,6 +211,7 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon
containerManager: cm.NewContainerManager(cgroupsName, client), containerManager: cm.NewContainerManager(cgroupsName, client),
checkpointManager: checkpointManager, checkpointManager: checkpointManager,
disableSharedPID: disableSharedPID, disableSharedPID: disableSharedPID,
startLocalStreamingServer: startLocalStreamingServer,
networkReady: make(map[string]bool), networkReady: make(map[string]bool),
} }
@ -307,6 +309,9 @@ type dockerService struct {
// See proposals/pod-pid-namespace.md for details. // See proposals/pod-pid-namespace.md for details.
// TODO: Remove once the escape hatch is no longer used (https://issues.k8s.io/41938) // TODO: Remove once the escape hatch is no longer used (https://issues.k8s.io/41938)
disableSharedPID bool disableSharedPID bool
// startLocalStreamingServer indicates whether dockershim should start a
// streaming server on localhost.
startLocalStreamingServer bool
} }
// TODO: handle context. // TODO: handle context.
@ -395,13 +400,25 @@ func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.Po
} }
// Start initializes and starts components in dockerService. // Start initializes and starts components in dockerService.
func (ds *dockerService) Start() error { func (ds *dockerService) Start(stopCh <-chan struct{}) error {
// Initialize the legacy cleanup flag. // Initialize the legacy cleanup flag.
if ds.startLocalStreamingServer {
go func() {
<-stopCh
if err := ds.streamingServer.Stop(); err != nil {
glog.Errorf("Failed to stop streaming server: %v", err)
}
}()
go func() {
if err := ds.streamingServer.Start(true); err != nil && err != http.ErrServerClosed {
glog.Fatalf("Failed to start streaming server: %v", err)
}
}()
}
return ds.containerManager.Start() return ds.containerManager.Start()
} }
// Status returns the status of the runtime. // Status returns the status of the runtime.
// TODO(random-liu): Set network condition accordingly here.
func (ds *dockerService) Status(_ context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) { func (ds *dockerService) Status(_ context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) {
runtimeReady := &runtimeapi.RuntimeCondition{ runtimeReady := &runtimeapi.RuntimeCondition{
Type: runtimeapi.RuntimeReady, Type: runtimeapi.RuntimeReady,

View File

@ -51,7 +51,7 @@ func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer {
// Start starts the dockershim grpc server. // Start starts the dockershim grpc server.
func (s *DockerServer) Start(stopCh <-chan struct{}) error { func (s *DockerServer) Start(stopCh <-chan struct{}) error {
// Start the internal service. // Start the internal service.
if err := s.service.Start(); err != nil { if err := s.service.Start(stopCh); err != nil {
glog.Errorf("Unable to start docker service") glog.Errorf("Unable to start docker service")
return err return err
} }

View File

@ -528,6 +528,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
daemonEndpoints: daemonEndpoints, daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager, containerManager: kubeDeps.ContainerManager,
containerRuntimeName: containerRuntime, containerRuntimeName: containerRuntime,
redirectContainerStreaming: crOptions.RedirectContainerStreaming,
nodeIP: parsedNodeIP, nodeIP: parsedNodeIP,
nodeIPValidator: validateNodeIP, nodeIPValidator: validateNodeIP,
clock: clock.RealClock{}, clock: clock.RealClock{},
@ -610,16 +611,16 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
switch containerRuntime { switch containerRuntime {
case kubetypes.DockerContainerRuntime: case kubetypes.DockerContainerRuntime:
// Create and start the CRI shim running as a grpc server. // Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps) streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig, ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory,
crOptions.DockerDisableSharedPID) crOptions.DockerDisableSharedPID, !crOptions.RedirectContainerStreaming)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// For now, the CRI shim redirects the streaming requests to the if crOptions.RedirectContainerStreaming {
// kubelet, which handles the requests using DockerService..
klet.criHandler = ds klet.criHandler = ds
}
// The unix socket for kubelet <-> dockershim communication. // The unix socket for kubelet <-> dockershim communication.
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q", glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
@ -675,6 +676,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err return nil, err
} }
klet.containerRuntime = runtime klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime klet.runner = runtime
if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) { if cadvisor.UsingLegacyCadvisorStats(containerRuntime, remoteRuntimeEndpoint) {
@ -1005,9 +1007,15 @@ type Kubelet struct {
// The name of the container runtime // The name of the container runtime
containerRuntimeName string containerRuntimeName string
// redirectContainerStreaming enables container streaming redirect.
redirectContainerStreaming bool
// Container runtime. // Container runtime.
containerRuntime kubecontainer.Runtime containerRuntime kubecontainer.Runtime
// Streaming runtime handles container streaming.
streamingRuntime kubecontainer.StreamingRuntime
// Container runtime service (needed by container runtime Start()). // Container runtime service (needed by container runtime Start()).
// TODO(CD): try to make this available without holding a reference in this // TODO(CD): try to make this available without holding a reference in this
// struct. For example, by adding a getter to generic runtime. // struct. For example, by adding a getter to generic runtime.
@ -2112,11 +2120,6 @@ func (kl *Kubelet) BirthCry() {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.") kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
} }
// StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
// ResyncInterval returns the interval used for periodic syncs. // ResyncInterval returns the interval used for periodic syncs.
func (kl *Kubelet) ResyncInterval() time.Duration { func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval return kl.resyncInterval
@ -2124,12 +2127,12 @@ func (kl *Kubelet) ResyncInterval() time.Duration {
// ListenAndServe runs the kubelet HTTP server. // ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) { func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.containerRuntime, kl.criHandler) server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.redirectContainerStreaming, kl.criHandler)
} }
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime) server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
} }
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
@ -2153,19 +2156,23 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
} }
// Gets the streaming server configuration to use with in-process CRI shims. // Gets the streaming server configuration to use with in-process CRI shims.
func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) *streaming.Config { func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config {
config := &streaming.Config{ config := &streaming.Config{
// Use a relative redirect (no scheme or host).
BaseURL: &url.URL{
Path: "/cri/",
},
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout, StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols, SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols, SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
} }
if !crOptions.RedirectContainerStreaming {
config.Addr = net.JoinHostPort("localhost", "0")
} else {
// Use a relative redirect (no scheme or host).
config.BaseURL = &url.URL{
Path: "/cri/",
}
if kubeDeps.TLSOptions != nil { if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config config.TLSConfig = kubeDeps.TLSOptions.Config
} }
}
return config return config
} }

View File

@ -30,7 +30,6 @@ import (
"sort" "sort"
"strings" "strings"
"sync" "sync"
"time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -41,7 +40,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilvalidation "k8s.io/apimachinery/pkg/util/validation" utilvalidation "k8s.io/apimachinery/pkg/util/validation"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/remotecommand"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/api/v1/resource"
podshelper "k8s.io/kubernetes/pkg/apis/core/pods" podshelper "k8s.io/kubernetes/pkg/apis/core/pods"
@ -1595,71 +1593,8 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
return kl.runner.RunInContainer(container.ID, cmd, 0) return kl.runner.RunInContainer(container.ID, cmd, 0)
} }
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize, timeout)
}
// AttachContainer uses the container runtime to attach the given streams to
// the given container.
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container not found (%q)", containerName)
}
return streamingRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
}
pods, err := kl.containerRuntime.GetPods(false)
if err != nil {
return err
}
// Resolve and type convert back again.
// We need the static pod UID but the kubecontainer API works with types.UID.
podUID = types.UID(kl.podManager.TranslatePodUID(podUID))
pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
if pod.IsEmpty() {
return fmt.Errorf("pod not found (%q)", podFullName)
}
return streamingRuntime.PortForward(&pod, port, stream)
}
// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it. // GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) { func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the exec directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
container, err := kl.findContainer(podFullName, podUID, containerName) container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1667,19 +1602,11 @@ func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName s
if container == nil { if container == nil {
return nil, fmt.Errorf("container not found (%q)", containerName) return nil, fmt.Errorf("container not found (%q)", containerName)
} }
return streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY) return kl.streamingRuntime.GetExec(container.ID, cmd, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, streamOpts.TTY)
default:
return nil, fmt.Errorf("container runtime does not support exec")
}
} }
// GetAttach gets the URL the attach will be served from, or nil if the Kubelet will serve it. // GetAttach gets the URL the attach will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) { func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
container, err := kl.findContainer(podFullName, podUID, containerName) container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1701,19 +1628,11 @@ func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName
} }
tty := containerSpec.TTY tty := containerSpec.TTY
return streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, tty) return kl.streamingRuntime.GetAttach(container.ID, streamOpts.Stdin, streamOpts.Stdout, streamOpts.Stderr, tty)
default:
return nil, fmt.Errorf("container runtime does not support attach")
}
} }
// GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it. // GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) { func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
return nil, nil
case kubecontainer.IndirectStreamingRuntime:
pods, err := kl.containerRuntime.GetPods(false) pods, err := kl.containerRuntime.GetPods(false)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1727,10 +1646,7 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID
return nil, fmt.Errorf("pod not found (%q)", podFullName) return nil, fmt.Errorf("pod not found (%q)", podFullName)
} }
return streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports) return kl.streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports)
default:
return nil, fmt.Errorf("container runtime does not support port-forward")
}
} }
// cleanupOrphanedPodCgroups removes cgroups that should no longer exist. // cleanupOrphanedPodCgroups removes cgroups that should no longer exist.

View File

@ -17,7 +17,6 @@ limitations under the License.
package kubelet package kubelet
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -2095,7 +2094,7 @@ func (f *fakeReadWriteCloser) Close() error {
return nil return nil
} }
func TestExec(t *testing.T) { func TestGetExec(t *testing.T) {
const ( const (
podName = "podFoo" podName = "podFoo"
podNamespace = "nsFoo" podNamespace = "nsFoo"
@ -2106,9 +2105,6 @@ func TestExec(t *testing.T) {
var ( var (
podFullName = kubecontainer.GetPodFullName(podWithUIDNameNs(podUID, podName, podNamespace)) podFullName = kubecontainer.GetPodFullName(podWithUIDNameNs(podUID, podName, podNamespace))
command = []string{"ls"} command = []string{"ls"}
stdin = &bytes.Buffer{}
stdout = &fakeReadWriteCloser{}
stderr = &fakeReadWriteCloser{}
) )
testcases := []struct { testcases := []struct {
@ -2149,41 +2145,10 @@ func TestExec(t *testing.T) {
}}, }},
} }
{ // No streaming case description := "streaming - " + tc.description
description := "no streaming - " + tc.description fakeRuntime := &containertest.FakeStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
assert.Error(t, err, description)
}
{ // Direct streaming case
description := "direct streaming - " + tc.description
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
assert.Equal(t, fakeRuntime.Args.ContainerID.ID, containerID, description+": ID")
assert.Equal(t, fakeRuntime.Args.Cmd, command, description+": Command")
assert.Equal(t, fakeRuntime.Args.Stdin, stdin, description+": Stdin")
assert.Equal(t, fakeRuntime.Args.Stdout, stdout, description+": Stdout")
assert.Equal(t, fakeRuntime.Args.Stderr, stderr, description+": Stderr")
assert.Equal(t, fakeRuntime.Args.TTY, tty, description+": TTY")
}
}
{ // Indirect streaming case
description := "indirect streaming - " + tc.description
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime kubelet.containerRuntime = fakeRuntime
kubelet.streamingRuntime = fakeRuntime
redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{}) redirect, err := kubelet.GetExec(tc.podFullName, podUID, tc.container, command, remotecommand.Options{})
if tc.expectError { if tc.expectError {
@ -2192,23 +2157,16 @@ func TestExec(t *testing.T) {
assert.NoError(t, err, description) assert.NoError(t, err, description)
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect") assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
} }
err = kubelet.ExecInContainer(tc.podFullName, podUID, tc.container, command, stdin, stdout, stderr, tty, nil, 0)
assert.Error(t, err, description)
}
} }
} }
func TestPortForward(t *testing.T) { func TestGetPortForward(t *testing.T) {
const ( const (
podName = "podFoo" podName = "podFoo"
podNamespace = "nsFoo" podNamespace = "nsFoo"
podUID types.UID = "12345678" podUID types.UID = "12345678"
port int32 = 5000 port int32 = 5000
) )
var (
stream = &fakeReadWriteCloser{}
)
testcases := []struct { testcases := []struct {
description string description string
@ -2240,39 +2198,10 @@ func TestPortForward(t *testing.T) {
}}, }},
} }
podFullName := kubecontainer.GetPodFullName(podWithUIDNameNs(podUID, tc.podName, podNamespace)) description := "streaming - " + tc.description
{ // No streaming case fakeRuntime := &containertest.FakeStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
description := "no streaming - " + tc.description
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
assert.Error(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.PortForward(podFullName, podUID, port, stream)
assert.Error(t, err, description)
}
{ // Direct streaming case
description := "direct streaming - " + tc.description
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)
err = kubelet.PortForward(podFullName, podUID, port, stream)
if tc.expectError {
assert.Error(t, err, description)
} else {
assert.NoError(t, err, description)
require.Equal(t, fakeRuntime.Args.Pod.ID, podUID, description+": Pod UID")
require.Equal(t, fakeRuntime.Args.Port, port, description+": Port")
require.Equal(t, fakeRuntime.Args.Stream, stream, description+": stream")
}
}
{ // Indirect streaming case
description := "indirect streaming - " + tc.description
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime kubelet.containerRuntime = fakeRuntime
kubelet.streamingRuntime = fakeRuntime
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{}) redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
if tc.expectError { if tc.expectError {
@ -2281,10 +2210,6 @@ func TestPortForward(t *testing.T) {
assert.NoError(t, err, description) assert.NoError(t, err, description)
assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect") assert.Equal(t, containertest.FakeHost, redirect.Host, description+": redirect")
} }
err = kubelet.PortForward(podFullName, podUID, port, stream)
assert.Error(t, err, description)
}
} }
} }

View File

@ -120,7 +120,7 @@ type kubeGenericRuntimeManager struct {
type KubeGenericRuntime interface { type KubeGenericRuntime interface {
kubecontainer.Runtime kubecontainer.Runtime
kubecontainer.IndirectStreamingRuntime kubecontainer.StreamingRuntime
kubecontainer.ContainerCommandRunner kubecontainer.ContainerCommandRunner
} }

View File

@ -37,7 +37,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/remotecommand:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/proxy:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library", "//vendor/k8s.io/apiserver/pkg/authentication/authenticator:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
@ -45,7 +45,6 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/httplog:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/flushwriter:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/flushwriter:go_default_library",
"//vendor/k8s.io/client-go/tools/remotecommand:go_default_library",
], ],
) )
@ -60,13 +59,14 @@ go_test(
deps = [ deps = [
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/apis/core/install:go_default_library", "//pkg/apis/core/install:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/stats/v1alpha1:go_default_library", "//pkg/kubelet/apis/stats/v1alpha1:go_default_library",
"//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library", "//pkg/kubelet/server/portforward:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library", "//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",

View File

@ -42,14 +42,13 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/proxy"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/httplog" "k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/flushwriter" "k8s.io/apiserver/pkg/util/flushwriter"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/v1/validation" "k8s.io/kubernetes/pkg/apis/core/v1/validation"
@ -78,7 +77,7 @@ type Server struct {
host HostInterface host HostInterface
restfulCont containerInterface restfulCont containerInterface
resourceAnalyzer stats.ResourceAnalyzer resourceAnalyzer stats.ResourceAnalyzer
runtime kubecontainer.Runtime redirectContainerStreaming bool
} }
type TLSOptions struct { type TLSOptions struct {
@ -124,11 +123,11 @@ func ListenAndServeKubeletServer(
tlsOptions *TLSOptions, tlsOptions *TLSOptions,
auth AuthInterface, auth AuthInterface,
enableDebuggingHandlers, enableDebuggingHandlers,
enableContentionProfiling bool, enableContentionProfiling,
runtime kubecontainer.Runtime, redirectContainerStreaming bool,
criHandler http.Handler) { criHandler http.Handler) {
glog.Infof("Starting to listen on %s:%d", address, port) glog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, enableContentionProfiling, runtime, criHandler) handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, enableContentionProfiling, redirectContainerStreaming, criHandler)
s := &http.Server{ s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler, Handler: &handler,
@ -146,9 +145,9 @@ func ListenAndServeKubeletServer(
} }
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) { func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint) {
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port) glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
s := NewServer(host, resourceAnalyzer, nil, false, false, runtime, nil) s := NewServer(host, resourceAnalyzer, nil, false, false, false, nil)
server := &http.Server{ server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
@ -173,12 +172,8 @@ type HostInterface interface {
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
GetRunningPods() ([]*v1.Pod, error) GetRunningPods() ([]*v1.Pod, error)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request) ServeLogs(w http.ResponseWriter, req *http.Request)
PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
StreamingConnectionIdleTimeout() time.Duration
ResyncInterval() time.Duration ResyncInterval() time.Duration
GetHostname() string GetHostname() string
LatestLoopEntryTime() time.Time LatestLoopEntryTime() time.Time
@ -193,15 +188,15 @@ func NewServer(
resourceAnalyzer stats.ResourceAnalyzer, resourceAnalyzer stats.ResourceAnalyzer,
auth AuthInterface, auth AuthInterface,
enableDebuggingHandlers, enableDebuggingHandlers,
enableContentionProfiling bool, enableContentionProfiling,
runtime kubecontainer.Runtime, redirectContainerStreaming bool,
criHandler http.Handler) Server { criHandler http.Handler) Server {
server := Server{ server := Server{
host: host, host: host,
resourceAnalyzer: resourceAnalyzer, resourceAnalyzer: resourceAnalyzer,
auth: auth, auth: auth,
restfulCont: &filteringContainer{Container: restful.NewContainer()}, restfulCont: &filteringContainer{Container: restful.NewContainer()},
runtime: runtime, redirectContainerStreaming: redirectContainerStreaming,
} }
if auth != nil { if auth != nil {
server.InstallAuthFilter() server.InstallAuthFilter()
@ -627,6 +622,15 @@ func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams
} }
} }
type responder struct {
errorMessage string
}
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
glog.Errorf("Error while proxying request: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
// getAttach handles requests to attach to a container. // getAttach handles requests to attach to a container.
func (s *Server) getAttach(request *restful.Request, response *restful.Response) { func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
params := getExecRequestParams(request) params := getExecRequestParams(request)
@ -643,26 +647,18 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
} }
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts) url, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts)
if err != nil { if err != nil {
streaming.WriteError(err, response.ResponseWriter) streaming.WriteError(err, response.ResponseWriter)
return return
} }
if redirect != nil {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) if s.redirectContainerStreaming {
http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
return return
} }
handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{})
remotecommandserver.ServeAttach(response.ResponseWriter, handler.ServeHTTP(response.ResponseWriter, request.Request)
request.Request,
s.host,
podFullName,
params.podUID,
params.containerName,
streamOpts,
s.host.StreamingConnectionIdleTimeout(),
remotecommandconsts.DefaultStreamCreationTimeout,
remotecommandconsts.SupportedStreamingProtocols)
} }
// getExec handles requests to run a command inside a container. // getExec handles requests to run a command inside a container.
@ -681,27 +677,17 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
} }
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
if err != nil { if err != nil {
streaming.WriteError(err, response.ResponseWriter) streaming.WriteError(err, response.ResponseWriter)
return return
} }
if redirect != nil { if s.redirectContainerStreaming {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
return return
} }
handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{})
remotecommandserver.ServeExec(response.ResponseWriter, handler.ServeHTTP(response.ResponseWriter, request.Request)
request.Request,
s.host,
podFullName,
params.podUID,
params.containerName,
params.cmd,
streamOpts,
s.host.StreamingConnectionIdleTimeout(),
remotecommandconsts.DefaultStreamCreationTimeout,
remotecommandconsts.SupportedStreamingProtocols)
} }
// getRun handles requests to run a command inside a container. // getRun handles requests to run a command inside a container.
@ -758,25 +744,17 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
return return
} }
redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions) url, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
if err != nil { if err != nil {
streaming.WriteError(err, response.ResponseWriter) streaming.WriteError(err, response.ResponseWriter)
return return
} }
if redirect != nil { if s.redirectContainerStreaming {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
return return
} }
handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{})
portforward.ServePortForward(response.ResponseWriter, handler.ServeHTTP(response.ResponseWriter, request.Request)
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
params.podUID,
portForwardOptions,
s.host.StreamingConnectionIdleTimeout(),
remotecommandconsts.DefaultStreamCreationTimeout,
portforward.SupportedProtocols)
} }
// ServeHTTP responds to HTTP requests on the Kubelet. // ServeHTTP responds to HTTP requests on the Kubelet.

View File

@ -48,20 +48,23 @@ import (
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
utiltesting "k8s.io/client-go/util/testing" utiltesting "k8s.io/client-go/util/testing"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
// Do some initialization to decode the query parameters correctly. // Do some initialization to decode the query parameters correctly.
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubecontainertesting "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/portforward"
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
const ( const (
testUID = "9b01b80f-8fb4-11e4-95ab-4200af06647" testUID = "9b01b80f-8fb4-11e4-95ab-4200af06647"
testContainerID = "container789"
testPodSandboxID = "pod0987"
) )
type fakeKubelet struct { type fakeKubelet struct {
@ -73,16 +76,16 @@ type fakeKubelet struct {
runningPodsFunc func() ([]*v1.Pod, error) runningPodsFunc func() ([]*v1.Pod, error)
logFunc func(w http.ResponseWriter, req *http.Request) logFunc func(w http.ResponseWriter, req *http.Request)
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
execFunc func(pod string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error getExecCheck func(string, types.UID, string, []string, remotecommandserver.Options)
attachFunc func(pod string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error getAttachCheck func(string, types.UID, string, remotecommandserver.Options)
portForwardFunc func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error getPortForwardCheck func(string, string, types.UID, portforward.V4Options)
containerLogsFunc func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error containerLogsFunc func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
streamingConnectionIdleTimeoutFunc func() time.Duration
hostnameFunc func() string hostnameFunc func() string
resyncInterval time.Duration resyncInterval time.Duration
loopEntryTime time.Time loopEntryTime time.Time
plegHealth bool plegHealth bool
redirectURL *url.URL streamingRuntime streaming.Server
} }
func (fk *fakeKubelet) ResyncInterval() time.Duration { func (fk *fakeKubelet) ResyncInterval() time.Duration {
@ -137,32 +140,109 @@ func (fk *fakeKubelet) RunInContainer(podFullName string, uid types.UID, contain
return fk.runFunc(podFullName, uid, containerName, cmd) return fk.runFunc(podFullName, uid, containerName, cmd)
} }
func (fk *fakeKubelet) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { type fakeRuntime struct {
return fk.execFunc(name, uid, container, cmd, in, out, err, tty) execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
portForwardFunc func(string, int32, io.ReadWriteCloser) error
} }
func (fk *fakeKubelet) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { func (f *fakeRuntime) Exec(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
return fk.attachFunc(name, uid, container, in, out, err, tty) return f.execFunc(containerID, cmd, stdin, stdout, stderr, tty, resize)
} }
func (fk *fakeKubelet) PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { func (f *fakeRuntime) Attach(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
return fk.portForwardFunc(name, uid, port, stream) return f.attachFunc(containerID, stdin, stdout, stderr, tty, resize)
}
func (f *fakeRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
return f.portForwardFunc(podSandboxID, port, stream)
}
type testStreamingServer struct {
streaming.Server
fakeRuntime *fakeRuntime
testHTTPServer *httptest.Server
}
func newTestStreamingServer(streamIdleTimeout time.Duration) (s *testStreamingServer, err error) {
s = &testStreamingServer{}
s.testHTTPServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.ServeHTTP(w, r)
}))
defer func() {
if err != nil {
s.testHTTPServer.Close()
}
}()
testURL, err := url.Parse(s.testHTTPServer.URL)
if err != nil {
return nil, err
}
s.fakeRuntime = &fakeRuntime{}
config := streaming.DefaultConfig
config.BaseURL = testURL
if streamIdleTimeout != 0 {
config.StreamIdleTimeout = streamIdleTimeout
}
s.Server, err = streaming.NewServer(config, s.fakeRuntime)
if err != nil {
return nil, err
}
return s, nil
} }
func (fk *fakeKubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) { func (fk *fakeKubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
return fk.redirectURL, nil if fk.getExecCheck != nil {
fk.getExecCheck(podFullName, podUID, containerName, cmd, streamOpts)
}
// Always use testContainerID
resp, err := fk.streamingRuntime.GetExec(&runtimeapi.ExecRequest{
ContainerId: testContainerID,
Cmd: cmd,
Tty: streamOpts.TTY,
Stdin: streamOpts.Stdin,
Stdout: streamOpts.Stdout,
Stderr: streamOpts.Stderr,
})
if err != nil {
return nil, err
}
return url.Parse(resp.GetUrl())
} }
func (fk *fakeKubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) { func (fk *fakeKubelet) GetAttach(podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) {
return fk.redirectURL, nil if fk.getAttachCheck != nil {
fk.getAttachCheck(podFullName, podUID, containerName, streamOpts)
}
// Always use testContainerID
resp, err := fk.streamingRuntime.GetAttach(&runtimeapi.AttachRequest{
ContainerId: testContainerID,
Tty: streamOpts.TTY,
Stdin: streamOpts.Stdin,
Stdout: streamOpts.Stdout,
Stderr: streamOpts.Stderr,
})
if err != nil {
return nil, err
}
return url.Parse(resp.GetUrl())
} }
func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) { func (fk *fakeKubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
return fk.redirectURL, nil if fk.getPortForwardCheck != nil {
} fk.getPortForwardCheck(podName, podNamespace, podUID, portForwardOpts)
}
func (fk *fakeKubelet) StreamingConnectionIdleTimeout() time.Duration { // Always use testPodSandboxID
return fk.streamingConnectionIdleTimeoutFunc() resp, err := fk.streamingRuntime.GetPortForward(&runtimeapi.PortForwardRequest{
PodSandboxId: testPodSandboxID,
Port: portForwardOpts.Ports,
})
if err != nil {
return nil, err
}
return url.Parse(resp.GetUrl())
} }
// Unused functions // Unused functions
@ -203,14 +283,16 @@ type serverTestFramework struct {
fakeKubelet *fakeKubelet fakeKubelet *fakeKubelet
fakeAuth *fakeAuth fakeAuth *fakeAuth
testHTTPServer *httptest.Server testHTTPServer *httptest.Server
fakeRuntime *fakeRuntime
testStreamingHTTPServer *httptest.Server
criHandler *utiltesting.FakeHandler criHandler *utiltesting.FakeHandler
} }
func newServerTest() *serverTestFramework { func newServerTest() *serverTestFramework {
return newServerTestWithDebug(true) return newServerTestWithDebug(true, false, nil)
} }
func newServerTestWithDebug(enableDebugging bool) *serverTestFramework { func newServerTestWithDebug(enableDebugging, redirectContainerStreaming bool, streamingServer streaming.Server) *serverTestFramework {
fw := &serverTestFramework{} fw := &serverTestFramework{}
fw.fakeKubelet = &fakeKubelet{ fw.fakeKubelet = &fakeKubelet{
hostnameFunc: func() string { hostnameFunc: func() string {
@ -226,6 +308,7 @@ func newServerTestWithDebug(enableDebugging bool) *serverTestFramework {
}, true }, true
}, },
plegHealth: true, plegHealth: true,
streamingRuntime: streamingServer,
} }
fw.fakeAuth = &fakeAuth{ fw.fakeAuth = &fakeAuth{
authenticateFunc: func(req *http.Request) (user.Info, bool, error) { authenticateFunc: func(req *http.Request) (user.Info, bool, error) {
@ -247,7 +330,7 @@ func newServerTestWithDebug(enableDebugging bool) *serverTestFramework {
fw.fakeAuth, fw.fakeAuth,
enableDebugging, enableDebugging,
false, false,
&kubecontainertesting.Mock{}, redirectContainerStreaming,
fw.criHandler) fw.criHandler)
fw.serverUnderTest = &server fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
@ -1070,13 +1153,12 @@ func TestContainerLogsWithFollow(t *testing.T) {
} }
func TestServeExecInContainerIdleTimeout(t *testing.T) { func TestServeExecInContainerIdleTimeout(t *testing.T) {
fw := newServerTest() ss, err := newTestStreamingServer(100 * time.Millisecond)
require.NoError(t, err)
defer ss.testHTTPServer.Close()
fw := newServerTestWithDebug(true, false, ss)
defer fw.testHTTPServer.Close() defer fw.testHTTPServer.Close()
fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
return 100 * time.Millisecond
}
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
expectedContainerName := "baz" expectedContainerName := "baz"
@ -1108,38 +1190,35 @@ func TestServeExecInContainerIdleTimeout(t *testing.T) {
} }
func testExecAttach(t *testing.T, verb string) { func testExecAttach(t *testing.T, verb string) {
tests := []struct { tests := map[string]struct {
stdin bool stdin bool
stdout bool stdout bool
stderr bool stderr bool
tty bool tty bool
responseStatusCode int responseStatusCode int
uid bool uid bool
responseLocation string redirect bool
}{ }{
{responseStatusCode: http.StatusBadRequest}, "no input or output": {responseStatusCode: http.StatusBadRequest},
{stdin: true, responseStatusCode: http.StatusSwitchingProtocols}, "stdin": {stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdout: true, responseStatusCode: http.StatusSwitchingProtocols}, "stdout": {stdout: true, responseStatusCode: http.StatusSwitchingProtocols},
{stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, "stderr": {stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, "stdout and stderr": {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols}, "stdout stderr and tty": {stdout: true, stderr: true, tty: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols}, "stdin stdout and stderr": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
{stdout: true, responseStatusCode: http.StatusFound, responseLocation: "http://localhost:12345/" + verb}, "stdin stdout stderr with uid": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols, uid: true},
"stdout with redirect": {stdout: true, responseStatusCode: http.StatusFound, redirect: true},
} }
for i, test := range tests { for desc, test := range tests {
fw := newServerTest() test := test
defer fw.testHTTPServer.Close() t.Run(desc, func(t *testing.T) {
ss, err := newTestStreamingServer(0)
fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
return 0
}
if test.responseLocation != "" {
var err error
fw.fakeKubelet.redirectURL, err = url.Parse(test.responseLocation)
require.NoError(t, err) require.NoError(t, err)
} defer ss.testHTTPServer.Close()
fw := newServerTestWithDebug(true, test.redirect, ss)
defer fw.testHTTPServer.Close()
fmt.Println(desc)
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
@ -1155,81 +1234,67 @@ func testExecAttach(t *testing.T, verb string) {
execInvoked := false execInvoked := false
attachInvoked := false attachInvoked := false
testStreamFunc := func(podFullName string, uid types.UID, containerName string, cmd []string, in io.Reader, out, stderr io.WriteCloser, tty bool, done chan struct{}) error { checkStream := func(podFullName string, uid types.UID, containerName string, streamOpts remotecommandserver.Options) {
defer close(done) assert.Equal(t, expectedPodName, podFullName, "podFullName")
if test.uid {
assert.Equal(t, testUID, string(uid), "uid")
}
assert.Equal(t, expectedContainerName, containerName, "containerName")
assert.Equal(t, test.stdin, streamOpts.Stdin, "stdin")
assert.Equal(t, test.stdout, streamOpts.Stdout, "stdout")
assert.Equal(t, test.tty, streamOpts.TTY, "tty")
assert.Equal(t, !test.tty && test.stderr, streamOpts.Stderr, "stderr")
}
if podFullName != expectedPodName { fw.fakeKubelet.getExecCheck = func(podFullName string, uid types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) {
t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName) execInvoked = true
assert.Equal(t, expectedCommand, strings.Join(cmd, " "), "cmd")
checkStream(podFullName, uid, containerName, streamOpts)
} }
if test.uid && string(uid) != testUID {
t.Fatalf("%d: uid: expected %v, got %v", i, testUID, uid) fw.fakeKubelet.getAttachCheck = func(podFullName string, uid types.UID, containerName string, streamOpts remotecommandserver.Options) {
} attachInvoked = true
if containerName != expectedContainerName { checkStream(podFullName, uid, containerName, streamOpts)
t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName)
} }
testStream := func(containerID string, in io.Reader, out, stderr io.WriteCloser, tty bool, done chan struct{}) error {
close(done)
assert.Equal(t, testContainerID, containerID, "containerID")
assert.Equal(t, test.tty, tty, "tty")
require.Equal(t, test.stdin, in != nil, "in")
require.Equal(t, test.stdout, out != nil, "out")
require.Equal(t, !test.tty && test.stderr, stderr != nil, "err")
if test.stdin { if test.stdin {
if in == nil {
t.Fatalf("%d: stdin: expected non-nil", i)
}
b := make([]byte, 10) b := make([]byte, 10)
n, err := in.Read(b) n, err := in.Read(b)
if err != nil { assert.NoError(t, err, "reading from stdin")
t.Fatalf("%d: error reading from stdin: %v", i, err) assert.Equal(t, expectedStdin, string(b[0:n]), "content from stdin")
}
if e, a := expectedStdin, string(b[0:n]); e != a {
t.Fatalf("%d: stdin: expected to read %v, got %v", i, e, a)
}
} else if in != nil {
t.Fatalf("%d: stdin: expected nil: %#v", i, in)
} }
if test.stdout { if test.stdout {
if out == nil {
t.Fatalf("%d: stdout: expected non-nil", i)
}
_, err := out.Write([]byte(expectedStdout)) _, err := out.Write([]byte(expectedStdout))
if err != nil { assert.NoError(t, err, "writing to stdout")
t.Fatalf("%d:, error writing to stdout: %v", i, err)
}
out.Close() out.Close()
<-clientStdoutReadDone <-clientStdoutReadDone
} else if out != nil {
t.Fatalf("%d: stdout: expected nil: %#v", i, out)
} }
if tty { if !test.tty && test.stderr {
if stderr != nil {
t.Fatalf("%d: tty set but received non-nil stderr: %v", i, stderr)
}
} else if test.stderr {
if stderr == nil {
t.Fatalf("%d: stderr: expected non-nil", i)
}
_, err := stderr.Write([]byte(expectedStderr)) _, err := stderr.Write([]byte(expectedStderr))
if err != nil { assert.NoError(t, err, "writing to stderr")
t.Fatalf("%d:, error writing to stderr: %v", i, err)
}
stderr.Close() stderr.Close()
<-clientStderrReadDone <-clientStderrReadDone
} else if stderr != nil {
t.Fatalf("%d: stderr: expected nil: %#v", i, stderr)
} }
return nil return nil
} }
fw.fakeKubelet.execFunc = func(podFullName string, uid types.UID, containerName string, cmd []string, in io.Reader, out, stderr io.WriteCloser, tty bool) error { ss.fakeRuntime.execFunc = func(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
execInvoked = true assert.Equal(t, expectedCommand, strings.Join(cmd, " "), "cmd")
if strings.Join(cmd, " ") != expectedCommand { return testStream(containerID, stdin, stdout, stderr, tty, done)
t.Fatalf("%d: cmd: expected: %s, got %v", i, expectedCommand, cmd)
}
return testStreamFunc(podFullName, uid, containerName, cmd, in, out, stderr, tty, done)
} }
fw.fakeKubelet.attachFunc = func(podFullName string, uid types.UID, containerName string, in io.Reader, out, stderr io.WriteCloser, tty bool) error { ss.fakeRuntime.attachFunc = func(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
attachInvoked = true return testStream(containerID, stdin, stdout, stderr, tty, done)
return testStreamFunc(podFullName, uid, containerName, nil, in, out, stderr, tty, done)
} }
var url string var url string
@ -1256,12 +1321,10 @@ func testExecAttach(t *testing.T, verb string) {
var ( var (
resp *http.Response resp *http.Response
err error
upgradeRoundTripper httpstream.UpgradeRoundTripper upgradeRoundTripper httpstream.UpgradeRoundTripper
c *http.Client c *http.Client
) )
if test.redirect {
if test.responseStatusCode != http.StatusSwitchingProtocols {
c = &http.Client{} c = &http.Client{}
// Don't follow redirects, since we want to inspect the redirect response. // Don't follow redirects, since we want to inspect the redirect response.
c.CheckRedirect = func(*http.Request, []*http.Request) error { c.CheckRedirect = func(*http.Request, []*http.Request) error {
@ -1273,115 +1336,75 @@ func testExecAttach(t *testing.T, verb string) {
} }
resp, err = c.Post(url, "", nil) resp, err = c.Post(url, "", nil)
if err != nil { require.NoError(t, err, "POSTing")
t.Fatalf("%d: Got error POSTing: %v", i, err)
}
defer resp.Body.Close() defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body) _, err = ioutil.ReadAll(resp.Body)
if err != nil { assert.NoError(t, err, "reading response body")
t.Errorf("%d: Error reading response body: %v", i, err)
}
if e, a := test.responseStatusCode, resp.StatusCode; e != a {
t.Fatalf("%d: response status: expected %v, got %v", i, e, a)
}
if e, a := test.responseLocation, resp.Header.Get("Location"); e != a {
t.Errorf("%d: response location: expected %v, got %v", i, e, a)
}
require.Equal(t, test.responseStatusCode, resp.StatusCode, "response status")
if test.responseStatusCode != http.StatusSwitchingProtocols { if test.responseStatusCode != http.StatusSwitchingProtocols {
continue return
} }
conn, err := upgradeRoundTripper.NewConnection(resp) conn, err := upgradeRoundTripper.NewConnection(resp)
if err != nil { require.NoError(t, err, "creating streaming connection")
t.Fatalf("Unexpected error creating streaming connection: %s", err)
}
if conn == nil {
t.Fatalf("%d: unexpected nil conn", i)
}
defer conn.Close() defer conn.Close()
h := http.Header{} h := http.Header{}
h.Set(api.StreamType, api.StreamTypeError) h.Set(api.StreamType, api.StreamTypeError)
if _, err := conn.CreateStream(h); err != nil { _, err = conn.CreateStream(h)
t.Fatalf("%d: error creating error stream: %v", i, err) require.NoError(t, err, "creating error stream")
}
if test.stdin { if test.stdin {
h.Set(api.StreamType, api.StreamTypeStdin) h.Set(api.StreamType, api.StreamTypeStdin)
stream, err := conn.CreateStream(h) stream, err := conn.CreateStream(h)
if err != nil { require.NoError(t, err, "creating stdin stream")
t.Fatalf("%d: error creating stdin stream: %v", i, err)
}
_, err = stream.Write([]byte(expectedStdin)) _, err = stream.Write([]byte(expectedStdin))
if err != nil { require.NoError(t, err, "writing to stdin stream")
t.Fatalf("%d: error writing to stdin stream: %v", i, err)
}
} }
var stdoutStream httpstream.Stream var stdoutStream httpstream.Stream
if test.stdout { if test.stdout {
h.Set(api.StreamType, api.StreamTypeStdout) h.Set(api.StreamType, api.StreamTypeStdout)
stdoutStream, err = conn.CreateStream(h) stdoutStream, err = conn.CreateStream(h)
if err != nil { require.NoError(t, err, "creating stdout stream")
t.Fatalf("%d: error creating stdout stream: %v", i, err)
}
} }
var stderrStream httpstream.Stream var stderrStream httpstream.Stream
if test.stderr && !test.tty { if test.stderr && !test.tty {
h.Set(api.StreamType, api.StreamTypeStderr) h.Set(api.StreamType, api.StreamTypeStderr)
stderrStream, err = conn.CreateStream(h) stderrStream, err = conn.CreateStream(h)
if err != nil { require.NoError(t, err, "creating stderr stream")
t.Fatalf("%d: error creating stderr stream: %v", i, err)
}
} }
if test.stdout { if test.stdout {
output := make([]byte, 10) output := make([]byte, 10)
n, err := stdoutStream.Read(output) n, err := stdoutStream.Read(output)
close(clientStdoutReadDone) close(clientStdoutReadDone)
if err != nil { assert.NoError(t, err, "reading from stdout stream")
t.Fatalf("%d: error reading from stdout stream: %v", i, err) assert.Equal(t, expectedStdout, string(output[0:n]), "stdout")
}
if e, a := expectedStdout, string(output[0:n]); e != a {
t.Fatalf("%d: stdout: expected '%v', got '%v'", i, e, a)
}
} }
if test.stderr && !test.tty { if test.stderr && !test.tty {
output := make([]byte, 10) output := make([]byte, 10)
n, err := stderrStream.Read(output) n, err := stderrStream.Read(output)
close(clientStderrReadDone) close(clientStderrReadDone)
if err != nil { assert.NoError(t, err, "reading from stderr stream")
t.Fatalf("%d: error reading from stderr stream: %v", i, err) assert.Equal(t, expectedStderr, string(output[0:n]), "stderr")
}
if e, a := expectedStderr, string(output[0:n]); e != a {
t.Fatalf("%d: stderr: expected '%v', got '%v'", i, e, a)
}
} }
// wait for the server to finish before checking if the attach/exec funcs were invoked // wait for the server to finish before checking if the attach/exec funcs were invoked
<-done <-done
if verb == "exec" { if verb == "exec" {
if !execInvoked { assert.True(t, execInvoked, "exec should be invoked")
t.Errorf("%d: exec was not invoked", i) assert.False(t, attachInvoked, "attach should not be invoked")
}
if attachInvoked {
t.Errorf("%d: attach should not have been invoked", i)
}
} else { } else {
if !attachInvoked { assert.True(t, attachInvoked, "attach should be invoked")
t.Errorf("%d: attach was not invoked", i) assert.False(t, execInvoked, "exec should not be invoked")
}
if execInvoked {
t.Errorf("%d: exec should not have been invoked", i)
}
} }
})
} }
} }
@ -1394,13 +1417,12 @@ func TestServeAttachContainer(t *testing.T) {
} }
func TestServePortForwardIdleTimeout(t *testing.T) { func TestServePortForwardIdleTimeout(t *testing.T) {
fw := newServerTest() ss, err := newTestStreamingServer(100 * time.Millisecond)
require.NoError(t, err)
defer ss.testHTTPServer.Close()
fw := newServerTestWithDebug(true, false, ss)
defer fw.testHTTPServer.Close() defer fw.testHTTPServer.Close()
fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
return 100 * time.Millisecond
}
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
@ -1428,82 +1450,67 @@ func TestServePortForwardIdleTimeout(t *testing.T) {
} }
func TestServePortForward(t *testing.T) { func TestServePortForward(t *testing.T) {
tests := []struct { tests := map[string]struct {
port string port string
uid bool uid bool
clientData string clientData string
containerData string containerData string
redirect bool
shouldError bool shouldError bool
responseLocation string
}{ }{
{port: "", shouldError: true}, "no port": {port: "", shouldError: true},
{port: "abc", shouldError: true}, "none number port": {port: "abc", shouldError: true},
{port: "-1", shouldError: true}, "negative port": {port: "-1", shouldError: true},
{port: "65536", shouldError: true}, "too large port": {port: "65536", shouldError: true},
{port: "0", shouldError: true}, "0 port": {port: "0", shouldError: true},
{port: "1", shouldError: false}, "min port": {port: "1", shouldError: false},
{port: "8000", shouldError: false}, "normal port": {port: "8000", shouldError: false},
{port: "8000", clientData: "client data", containerData: "container data", shouldError: false}, "normal port with data forward": {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
{port: "65535", shouldError: false}, "max port": {port: "65535", shouldError: false},
{port: "65535", uid: true, shouldError: false}, "normal port with uid": {port: "8000", uid: true, shouldError: false},
{port: "65535", responseLocation: "http://localhost:12345/portforward", shouldError: false}, "normal port with redirect": {port: "8000", redirect: true, shouldError: false},
} }
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
for i, test := range tests { for desc, test := range tests {
fw := newServerTest() test := test
defer fw.testHTTPServer.Close() t.Run(desc, func(t *testing.T) {
ss, err := newTestStreamingServer(0)
fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
return 0
}
if test.responseLocation != "" {
var err error
fw.fakeKubelet.redirectURL, err = url.Parse(test.responseLocation)
require.NoError(t, err) require.NoError(t, err)
} defer ss.testHTTPServer.Close()
fw := newServerTestWithDebug(true, test.redirect, ss)
defer fw.testHTTPServer.Close()
portForwardFuncDone := make(chan struct{}) portForwardFuncDone := make(chan struct{})
fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { fw.fakeKubelet.getPortForwardCheck = func(name, namespace string, uid types.UID, opts portforward.V4Options) {
assert.Equal(t, podName, name, "pod name")
assert.Equal(t, podNamespace, namespace, "pod namespace")
if test.uid {
assert.Equal(t, testUID, string(uid), "uid")
}
}
ss.fakeRuntime.portForwardFunc = func(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
defer close(portForwardFuncDone) defer close(portForwardFuncDone)
assert.Equal(t, testPodSandboxID, podSandboxID, "pod sandbox id")
if e, a := expectedPodName, name; e != a { // The port should be valid if it reaches here.
t.Fatalf("%d: pod name: expected '%v', got '%v'", i, e, a) testPort, err := strconv.ParseInt(test.port, 10, 32)
} require.NoError(t, err, "parse port")
assert.Equal(t, int32(testPort), port, "port")
if e, a := testUID, uid; test.uid && e != string(a) {
t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a)
}
p, err := strconv.ParseInt(test.port, 10, 32)
if err != nil {
t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err)
}
if e, a := int32(p), port; e != a {
t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a)
}
if test.clientData != "" { if test.clientData != "" {
fromClient := make([]byte, 32) fromClient := make([]byte, 32)
n, err := stream.Read(fromClient) n, err := stream.Read(fromClient)
if err != nil { assert.NoError(t, err, "reading client data")
t.Fatalf("%d: error reading client data: %v", i, err) assert.Equal(t, test.clientData, string(fromClient[0:n]), "client data")
}
if e, a := test.clientData, string(fromClient[0:n]); e != a {
t.Fatalf("%d: client data: expected to receive '%v', got '%v'", i, e, a)
}
} }
if test.containerData != "" { if test.containerData != "" {
_, err := stream.Write([]byte(test.containerData)) _, err := stream.Write([]byte(test.containerData))
if err != nil { assert.NoError(t, err, "writing container data")
t.Fatalf("%d: error writing container data: %v", i, err)
}
} }
return nil return nil
@ -1521,7 +1528,7 @@ func TestServePortForward(t *testing.T) {
c *http.Client c *http.Client
) )
if len(test.responseLocation) > 0 { if test.redirect {
c = &http.Client{} c = &http.Client{}
// Don't follow redirects, since we want to inspect the redirect response. // Don't follow redirects, since we want to inspect the redirect response.
c.CheckRedirect = func(*http.Request, []*http.Request) error { c.CheckRedirect = func(*http.Request, []*http.Request) error {
@ -1533,69 +1540,49 @@ func TestServePortForward(t *testing.T) {
} }
resp, err := c.Post(url, "", nil) resp, err := c.Post(url, "", nil)
if err != nil { require.NoError(t, err, "POSTing")
t.Fatalf("%d: Got error POSTing: %v", i, err)
}
defer resp.Body.Close() defer resp.Body.Close()
if test.responseLocation != "" { if test.redirect {
assert.Equal(t, http.StatusFound, resp.StatusCode, "%d: status code", i) assert.Equal(t, http.StatusFound, resp.StatusCode, "status code")
assert.Equal(t, test.responseLocation, resp.Header.Get("Location"), "%d: location", i) return
continue
} else { } else {
assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "%d: status code", i) assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
} }
conn, err := upgradeRoundTripper.NewConnection(resp) conn, err := upgradeRoundTripper.NewConnection(resp)
if err != nil { require.NoError(t, err, "creating streaming connection")
t.Fatalf("Unexpected error creating streaming connection: %s", err)
}
if conn == nil {
t.Fatalf("%d: Unexpected nil connection", i)
}
defer conn.Close() defer conn.Close()
headers := http.Header{} headers := http.Header{}
headers.Set("streamType", "error") headers.Set("streamType", "error")
headers.Set("port", test.port) headers.Set("port", test.port)
errorStream, err := conn.CreateStream(headers) _, err = conn.CreateStream(headers)
_ = errorStream assert.Equal(t, test.shouldError, err != nil, "expect error")
haveErr := err != nil
if e, a := test.shouldError, haveErr; e != a {
t.Fatalf("%d: create stream: expected err=%t, got %t: %v", i, e, a, err)
}
if test.shouldError { if test.shouldError {
continue return
} }
headers.Set("streamType", "data") headers.Set("streamType", "data")
headers.Set("port", test.port) headers.Set("port", test.port)
dataStream, err := conn.CreateStream(headers) dataStream, err := conn.CreateStream(headers)
haveErr = err != nil require.NoError(t, err, "create stream")
if e, a := test.shouldError, haveErr; e != a {
t.Fatalf("%d: create stream: expected err=%t, got %t: %v", i, e, a, err)
}
if test.clientData != "" { if test.clientData != "" {
_, err := dataStream.Write([]byte(test.clientData)) _, err := dataStream.Write([]byte(test.clientData))
if err != nil { assert.NoError(t, err, "writing client data")
t.Fatalf("%d: unexpected error writing client data: %v", i, err)
}
} }
if test.containerData != "" { if test.containerData != "" {
fromContainer := make([]byte, 32) fromContainer := make([]byte, 32)
n, err := dataStream.Read(fromContainer) n, err := dataStream.Read(fromContainer)
if err != nil { assert.NoError(t, err, "reading container data")
t.Fatalf("%d: unexpected error reading container data: %v", i, err) assert.Equal(t, test.containerData, string(fromContainer[0:n]), "container data")
}
if e, a := test.containerData, string(fromContainer[0:n]); e != a {
t.Fatalf("%d: expected to receive '%v' from container, got '%v'", i, e, a)
}
} }
<-portForwardFuncDone <-portForwardFuncDone
})
} }
} }
@ -1616,7 +1603,7 @@ func TestCRIHandler(t *testing.T) {
} }
func TestDebuggingDisabledHandlers(t *testing.T) { func TestDebuggingDisabledHandlers(t *testing.T) {
fw := newServerTestWithDebug(false) fw := newServerTestWithDebug(false, false, nil)
defer fw.testHTTPServer.Close() defer fw.testHTTPServer.Close()
paths := []string{ paths := []string{

View File

@ -23,11 +23,13 @@ import (
"strconv" "strconv"
"sync" "sync"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/websocket" "golang.org/x/net/websocket"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
) )
const ( const (
@ -36,75 +38,65 @@ const (
) )
func TestServeWSPortForward(t *testing.T) { func TestServeWSPortForward(t *testing.T) {
tests := []struct { tests := map[string]struct {
port string port string
uid bool uid bool
clientData string clientData string
containerData string containerData string
shouldError bool shouldError bool
}{ }{
{port: "", shouldError: true}, "no port": {port: "", shouldError: true},
{port: "abc", shouldError: true}, "none number port": {port: "abc", shouldError: true},
{port: "-1", shouldError: true}, "negative port": {port: "-1", shouldError: true},
{port: "65536", shouldError: true}, "too large port": {port: "65536", shouldError: true},
{port: "0", shouldError: true}, "0 port": {port: "0", shouldError: true},
{port: "1", shouldError: false}, "min port": {port: "1", shouldError: false},
{port: "8000", shouldError: false}, "normal port": {port: "8000", shouldError: false},
{port: "8000", clientData: "client data", containerData: "container data", shouldError: false}, "normal port with data forward": {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
{port: "65535", shouldError: false}, "max port": {port: "65535", shouldError: false},
{port: "65535", uid: true, shouldError: false}, "normal port with uid": {port: "8000", uid: true, shouldError: false},
} }
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
for i, test := range tests { for desc, test := range tests {
fw := newServerTest() test := test
t.Run(desc, func(t *testing.T) {
ss, err := newTestStreamingServer(0)
require.NoError(t, err)
defer ss.testHTTPServer.Close()
fw := newServerTestWithDebug(true, false, ss)
defer fw.testHTTPServer.Close() defer fw.testHTTPServer.Close()
fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
return 0
}
portForwardFuncDone := make(chan struct{}) portForwardFuncDone := make(chan struct{})
fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { fw.fakeKubelet.getPortForwardCheck = func(name, namespace string, uid types.UID, opts portforward.V4Options) {
assert.Equal(t, podName, name, "pod name")
assert.Equal(t, podNamespace, namespace, "pod namespace")
if test.uid {
assert.Equal(t, testUID, string(uid), "uid")
}
}
ss.fakeRuntime.portForwardFunc = func(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
defer close(portForwardFuncDone) defer close(portForwardFuncDone)
assert.Equal(t, testPodSandboxID, podSandboxID, "pod sandbox id")
if e, a := expectedPodName, name; e != a { // The port should be valid if it reaches here.
t.Fatalf("%d: pod name: expected '%v', got '%v'", i, e, a) testPort, err := strconv.ParseInt(test.port, 10, 32)
} require.NoError(t, err, "parse port")
assert.Equal(t, int32(testPort), port, "port")
if e, a := expectedUid, uid; test.uid && e != string(a) {
t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a)
}
p, err := strconv.ParseInt(test.port, 10, 32)
if err != nil {
t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err)
}
if e, a := int32(p), port; e != a {
t.Fatalf("%d: port: expected '%v', got '%v'", i, e, a)
}
if test.clientData != "" { if test.clientData != "" {
fromClient := make([]byte, 32) fromClient := make([]byte, 32)
n, err := stream.Read(fromClient) n, err := stream.Read(fromClient)
if err != nil { assert.NoError(t, err, "reading client data")
t.Fatalf("%d: error reading client data: %v", i, err) assert.Equal(t, test.clientData, string(fromClient[0:n]), "client data")
}
if e, a := test.clientData, string(fromClient[0:n]); e != a {
t.Fatalf("%d: client data: expected to receive '%v', got '%v'", i, e, a)
}
} }
if test.containerData != "" { if test.containerData != "" {
_, err := stream.Write([]byte(test.containerData)) _, err := stream.Write([]byte(test.containerData))
if err != nil { assert.NoError(t, err, "writing container data")
t.Fatalf("%d: error writing container data: %v", i, err)
}
} }
return nil return nil
@ -112,76 +104,48 @@ func TestServeWSPortForward(t *testing.T) {
var url string var url string
if test.uid { if test.uid {
url = fmt.Sprintf("ws://%s/portForward/%s/%s/%s?port=%s", fw.testHTTPServer.Listener.Addr().String(), podNamespace, podName, expectedUid, test.port) url = fmt.Sprintf("ws://%s/portForward/%s/%s/%s?port=%s", fw.testHTTPServer.Listener.Addr().String(), podNamespace, podName, testUID, test.port)
} else { } else {
url = fmt.Sprintf("ws://%s/portForward/%s/%s?port=%s", fw.testHTTPServer.Listener.Addr().String(), podNamespace, podName, test.port) url = fmt.Sprintf("ws://%s/portForward/%s/%s?port=%s", fw.testHTTPServer.Listener.Addr().String(), podNamespace, podName, test.port)
} }
ws, err := websocket.Dial(url, "", "http://127.0.0.1/") ws, err := websocket.Dial(url, "", "http://127.0.0.1/")
assert.Equal(t, test.shouldError, err != nil, "websocket dial")
if test.shouldError { if test.shouldError {
if err == nil { return
t.Fatalf("%d: websocket dial expected err", i)
} }
continue
} else if err != nil {
t.Fatalf("%d: websocket dial unexpected err: %v", i, err)
}
defer ws.Close() defer ws.Close()
p, err := strconv.ParseUint(test.port, 10, 16) p, err := strconv.ParseUint(test.port, 10, 16)
if err != nil { require.NoError(t, err, "parse port")
t.Fatalf("%d: error parsing port string '%s': %v", i, test.port, err)
}
p16 := uint16(p) p16 := uint16(p)
channel, data, err := wsRead(ws) channel, data, err := wsRead(ws)
if err != nil { require.NoError(t, err, "read")
t.Fatalf("%d: read failed: expected no error: got %v", i, err) assert.Equal(t, dataChannel, int(channel), "channel")
} assert.Len(t, data, binary.Size(p16), "data size")
if channel != dataChannel { assert.Equal(t, p16, binary.LittleEndian.Uint16(data), "data")
t.Fatalf("%d: wrong channel: got %q: expected %q", i, channel, dataChannel)
}
if len(data) != binary.Size(p16) {
t.Fatalf("%d: wrong data size: got %q: expected %d", i, data, binary.Size(p16))
}
if e, a := p16, binary.LittleEndian.Uint16(data); e != a {
t.Fatalf("%d: wrong data: got %q: expected %s", i, data, test.port)
}
channel, data, err = wsRead(ws) channel, data, err = wsRead(ws)
if err != nil { assert.NoError(t, err, "read")
t.Fatalf("%d: read succeeded: expected no error: got %v", i, err) assert.Equal(t, errorChannel, int(channel), "channel")
} assert.Len(t, data, binary.Size(p16), "data size")
if channel != errorChannel { assert.Equal(t, p16, binary.LittleEndian.Uint16(data), "data")
t.Fatalf("%d: wrong channel: got %q: expected %q", i, channel, errorChannel)
}
if len(data) != binary.Size(p16) {
t.Fatalf("%d: wrong data size: got %q: expected %d", i, data, binary.Size(p16))
}
if e, a := p16, binary.LittleEndian.Uint16(data); e != a {
t.Fatalf("%d: wrong data: got %q: expected %s", i, data, test.port)
}
if test.clientData != "" { if test.clientData != "" {
println("writing the client data") println("writing the client data")
err := wsWrite(ws, dataChannel, []byte(test.clientData)) err := wsWrite(ws, dataChannel, []byte(test.clientData))
if err != nil { assert.NoError(t, err, "writing client data")
t.Fatalf("%d: unexpected error writing client data: %v", i, err)
}
} }
if test.containerData != "" { if test.containerData != "" {
_, data, err = wsRead(ws) _, data, err = wsRead(ws)
if err != nil { assert.NoError(t, err, "reading container data")
t.Fatalf("%d: unexpected error reading container data: %v", i, err) assert.Equal(t, test.containerData, string(data), "container data")
}
if e, a := test.containerData, string(data); e != a {
t.Fatalf("%d: expected to receive '%v' from container, got '%v'", i, e, a)
}
} }
<-portForwardFuncDone <-portForwardFuncDone
})
} }
} }
@ -190,45 +154,39 @@ func TestServeWSMultiplePortForward(t *testing.T) {
ports := []uint16{7000, 8000, 9000} ports := []uint16{7000, 8000, 9000}
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
fw := newServerTest() ss, err := newTestStreamingServer(0)
require.NoError(t, err)
defer ss.testHTTPServer.Close()
fw := newServerTestWithDebug(true, false, ss)
defer fw.testHTTPServer.Close() defer fw.testHTTPServer.Close()
fw.fakeKubelet.streamingConnectionIdleTimeoutFunc = func() time.Duration {
return 0
}
portForwardWG := sync.WaitGroup{} portForwardWG := sync.WaitGroup{}
portForwardWG.Add(len(ports)) portForwardWG.Add(len(ports))
portsMutex := sync.Mutex{} portsMutex := sync.Mutex{}
portsForwarded := map[int32]struct{}{} portsForwarded := map[int32]struct{}{}
fw.fakeKubelet.portForwardFunc = func(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error { fw.fakeKubelet.getPortForwardCheck = func(name, namespace string, uid types.UID, opts portforward.V4Options) {
defer portForwardWG.Done() assert.Equal(t, podName, name, "pod name")
assert.Equal(t, podNamespace, namespace, "pod namespace")
if e, a := expectedPodName, name; e != a {
t.Fatalf("%d: pod name: expected '%v', got '%v'", port, e, a)
} }
ss.fakeRuntime.portForwardFunc = func(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
defer portForwardWG.Done()
assert.Equal(t, testPodSandboxID, podSandboxID, "pod sandbox id")
portsMutex.Lock() portsMutex.Lock()
portsForwarded[port] = struct{}{} portsForwarded[port] = struct{}{}
portsMutex.Unlock() portsMutex.Unlock()
fromClient := make([]byte, 32) fromClient := make([]byte, 32)
n, err := stream.Read(fromClient) n, err := stream.Read(fromClient)
if err != nil { assert.NoError(t, err, "reading client data")
t.Fatalf("%d: error reading client data: %v", port, err) assert.Equal(t, fmt.Sprintf("client data on port %d", port), string(fromClient[0:n]), "client data")
}
if e, a := fmt.Sprintf("client data on port %d", port), string(fromClient[0:n]); e != a {
t.Fatalf("%d: client data: expected to receive '%v', got '%v'", port, e, a)
}
_, err = stream.Write([]byte(fmt.Sprintf("container data on port %d", port))) _, err = stream.Write([]byte(fmt.Sprintf("container data on port %d", port)))
if err != nil { assert.NoError(t, err, "writing container data")
t.Fatalf("%d: error writing container data: %v", port, err)
}
return nil return nil
} }
@ -239,70 +197,42 @@ func TestServeWSMultiplePortForward(t *testing.T) {
} }
ws, err := websocket.Dial(url, "", "http://127.0.0.1/") ws, err := websocket.Dial(url, "", "http://127.0.0.1/")
if err != nil { require.NoError(t, err, "websocket dial")
t.Fatalf("websocket dial unexpected err: %v", err)
}
defer ws.Close() defer ws.Close()
for i, port := range ports { for i, port := range ports {
channel, data, err := wsRead(ws) channel, data, err := wsRead(ws)
if err != nil { assert.NoError(t, err, "port %d read", port)
t.Fatalf("%d: read failed: expected no error: got %v", i, err) assert.Equal(t, i*2+dataChannel, int(channel), "port %d channel", port)
} assert.Len(t, data, binary.Size(port), "port %d data size", port)
if int(channel) != i*2+dataChannel { assert.Equal(t, binary.LittleEndian.Uint16(data), port, "port %d data", port)
t.Fatalf("%d: wrong channel: got %q: expected %q", i, channel, i*2+dataChannel)
}
if len(data) != binary.Size(port) {
t.Fatalf("%d: wrong data size: got %q: expected %d", i, data, binary.Size(port))
}
if e, a := port, binary.LittleEndian.Uint16(data); e != a {
t.Fatalf("%d: wrong data: got %q: expected %d", i, data, port)
}
channel, data, err = wsRead(ws) channel, data, err = wsRead(ws)
if err != nil { assert.NoError(t, err, "port %d read", port)
t.Fatalf("%d: read succeeded: expected no error: got %v", i, err) assert.Equal(t, i*2+errorChannel, int(channel), "port %d channel", port)
} assert.Len(t, data, binary.Size(port), "port %d data size", port)
if int(channel) != i*2+errorChannel { assert.Equal(t, binary.LittleEndian.Uint16(data), port, "port %d data", port)
t.Fatalf("%d: wrong channel: got %q: expected %q", i, channel, i*2+errorChannel)
}
if len(data) != binary.Size(port) {
t.Fatalf("%d: wrong data size: got %q: expected %d", i, data, binary.Size(port))
}
if e, a := port, binary.LittleEndian.Uint16(data); e != a {
t.Fatalf("%d: wrong data: got %q: expected %d", i, data, port)
}
} }
for i, port := range ports { for i, port := range ports {
println("writing the client data", port) t.Logf("port %d writing the client data", port)
err := wsWrite(ws, byte(i*2+dataChannel), []byte(fmt.Sprintf("client data on port %d", port))) err := wsWrite(ws, byte(i*2+dataChannel), []byte(fmt.Sprintf("client data on port %d", port)))
if err != nil { assert.NoError(t, err, "port %d write client data", port)
t.Fatalf("%d: unexpected error writing client data: %v", i, err)
}
channel, data, err := wsRead(ws) channel, data, err := wsRead(ws)
if err != nil { assert.NoError(t, err, "port %d read container data", port)
t.Fatalf("%d: unexpected error reading container data: %v", i, err) assert.Equal(t, i*2+dataChannel, int(channel), "port %d channel", port)
} assert.Equal(t, fmt.Sprintf("container data on port %d", port), string(data), "port %d container data", port)
if int(channel) != i*2+dataChannel {
t.Fatalf("%d: wrong channel: got %q: expected %q", port, channel, i*2+dataChannel)
}
if e, a := fmt.Sprintf("container data on port %d", port), string(data); e != a {
t.Fatalf("%d: expected to receive '%v' from container, got '%v'", i, e, a)
}
} }
portForwardWG.Wait() portForwardWG.Wait()
portsMutex.Lock() portsMutex.Lock()
defer portsMutex.Unlock() defer portsMutex.Unlock()
if len(ports) != len(portsForwarded) { assert.Len(t, portsForwarded, len(ports), "all ports forwarded")
t.Fatalf("expected to forward %d ports; got %v", len(ports), portsForwarded)
}
} }
func wsWrite(conn *websocket.Conn, channel byte, data []byte) error { func wsWrite(conn *websocket.Conn, channel byte, data []byte) error {
frame := make([]byte, len(data)+1) frame := make([]byte, len(data)+1)
frame[0] = channel frame[0] = channel

View File

@ -20,6 +20,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"io" "io"
"net"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
@ -71,6 +72,7 @@ type Config struct {
Addr string Addr string
// The optional base URL for constructing streaming URLs. If empty, the baseURL will be // The optional base URL for constructing streaming URLs. If empty, the baseURL will be
// constructed from the serve address. // constructed from the serve address.
// Note that for port "0", the URL port will be set to actual port in use.
BaseURL *url.URL BaseURL *url.URL
// How long to leave idle connections open for. // How long to leave idle connections open for.
@ -233,10 +235,16 @@ func (s *server) Start(stayUp bool) error {
return errors.New("stayUp=false is not yet implemented") return errors.New("stayUp=false is not yet implemented")
} }
listener, err := net.Listen("tcp", s.config.Addr)
if err != nil {
return err
}
// Use the actual address as baseURL host. This handles the "0" port case.
s.config.BaseURL.Host = listener.Addr().String()
if s.config.TLSConfig != nil { if s.config.TLSConfig != nil {
return s.server.ListenAndServeTLS("", "") // Use certs from TLSConfig. return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
} else { } else {
return s.server.ListenAndServe() return s.server.Serve(listener)
} }
} }