Proxy container streaming in kubelet.

This commit is contained in:
Lantao Liu 2018-05-17 18:10:12 -07:00
parent aeb6cacf01
commit 174b6d0e2f
10 changed files with 138 additions and 183 deletions

View File

@ -45,12 +45,13 @@ func NewContainerRuntimeOptions() *config.ContainerRuntimeOptions {
} }
return &config.ContainerRuntimeOptions{ return &config.ContainerRuntimeOptions{
ContainerRuntime: kubetypes.DockerContainerRuntime, ContainerRuntime: kubetypes.DockerContainerRuntime,
DockerEndpoint: dockerEndpoint, RedirectContainerStreaming: false,
DockershimRootDirectory: "/var/lib/dockershim", DockerEndpoint: dockerEndpoint,
DockerDisableSharedPID: true, DockershimRootDirectory: "/var/lib/dockershim",
PodSandboxImage: defaultPodSandboxImage, DockerDisableSharedPID: true,
ImagePullProgressDeadline: metav1.Duration{Duration: 1 * time.Minute}, PodSandboxImage: defaultPodSandboxImage,
ExperimentalDockershim: false, ImagePullProgressDeadline: metav1.Duration{Duration: 1 * time.Minute},
ExperimentalDockershim: false,
} }
} }

View File

@ -1170,30 +1170,13 @@ func RunDockershim(f *options.KubeletFlags, c *kubeletconfiginternal.KubeletConf
SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols, SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
} }
// Standalone dockershim will always start the local streaming server.
ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings, ds, err := dockershim.NewDockerService(dockerClientConfig, r.PodSandboxImage, streamingConfig, &pluginSettings,
f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, r.DockerDisableSharedPID) f.RuntimeCgroups, c.CgroupDriver, r.DockershimRootDirectory, r.DockerDisableSharedPID, true /*startLocalStreamingServer*/)
if err != nil { if err != nil {
return err return err
} }
glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") glog.V(2).Infof("Starting the GRPC server for the docker CRI shim.")
server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds) server := dockerremote.NewDockerServer(f.RemoteRuntimeEndpoint, ds)
if err := server.Start(stopCh); err != nil { return server.Start(stopCh)
return err
}
streamingServer := &http.Server{
Addr: net.JoinHostPort(c.Address, strconv.Itoa(int(c.Port))),
Handler: ds,
}
go func() {
<-stopCh
streamingServer.Shutdown(context.Background())
}()
// Start the streaming server
if err := streamingServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
return err
}
return nil
} }

View File

