Use indirect streaming path for dockershim & remote CRI runtime

This commit is contained in:
Tim St. Clair 2016-11-04 11:50:51 -07:00
parent 0f028ff660
commit 7badc1d226
No known key found for this signature in database
GPG Key ID: 434D16BCEF479EAB
12 changed files with 139 additions and 54 deletions

View File

@ -77,6 +77,7 @@ go_library(
"//pkg/kubelet/server:go_default_library", "//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library", "//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
"//pkg/kubelet/status:go_default_library", "//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/sysctl:go_default_library", "//pkg/kubelet/sysctl:go_default_library",
"//pkg/kubelet/types:go_default_library", "//pkg/kubelet/types:go_default_library",

View File

@ -21,7 +21,6 @@ go_library(
"docker_service.go", "docker_service.go",
"docker_streaming.go", "docker_streaming.go",
"helpers.go", "helpers.go",
"legacy.go",
"naming.go", "naming.go",
"security_context.go", "security_context.go",
], ],

View File

@ -18,6 +18,7 @@ package dockershim
import ( import (
"fmt" "fmt"
"net/http"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -136,6 +137,8 @@ type DockerService interface {
internalApi.RuntimeService internalApi.RuntimeService
internalApi.ImageManagerService internalApi.ImageManagerService
Start() error Start() error
// For serving streaming calls.
http.Handler
} }
type dockerService struct { type dockerService struct {
@ -236,3 +239,11 @@ func (ds *dockerService) Status() (*runtimeApi.RuntimeStatus, error) {
} }
return &runtimeApi.RuntimeStatus{Conditions: conditions}, nil return &runtimeApi.RuntimeStatus{Conditions: conditions}, nil
} }
func (ds *dockerService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ds.streamingServer != nil {
ds.streamingServer.ServeHTTP(w, r)
} else {
http.NotFound(w, r)
}
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package remote package remote
import ( import (
"fmt"
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -165,15 +164,15 @@ func (d *dockerService) ExecSync(ctx context.Context, r *runtimeApi.ExecSyncRequ
} }
func (d *dockerService) Exec(ctx context.Context, r *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) { func (d *dockerService) Exec(ctx context.Context, r *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
return nil, fmt.Errorf("not implemented") return d.runtimeService.Exec(r)
} }
func (d *dockerService) Attach(ctx context.Context, r *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) { func (d *dockerService) Attach(ctx context.Context, r *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
return nil, fmt.Errorf("not implemented") return d.runtimeService.Attach(r)
} }
func (d *dockerService) PortForward(ctx context.Context, r *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) { func (d *dockerService) PortForward(ctx context.Context, r *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
return nil, fmt.Errorf("not implemented") return d.runtimeService.PortForward(r)
} }
func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeApi.UpdateRuntimeConfigRequest) (*runtimeApi.UpdateRuntimeConfigResponse, error) { func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeApi.UpdateRuntimeConfigRequest) (*runtimeApi.UpdateRuntimeConfigResponse, error) {

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"net/url"
"os" "os"
"path" "path"
"sort" "sort"
@ -62,6 +63,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/server" "k8s.io/kubernetes/pkg/kubelet/server"
"k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/sysctl" "k8s.io/kubernetes/pkg/kubelet/sysctl"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -527,8 +529,9 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
switch kubeCfg.ContainerRuntime { switch kubeCfg.ContainerRuntime {
case "docker": case "docker":
streamingConfig := getStreamingConfig(kubeCfg, kubeDeps)
// Use the new CRI shim for docker. // Use the new CRI shim for docker.
ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings, kubeCfg.RuntimeCgroups) ds, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, streamingConfig, &pluginSettings, kubeCfg.RuntimeCgroups)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -538,6 +541,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
return nil, err return nil, err
} }
klet.criHandler = ds
rs := ds.(internalApi.RuntimeService) rs := ds.(internalApi.RuntimeService)
is := ds.(internalApi.ImageManagerService) is := ds.(internalApi.ImageManagerService)
// This is an internal knob to switch between grpc and non-grpc // This is an internal knob to switch between grpc and non-grpc
@ -1074,6 +1078,9 @@ type Kubelet struct {
// The AppArmor validator for checking whether AppArmor is supported. // The AppArmor validator for checking whether AppArmor is supported.
appArmorValidator apparmor.Validator appArmorValidator apparmor.Validator
// The handler serving CRI streaming calls (exec/attach/port-forward).
criHandler http.Handler
} }
// setupDataDirs creates: // setupDataDirs creates:
@ -2064,7 +2071,7 @@ func (kl *Kubelet) ResyncInterval() time.Duration {
// ListenAndServe runs the kubelet HTTP server. // ListenAndServe runs the kubelet HTTP server.
func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) { func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime) server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime, kl.criHandler)
} }
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode. // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
@ -2130,3 +2137,20 @@ func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap)
} }
return reservation, nil return reservation, nil
} }
// Gets the streaming server configuration to use with in-process CRI shims.
func getStreamingConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps) *streaming.Config {
config := &streaming.Config{
// Use a relative redirect (no scheme or host).
BaseURL: &url.URL{
Path: "/cri/",
},
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedProtocols: streaming.DefaultConfig.SupportedProtocols,
}
if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config
}
return config
}

