Merge pull request #36253 from timstclair/klet-stream-config-pr

Automatic merge from submit-queue

Use indirect streaming path for remote CRI shim

Last step for https://github.com/kubernetes/kubernetes/issues/29579

- Wire through the remote indirect streaming methods in the docker remote shim
- Add the docker streaming server as a handler at `<node>:10250/cri/{exec,attach,portforward}`
- Disable legacy streaming for dockershim

Note: This requires PR https://github.com/kubernetes/kubernetes/pull/34987 to work.

Tested manually on an E2E cluster.

/cc @euank @feiskyer @kubernetes/sig-node
This commit is contained in:
Kubernetes Submit Queue 2016-11-09 23:29:18 -08:00 committed by GitHub
commit 9bdff48d5e
15 changed files with 142 additions and 165 deletions

View File

@ -77,6 +77,7 @@ go_library(
"//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/sysctl:go_default_library",
"//pkg/kubelet/types:go_default_library",

View File

@ -21,7 +21,6 @@ go_library(
"docker_service.go",
"docker_streaming.go",
"helpers.go",
"legacy.go",
"naming.go",
"security_context.go",
],

View File

@ -18,7 +18,7 @@ package dockershim
import (
"fmt"
"io"
"net/http"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network/cni"
"k8s.io/kubernetes/pkg/kubelet/network/kubenet"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/util/term"
)
const (
@ -132,23 +131,14 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str
return ds, nil
}
// DockerService is an interface that embeds both the new RuntimeService and
// ImageService interfaces, while including DockerLegacyService for backward
// compatibility.
// DockerService is an interface that embeds the new RuntimeService and
// ImageService interfaces.
type DockerService interface {
internalApi.RuntimeService
internalApi.ImageManagerService
DockerLegacyService
Start() error
}
// DockerLegacyService is an interface that embeds all legacy methods for
// backward compatibility.
type DockerLegacyService interface {
// Supporting legacy methods for docker.
LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error
LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error
// For serving streaming calls.
http.Handler
}
type dockerService struct {
@ -249,3 +239,11 @@ func (ds *dockerService) Status() (*runtimeApi.RuntimeStatus, error) {
}
return &runtimeApi.RuntimeStatus{Conditions: conditions}, nil
}
func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ds.streamingServer != nil {
ds.streamingServer.ServeHTTP(w, r)
} else {
http.NotFound(w, r)
}
}

View File

@ -1,41 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockershim
import (
"io"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/util/term"
)
// This file implements the functions that are needed for backward
// compatibility. Therefore, it imports various kubernetes packages
// directly.
// TODO: implement the methods in this file.
func (ds *dockerService) LegacyAttach(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
return ds.streamingRuntime.Attach(id.ID, stdin, stdout, stderr, resize)
}
func (ds *dockerService) LegacyPortForward(sandboxID string, port uint16, stream io.ReadWriteCloser) error {
return ds.streamingRuntime.PortForward(sandboxID, int32(port), stream)
}
func (ds *dockerService) LegacyExec(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
return ds.streamingRuntime.Exec(containerID.ID, cmd, stdin, stdout, stderr, tty, resize)
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package remote
import (
"fmt"
"time"
"golang.org/x/net/context"
@ -165,15 +164,15 @@ func (d *dockerService) ExecSync(ctx context.Context, r *runtimeApi.ExecSyncRequ
}
func (d *dockerService) Exec(ctx context.Context, r *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
return nil, fmt.Errorf("not implemented")
return d.runtimeService.Exec(r)
}
func (d *dockerService) Attach(ctx context.Context, r *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
return nil, fmt.Errorf("not implemented")
return d.runtimeService.Attach(r)
}
func (d *dockerService) PortForward(ctx context.Context, r *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
return nil, fmt.Errorf("not implemented")
return d.runtimeService.PortForward(r)
}
func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeApi.UpdateRuntimeConfigRequest) (*runtimeApi.UpdateRuntimeConfigResponse, error) {

View File

@ -20,6 +20,7 @@ import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"path"
"sort"
@ -62,6 +63,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -527,8 +529,9 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
switch kubeCfg.ContainerRuntime {
case "docker":
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps)
// Use the new CRI shim for docker.
ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings, kubeCfg.RuntimeCgroups)
ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, streamingConfig, &pluginSettings, kubeCfg.RuntimeCgroups)
if err != nil {
return nil, err
}
@ -538,6 +541,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
return nil, err
}
klet.criHandler = ds
rs := ds.(internalApi.RuntimeService)
is := ds.(internalApi.ImageManagerService)
// This is an internal knob to switch between grpc and non-grpc
@ -567,13 +571,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
// functions in CRI.
// TODO: Remove this hack after CRI is fully implemented.
// TODO: Move the instrumented interface wrapping into kuberuntime.
runtimeService = &struct {
internalApi.RuntimeService
dockershim.DockerLegacyService
}{
RuntimeService: kuberuntime.NewInstrumentedRuntimeService(rs),
DockerLegacyService: ds,
}
runtimeService = kuberuntime.NewInstrumentedRuntimeService(rs)
imageService = is
case "remote":
runtimeService, imageService, err = getRuntimeAndImageServices(kubeCfg)
@ -1086,6 +1084,9 @@ type Kubelet struct {
// The AppArmor validator for checking whether AppArmor is supported.
appArmorValidator apparmor.Validator
// The handler serving CRI streaming calls (exec/attach/port-forward).
criHandler http.Handler
}
// setupDataDirs creates:
@ -2076,7 +2077,7 @@ func (kl *Kubelet) ResyncInterval() time.Duration {
// ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime)
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime, kl.criHandler)
}
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
@ -2142,3 +2143,20 @@ func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap)
}
return reservation, nil
}
// Gets the streaming server configuration to use with in-process CRI shims.
func getStreamingConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps) *streaming.Config {
config := &streaming.Config{
// Use a relative redirect (no scheme or host).
BaseURL: &url.URL{
Path: "/cri/",
},
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedProtocols: streaming.DefaultConfig.SupportedProtocols,
}
if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config
}
return config
}