@ -31,6 +31,15 @@ type ContainerRuntimeOptions struct {
ContainerRuntime string ContainerRuntime string
// RuntimeCgroups that container runtime is expected to be isolated in. // RuntimeCgroups that container runtime is expected to be isolated in.
RuntimeCgroups string RuntimeCgroups string
// RedirectContainerStreaming enables container streaming redirect.
// When RedirectContainerStreaming is false, kubelet will proxy container streaming data
// between apiserver and container runtime. This approach is more secure, but the proxy
// introduces some overhead.
// When RedirectContainerStreaming is true, kubelet will return an http redirect to apiserver,
// and apiserver will access container runtime directly. This approach is more performant,
// but less secure because the connection between apiserver and container runtime is not
// authenticated.
RedirectContainerStreaming bool
// Docker-specific options. // Docker-specific options.
@ -77,6 +86,7 @@ func (s *ContainerRuntimeOptions) AddFlags(fs *pflag.FlagSet) {
// General settings. // General settings.
fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'remote', 'rkt (deprecated)'.") fs.StringVar(&s.ContainerRuntime, "container-runtime", s.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'remote', 'rkt (deprecated)'.")
fs.StringVar(&s.RuntimeCgroups, "runtime-cgroups", s.RuntimeCgroups, "Optional absolute name of cgroups to create and run the runtime in.") fs.StringVar(&s.RuntimeCgroups, "runtime-cgroups", s.RuntimeCgroups, "Optional absolute name of cgroups to create and run the runtime in.")
fs.BoolVar(&s.RedirectContainerStreaming, "redirect-container-streaming", s.RedirectContainerStreaming, "Enables container streaming redirect. If false, kubelet will proxy container streaming data between apiserver and container runtime; if true, kubelet will return an http redirect to apiserver, and apiserver will access container runtime directly. The proxy approach is more secure, but introduces some overhead. The redirect approach is more performant, but less secure because the connection between apiserver and container runtime is not authenticated.")
// Docker-specific settings. // Docker-specific settings.
fs.BoolVar(&s.ExperimentalDockershim, "experimental-dockershim", s.ExperimentalDockershim, "Enable dockershim only mode. In this mode, kubelet will only start dockershim without any other functionalities. This flag only serves test purpose, please do not use it unless you are conscious of what you are doing. [default=false]") fs.BoolVar(&s.ExperimentalDockershim, "experimental-dockershim", s.ExperimentalDockershim, "Enable dockershim only mode. In this mode, kubelet will only start dockershim without any other functionalities. This flag only serves test purpose, please do not use it unless you are conscious of what you are doing. [default=false]")

View File

@ -85,7 +85,7 @@ const (
type CRIService interface { type CRIService interface {
runtimeapi.RuntimeServiceServer runtimeapi.RuntimeServiceServer
runtimeapi.ImageServiceServer runtimeapi.ImageServiceServer
Start() error Start(<-chan struct{}) error
} }
// DockerService is an interface that embeds the new RuntimeService and // DockerService is an interface that embeds the new RuntimeService and
@ -188,7 +188,8 @@ func NewDockerClientFromConfig(config *ClientConfig) libdocker.Interface {
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process. // NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config, func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config,
pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, disableSharedPID bool) (DockerService, error) { pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string,
disableSharedPID, startLocalStreamingServer bool) (DockerService, error) {
client := NewDockerClientFromConfig(config) client := NewDockerClientFromConfig(config)
@ -207,10 +208,11 @@ func NewDockerService(config *ClientConfig, podSandboxImage string, streamingCon
client: client, client: client,
execHandler: &NativeExecHandler{}, execHandler: &NativeExecHandler{},
}, },
containerManager: cm.NewContainerManager(cgroupsName, client), containerManager: cm.NewContainerManager(cgroupsName, client),
checkpointManager: checkpointManager, checkpointManager: checkpointManager,
disableSharedPID: disableSharedPID, disableSharedPID: disableSharedPID,
networkReady: make(map[string]bool), startLocalStreamingServer: startLocalStreamingServer,
networkReady: make(map[string]bool),
} }
// check docker version compatibility. // check docker version compatibility.
@ -307,6 +309,9 @@ type dockerService struct {
// See proposals/pod-pid-namespace.md for details. // See proposals/pod-pid-namespace.md for details.
// TODO: Remove once the escape hatch is no longer used (https://issues.k8s.io/41938) // TODO: Remove once the escape hatch is no longer used (https://issues.k8s.io/41938)
disableSharedPID bool disableSharedPID bool
// startLocalStreamingServer indicates whether dockershim should start a
// streaming server on localhost.
startLocalStreamingServer bool
} }
// TODO: handle context. // TODO: handle context.
@ -395,13 +400,25 @@ func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.Po
} }
// Start initializes and starts components in dockerService. // Start initializes and starts components in dockerService.
func (ds *dockerService) Start() error { func (ds *dockerService) Start(stopCh <-chan struct{}) error {
// Initialize the legacy cleanup flag. // Initialize the legacy cleanup flag.
if ds.startLocalStreamingServer {
go func() {
<-stopCh
if err := ds.streamingServer.Stop(); err != nil {
glog.Errorf("Failed to stop streaming server: %v", err)
}
}()
go func() {
if err := ds.streamingServer.Start(true); err != nil && err != http.ErrServerClosed {
glog.Fatalf("Failed to start streaming server: %v", err)
}
}()
}
return ds.containerManager.Start() return ds.containerManager.Start()
} }
// Status returns the status of the runtime. // Status returns the status of the runtime.
// TODO(random-liu): Set network condition accordingly here.
func (ds *dockerService) Status(_ context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) { func (ds *dockerService) Status(_ context.Context, r *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) {
runtimeReady := &runtimeapi.RuntimeCondition{ runtimeReady := &runtimeapi.RuntimeCondition{
Type: runtimeapi.RuntimeReady, Type: runtimeapi.RuntimeReady,

View File

@ -51,7 +51,7 @@ func NewDockerServer(endpoint string, s dockershim.CRIService) *DockerServer {
// Start starts the dockershim grpc server. // Start starts the dockershim grpc server.
func (s *DockerServer) Start(stopCh <-chan struct{}) error { func (s *DockerServer) Start(stopCh <-chan struct{}) error {
// Start the internal service. // Start the internal service.
if err := s.service.Start(); err != nil { if err := s.service.Start(stopCh); err != nil {
glog.Errorf("Unable to start docker service") glog.Errorf("Unable to start docker service")
return err return err
} }

View File

@ -512,21 +512,22 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
nodeRef: nodeRef, nodeRef: nodeRef,
nodeLabels: nodeLabels, nodeLabels: nodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration, nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
os: kubeDeps.OSInterface, os: kubeDeps.OSInterface,
oomWatcher: oomWatcher, oomWatcher: oomWatcher,
cgroupsPerQOS: kubeCfg.CgroupsPerQOS, cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
cgroupRoot: kubeCfg.CgroupRoot, cgroupRoot: kubeCfg.CgroupRoot,
mounter: kubeDeps.Mounter, mounter: kubeDeps.Mounter,
writer: kubeDeps.Writer, writer: kubeDeps.Writer,
maxPods: int(kubeCfg.MaxPods), maxPods: int(kubeCfg.MaxPods),
podsPerCore: int(kubeCfg.PodsPerCore), podsPerCore: int(kubeCfg.PodsPerCore),
syncLoopMonitor: atomic.Value{}, syncLoopMonitor: atomic.Value{},
daemonEndpoints: daemonEndpoints, daemonEndpoints: daemonEndpoints,
containerManager: kubeDeps.ContainerManager, containerManager: kubeDeps.ContainerManager,
containerRuntimeName: containerRuntime, containerRuntimeName: containerRuntime,
nodeIP: parsedNodeIP, redirectContainerStreaming: crOptions.RedirectContainerStreaming,
nodeIPValidator: validateNodeIP, nodeIP: parsedNodeIP,
clock: clock.RealClock{}, nodeIPValidator: validateNodeIP,
clock: clock.RealClock{},
enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach, enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4), iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains, makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
@ -605,16 +606,16 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
switch containerRuntime { switch containerRuntime {
case kubetypes.DockerContainerRuntime: case kubetypes.DockerContainerRuntime:
// Create and start the CRI shim running as a grpc server. // Create and start the CRI shim running as a grpc server.
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps) streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions)
ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig, ds, err := dockershim.NewDockerService(kubeDeps.DockerClientConfig, crOptions.PodSandboxImage, streamingConfig,
&pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory,
crOptions.DockerDisableSharedPID) crOptions.DockerDisableSharedPID, !crOptions.RedirectContainerStreaming)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// For now, the CRI shim redirects the streaming requests to the if crOptions.RedirectContainerStreaming {
// kubelet, which handles the requests using DockerService.. klet.criHandler = ds
klet.criHandler = ds }
// The unix socket for kubelet <-> dockershim communication. // The unix socket for kubelet <-> dockershim communication.
glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q", glog.V(5).Infof("RemoteRuntimeEndpoint: %q, RemoteImageEndpoint: %q",
@ -1000,6 +1001,9 @@ type Kubelet struct {
// The name of the container runtime // The name of the container runtime
containerRuntimeName string containerRuntimeName string
// redirectContainerStreaming enables container streaming redirect.
redirectContainerStreaming bool
// Container runtime. // Container runtime.
containerRuntime kubecontainer.Runtime containerRuntime kubecontainer.Runtime
@ -2097,11 +2101,6 @@ func (kl *Kubelet) BirthCry() {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.") kl.recorder.Eventf(kl.nodeRef, v1.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
} }
// StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}
// ResyncInterval returns the interval used for periodic syncs. // ResyncInterval returns the interval used for periodic syncs.
func (kl *Kubelet) ResyncInterval() time.Duration { func (kl *Kubelet) ResyncInterval() time.Duration {
return kl.resyncInterval return kl.resyncInterval
@ -2109,12 +2108,12 @@ func (kl *Kubelet) ResyncInterval() time.Duration {
// ListenAndServe runs the kubelet HTTP server. // ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) { func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers, enableContentionProfiling bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.containerRuntime, kl.criHandler) server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, enableContentionProfiling, kl.redirectContainerStreaming, kl.criHandler)
} }
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime) server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port)
} }
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
@ -2138,19 +2137,23 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
} }
// Gets the streaming server configuration to use with in-process CRI shims. // Gets the streaming server configuration to use with in-process CRI shims.
func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) *streaming.Config { func getStreamingConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions) *streaming.Config {
config := &streaming.Config{ config := &streaming.Config{
// Use a relative redirect (no scheme or host).
BaseURL: &url.URL{
Path: "/cri/",
},
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout, StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols, SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols, SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
} }
if kubeDeps.TLSOptions != nil { if !crOptions.RedirectContainerStreaming {
config.TLSConfig = kubeDeps.TLSOptions.Config config.Addr = net.JoinHostPort("localhost", "0")
} else {
// Use a relative redirect (no scheme or host).
config.BaseURL = &url.URL{
Path: "/cri/",
}
if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config
}
} }
return config return config
} }