View File

@ -36,7 +36,6 @@ go_library(
"//pkg/kubelet/api:go_default_library", "//pkg/kubelet/api:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/events:go_default_library", "//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/images:go_default_library", "//pkg/kubelet/images:go_default_library",
@ -56,7 +55,6 @@ go_library(
"//pkg/util/runtime:go_default_library", "//pkg/util/runtime:go_default_library",
"//pkg/util/selinux:go_default_library", "//pkg/util/selinux:go_default_library",
"//pkg/util/sets:go_default_library", "//pkg/util/sets:go_default_library",
"//pkg/util/term:go_default_library",
"//vendor:github.com/coreos/go-semver/semver", "//vendor:github.com/coreos/go-semver/semver",
"//vendor:github.com/docker/docker/pkg/jsonlog", "//vendor:github.com/docker/docker/pkg/jsonlog",
"//vendor:github.com/fsnotify/fsnotify", "//vendor:github.com/fsnotify/fsnotify",

View File

@ -74,6 +74,7 @@ go_test(
"//pkg/util/httpstream/spdy:go_default_library", "//pkg/util/httpstream/spdy:go_default_library",
"//pkg/util/sets:go_default_library", "//pkg/util/sets:go_default_library",
"//pkg/util/term:go_default_library", "//pkg/util/term:go_default_library",
"//pkg/util/testing:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//vendor:github.com/google/cadvisor/info/v1", "//vendor:github.com/google/cadvisor/info/v1",
"//vendor:github.com/google/cadvisor/info/v2", "//vendor:github.com/google/cadvisor/info/v2",

View File

@ -118,9 +118,10 @@ func ListenAndServeKubeletServer(
tlsOptions *TLSOptions, tlsOptions *TLSOptions,
auth AuthInterface, auth AuthInterface,
enableDebuggingHandlers bool, enableDebuggingHandlers bool,
runtime kubecontainer.Runtime) { runtime kubecontainer.Runtime,
criHandler http.Handler) {
glog.Infof("Starting to listen on %s:%d", address, port) glog.Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime) handler := NewServer(host, resourceAnalyzer, auth, enableDebuggingHandlers, runtime, criHandler)
s := &http.Server{ s := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler, Handler: &handler,
@ -137,7 +138,7 @@ func ListenAndServeKubeletServer(
// ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet. // ListenAndServeKubeletReadOnlyServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) { func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer stats.ResourceAnalyzer, address net.IP, port uint, runtime kubecontainer.Runtime) {
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port) glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
s := NewServer(host, resourceAnalyzer, nil, false, runtime) s := NewServer(host, resourceAnalyzer, nil, false, runtime, nil)
server := &http.Server{ server := &http.Server{
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
@ -191,7 +192,8 @@ func NewServer(
resourceAnalyzer stats.ResourceAnalyzer, resourceAnalyzer stats.ResourceAnalyzer,
auth AuthInterface, auth AuthInterface,
enableDebuggingHandlers bool, enableDebuggingHandlers bool,
runtime kubecontainer.Runtime) Server { runtime kubecontainer.Runtime,
criHandler http.Handler) Server {
server := Server{ server := Server{
host: host, host: host,
resourceAnalyzer: resourceAnalyzer, resourceAnalyzer: resourceAnalyzer,
@ -204,7 +206,7 @@ func NewServer(
} }
server.InstallDefaultHandlers() server.InstallDefaultHandlers()
if enableDebuggingHandlers { if enableDebuggingHandlers {
server.InstallDebuggingHandlers() server.InstallDebuggingHandlers(criHandler)
} }
return server return server
} }
@ -282,7 +284,7 @@ func (s *Server) InstallDefaultHandlers() {
const pprofBasePath = "/debug/pprof/" const pprofBasePath = "/debug/pprof/"
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers // InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers() { func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
var ws *restful.WebService var ws *restful.WebService
ws = new(restful.WebService) ws = new(restful.WebService)
@ -393,14 +395,10 @@ func (s *Server) InstallDebuggingHandlers() {
To(s.getRunningPods). To(s.getRunningPods).
Operation("getRunningPods")) Operation("getRunningPods"))
s.restfulCont.Add(ws) s.restfulCont.Add(ws)
}
type httpHandler struct { if criHandler != nil {
f func(w http.ResponseWriter, r *http.Request) s.restfulCont.Handle("/cri/", criHandler)
} }
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.f(w, r)
} }
// Checks if kubelet's sync loop that updates containers is working. // Checks if kubelet's sync loop that updates containers is working.
@ -701,8 +699,12 @@ func (s *Server) getPortForward(request *restful.Request, response *restful.Resp
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist")) response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
return return
} }
if len(params.podUID) > 0 && pod.UID != params.podUID {
response.WriteError(http.StatusNotFound, fmt.Errorf("pod not found"))
return
}
redirect, err := s.host.GetPortForward(params.podName, params.podNamespace, params.podUID) redirect, err := s.host.GetPortForward(pod.Name, pod.Namespace, pod.UID)
if err != nil { if err != nil {
response.WriteError(streaming.HTTPStatus(err), err) response.WriteError(streaming.HTTPStatus(err), err)
return return

View File

@ -53,9 +53,14 @@ import (
"k8s.io/kubernetes/pkg/util/httpstream/spdy" "k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/term" "k8s.io/kubernetes/pkg/util/term"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
) )
const (
testUID = "9b01b80f-8fb4-11e4-95ab-4200af06647"
)
type fakeKubelet struct { type fakeKubelet struct {
podByNameFunc func(namespace, name string) (*api.Pod, bool) podByNameFunc func(namespace, name string) (*api.Pod, bool)
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error)
@ -196,6 +201,7 @@ type serverTestFramework struct {
fakeKubelet *fakeKubelet fakeKubelet *fakeKubelet
fakeAuth *fakeAuth fakeAuth *fakeAuth
testHTTPServer *httptest.Server testHTTPServer *httptest.Server
criHandler *utiltesting.FakeHandler
} }
func newServerTest() *serverTestFramework { func newServerTest() *serverTestFramework {
@ -209,6 +215,7 @@ func newServerTest() *serverTestFramework {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Namespace: namespace, Namespace: namespace,
Name: name, Name: name,
UID: testUID,
}, },
}, true }, true
}, },
@ -225,12 +232,16 @@ func newServerTest() *serverTestFramework {
return true, "", nil return true, "", nil
}, },
} }
fw.criHandler = &utiltesting.FakeHandler{
StatusCode: http.StatusOK,
}
server := NewServer( server := NewServer(
fw.fakeKubelet, fw.fakeKubelet,
stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &kubecontainertesting.FakeRuntime{}), stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &kubecontainertesting.FakeRuntime{}),
fw.fakeAuth, fw.fakeAuth,
true, true,
&kubecontainertesting.Mock{}) &kubecontainertesting.Mock{},
fw.criHandler)
fw.serverUnderTest = &server fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw return fw
@ -296,15 +307,14 @@ func TestContainerInfoWithUidNamespace(t *testing.T) {
expectedNamespace := "custom" expectedNamespace := "custom"
expectedPodID := getPodName(podID, expectedNamespace) expectedPodID := getPodName(podID, expectedNamespace)
expectedContainerName := "goodcontainer" expectedContainerName := "goodcontainer"
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) { fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
if podID != expectedPodID || string(uid) != expectedUid || containerName != expectedContainerName { if podID != expectedPodID || string(uid) != testUID || containerName != expectedContainerName {
return nil, fmt.Errorf("bad podID or uid or containerName: podID=%v; uid=%v; containerName=%v", podID, uid, containerName) return nil, fmt.Errorf("bad podID or uid or containerName: podID=%v; uid=%v; containerName=%v", podID, uid, containerName)
} }
return expectedInfo, nil return expectedInfo, nil
} }
resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName)) resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, testUID, expectedContainerName))
if err != nil { if err != nil {
t.Fatalf("Got error GETing: %v", err) t.Fatalf("Got error GETing: %v", err)
} }
@ -325,11 +335,10 @@ func TestContainerNotFound(t *testing.T) {
podID := "somepod" podID := "somepod"
expectedNamespace := "custom" expectedNamespace := "custom"
expectedContainerName := "slowstartcontainer" expectedContainerName := "slowstartcontainer"
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) { fw.fakeKubelet.containerInfoFunc = func(podID string, uid types.UID, containerName string, req *cadvisorapi.ContainerInfoRequest) (*cadvisorapi.ContainerInfo, error) {
return nil, kubecontainer.ErrContainerNotFound return nil, kubecontainer.ErrContainerNotFound
} }
resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, expectedUid, expectedContainerName)) resp, err := http.Get(fw.testHTTPServer.URL + fmt.Sprintf("/stats/%v/%v/%v/%v", expectedNamespace, podID, testUID, expectedContainerName))
if err != nil { if err != nil {
t.Fatalf("Got error GETing: %v", err) t.Fatalf("Got error GETing: %v", err)
} }
@ -517,15 +526,14 @@ func TestServeRunInContainerWithUID(t *testing.T) {
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
expectedPodName := getPodName(podName, podNamespace) expectedPodName := getPodName(podName, podNamespace)
expectedUID := "7e00838d_-_3523_-_11e4_-_8421_-_42010af0a720"
expectedContainerName := "baz" expectedContainerName := "baz"
expectedCommand := "ls -a" expectedCommand := "ls -a"
fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) { fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
if podFullName != expectedPodName { if podFullName != expectedPodName {
t.Errorf("expected %s, got %s", expectedPodName, podFullName) t.Errorf("expected %s, got %s", expectedPodName, podFullName)
} }
if string(uid) != expectedUID { if string(uid) != testUID {
t.Errorf("expected %s, got %s", expectedUID, uid) t.Errorf("expected %s, got %s", testUID, uid)
} }
if containerName != expectedContainerName { if containerName != expectedContainerName {
t.Errorf("expected %s, got %s", expectedContainerName, containerName) t.Errorf("expected %s, got %s", expectedContainerName, containerName)
@ -537,7 +545,7 @@ func TestServeRunInContainerWithUID(t *testing.T) {
return []byte(output), nil return []byte(output), nil
} }
resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil) resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
if err != nil { if err != nil {
t.Fatalf("Got error POSTing: %v", err) t.Fatalf("Got error POSTing: %v", err)
@ -645,7 +653,8 @@ func TestAuthFilters(t *testing.T) {
isSubpath(path, "/pods"), isSubpath(path, "/pods"),
isSubpath(path, "/portForward"), isSubpath(path, "/portForward"),
isSubpath(path, "/run"), isSubpath(path, "/run"),
isSubpath(path, "/runningpods"): isSubpath(path, "/runningpods"),
isSubpath(path, "/cri"):
return "proxy" return "proxy"
default: default:
@ -1182,7 +1191,6 @@ func testExecAttach(t *testing.T, verb string) {
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
expectedPodName := getPodName(podName, podNamespace) expectedPodName := getPodName(podName, podNamespace)
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
expectedContainerName := "baz" expectedContainerName := "baz"
expectedCommand := "ls -a" expectedCommand := "ls -a"
expectedStdin := "stdin" expectedStdin := "stdin"
@ -1200,8 +1208,8 @@ func testExecAttach(t *testing.T, verb string) {
if podFullName != expectedPodName { if podFullName != expectedPodName {
t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName) t.Fatalf("%d: podFullName: expected %s, got %s", i, expectedPodName, podFullName)
} }
if test.uid && string(uid) != expectedUid { if test.uid && string(uid) != testUID {
t.Fatalf("%d: uid: expected %v, got %v", i, expectedUid, uid) t.Fatalf("%d: uid: expected %v, got %v", i, testUID, uid)
} }
if containerName != expectedContainerName { if containerName != expectedContainerName {
t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName) t.Fatalf("%d: containerName: expected %s, got %s", i, expectedContainerName, containerName)
@ -1273,7 +1281,7 @@ func testExecAttach(t *testing.T, verb string) {
var url string var url string
if test.uid { if test.uid {
url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedUid + "/" + expectedContainerName + "?ignore=1" url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + testUID + "/" + expectedContainerName + "?ignore=1"
} else { } else {
url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1" url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1"
} }
@ -1491,7 +1499,6 @@ func TestServePortForward(t *testing.T) {
podNamespace := "other" podNamespace := "other"
podName := "foo" podName := "foo"
expectedPodName := getPodName(podName, podNamespace) expectedPodName := getPodName(podName, podNamespace)
expectedUid := "9b01b80f-8fb4-11e4-95ab-4200af06647"
for i, test := range tests { for i, test := range tests {
fw := newServerTest() fw := newServerTest()
@ -1516,7 +1523,7 @@ func TestServePortForward(t *testing.T) {
t.Fatalf("%d: pod name: expected '%v', got '%v'", i, e, a) t.Fatalf("%d: pod name: expected '%v', got '%v'", i, e, a)
} }
if e, a := expectedUid, uid; test.uid && e != string(a) { if e, a := testUID, uid; test.uid && e != string(a) {
t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a) t.Fatalf("%d: uid: expected '%v', got '%v'", i, e, a)
} }
@ -1551,7 +1558,7 @@ func TestServePortForward(t *testing.T) {
var url string var url string
if test.uid { if test.uid {
url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, expectedUid) url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, testUID)
} else { } else {
url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName) url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName)
} }
@ -1629,3 +1636,19 @@ func TestServePortForward(t *testing.T) {
<-portForwardFuncDone <-portForwardFuncDone
} }
} }
func TestCRIHandler(t *testing.T) {
fw := newServerTest()
defer fw.testHTTPServer.Close()
const (
path = "/cri/exec/123456abcdef"
query = "cmd=echo+foo"
)
resp, err := http.Get(fw.testHTTPServer.URL + path + "?" + query)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "GET", fw.criHandler.RequestReceived.Method)
assert.Equal(t, path, fw.criHandler.RequestReceived.URL.Path)
assert.Equal(t, query, fw.criHandler.RequestReceived.URL.RawQuery)
}

View File

@ -18,6 +18,7 @@ go_library(
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library", "//pkg/kubelet/server/portforward:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library", "//pkg/kubelet/server/remotecommand:go_default_library",
@ -26,7 +27,6 @@ go_library(
"//vendor:github.com/emicklei/go-restful", "//vendor:github.com/emicklei/go-restful",
"//vendor:google.golang.org/grpc", "//vendor:google.golang.org/grpc",
"//vendor:google.golang.org/grpc/codes", "//vendor:google.golang.org/grpc/codes",
"//vendor:k8s.io/client-go/pkg/api",
], ],
) )