View File

@ -36,7 +36,6 @@ go_library(
"//pkg/kubelet/api:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/images:go_default_library",
@ -56,7 +55,6 @@ go_library(
"//pkg/util/runtime:go_default_library",
"//pkg/util/selinux:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/term:go_default_library",
"//vendor:github.com/coreos/go-semver/semver",
"//vendor:github.com/docker/docker/pkg/jsonlog",
"//vendor:github.com/fsnotify/fsnotify",

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/types"
@ -42,7 +41,6 @@ import (
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/selinux"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
)
// startContainer starts a container and returns a message indicates why it is failed on error.
@ -653,17 +651,6 @@ func findNextInitContainerToRun(pod *api.Pod, podStatus *kubecontainer.PodStatus
return nil, &pod.Spec.InitContainers[0], false
}
// AttachContainer attaches to the container's console
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) {
// Use `docker attach` directly for in-process docker integration for
// now to unblock other tests.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyAttach(id, stdin, stdout, stderr, tty, resize)
}
return fmt.Errorf("not implemented")
}
// GetContainerLogs returns logs of a specific container.
func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error) {
status, err := m.runtimeService.ContainerStatus(containerID.ID)
@ -714,19 +701,6 @@ func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID,
return append(stdout, stderr...), err
}
// Runs the command in the container of the specified pod using nsenter.
// Attaches the processes stdin, stdout, and stderr. Optionally uses a
// tty.
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) ExecInContainer(containerID kubecontainer.ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error {
// Use `docker exec` directly for in-process docker integration for
// now to unblock other tests.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyExec(containerID, cmd, stdin, stdout, stderr, tty, resize)
}
return fmt.Errorf("not implemented")
}
// removeContainer removes the container and the container logs.
// Notice that we remove the container logs first, so that container will not be removed if
// container logs are failed to be removed, and kubelet will retry this later. This guarantees

View File