View File

@ -30,7 +30,6 @@ import (
"sort" "sort"
"strings" "strings"
"sync" "sync"
"time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
@ -41,7 +40,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
utilvalidation "k8s.io/apimachinery/pkg/util/validation" utilvalidation "k8s.io/apimachinery/pkg/util/validation"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/remotecommand"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/api/v1/resource"
podshelper "k8s.io/kubernetes/pkg/apis/core/pods" podshelper "k8s.io/kubernetes/pkg/apis/core/pods"
@ -1589,27 +1587,6 @@ func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containe
return kl.runner.RunInContainer(container.ID, cmd, 0) return kl.runner.RunInContainer(container.ID, cmd, 0)
} }
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
// TODO(random-liu): Remove this.
return fmt.Errorf("unimplemented")
}
// AttachContainer uses the container runtime to attach the given streams to
// the given container.
func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
// TODO(random-liu): Remove this.
return fmt.Errorf("unimplemented")
}
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
// TODO(random-liu): Remove this.
return fmt.Errorf("unimplemented")
}
// GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it. // GetExec gets the URL the exec will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) { func (kl *Kubelet) GetExec(podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
container, err := kl.findContainer(podFullName, podUID, containerName) container, err := kl.findContainer(podFullName, podUID, containerName)

View File

