diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 800ba6ad5f4..eaade638c31 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -77,6 +77,7 @@ go_library( "//pkg/kubelet/server:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/stats:go_default_library", + "//pkg/kubelet/server/streaming:go_default_library", "//pkg/kubelet/status:go_default_library", "//pkg/kubelet/sysctl:go_default_library", "//pkg/kubelet/types:go_default_library", diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index 324acb06592..cba56f896e4 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -21,7 +21,6 @@ go_library( "docker_service.go", "docker_streaming.go", "helpers.go", - "legacy.go", "naming.go", "security_context.go", ], diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index 5ffe8829312..1cf1cb13bdd 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -18,7 +18,7 @@ package dockershim import ( "fmt" - "io" + "net/http" "github.com/golang/glog" "github.com/golang/protobuf/proto" @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network/cni" "k8s.io/kubernetes/pkg/kubelet/network/kubenet" "k8s.io/kubernetes/pkg/kubelet/server/streaming" - "k8s.io/kubernetes/pkg/util/term" ) const ( @@ -132,23 +131,14 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str return ds, nil } -// DockerService is an interface that embeds both the new RuntimeService and -// ImageService interfaces, while including DockerLegacyService for backward -// compatibility. +// DockerService is an interface that embeds the new RuntimeService and +// ImageService interfaces. type DockerService interface { internalApi.RuntimeService internalApi.ImageManagerService - DockerLegacyService Start() error -} - -// DockerLegacyService is an interface that embeds all legacy methods for -// backward compatibility. -type DockerLegacyService interface { - // Supporting legacy methods for docker. - LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error - LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error - LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error + // For serving streaming calls. + http.Handler } type dockerService struct { @@ -249,3 +239,11 @@ func (ds *dockerService) Status() (*runtimeApi.RuntimeStatus, error) { } return &runtimeApi.RuntimeStatus{Conditions: conditions}, nil } + +func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if ds.streamingServer != nil { + ds.streamingServer.ServeHTTP(w, r) + } else { + http.NotFound(w, r) + } +} diff --git a/pkg/kubelet/dockershim/legacy.go b/pkg/kubelet/dockershim/legacy.go deleted file mode 100644 index 683e3755310..00000000000 --- a/pkg/kubelet/dockershim/legacy.go +++ /dev/null @@ -1,41 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dockershim - -import ( - "io" - - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/util/term" -) - -// This file implements the functions that are needed for backward -// compatibility. Therefore, it imports various kubernetes packages -// directly. - -// TODO: implement the methods in this file. -func (ds *dockerService) LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) { - return ds.streamingRuntime.Attach(id.ID, stdin, stdout, stderr, resize) -} - -func (ds *dockerService) LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error { - return ds.streamingRuntime.PortForward(sandboxID, int32(port), stream) -} - -func (ds *dockerService) LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error { - return ds.streamingRuntime.Exec(containerID.ID, cmd, stdin, stdout, stderr, tty, resize) -} diff --git a/pkg/kubelet/dockershim/remote/docker_service.go b/pkg/kubelet/dockershim/remote/docker_service.go index fdca2f1ec65..1cef919f61d 100644 --- a/pkg/kubelet/dockershim/remote/docker_service.go +++ b/pkg/kubelet/dockershim/remote/docker_service.go @@ -17,7 +17,6 @@ limitations under the License. package remote import ( - "fmt" "time" "golang.org/x/net/context" @@ -165,15 +164,15 @@ func (d *dockerService) ExecSync(ctx context.Context, r *runtimeApi.ExecSyncRequ } func (d *dockerService) Exec(ctx context.Context, r *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) { - return nil, fmt.Errorf("not implemented") + return d.runtimeService.Exec(r) } func (d *dockerService) Attach(ctx context.Context, r *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) { - return nil, fmt.Errorf("not implemented") + return d.runtimeService.Attach(r) } func (d *dockerService) PortForward(ctx context.Context, r *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) { - return nil, fmt.Errorf("not implemented") + return d.runtimeService.PortForward(r) } func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeApi.UpdateRuntimeConfigRequest) (*runtimeApi.UpdateRuntimeConfigResponse, error) { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index bc63f07d473..5e2b5e9375a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "net/http" + "net/url" "os" "path" "sort" @@ -62,6 +63,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/server" "k8s.io/kubernetes/pkg/kubelet/server/stats" + "k8s.io/kubernetes/pkg/kubelet/server/streaming" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/sysctl" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -527,8 +529,9 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub switch kubeCfg.ContainerRuntime { case "docker": + streamingConfig := getStreamingConfig(kubeCfg, kubeDeps) // Use the new CRI shim for docker. - ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings, kubeCfg.RuntimeCgroups) + ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, streamingConfig, &pluginSettings, kubeCfg.RuntimeCgroups) if err != nil { return nil, err } @@ -538,6 +541,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub return nil, err } + klet.criHandler = ds rs := ds.(internalApi.RuntimeService) is := ds.(internalApi.ImageManagerService) // This is an internal knob to switch between grpc and non-grpc @@ -567,13 +571,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub // functions in CRI. // TODO: Remove this hack after CRI is fully implemented. // TODO: Move the instrumented interface wrapping into kuberuntime. - runtimeService = &struct { - internalApi.RuntimeService - dockershim.DockerLegacyService - }{ - RuntimeService: kuberuntime.NewInstrumentedRuntimeService(rs), - DockerLegacyService: ds, - } + runtimeService = kuberuntime.NewInstrumentedRuntimeService(rs) imageService = is case "remote": runtimeService, imageService, err = getRuntimeAndImageServices(kubeCfg) @@ -1086,6 +1084,9 @@ type Kubelet struct { // The AppArmor validator for checking whether AppArmor is supported. appArmorValidator apparmor.Validator + + // The handler serving CRI streaming calls (exec/attach/port-forward). + criHandler http.Handler } // setupDataDirs creates: @@ -2076,7 +2077,7 @@ func (kl *Kubelet) ResyncInterval() time.Duration { // ListenAndServe runs the kubelet HTTP server. func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) { - server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime) + server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime, kl.criHandler) } // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. @@ -2142,3 +2143,20 @@ func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) } return reservation, nil } + +// Gets the streaming server configuration to use with in-process CRI shims. +func getStreamingConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps) *streaming.Config { + config := &streaming.Config{ + // Use a relative redirect (no scheme or host). + BaseURL: &url.URL{ + Path: "/cri/", + }, + StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration, + StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout, + SupportedProtocols: streaming.DefaultConfig.SupportedProtocols, + } + if kubeDeps.TLSOptions != nil { + config.TLSConfig = kubeDeps.TLSOptions.Config + } + return config +} diff --git a/pkg/kubelet/kuberuntime/BUILD b/pkg/kubelet/kuberuntime/BUILD index ffa596fdace..38844e71891 100644 --- a/pkg/kubelet/kuberuntime/BUILD +++ b/pkg/kubelet/kuberuntime/BUILD @@ -36,7 +36,6 @@ go_library( "//pkg/kubelet/api:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/container:go_default_library", - "//pkg/kubelet/dockershim:go_default_library", "//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/images:go_default_library", @@ -56,7 +55,6 @@ go_library( "//pkg/util/runtime:go_default_library", "//pkg/util/selinux:go_default_library", "//pkg/util/sets:go_default_library", - "//pkg/util/term:go_default_library", "//vendor:github.com/coreos/go-semver/semver", "//vendor:github.com/docker/docker/pkg/jsonlog", "//vendor:github.com/fsnotify/fsnotify", diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 65610b2f0df..dc0ffe33dd4 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/dockershim" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/types" @@ -42,7 +41,6 @@ import ( utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/selinux" "k8s.io/kubernetes/pkg/util/sets" - "k8s.io/kubernetes/pkg/util/term" ) // startContainer starts a container and returns a message indicates why it is failed on error. @@ -653,17 +651,6 @@ func findNextInitContainerToRun(pod *api.Pod, podStatus *kubecontainer.PodStatus return nil, &pod.Spec.InitContainers[0], false } -// AttachContainer attaches to the container's console -// TODO: Remove this method once the indirect streaming path is fully functional. -func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) { - // Use `docker attach` directly for in-process docker integration for - // now to unblock other tests. - if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { - return ds.LegacyAttach(id, stdin, stdout, stderr, tty, resize) - } - return fmt.Errorf("not implemented") -} - // GetContainerLogs returns logs of a specific container. func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) { status, err := m.runtimeService.ContainerStatus(containerID.ID) @@ -714,19 +701,6 @@ func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, return append(stdout, stderr...), err } -// Runs the command in the container of the specified pod using nsenter. -// Attaches the processes stdin, stdout, and stderr. Optionally uses a -// tty. -// TODO: Remove this method once the indirect streaming path is fully functional. -func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error { - // Use `docker exec` directly for in-process docker integration for - // now to unblock other tests. - if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { - return ds.LegacyExec(containerID, cmd, stdin, stdout, stderr, tty, resize) - } - return fmt.Errorf("not implemented") -} - // removeContainer removes the container and the container logs. // Notice that we remove the container logs first, so that container will not be removed if // container logs are failed to be removed, and kubelet will retry this later. This guarantees diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 159f4dce33c..587695f2df5 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -19,7 +19,6 @@ package kuberuntime import ( "errors" "fmt" - "io" "os" "time" @@ -33,7 +32,6 @@ import ( internalApi "k8s.io/kubernetes/pkg/kubelet/api" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/dockershim" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -114,8 +112,6 @@ type KubeGenericRuntime interface { kubecontainer.Runtime kubecontainer.IndirectStreamingRuntime kubecontainer.ContainerCommandRunner - // TODO(timstclair): Remove this once the indirect path is fully functional. - kubecontainer.DirectStreamingRuntime } // NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager @@ -919,24 +915,6 @@ func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (k return pod.Sandboxes[0].ID, nil } -// Forward the specified port from the specified pod to the stream. -// TODO: Remove this method once the indirect streaming path is fully functional. -func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error { - formattedPod := kubecontainer.FormatPod(pod) - if len(pod.Sandboxes) == 0 { - glog.Errorf("No sandboxes are found for pod %q", formattedPod) - return fmt.Errorf("sandbox for pod %q not found", formattedPod) - } - - // Use docker portforward directly for in-process docker integration - // now to unblock other tests. - if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok { - return ds.LegacyPortForward(pod.Sandboxes[0].ID.ID, port, stream) - } - - return fmt.Errorf("not implemented") -} - // UpdatePodCIDR is just a passthrough method to update the runtimeConfig of the shim // with the podCIDR supplied by the kubelet. func (m *kubeGenericRuntimeManager) UpdatePodCIDR(podCIDR string) error { diff --git a/pkg/kubelet/server/BUILD b/pkg/kubelet/server/BUILD index bed803086fe..b6bc30809ab 100644 --- a/pkg/kubelet/server/BUILD +++ b/pkg/kubelet/server/BUILD @@ -74,6 +74,7 @@ go_test( "//pkg/util/httpstream/spdy:go_default_library", "//pkg/util/sets:go_default_library", "//pkg/util/term:go_default_library", + "//pkg/util/testing:go_default_library", "//pkg/volume:go_default_library", "//vendor:github.com/google/cadvisor/info/v1", "//vendor:github.com/google/cadvisor/info/v2", diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 4e7d6ebe883..ededae92e3c 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -118,9 +118,10 @@ func ListenAndServeKubeletServer( tlsOptions *TLSOptions, auth AuthInterface, enableDebuggingHandlers bool, - runtime kubecontainer.Runtime) { + runtime kubecontainer.Runtime, + criHandler http.Handler) { glog.Infof("Starting to listen on %s:%d", address, port) - handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime) + handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime, criHandler) s := &http.Server{ Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Handler: &handler, @@ -137,7 +138,7 @@ func ListenAndServeKubeletServer( // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) { glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port) - s := NewServer(host, resourceAnalyzer, nil, false, runtime) + s := NewServer(host, resourceAnalyzer, nil, false, runtime, nil) server := &http.Server{ Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), @@ -191,7 +192,8 @@ func NewServer( resourceAnalyzer stats.ResourceAnalyzer, auth AuthInterface, enableDebuggingHandlers bool, - runtime kubecontainer.Runtime) Server { + runtime kubecontainer.Runtime, + criHandler http.Handler) Server { server := Server{ host: host, resourceAnalyzer: resourceAnalyzer, @@ -204,7 +206,7 @@ func NewServer( } server.InstallDefaultHandlers() if enableDebuggingHandlers { - server.InstallDebuggingHandlers() + server.InstallDebuggingHandlers(criHandler) } return server } @@ -282,7 +284,7 @@ func (s *Server) InstallDefaultHandlers() { const pprofBasePath = "/debug/pprof/" // InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers -func (s *Server) InstallDebuggingHandlers() { +func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) { var ws *restful.WebService ws = new(restful.WebService) @@ -393,14 +395,10 @@ func (s *Server) InstallDebuggingHandlers() { To(s.getRunningPods). Operation("getRunningPods")) s.restfulCont.Add(ws) -} -type httpHandler struct { - f func(w http.ResponseWriter, r *http.Request) -} - -func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.f(w, r) + if criHandler != nil { + s.restfulCont.Handle("/cri/", criHandler) + } } // Checks if kubelet's sync loop that updates containers is working. @@ -701,8 +699,12 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) return } + if len(params.podUID) > 0 && pod.UID != params.podUID { + response.WriteError(http.StatusNotFound, fmt.Errorf("pod not found")) + return + } - redirect, err := s.host.GetPortForward(params.podName, params.podNamespace, params.podUID) + redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID) if err != nil { response.WriteError(streaming.HTTPStatus(err), err) return diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index 0fa92bdc895..648eaa30d18 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -53,9 +53,14 @@ import ( "k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/term" + utiltesting "k8s.io/kubernetes/pkg/util/testing" "k8s.io/kubernetes/pkg/volume" ) +const ( + testUID = "9b01b80f-8fb4-11e4-95ab-4200af06647" +) + type fakeKubelet struct { podByNameFunc func(namespace, name string) (*api.Pod, bool) containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) @@ -196,6 +201,7 @@ type serverTestFramework struct { fakeKubelet *fakeKubelet fakeAuth *fakeAuth testHTTPServer *httptest.Server + criHandler *utiltesting.FakeHandler } func newServerTest() *serverTestFramework { @@ -209,6 +215,7 @@ func newServerTest() *serverTestFramework { ObjectMeta: api.ObjectMeta{ Namespace: namespace, Name: name, + UID: testUID, }, }, true }, @@ -225,12 +232,16 @@ func newServerTest() *serverTestFramework { return true, "", nil }, } + fw.criHandler = &utiltesting.FakeHandler{ + StatusCode: http.StatusOK, + } server := NewServer( fw.fakeKubelet, stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &kubecontainertesting.FakeRuntime{}), fw.fakeAuth, true, - &kubecontainertesting.Mock{}) + &kubecontainertesting.Mock{}, + fw.criHandler) fw.serverUnderTest = &server fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) return fw @@ -296,15 +307,14 @@ func TestContainerInfoWithUidNamespace(t *testing.T) { expectedNamespace := "custom" expectedPodID := getPodName(podID, expectedNamespace) expectedContainerName := "goodcontainer" - expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647" fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) { - if podID != expectedPodID || string(uid) != expectedUid || containerName != expectedContainerName { + if podID != expectedPodID || string(uid) != testUID || containerName != expectedContainerName { return nil, fmt.Errorf("bad podID or uid or containerName: podID=%v; uid=%v; containerName=%v", podID, uid, containerName) } return expectedInfo, nil } - resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName)) + resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, testUID, expectedContainerName)) if err != nil { t.Fatalf("Got error GETing: %v", err) } @@ -325,11 +335,10 @@ func TestContainerNotFound(t *testing.T) { podID := "somepod" expectedNamespace := "custom" expectedContainerName := "slowstartcontainer" - expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647" fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) { return nil, kubecontainer.ErrContainerNotFound } - resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName)) + resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, testUID, expectedContainerName)) if err != nil { t.Fatalf("Got error GETing: %v", err) } @@ -517,15 +526,14 @@ func TestServeRunInContainerWithUID(t *testing.T) { podNamespace := "other" podName := "foo" expectedPodName := getPodName(podName, podNamespace) - expectedUID := "7e00838d_-_3523_-_11e4_-_8421_-_42010af0a720" expectedContainerName := "baz" expectedCommand := "ls -a" fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) { if podFullName != expectedPodName { t.Errorf("expected %s, got %s", expectedPodName, podFullName) } - if string(uid) != expectedUID { - t.Errorf("expected %s, got %s", expectedUID, uid) + if string(uid) != testUID { + t.Errorf("expected %s, got %s", testUID, uid) } if containerName != expectedContainerName { t.Errorf("expected %s, got %s", expectedContainerName, containerName) @@ -537,7 +545,7 @@ func TestServeRunInContainerWithUID(t *testing.T) { return []byte(output), nil } - resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil) + resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil) if err != nil { t.Fatalf("Got error POSTing: %v", err) @@ -645,7 +653,8 @@ func TestAuthFilters(t *testing.T) { isSubpath(path, "/pods"), isSubpath(path, "/portForward"), isSubpath(path, "/run"), - isSubpath(path, "/runningpods"): + isSubpath(path, "/runningpods"), + isSubpath(path, "/cri"): return "proxy" default: @@ -1182,7 +1191,6 @@ func testExecAttach(t *testing.T, verb string) { podNamespace := "other" podName := "foo" expectedPodName := getPodName(podName, podNamespace) - expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647" expectedContainerName := "baz" expectedCommand := "ls -a" expectedStdin := "stdin" @@ -1200,8 +1208,8 @@ func testExecAttach(t *testing.T, verb string) { if podFullName != expectedPodName { t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName) } - if test.uid && string(uid) != expectedUid { - t.Fatalf("%d: uid: expected %v, got %v", i, expectedUid, uid) + if test.uid && string(uid) != testUID { + t.Fatalf("%d: uid: expected %v, got %v", i, testUID, uid) } if containerName != expectedContainerName { t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName) @@ -1273,7 +1281,7 @@ func testExecAttach(t *testing.T, verb string) { var url string if test.uid { - url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedUid + "/" + expectedContainerName + "?ignore=1" + url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + testUID + "/" + expectedContainerName + "?ignore=1" } else { url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1" } @@ -1491,7 +1499,6 @@ func TestServePortForward(t *testing.T) { podNamespace := "other" podName := "foo" expectedPodName := getPodName(podName, podNamespace) - expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647" for i, test := range tests { fw := newServerTest() @@ -1516,7 +1523,7 @@ func TestServePortForward(t *testing.T) { t.Fatalf("%d: pod name: expected '%v', got '%v'", i, e, a) } - if e, a := expectedUid, uid; test.uid && e != string(a) { + if e, a := testUID, uid; test.uid && e != string(a) { t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a) } @@ -1551,7 +1558,7 @@ func TestServePortForward(t *testing.T) { var url string if test.uid { - url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, expectedUid) + url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, testUID) } else { url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName) } @@ -1629,3 +1636,19 @@ func TestServePortForward(t *testing.T) { <-portForwardFuncDone } } + +func TestCRIHandler(t *testing.T) { + fw := newServerTest() + defer fw.testHTTPServer.Close() + + const ( + path = "/cri/exec/123456abcdef" + query = "cmd=echo+foo" + ) + resp, err := http.Get(fw.testHTTPServer.URL + path + "?" + query) + require.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "GET", fw.criHandler.RequestReceived.Method) + assert.Equal(t, path, fw.criHandler.RequestReceived.URL.Path) + assert.Equal(t, query, fw.criHandler.RequestReceived.URL.RawQuery) +} diff --git a/pkg/kubelet/server/streaming/BUILD b/pkg/kubelet/server/streaming/BUILD index 089ab2c0a07..89cc3e63a25 100644 --- a/pkg/kubelet/server/streaming/BUILD +++ b/pkg/kubelet/server/streaming/BUILD @@ -18,6 +18,7 @@ go_library( ], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/server/portforward:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library", @@ -26,7 +27,6 @@ go_library( "//vendor:github.com/emicklei/go-restful", "//vendor:google.golang.org/grpc", "//vendor:google.golang.org/grpc/codes", - "//vendor:k8s.io/client-go/pkg/api", ], ) diff --git a/pkg/kubelet/server/streaming/server.go b/pkg/kubelet/server/streaming/server.go index 8a3a6b2ca42..5161d133e0c 100644 --- a/pkg/kubelet/server/streaming/server.go +++ b/pkg/kubelet/server/streaming/server.go @@ -19,15 +19,15 @@ package streaming import ( "crypto/tls" "errors" - "fmt" "io" "net/http" "net/url" + "path" "time" restful "github.com/emicklei/go-restful" - "k8s.io/client-go/pkg/api" + "k8s.io/kubernetes/pkg/api" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand" @@ -39,7 +39,7 @@ import ( type Server interface { http.Handler - // Get the serving URL for the requests. Server must be started before these are called. + // Get the serving URL for the requests. // Requests must not be nil. Responses may be nil iff an error is returned. GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) GetAttach(req *runtimeapi.AttachRequest, tty bool) (*runtimeapi.AttachResponse, error) @@ -66,6 +66,9 @@ type Runtime interface { type Config struct { // The host:port address the server will listen on. Addr string + // The optional base URL for constructing streaming URLs. If empty, the baseURL will be + // constructed from the serve address. + BaseURL *url.URL // How long to leave idle connections open for. StreamIdleTimeout time.Duration @@ -96,6 +99,16 @@ func NewServer(config Config, runtime Runtime) (Server, error) { runtime: &criAdapter{runtime}, } + if s.config.BaseURL == nil { + s.config.BaseURL = &url.URL{ + Scheme: "http", + Host: s.config.Addr, + } + if s.config.TLSConfig != nil { + s.config.BaseURL.Scheme = "https" + } + } + ws := &restful.WebService{} endpoints := []struct { path string @@ -105,11 +118,13 @@ func NewServer(config Config, runtime Runtime) (Server, error) { {"/attach/{containerID}", s.serveAttach}, {"/portforward/{podSandboxID}", s.servePortForward}, } + // If serving relative to a base path, set that here. + pathPrefix := path.Dir(s.config.BaseURL.Path) for _, e := range endpoints { for _, method := range []string{"GET", "POST"} { ws.Route(ws. Method(method). - Path(e.path). + Path(path.Join(pathPrefix, e.path)). To(e.handler)) } } @@ -204,13 +219,8 @@ const ( ) func (s *server) buildURL(method, id string, opts streamOpts) string { - loc := url.URL{ - Scheme: "http", - Host: s.config.Addr, - Path: fmt.Sprintf("/%s/%s", method, id), - } - if s.config.TLSConfig != nil { - loc.Scheme = "https" + loc := &url.URL{ + Path: path.Join(method, id), } query := url.Values{} @@ -231,7 +241,7 @@ func (s *server) buildURL(method, id string, opts streamOpts) string { } loc.RawQuery = query.Encode() - return loc.String() + return s.config.BaseURL.ResolveReference(loc).String() } func (s *server) serveExec(req *restful.Request, resp *restful.Response) { diff --git a/pkg/kubelet/server/streaming/server_test.go b/pkg/kubelet/server/streaming/server_test.go index 9922449ea6a..1bd4e3f609a 100644 --- a/pkg/kubelet/server/streaming/server_test.go +++ b/pkg/kubelet/server/streaming/server_test.go @@ -68,6 +68,17 @@ func TestGetExec(t *testing.T) { }, nil) assert.NoError(t, err) + const pathPrefix = "cri/shim" + prefixServer, err := NewServer(Config{ + Addr: testAddr, + BaseURL: &url.URL{ + Scheme: "http", + Host: testAddr, + Path: "/" + pathPrefix + "/", + }, + }, nil) + assert.NoError(t, err) + containerID := testContainerID for _, test := range testcases { request := &runtimeapi.ExecRequest{ @@ -87,6 +98,12 @@ func TestGetExec(t *testing.T) { assert.NoError(t, err, "testcase=%+v", test) expectedURL = "https://" + testAddr + "/exec/" + testContainerID + test.expectedQuery assert.Equal(t, expectedURL, resp.GetUrl(), "testcase=%+v", test) + + // Path prefix + resp, err = prefixServer.GetExec(request) + assert.NoError(t, err, "testcase=%+v", test) + expectedURL = "http://" + testAddr + "/" + pathPrefix + "/exec/" + testContainerID + test.expectedQuery + assert.Equal(t, expectedURL, resp.GetUrl(), "testcase=%+v", test) } }