@ -19,7 +19,6 @@ package kuberuntime
import (
"errors"
"fmt"
"io"
"os"
"time"
@ -33,7 +32,6 @@ import (
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -114,8 +112,6 @@ type KubeGenericRuntime interface {
kubecontainer.Runtime
kubecontainer.IndirectStreamingRuntime
kubecontainer.ContainerCommandRunner
// TODO(timstclair): Remove this once the indirect path is fully functional.
kubecontainer.DirectStreamingRuntime
}
// NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
@ -919,24 +915,6 @@ func (m *kubeGenericRuntimeManager) GetPodContainerID(pod *kubecontainer.Pod) (k
return pod.Sandboxes[0].ID, nil
}
// Forward the specified port from the specified pod to the stream.
// TODO: Remove this method once the indirect streaming path is fully functional.
func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
formattedPod := kubecontainer.FormatPod(pod)
if len(pod.Sandboxes) == 0 {
glog.Errorf("No sandboxes are found for pod %q", formattedPod)
return fmt.Errorf("sandbox for pod %q not found", formattedPod)
}
// Use docker portforward directly for in-process docker integration
// now to unblock other tests.
if ds, ok := m.runtimeService.(dockershim.DockerLegacyService); ok {
return ds.LegacyPortForward(pod.Sandboxes[0].ID.ID, port, stream)
}
return fmt.Errorf("not implemented")
}
// UpdatePodCIDR is just a passthrough method to update the runtimeConfig of the shim
// with the podCIDR supplied by the kubelet.
func (m *kubeGenericRuntimeManager) UpdatePodCIDR(podCIDR string) error {

View File

@ -74,6 +74,7 @@ go_test(
"//pkg/util/httpstream/spdy:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/term:go_default_library",
"//pkg/util/testing:go_default_library",
"//pkg/volume:go_default_library",
"//vendor:github.com/google/cadvisor/info/v1",
"//vendor:github.com/google/cadvisor/info/v2",

View File

@ -118,9 +118,10 @@ func ListenAndServeKubeletServer(
tlsOptions *TLSOptions,
auth AuthInterface,
enableDebuggingHandlers bool,
runtime kubecontainer.Runtime) {
runtime kubecontainer.Runtime,
criHandler http.Handler) {
glog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime, criHandler)
s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
@ -137,7 +138,7 @@ func ListenAndServeKubeletServer(
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) {
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
s := NewServer(host, resourceAnalyzer, nil, false, runtime)
s := NewServer(host, resourceAnalyzer, nil, false, runtime, nil)
server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
@ -191,7 +192,8 @@ func NewServer(
resourceAnalyzer stats.ResourceAnalyzer,
auth AuthInterface,
enableDebuggingHandlers bool,
runtime kubecontainer.Runtime) Server {
runtime kubecontainer.Runtime,
criHandler http.Handler) Server {
server := Server{
host: host,
resourceAnalyzer: resourceAnalyzer,
@ -204,7 +206,7 @@ func NewServer(
}
server.InstallDefaultHandlers()
if enableDebuggingHandlers {
server.InstallDebuggingHandlers()
server.InstallDebuggingHandlers(criHandler)
}
return server
}
@ -282,7 +284,7 @@ func (s *Server) InstallDefaultHandlers() {
const pprofBasePath = "/debug/pprof/"
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers() {
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
var ws *restful.WebService
ws = new(restful.WebService)
@ -393,14 +395,10 @@ func (s *Server) InstallDebuggingHandlers() {
To(s.getRunningPods).
Operation("getRunningPods"))
s.restfulCont.Add(ws)
}
type httpHandler struct {
f func(w http.ResponseWriter, r *http.Request)
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.f(w, r)
if criHandler != nil {
s.restfulCont.Handle("/cri/", criHandler)
}
}
// Checks if kubelet's sync loop that updates containers is working.
@ -701,8 +699,12 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return
}
if len(params.podUID) > 0 && pod.UID != params.podUID {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod not found"))
return
}
redirect, err := s.host.GetPortForward(params.podName, params.podNamespace, params.podUID)
redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID)
if err != nil {
response.WriteError(streaming.HTTPStatus(err), err)
return

View File

@ -53,9 +53,14 @@ import (
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/volume"
)
const (
testUID = "9b01b80f-8fb4-11e4-95ab-4200af06647"
)
type fakeKubelet struct {
podByNameFunc func(namespace, name string) (*api.Pod, bool)
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
@ -196,6 +201,7 @@ type serverTestFramework struct {
fakeKubelet *fakeKubelet
fakeAuth *fakeAuth
testHTTPServer *httptest.Server
criHandler *utiltesting.FakeHandler
}
func newServerTest() *serverTestFramework {
@ -209,6 +215,7 @@ func newServerTest() *serverTestFramework {
ObjectMeta: api.ObjectMeta{
Namespace: namespace,
Name: name,
UID: testUID,
},
}, true
},
@ -225,12 +232,16 @@ func newServerTest() *serverTestFramework {
return true, "", nil
},
}
fw.criHandler = &utiltesting.FakeHandler{
StatusCode: http.StatusOK,
}
server := NewServer(
fw.fakeKubelet,
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &kubecontainertesting.FakeRuntime{}),
fw.fakeAuth,
true,
&kubecontainertesting.Mock{})
&kubecontainertesting.Mock{},
fw.criHandler)
fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw
@ -296,15 +307,14 @@ func TestContainerInfoWithUidNamespace(t *testing.T) {
expectedNamespace := "custom"
expectedPodID := getPodName(podID, expectedNamespace)
expectedContainerName := "goodcontainer"
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
if podID != expectedPodID || string(uid) != expectedUid || containerName != expectedContainerName {
if podID != expectedPodID || string(uid) != testUID || containerName != expectedContainerName {
return nil, fmt.Errorf("bad podID or uid or containerName: podID=%v; uid=%v; containerName=%v", podID, uid, containerName)
}
return expectedInfo, nil
}
resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName))
resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, testUID, expectedContainerName))
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
@ -325,11 +335,10 @@ func TestContainerNotFound(t *testing.T) {
podID := "somepod"
expectedNamespace := "custom"
expectedContainerName := "slowstartcontainer"
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
return nil, kubecontainer.ErrContainerNotFound
}
resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName))
resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, testUID, expectedContainerName))
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
@ -517,15 +526,14 @@ func TestServeRunInContainerWithUID(t *testing.T) {
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedUID := "7e00838d_-_3523_-_11e4_-_8421_-_42010af0a720"
expectedContainerName := "baz"
expectedCommand := "ls -a"
fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
if podFullName != expectedPodName {
t.Errorf("expected %s, got %s", expectedPodName, podFullName)
}
if string(uid) != expectedUID {
t.Errorf("expected %s, got %s", expectedUID, uid)
if string(uid) != testUID {
t.Errorf("expected %s, got %s", testUID, uid)
}
if containerName != expectedContainerName {
t.Errorf("expected %s, got %s", expectedContainerName, containerName)
@ -537,7 +545,7 @@ func TestServeRunInContainerWithUID(t *testing.T) {
return []byte(output), nil
}
resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
if err != nil {
t.Fatalf("Got error POSTing: %v", err)
@ -645,7 +653,8 @@ func TestAuthFilters(t *testing.T) {
isSubpath(path, "/pods"),
isSubpath(path, "/portForward"),
isSubpath(path, "/run"),
isSubpath(path, "/runningpods"):
isSubpath(path, "/runningpods"),
isSubpath(path, "/cri"):
return "proxy"
default:
@ -1182,7 +1191,6 @@ func testExecAttach(t *testing.T, verb string) {
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
expectedContainerName := "baz"
expectedCommand := "ls -a"
expectedStdin := "stdin"
@ -1200,8 +1208,8 @@ func testExecAttach(t *testing.T, verb string) {
if podFullName != expectedPodName {
t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName)
}
if test.uid && string(uid) != expectedUid {
t.Fatalf("%d: uid: expected %v, got %v", i, expectedUid, uid)
if test.uid && string(uid) != testUID {
t.Fatalf("%d: uid: expected %v, got %v", i, testUID, uid)
}
if containerName != expectedContainerName {
t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName)
@ -1273,7 +1281,7 @@ func testExecAttach(t *testing.T, verb string) {
var url string
if test.uid {
url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedUid + "/" + expectedContainerName + "?ignore=1"
url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + testUID + "/" + expectedContainerName + "?ignore=1"
} else {
url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1"
}
@ -1491,7 +1499,6 @@ func TestServePortForward(t *testing.T) {
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
for i, test := range tests {
fw := newServerTest()
@ -1516,7 +1523,7 @@ func TestServePortForward(t *testing.T) {
t.Fatalf("%d: pod name: expected '%v', got '%v'", i, e, a)
}
if e, a := expectedUid, uid; test.uid && e != string(a) {
if e, a := testUID, uid; test.uid && e != string(a) {
t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a)
}
@ -1551,7 +1558,7 @@ func TestServePortForward(t *testing.T) {
var url string
if test.uid {
url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, expectedUid)
url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, testUID)
} else {
url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName)
}
@ -1629,3 +1636,19 @@ func TestServePortForward(t *testing.T) {
<-portForwardFuncDone
}
}
func TestCRIHandler(t *testing.T) {
fw := newServerTest()
defer fw.testHTTPServer.Close()
const (
path = "/cri/exec/123456abcdef"
query = "cmd=echo+foo"
)
resp, err := http.Get(fw.testHTTPServer.URL + path + "?" + query)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "GET", fw.criHandler.RequestReceived.Method)
assert.Equal(t, path, fw.criHandler.RequestReceived.URL.Path)
assert.Equal(t, query, fw.criHandler.RequestReceived.URL.RawQuery)
}

View File

@ -18,6 +18,7 @@ go_library(
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
@ -26,7 +27,6 @@ go_library(
"//vendor:github.com/emicklei/go-restful",
"//vendor:google.golang.org/grpc",
"//vendor:google.golang.org/grpc/codes",
"//vendor:k8s.io/client-go/pkg/api",
],
)

View File

@ -19,15 +19,15 @@ package streaming
import (
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"time"
restful "github.com/emicklei/go-restful"
"k8s.io/client-go/pkg/api"
"k8s.io/kubernetes/pkg/api"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
@ -39,7 +39,7 @@ import (
type Server interface {
http.Handler
// Get the serving URL for the requests. Server must be started before these are called.
// Get the serving URL for the requests.
// Requests must not be nil. Responses may be nil iff an error is returned.
GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
GetAttach(req *runtimeapi.AttachRequest, tty bool) (*runtimeapi.AttachResponse, error)
@ -66,6 +66,9 @@ type Runtime interface {
type Config struct {
// The host:port address the server will listen on.
Addr string
// The optional base URL for constructing streaming URLs. If empty, the baseURL will be
// constructed from the serve address.
BaseURL *url.URL
// How long to leave idle connections open for.
StreamIdleTimeout time.Duration
@ -96,6 +99,16 @@ func NewServer(config Config, runtime Runtime) (Server, error) {
runtime: &criAdapter{runtime},
}
if s.config.BaseURL == nil {
s.config.BaseURL = &url.URL{
Scheme: "http",
Host: s.config.Addr,
}
if s.config.TLSConfig != nil {
s.config.BaseURL.Scheme = "https"
}
}
ws := &restful.WebService{}
endpoints := []struct {
path string
@ -105,11 +118,13 @@ func NewServer(config Config, runtime Runtime) (Server, error) {
{"/attach/{containerID}", s.serveAttach},
{"/portforward/{podSandboxID}", s.servePortForward},
}
// If serving relative to a base path, set that here.
pathPrefix := path.Dir(s.config.BaseURL.Path)
for _, e := range endpoints {
for _, method := range []string{"GET", "POST"} {
ws.Route(ws.
Method(method).
Path(e.path).
Path(path.Join(pathPrefix, e.path)).
To(e.handler))
}
}
@ -204,13 +219,8 @@ const (
)
func (s *server) buildURL(method, id string, opts streamOpts) string {
loc := url.URL{
Scheme: "http",
Host: s.config.Addr,
Path: fmt.Sprintf("/%s/%s", method, id),
}
if s.config.TLSConfig != nil {
loc.Scheme = "https"
loc := &url.URL{
Path: path.Join(method, id),
}
query := url.Values{}
@ -231,7 +241,7 @@ func (s *server) buildURL(method, id string, opts streamOpts) string {
}
loc.RawQuery = query.Encode()
return loc.String()
return s.config.BaseURL.ResolveReference(loc).String()
}
func (s *server) serveExec(req *restful.Request, resp *restful.Response) {

View File

@ -68,6 +68,17 @@ func TestGetExec(t *testing.T) {
}, nil)
assert.NoError(t, err)
const pathPrefix = "cri/shim"
prefixServer, err := NewServer(Config{
Addr: testAddr,
BaseURL: &url.URL{
Scheme: "http",
Host: testAddr,
Path: "/" + pathPrefix + "/",
},
}, nil)
assert.NoError(t, err)
containerID := testContainerID
for _, test := range testcases {
request := &runtimeapi.ExecRequest{
@ -87,6 +98,12 @@ func TestGetExec(t *testing.T) {
assert.NoError(t, err, "testcase=%+v", test)
expectedURL = "https://" + testAddr + "/exec/" + testContainerID + test.expectedQuery
assert.Equal(t, expectedURL, resp.GetUrl(), "testcase=%+v", test)
// Path prefix
resp, err = prefixServer.GetExec(request)
assert.NoError(t, err, "testcase=%+v", test)
expectedURL = "http://" + testAddr + "/" + pathPrefix + "/exec/" + testContainerID + test.expectedQuery
assert.Equal(t, expectedURL, resp.GetUrl(), "testcase=%+v", test)
}
}