@ -42,14 +42,13 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand" "k8s.io/apimachinery/pkg/util/proxy"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/httplog" "k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/flushwriter" "k8s.io/apiserver/pkg/util/flushwriter"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/v1/validation" "k8s.io/kubernetes/pkg/apis/core/v1/validation"
@ -74,11 +73,11 @@ const (
// Server is a http.Handler which exposes kubelet functionality over HTTP. // Server is a http.Handler which exposes kubelet functionality over HTTP.
type Server struct { type Server struct {
auth AuthInterface auth AuthInterface
host HostInterface host HostInterface
restfulCont containerInterface restfulCont containerInterface
resourceAnalyzer stats.ResourceAnalyzer resourceAnalyzer stats.ResourceAnalyzer
runtime kubecontainer.Runtime redirectContainerStreaming bool
} }
type TLSOptions struct { type TLSOptions struct {
@ -124,11 +123,11 @@ func ListenAndServeKubeletServer(
tlsOptions *TLSOptions, tlsOptions *TLSOptions,
auth AuthInterface, auth AuthInterface,
enableDebuggingHandlers, enableDebuggingHandlers,
enableContentionProfiling bool, enableContentionProfiling,
runtime kubecontainer.Runtime, redirectContainerStreaming bool,
criHandler http.Handler) { criHandler http.Handler) {
glog.Infof("Starting to listen on %s:%d", address, port) glog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, enableContentionProfiling, runtime, criHandler) handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, enableContentionProfiling, redirectContainerStreaming, criHandler)
s := &http.Server{ s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler, Handler: &handler,
@ -146,9 +145,9 @@ func ListenAndServeKubeletServer(
} }
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) { func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint) {
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port) glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
s := NewServer(host, resourceAnalyzer, nil, false, false, runtime, nil) s := NewServer(host, resourceAnalyzer, nil, false, false, false, nil)
server := &http.Server{ server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
@ -173,12 +172,8 @@ type HostInterface interface {
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
GetRunningPods() ([]*v1.Pod, error) GetRunningPods() ([]*v1.Pod, error)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error
AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error
GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request) ServeLogs(w http.ResponseWriter, req *http.Request)
PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error
StreamingConnectionIdleTimeout() time.Duration
ResyncInterval() time.Duration ResyncInterval() time.Duration
GetHostname() string GetHostname() string
LatestLoopEntryTime() time.Time LatestLoopEntryTime() time.Time
@ -193,15 +188,15 @@ func NewServer(
resourceAnalyzer stats.ResourceAnalyzer, resourceAnalyzer stats.ResourceAnalyzer,
auth AuthInterface, auth AuthInterface,
enableDebuggingHandlers, enableDebuggingHandlers,
enableContentionProfiling bool, enableContentionProfiling,
runtime kubecontainer.Runtime, redirectContainerStreaming bool,
criHandler http.Handler) Server { criHandler http.Handler) Server {
server := Server{ server := Server{
host: host, host: host,
resourceAnalyzer: resourceAnalyzer, resourceAnalyzer: resourceAnalyzer,
auth: auth, auth: auth,
restfulCont: &filteringContainer{Container: restful.NewContainer()}, restfulCont: &filteringContainer{Container: restful.NewContainer()},
runtime: runtime, redirectContainerStreaming: redirectContainerStreaming,
} }
if auth != nil { if auth != nil {
server.InstallAuthFilter() server.InstallAuthFilter()
@ -627,6 +622,15 @@ func getPortForwardRequestParams(req *restful.Request) portForwardRequestParams
} }
} }
type responder struct {
errorMessage string
}
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
glog.Errorf("Error while proxying request: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
// getAttach handles requests to attach to a container. // getAttach handles requests to attach to a container.
func (s *Server) getAttach(request *restful.Request, response *restful.Response) { func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
params := getExecRequestParams(request) params := getExecRequestParams(request)
@ -643,26 +647,18 @@ func (s *Server) getAttach(request *restful.Request, response *restful.Response)
} }
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
redirect, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts) url, err := s.host.GetAttach(podFullName, params.podUID, params.containerName, *streamOpts)
if err != nil { if err != nil {
streaming.WriteError(err, response.ResponseWriter) streaming.WriteError(err, response.ResponseWriter)
return return
} }
if redirect != nil {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) if s.redirectContainerStreaming {
http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
return return
} }
handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{})
remotecommandserver.ServeAttach(response.ResponseWriter, handler.ServeHTTP(response.ResponseWriter, request.Request)
request.Request,
s.host,
podFullName,
params.podUID,
params.containerName,
streamOpts,
s.host.StreamingConnectionIdleTimeout(),
remotecommandconsts.DefaultStreamCreationTimeout,
remotecommandconsts.SupportedStreamingProtocols)
} }
// getExec handles requests to run a command inside a container. // getExec handles requests to run a command inside a container.
@ -681,27 +677,17 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
} }
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
redirect, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts) url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
if err != nil { if err != nil {
streaming.WriteError(err, response.ResponseWriter) streaming.WriteError(err, response.ResponseWriter)
return return
} }
if redirect != nil { if s.redirectContainerStreaming {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
return return
} }
handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{})
remotecommandserver.ServeExec(response.ResponseWriter, handler.ServeHTTP(response.ResponseWriter, request.Request)
request.Request,
s.host,
podFullName,
params.podUID,
params.containerName,
params.cmd,
streamOpts,
s.host.StreamingConnectionIdleTimeout(),
remotecommandconsts.DefaultStreamCreationTimeout,
remotecommandconsts.SupportedStreamingProtocols)
} }
// getRun handles requests to run a command inside a container. // getRun handles requests to run a command inside a container.
@ -758,25 +744,17 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
return return
} }
redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions) url, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID, *portForwardOptions)
if err != nil { if err != nil {
streaming.WriteError(err, response.ResponseWriter) streaming.WriteError(err, response.ResponseWriter)
return return
} }
if redirect != nil { if s.redirectContainerStreaming {
http.Redirect(response.ResponseWriter, request.Request, redirect.String(), http.StatusFound) http.Redirect(response.ResponseWriter, request.Request, url.String(), http.StatusFound)
return return
} }
handler := proxy.NewUpgradeAwareHandler(url, nil /*transport*/, false /*wrapTransport*/, false /*upgradeRequired*/, &responder{})
portforward.ServePortForward(response.ResponseWriter, handler.ServeHTTP(response.ResponseWriter, request.Request)
request.Request,
s.host,
kubecontainer.GetPodFullName(pod),
params.podUID,
portForwardOptions,
s.host.StreamingConnectionIdleTimeout(),
remotecommandconsts.DefaultStreamCreationTimeout,
portforward.SupportedProtocols)
} }
// ServeHTTP responds to HTTP requests on the Kubelet. // ServeHTTP responds to HTTP requests on the Kubelet.

