From 174b6d0e2fc99d9964a7d5a7484aa0b7d50b4be1 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Thu, 17 May 2018 18:10:12 -0700 Subject: [PATCH] Proxy container streaming in kubelet. --- cmd/kubelet/app/options/container_runtime.go | 15 +-- cmd/kubelet/app/server.go | 23 +--- pkg/kubelet/config/flags.go | 10 ++ pkg/kubelet/dockershim/docker_service.go | 33 ++++-- .../dockershim/remote/docker_server.go | 2 +- pkg/kubelet/kubelet.go | 71 ++++++------ pkg/kubelet/kubelet_pods.go | 23 ---- pkg/kubelet/server/server.go | 108 +++++++----------- pkg/kubelet/server/server_test.go | 24 +--- pkg/kubelet/server/streaming/server.go | 12 +- 10 files changed, 138 insertions(+), 183 deletions(-) diff --git a/cmd/kubelet/app/options/container_runtime.go b/cmd/kubelet/app/options/container_runtime.go index 2f04036bffb..d07cfb699f4 100644 --- a/cmd/kubelet/app/options/container_runtime.go +++ b/cmd/kubelet/app/options/container_runtime.go @@ -45,12 +45,13 @@ func NewContainerRuntimeOptions() *config.ContainerRuntimeOptions { } return &config.ContainerRuntimeOptions{ - ContainerRuntime: kubetypes.DockerContainerRuntime, - DockerEndpoint: dockerEndpoint, - DockershimRootDirectory: "/var/lib/dockershim", - DockerDisableSharedPID: true, - PodSandboxImage: defaultPodSandboxImage, - ImagePullProgressDeadline: metav1.Duration{Duration: 1 * time.Minute}, - ExperimentalDockershim: false, + ContainerRuntime: kubetypes.DockerContainerRuntime, + RedirectContainerStreaming: false, + DockerEndpoint: dockerEndpoint, + DockershimRootDirectory: "/var/lib/dockershim", + DockerDisableSharedPID: true, + PodSandboxImage: defaultPodSandboxImage, + ImagePullProgressDeadline: metav1.Duration{Duration: 1 * time.Minute}, + ExperimentalDockershim: false, } } diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index ca13dc51d30..2d354375ba2 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -1170,30 +1170,13 @@ func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConf SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols, } + // Standalone dockershim will always start the local streaming server. ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings, - f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, r.DockerDisableSharedPID) + f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, r.DockerDisableSharedPID, true /*startLocalStreamingServer*/) if err != nil { return err } glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds) - if err := server.Start(stopCh); err != nil { - return err - } - - streamingServer := &http.Server{ - Addr: net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))), - Handler: ds, - } - - go func() { - <-stopCh - streamingServer.Shutdown(context.Background()) - }() - - // Start the streaming server - if err := streamingServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - return err - } - return nil + return server.Start(stopCh) } diff --git a/pkg/kubelet/config/flags.go b/pkg/kubelet/config/flags.go index c51b4de8444..377d64e4574 100644 --- a/pkg/kubelet/config/flags.go +++ b/pkg/kubelet/config/flags.go @@ -31,6 +31,15 @@ type ContainerRuntimeOptions struct { ContainerRuntime string // RuntimeCgroups that container runtime is expected to be isolated in. RuntimeCgroups string + // RedirectContainerStreaming enables container streaming redirect. + // When RedirectContainerStreaming is false, kubelet will proxy container streaming data + // between apiserver and container runtime. This approach is more secure, but the proxy + // introduces some overhead. + // When RedirectContainerStreaming is true, kubelet will return an http redirect to apiserver, + // and apiserver will access container runtime directly. This approach is more performant, + // but less secure because the connection between apiserver and container runtime is not + // authenticated. + RedirectContainerStreaming bool // Docker-specific options. @@ -77,6 +86,7 @@ func (s *ContainerRuntimeOptions) AddFlags(fs *pflag.FlagSet) { // General settings. fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'remote', 'rkt (deprecated)'.") fs.StringVar(&s.RuntimeCgroups, "runtime-cgroups", s.RuntimeCgroups, "Optional absolute name of cgroups to create and run the runtime in.") + fs.BoolVar(&s.RedirectContainerStreaming, "redirect-container-streaming", s.RedirectContainerStreaming, "Enables container streaming redirect. If false, kubelet will proxy container streaming data between apiserver and container runtime; if true, kubelet will return an http redirect to apiserver, and apiserver will access container runtime directly. The proxy approach is more secure, but introduces some overhead. The redirect approach is more performant, but less secure because the connection between apiserver and container runtime is not authenticated.") // Docker-specific settings. fs.BoolVar(&s.ExperimentalDockershim, "experimental-dockershim", s.ExperimentalDockershim, "Enable dockershim only mode. In this mode, kubelet will only start dockershim without any other functionalities. This flag only serves test purpose, please do not use it unless you are conscious of what you are doing. [default=false]") diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index 5399094cc3c..2a7cce57f0d 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -85,7 +85,7 @@ const ( type CRIService interface { runtimeapi.RuntimeServiceServer runtimeapi.ImageServiceServer - Start() error + Start(<-chan struct{}) error } // DockerService is an interface that embeds the new RuntimeService and @@ -188,7 +188,8 @@ func NewDockerClientFromConfig(config *ClientConfig) libdocker.Interface { // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config, - pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, disableSharedPID bool) (DockerService, error) { + pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, + disableSharedPID, startLocalStreamingServer bool) (DockerService, error) { client := NewDockerClientFromConfig(config) @@ -207,10 +208,11 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon client: client, execHandler: &NativeExecHandler{}, }, - containerManager: cm.NewContainerManager(cgroupsName, client), - checkpointManager: checkpointManager, - disableSharedPID: disableSharedPID, - networkReady: make(map[string]bool), + containerManager: cm.NewContainerManager(cgroupsName, client), + checkpointManager: checkpointManager, + disableSharedPID: disableSharedPID, + startLocalStreamingServer: startLocalStreamingServer, + networkReady: make(map[string]bool), } // check docker version compatibility. @@ -307,6 +309,9 @@ type dockerService struct { // See proposals/pod-pid-namespace.md for details. // TODO: Remove once the escape hatch is no longer used (https://issues.k8s.io/41938) disableSharedPID bool + // startLocalStreamingServer indicates whether dockershim should start a + // streaming server on localhost. + startLocalStreamingServer bool } // TODO: handle context. @@ -395,13 +400,25 @@ func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.Po } // Start initializes and starts components in dockerService. -func (ds *dockerService) Start() error { +func (ds *dockerService) Start(stopCh <-chan struct{}) error { // Initialize the legacy cleanup flag. + if ds.startLocalStreamingServer { + go func() { + <-stopCh + if err := ds.streamingServer.Stop(); err != nil { + glog.Errorf("Failed to stop streaming server: %v", err) + } + }() + go func() { + if err := ds.streamingServer.Start(true); err != nil && err != http.ErrServerClosed { + glog.Fatalf("Failed to start streaming server: %v", err) + } + }() + } return ds.containerManager.Start() } // Status returns the status of the runtime. -// TODO(random-liu): Set network condition accordingly here. func (ds *dockerService) Status(_ context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) { runtimeReady := &runtimeapi.RuntimeCondition{ Type: runtimeapi.RuntimeReady, diff --git a/pkg/kubelet/dockershim/remote/docker_server.go b/pkg/kubelet/dockershim/remote/docker_server.go index 1ac7560d41b..5e8967a8d5f 100644 --- a/pkg/kubelet/dockershim/remote/docker_server.go +++ b/pkg/kubelet/dockershim/remote/docker_server.go @@ -51,7 +51,7 @@ func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer { // Start starts the dockershim grpc server. func (s *DockerServer) Start(stopCh <-chan struct{}) error { // Start the internal service. - if err := s.service.Start(); err != nil { + if err := s.service.Start(stopCh); err != nil { glog.Errorf("Unable to start docker service") return err } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7b31fdeffbb..2d84aac0ae3 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -512,21 +512,22 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, nodeRef: nodeRef, nodeLabels: nodeLabels, nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration, - os: kubeDeps.OSInterface, - oomWatcher: oomWatcher, - cgroupsPerQOS: kubeCfg.CgroupsPerQOS, - cgroupRoot: kubeCfg.CgroupRoot, - mounter: kubeDeps.Mounter, - writer: kubeDeps.Writer, - maxPods: int(kubeCfg.MaxPods), - podsPerCore: int(kubeCfg.PodsPerCore), - syncLoopMonitor: atomic.Value{}, - daemonEndpoints: daemonEndpoints, - containerManager: kubeDeps.ContainerManager, - containerRuntimeName: containerRuntime, - nodeIP: parsedNodeIP, - nodeIPValidator: validateNodeIP, - clock: clock.RealClock{}, + os: kubeDeps.OSInterface, + oomWatcher: oomWatcher, + cgroupsPerQOS: kubeCfg.CgroupsPerQOS, + cgroupRoot: kubeCfg.CgroupRoot, + mounter: kubeDeps.Mounter, + writer: kubeDeps.Writer, + maxPods: int(kubeCfg.MaxPods), + podsPerCore: int(kubeCfg.PodsPerCore), + syncLoopMonitor: atomic.Value{}, + daemonEndpoints: daemonEndpoints, + containerManager: kubeDeps.ContainerManager, + containerRuntimeName: containerRuntime, + redirectContainerStreaming: crOptions.RedirectContainerStreaming, + nodeIP: parsedNodeIP, + nodeIPValidator: validateNodeIP, + clock: clock.RealClock{}, enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach, iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4), makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains, @@ -605,16 +606,16 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, switch containerRuntime { case kubetypes.DockerContainerRuntime: // Create and start the CRI shim running as a grpc server. - streamingConfig := getStreamingConfig(kubeCfg, kubeDeps) + streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions) ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig, &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, - crOptions.DockerDisableSharedPID) + crOptions.DockerDisableSharedPID, !crOptions.RedirectContainerStreaming) if err != nil { return nil, err } - // For now, the CRI shim redirects the streaming requests to the - // kubelet, which handles the requests using DockerService.. - klet.criHandler = ds + if crOptions.RedirectContainerStreaming { + klet.criHandler = ds + } // The unix socket for kubelet <-> dockershim communication. glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q", @@ -1000,6 +1001,9 @@ type Kubelet struct { // The name of the container runtime containerRuntimeName string + // redirectContainerStreaming enables container streaming redirect. + redirectContainerStreaming bool + // Container runtime. containerRuntime kubecontainer.Runtime @@ -2097,11 +2101,6 @@ func (kl *Kubelet) BirthCry() { kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.") } -// StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server. -func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration { - return kl.streamingConnectionIdleTimeout -} - // ResyncInterval returns the interval used for periodic syncs. func (kl *Kubelet) ResyncInterval() time.Duration { return kl.resyncInterval @@ -2109,12 +2108,12 @@ func (kl *Kubelet) ResyncInterval() time.Duration { // ListenAndServe runs the kubelet HTTP server. func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) { - server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.containerRuntime, kl.criHandler) + server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.redirectContainerStreaming, kl.criHandler) } // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { - server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime) + server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port) } // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. @@ -2138,19 +2137,23 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { } // Gets the streaming server configuration to use with in-process CRI shims. -func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) *streaming.Config { +func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config { config := &streaming.Config{ - // Use a relative redirect (no scheme or host). - BaseURL: &url.URL{ - Path: "/cri/", - }, StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout, SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols, SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols, } - if kubeDeps.TLSOptions != nil { - config.TLSConfig = kubeDeps.TLSOptions.Config + if !crOptions.RedirectContainerStreaming { + config.Addr = net.JoinHostPort("localhost", "0") + } else { + // Use a relative redirect (no scheme or host). + config.BaseURL = &url.URL{ + Path: "/cri/", + } + if kubeDeps.TLSOptions != nil { + config.TLSConfig = kubeDeps.TLSOptions.Config + } } return config } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 99694d0b5fc..38cc6d5559e 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -30,7 +30,6 @@ import ( "sort" "strings" "sync" - "time" "github.com/golang/glog" "k8s.io/api/core/v1" @@ -41,7 +40,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" utilvalidation "k8s.io/apimachinery/pkg/util/validation" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/tools/remotecommand" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/api/v1/resource" podshelper "k8s.io/kubernetes/pkg/apis/core/pods" @@ -1589,27 +1587,6 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe return kl.runner.RunInContainer(container.ID, cmd, 0) } -// ExecInContainer executes a command in a container, connecting the supplied -// stdin/stdout/stderr to the command's IO streams. -func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { - // TODO(random-liu): Remove this. - return fmt.Errorf("unimplemented") -} - -// AttachContainer uses the container runtime to attach the given streams to -// the given container. -func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { - // TODO(random-liu): Remove this. - return fmt.Errorf("unimplemented") -} - -// PortForward connects to the pod's port and copies data between the port -// and the stream. -func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error { - // TODO(random-liu): Remove this. - return fmt.Errorf("unimplemented") -} - // GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it. func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) { container, err := kl.findContainer(podFullName, podUID, containerName) diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index fdde1fee4e8..2e5bbde211f 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -42,14 +42,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" + "k8s.io/apimachinery/pkg/util/proxy" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/httplog" "k8s.io/apiserver/pkg/util/flushwriter" - "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/v1/validation" @@ -74,11 +73,11 @@ const ( // Server is a http.Handler which exposes kubelet functionality over HTTP. type Server struct { - auth AuthInterface - host HostInterface - restfulCont containerInterface - resourceAnalyzer stats.ResourceAnalyzer - runtime kubecontainer.Runtime + auth AuthInterface + host HostInterface + restfulCont containerInterface + resourceAnalyzer stats.ResourceAnalyzer + redirectContainerStreaming bool } type TLSOptions struct { @@ -124,11 +123,11 @@ func ListenAndServeKubeletServer( tlsOptions *TLSOptions, auth AuthInterface, enableDebuggingHandlers, - enableContentionProfiling bool, - runtime kubecontainer.Runtime, + enableContentionProfiling, + redirectContainerStreaming bool, criHandler http.Handler) { glog.Infof("Starting to listen on %s:%d", address, port) - handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, enableContentionProfiling, runtime, criHandler) + handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, enableContentionProfiling, redirectContainerStreaming, criHandler) s := &http.Server{ Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Handler: &handler, @@ -146,9 +145,9 @@ func ListenAndServeKubeletServer( } // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. -func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) { +func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint) { glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port) - s := NewServer(host, resourceAnalyzer, nil, false, false, runtime, nil) + s := NewServer(host, resourceAnalyzer, nil, false, false, false, nil) server := &http.Server{ Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), @@ -173,12 +172,8 @@ type HostInterface interface { GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) GetRunningPods() ([]*v1.Pod, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) - ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error - AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) - PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error - StreamingConnectionIdleTimeout() time.Duration ResyncInterval() time.Duration GetHostname() string LatestLoopEntryTime() time.Time @@ -193,15 +188,15 @@ func NewServer( resourceAnalyzer stats.ResourceAnalyzer, auth AuthInterface, enableDebuggingHandlers, - enableContentionProfiling bool, - runtime kubecontainer.Runtime, + enableContentionProfiling, + redirectContainerStreaming bool, criHandler http.Handler) Server { server := Server{ - host: host, - resourceAnalyzer: resourceAnalyzer, - auth: auth, - restfulCont: &filteringContainer{Container: restful.NewContainer()}, - runtime: runtime, + host: host, + resourceAnalyzer: resourceAnalyzer, + auth: auth, + restfulCont: &filteringContainer{Container: restful.NewContainer()}, + redirectContainerStreaming: redirectContainerStreaming, } if auth != nil { server.InstallAuthFilter() @@ -627,6 +622,15 @@ func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams } } +type responder struct { + errorMessage string +} + +func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { + glog.Errorf("Error while proxying request: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) +} + // getAttach handles requests to attach to a container. func (s *Server) getAttach(request *restful.Request, response *restful.Response) { params := getExecRequestParams(request) @@ -643,26 +647,18 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response) } podFullName := kubecontainer.GetPodFullName(pod) - redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts) + url, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts) if err != nil { streaming.WriteError(err, response.ResponseWriter) return } - if redirect != nil { - http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) + + if s.redirectContainerStreaming { + http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound) return } - - remotecommandserver.ServeAttach(response.ResponseWriter, - request.Request, - s.host, - podFullName, - params.podUID, - params.containerName, - streamOpts, - s.host.StreamingConnectionIdleTimeout(), - remotecommandconsts.DefaultStreamCreationTimeout, - remotecommandconsts.SupportedStreamingProtocols) + handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{}) + handler.ServeHTTP(response.ResponseWriter, request.Request) } // getExec handles requests to run a command inside a container. @@ -681,27 +677,17 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) { } podFullName := kubecontainer.GetPodFullName(pod) - redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) + url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) if err != nil { streaming.WriteError(err, response.ResponseWriter) return } - if redirect != nil { - http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) + if s.redirectContainerStreaming { + http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound) return } - - remotecommandserver.ServeExec(response.ResponseWriter, - request.Request, - s.host, - podFullName, - params.podUID, - params.containerName, - params.cmd, - streamOpts, - s.host.StreamingConnectionIdleTimeout(), - remotecommandconsts.DefaultStreamCreationTimeout, - remotecommandconsts.SupportedStreamingProtocols) + handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{}) + handler.ServeHTTP(response.ResponseWriter, request.Request) } // getRun handles requests to run a command inside a container. @@ -758,25 +744,17 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp return } - redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions) + url, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions) if err != nil { streaming.WriteError(err, response.ResponseWriter) return } - if redirect != nil { - http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) + if s.redirectContainerStreaming { + http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound) return } - - portforward.ServePortForward(response.ResponseWriter, - request.Request, - s.host, - kubecontainer.GetPodFullName(pod), - params.podUID, - portForwardOptions, - s.host.StreamingConnectionIdleTimeout(), - remotecommandconsts.DefaultStreamCreationTimeout, - portforward.SupportedProtocols) + handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{}) + handler.ServeHTTP(response.ResponseWriter, request.Request) } // ServeHTTP responds to HTTP requests on the Kubelet. diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index cdb978078cf..25776273a1b 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -46,7 +46,6 @@ import ( "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/client-go/tools/remotecommand" - utiltesting "k8s.io/client-go/util/testing" api "k8s.io/kubernetes/pkg/apis/core" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" // Do some initialization to decode the query parameters correctly. @@ -203,7 +202,6 @@ type serverTestFramework struct { fakeKubelet *fakeKubelet fakeAuth *fakeAuth testHTTPServer *httptest.Server - criHandler *utiltesting.FakeHandler } func newServerTest() *serverTestFramework { @@ -238,17 +236,13 @@ func newServerTestWithDebug(enableDebugging bool) *serverTestFramework { return authorizer.DecisionAllow, "", nil }, } - fw.criHandler = &utiltesting.FakeHandler{ - StatusCode: http.StatusOK, - } server := NewServer( fw.fakeKubelet, stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute), fw.fakeAuth, enableDebugging, false, - &kubecontainertesting.Mock{}, - fw.criHandler) + &kubecontainertesting.Mock{}) fw.serverUnderTest = &server fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) return fw @@ -1599,22 +1593,6 @@ func TestServePortForward(t *testing.T) { } } -func TestCRIHandler(t *testing.T) { - fw := newServerTest() - defer fw.testHTTPServer.Close() - - const ( - path = "/cri/exec/123456abcdef" - query = "cmd=echo+foo" - ) - resp, err := http.Get(fw.testHTTPServer.URL + path + "?" + query) - require.NoError(t, err) - assert.Equal(t, http.StatusOK, resp.StatusCode) - assert.Equal(t, "GET", fw.criHandler.RequestReceived.Method) - assert.Equal(t, path, fw.criHandler.RequestReceived.URL.Path) - assert.Equal(t, query, fw.criHandler.RequestReceived.URL.RawQuery) -} - func TestDebuggingDisabledHandlers(t *testing.T) { fw := newServerTestWithDebug(false) defer fw.testHTTPServer.Close() diff --git a/pkg/kubelet/server/streaming/server.go b/pkg/kubelet/server/streaming/server.go index ae1c046b025..7cbc424c41e 100644 --- a/pkg/kubelet/server/streaming/server.go +++ b/pkg/kubelet/server/streaming/server.go @@ -20,6 +20,7 @@ import ( "crypto/tls" "errors" "io" + "net" "net/http" "net/url" "path" @@ -71,6 +72,7 @@ type Config struct { Addr string // The optional base URL for constructing streaming URLs. If empty, the baseURL will be // constructed from the serve address. + // Note that for port "0", the URL port will be set to actual port in use. BaseURL *url.URL // How long to leave idle connections open for. @@ -233,10 +235,16 @@ func (s *server) Start(stayUp bool) error { return errors.New("stayUp=false is not yet implemented") } + listener, err := net.Listen("tcp", s.config.Addr) + if err != nil { + return err + } + // Use the actual address as baseURL host. This handles the "0" port case. + s.config.BaseURL.Host = listener.Addr().String() if s.config.TLSConfig != nil { - return s.server.ListenAndServeTLS("", "") // Use certs from TLSConfig. + return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig. } else { - return s.server.ListenAndServe() + return s.server.Serve(listener) } }