View File

@ -19,15 +19,15 @@ package streaming
import ( import (
"crypto/tls" "crypto/tls"
"errors" "errors"
"fmt"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"path"
"time" "time"
restful "github.com/emicklei/go-restful" restful "github.com/emicklei/go-restful"
"k8s.io/client-go/pkg/api" "k8s.io/kubernetes/pkg/api"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/server/portforward" "k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand" "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
@ -39,7 +39,7 @@ import (
type Server interface { type Server interface {
http.Handler http.Handler
// Get the serving URL for the requests. Server must be started before these are called. // Get the serving URL for the requests.
// Requests must not be nil. Responses may be nil iff an error is returned. // Requests must not be nil. Responses may be nil iff an error is returned.
GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) GetExec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
GetAttach(req *runtimeapi.AttachRequest, tty bool) (*runtimeapi.AttachResponse, error) GetAttach(req *runtimeapi.AttachRequest, tty bool) (*runtimeapi.AttachResponse, error)
@ -66,6 +66,9 @@ type Runtime interface {
type Config struct { type Config struct {
// The host:port address the server will listen on. // The host:port address the server will listen on.
Addr string Addr string
// The optional base URL for constructing streaming URLs. If empty, the baseURL will be
// constructed from the serve address.
BaseURL *url.URL
// How long to leave idle connections open for. // How long to leave idle connections open for.
StreamIdleTimeout time.Duration StreamIdleTimeout time.Duration
@ -96,6 +99,16 @@ func NewServer(config Config, runtime Runtime) (Server, error) {
runtime: &criAdapter{runtime}, runtime: &criAdapter{runtime},
} }
if s.config.BaseURL == nil {
s.config.BaseURL = &url.URL{
Scheme: "http",
Host: s.config.Addr,
}
if s.config.TLSConfig != nil {
s.config.BaseURL.Scheme = "https"
}
}
ws := &restful.WebService{} ws := &restful.WebService{}
endpoints := []struct { endpoints := []struct {
path string path string
@ -105,11 +118,13 @@ func NewServer(config Config, runtime Runtime) (Server, error) {
{"/attach/{containerID}", s.serveAttach}, {"/attach/{containerID}", s.serveAttach},
{"/portforward/{podSandboxID}", s.servePortForward}, {"/portforward/{podSandboxID}", s.servePortForward},
} }
// If serving relative to a base path, set that here.
pathPrefix := path.Dir(s.config.BaseURL.Path)
for _, e := range endpoints { for _, e := range endpoints {
for _, method := range []string{"GET", "POST"} { for _, method := range []string{"GET", "POST"} {
ws.Route(ws. ws.Route(ws.
Method(method). Method(method).
Path(e.path). Path(path.Join(pathPrefix, e.path)).
To(e.handler)) To(e.handler))
} }
} }
@ -204,13 +219,8 @@ const (
) )
func (s *server) buildURL(method, id string, opts streamOpts) string { func (s *server) buildURL(method, id string, opts streamOpts) string {
loc := url.URL{ loc := &url.URL{
Scheme: "http", Path: path.Join(method, id),
Host: s.config.Addr,
Path: fmt.Sprintf("/%s/%s", method, id),
}
if s.config.TLSConfig != nil {
loc.Scheme = "https"
} }
query := url.Values{} query := url.Values{}
@ -231,7 +241,7 @@ func (s *server) buildURL(method, id string, opts streamOpts) string {
} }
loc.RawQuery = query.Encode() loc.RawQuery = query.Encode()
return loc.String() return s.config.BaseURL.ResolveReference(loc).String()
} }
func (s *server) serveExec(req *restful.Request, resp *restful.Response) { func (s *server) serveExec(req *restful.Request, resp *restful.Response) {

View File

@ -68,6 +68,17 @@ func TestGetExec(t *testing.T) {
}, nil) }, nil)
assert.NoError(t, err) assert.NoError(t, err)
const pathPrefix = "cri/shim"
prefixServer, err := NewServer(Config{
Addr: testAddr,
BaseURL: &url.URL{
Scheme: "http",
Host: testAddr,
Path: "/" + pathPrefix + "/",
},
}, nil)
assert.NoError(t, err)
containerID := testContainerID containerID := testContainerID
for _, test := range testcases { for _, test := range testcases {
request := &runtimeapi.ExecRequest{ request := &runtimeapi.ExecRequest{
@ -87,6 +98,12 @@ func TestGetExec(t *testing.T) {
assert.NoError(t, err, "testcase=%+v", test) assert.NoError(t, err, "testcase=%+v", test)
expectedURL = "https://" + testAddr + "/exec/" + testContainerID + test.expectedQuery expectedURL = "https://" + testAddr + "/exec/" + testContainerID + test.expectedQuery
assert.Equal(t, expectedURL, resp.GetUrl(), "testcase=%+v", test) assert.Equal(t, expectedURL, resp.GetUrl(), "testcase=%+v", test)
// Path prefix
resp, err = prefixServer.GetExec(request)
assert.NoError(t, err, "testcase=%+v", test)
expectedURL = "http://" + testAddr + "/" + pathPrefix + "/exec/" + testContainerID + test.expectedQuery
assert.Equal(t, expectedURL, resp.GetUrl(), "testcase=%+v", test)
} }
} }