View File

@ -46,7 +46,6 @@ import (
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/authorization/authorizer" "k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
utiltesting "k8s.io/client-go/util/testing"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1"
// Do some initialization to decode the query parameters correctly. // Do some initialization to decode the query parameters correctly.
@ -203,7 +202,6 @@ type serverTestFramework struct {
fakeKubelet *fakeKubelet fakeKubelet *fakeKubelet
fakeAuth *fakeAuth fakeAuth *fakeAuth
testHTTPServer *httptest.Server testHTTPServer *httptest.Server
criHandler *utiltesting.FakeHandler
} }
func newServerTest() *serverTestFramework { func newServerTest() *serverTestFramework {
@ -238,17 +236,13 @@ func newServerTestWithDebug(enableDebugging bool) *serverTestFramework {
return authorizer.DecisionAllow, "", nil return authorizer.DecisionAllow, "", nil
}, },
} }
fw.criHandler = &utiltesting.FakeHandler{
StatusCode: http.StatusOK,
}
server := NewServer( server := NewServer(
fw.fakeKubelet, fw.fakeKubelet,
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute), stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute),
fw.fakeAuth, fw.fakeAuth,
enableDebugging, enableDebugging,
false, false,
&kubecontainertesting.Mock{}, &kubecontainertesting.Mock{})
fw.criHandler)
fw.serverUnderTest = &server fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw return fw
@ -1599,22 +1593,6 @@ func TestServePortForward(t *testing.T) {
} }
} }
func TestCRIHandler(t *testing.T) {
fw := newServerTest()
defer fw.testHTTPServer.Close()
const (
path = "/cri/exec/123456abcdef"
query = "cmd=echo+foo"
)
resp, err := http.Get(fw.testHTTPServer.URL + path + "?" + query)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "GET", fw.criHandler.RequestReceived.Method)
assert.Equal(t, path, fw.criHandler.RequestReceived.URL.Path)
assert.Equal(t, query, fw.criHandler.RequestReceived.URL.RawQuery)
}
func TestDebuggingDisabledHandlers(t *testing.T) { func TestDebuggingDisabledHandlers(t *testing.T) {
fw := newServerTestWithDebug(false) fw := newServerTestWithDebug(false)
defer fw.testHTTPServer.Close() defer fw.testHTTPServer.Close()

View File

@ -20,6 +20,7 @@ import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"io" "io"
"net"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
@ -71,6 +72,7 @@ type Config struct {
Addr string Addr string
// The optional base URL for constructing streaming URLs. If empty, the baseURL will be // The optional base URL for constructing streaming URLs. If empty, the baseURL will be
// constructed from the serve address. // constructed from the serve address.
// Note that for port "0", the URL port will be set to actual port in use.
BaseURL *url.URL BaseURL *url.URL
// How long to leave idle connections open for. // How long to leave idle connections open for.
@ -233,10 +235,16 @@ func (s *server) Start(stayUp bool) error {
return errors.New("stayUp=false is not yet implemented") return errors.New("stayUp=false is not yet implemented")
} }
listener, err := net.Listen("tcp", s.config.Addr)
if err != nil {
return err
}
// Use the actual address as baseURL host. This handles the "0" port case.
s.config.BaseURL.Host = listener.Addr().String()
if s.config.TLSConfig != nil { if s.config.TLSConfig != nil {
return s.server.ListenAndServeTLS("", "") // Use certs from TLSConfig. return s.server.ServeTLS(listener, "", "") // Use certs from TLSConfig.
} else { } else {
return s.server.ListenAndServe() return s.server.Serve(listener)
} }
} }