diff --git a/pkg/master/master.go b/pkg/master/master.go index 8d750dfbed6..8624443c5e7 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -387,11 +387,14 @@ func (m *Master) init(c *Config) { // TODO: Factor out the core API registration m.storage = map[string]rest.Storage{ - "pods": podStorage.Pod, - "pods/status": podStorage.Status, - "pods/log": podStorage.Log, - "pods/binding": podStorage.Binding, - "bindings": podStorage.Binding, + "pods": podStorage.Pod, + "pods/status": podStorage.Status, + "pods/log": podStorage.Log, + "pods/exec": podStorage.Exec, + "pods/portforward": podStorage.PortForward, + "pods/proxy": podStorage.Proxy, + "pods/binding": podStorage.Binding, + "bindings": podStorage.Binding, "replicationControllers": controllerStorage, "services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName), diff --git a/pkg/registry/generic/rest/proxy.go b/pkg/registry/generic/rest/proxy.go new file mode 100644 index 00000000000..fdee893c79f --- /dev/null +++ b/pkg/registry/generic/rest/proxy.go @@ -0,0 +1,197 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "net/http/httputil" + "net/url" + "strings" + "sync" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/proxy" + + "github.com/GoogleCloudPlatform/kubernetes/third_party/golang/netutil" + "github.com/golang/glog" +) + +// UpgradeAwareProxyHandler is a handler for proxy requests that may require an upgrade +type UpgradeAwareProxyHandler struct { + UpgradeRequired bool + Location *url.URL + Transport http.RoundTripper + FlushInterval time.Duration + err error +} + +const defaultFlushInterval = 200 * time.Millisecond + +// NewUpgradeAwareProxyHandler creates a new proxy handler with a default flush interval +func NewUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *UpgradeAwareProxyHandler { + return &UpgradeAwareProxyHandler{ + Location: location, + Transport: transport, + UpgradeRequired: upgradeRequired, + FlushInterval: defaultFlushInterval, + } +} + +// RequestError returns an error that occurred while handling request +func (h *UpgradeAwareProxyHandler) RequestError() error { + return h.err +} + +// ServeHTTP handles the proxy request +func (h *UpgradeAwareProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + h.err = nil + if len(h.Location.Scheme) == 0 { + h.Location.Scheme = "http" + } + if h.tryUpgrade(w, req) { + return + } + if h.UpgradeRequired { + h.err = errors.NewBadRequest("Upgrade request required") + return + } + + if h.Transport == nil { + h.Transport = h.defaultProxyTransport(req.URL) + } + + loc := *h.Location + loc.RawQuery = req.URL.RawQuery + newReq, err := http.NewRequest(req.Method, loc.String(), req.Body) + if err != nil { + h.err = err + return + } + newReq.Header = req.Header + + proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: h.Location.Scheme, Host: h.Location.Host}) + proxy.Transport = h.Transport + proxy.FlushInterval = h.FlushInterval + proxy.ServeHTTP(w, newReq) +} + +// tryUpgrade returns true if the request was handled. +func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.Request) bool { + if !httpstream.IsUpgradeRequest(req) { + return false + } + + backendConn, err := h.dialURL() + if err != nil { + h.err = err + return true + } + defer backendConn.Close() + + requestHijackedConn, _, err := w.(http.Hijacker).Hijack() + if err != nil { + h.err = err + return true + } + defer requestHijackedConn.Close() + + newReq, err := http.NewRequest(req.Method, h.Location.String(), req.Body) + if err != nil { + h.err = err + return true + } + newReq.Header = req.Header + + if err = newReq.Write(backendConn); err != nil { + h.err = err + return true + } + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + _, err := io.Copy(backendConn, requestHijackedConn) + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + glog.Errorf("Error proxying data from client to backend: %v", err) + } + wg.Done() + }() + + go func() { + _, err := io.Copy(requestHijackedConn, backendConn) + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + glog.Errorf("Error proxying data from backend to client: %v", err) + } + wg.Done() + }() + + wg.Wait() + return true +} + +func (h *UpgradeAwareProxyHandler) dialURL() (net.Conn, error) { + dialAddr := netutil.CanonicalAddr(h.Location) + + switch h.Location.Scheme { + case "http": + return net.Dial("tcp", dialAddr) + case "https": + // Get the tls config from the transport if we recognize it + var tlsConfig *tls.Config + if h.Transport != nil { + httpTransport, ok := h.Transport.(*http.Transport) + if ok { + tlsConfig = httpTransport.TLSClientConfig + } + } + + // Dial + tlsConn, err := tls.Dial("tcp", dialAddr, tlsConfig) + if err != nil { + return nil, err + } + + // Verify + host, _, _ := net.SplitHostPort(dialAddr) + if err := tlsConn.VerifyHostname(host); err != nil { + tlsConn.Close() + return nil, err + } + + return tlsConn, nil + default: + return nil, fmt.Errorf("Unknown scheme: %s", h.Location.Scheme) + } +} + +func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL) http.RoundTripper { + scheme := url.Scheme + host := url.Host + pathPrepend := strings.TrimRight(url.Path, h.Location.Path) + return &proxy.Transport{ + Scheme: scheme, + Host: host, + PathPrepend: pathPrepend, + } +} diff --git a/pkg/registry/generic/rest/proxy_test.go b/pkg/registry/generic/rest/proxy_test.go new file mode 100644 index 00000000000..615312032cc --- /dev/null +++ b/pkg/registry/generic/rest/proxy_test.go @@ -0,0 +1,242 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "golang.org/x/net/websocket" +) + +type SimpleBackendHandler struct { + requestURL url.URL + requestHeader http.Header + requestBody []byte + requestMethod string + responseBody string + responseHeader map[string]string + t *testing.T +} + +func (s *SimpleBackendHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.requestURL = *req.URL + s.requestHeader = req.Header + s.requestMethod = req.Method + var err error + s.requestBody, err = ioutil.ReadAll(req.Body) + if err != nil { + s.t.Errorf("Unexpected error: %v", err) + return + } + + for k, v := range s.responseHeader { + w.Header().Add(k, v) + } + w.Write([]byte(s.responseBody)) +} + +func validateParameters(t *testing.T, name string, actual url.Values, expected map[string]string) { + for k, v := range expected { + actualValue, ok := actual[k] + if !ok { + t.Errorf("%s: Expected parameter %s not received", name, k) + continue + } + if actualValue[0] != v { + t.Errorf("%s: Parameter %s values don't match. Actual: %#v, Expected: %s", + name, k, actualValue, v) + } + } +} + +func validateHeaders(t *testing.T, name string, actual http.Header, expected map[string]string) { + for k, v := range expected { + actualValue, ok := actual[k] + if !ok { + t.Errorf("%s: Expected header %s not received", name, k) + continue + } + if actualValue[0] != v { + t.Errorf("%s: Header %s values don't match. Actual: %s, Expected: %s", + name, k, actualValue, v) + } + } +} + +func TestServeHTTP(t *testing.T) { + tests := []struct { + name string + method string + requestPath string + requestBody string + requestParams map[string]string + requestHeader map[string]string + }{ + { + name: "root path, simple get", + method: "GET", + requestPath: "/", + }, + { + name: "simple path, get", + method: "GET", + requestPath: "/path/to/test", + }, + { + name: "request params", + method: "POST", + requestPath: "/some/path", + requestParams: map[string]string{"param1": "value/1", "param2": "value%2"}, + requestBody: "test request body", + }, + { + name: "request headers", + method: "PUT", + requestPath: "/some/path", + requestHeader: map[string]string{"Header1": "value1", "Header2": "value2"}, + }, + } + + for _, test := range tests { + func() { + backendResponse := "
Hello" + backendHandler := &SimpleBackendHandler{ + responseBody: backendResponse, + responseHeader: map[string]string{"Content-Type": "text/html"}, + } + backendServer := httptest.NewServer(backendHandler) + defer backendServer.Close() + + backendURL, _ := url.Parse(backendServer.URL) + backendURL.Path = test.requestPath + proxyHandler := &UpgradeAwareProxyHandler{ + Location: backendURL, + } + proxyServer := httptest.NewServer(proxyHandler) + defer proxyServer.Close() + proxyURL, _ := url.Parse(proxyServer.URL) + proxyURL.Path = test.requestPath + paramValues := url.Values{} + for k, v := range test.requestParams { + paramValues[k] = []string{v} + } + proxyURL.RawQuery = paramValues.Encode() + var requestBody io.Reader + if test.requestBody != "" { + requestBody = bytes.NewBufferString(test.requestBody) + } + req, err := http.NewRequest(test.method, proxyURL.String(), requestBody) + if test.requestHeader != nil { + header := http.Header{} + for k, v := range test.requestHeader { + header.Add(k, v) + } + req.Header = header + } + if err != nil { + t.Errorf("Error creating client request: %v", err) + } + client := &http.Client{} + res, err := client.Do(req) + if err != nil { + t.Errorf("Error from proxy request: %v", err) + } + + // Validate backend request + // Method + if backendHandler.requestMethod != test.method { + t.Errorf("Unexpected request method: %s. Expected: %s", + backendHandler.requestMethod, test.method) + } + + // Body + if string(backendHandler.requestBody) != test.requestBody { + t.Errorf("Unexpected request body: %s. Expected: %s", + string(backendHandler.requestBody), test.requestBody) + } + + // Path + if backendHandler.requestURL.Path != test.requestPath { + t.Errorf("Unexpected request path: %s", backendHandler.requestURL.Path) + } + // Parameters + validateParameters(t, test.name, backendHandler.requestURL.Query(), test.requestParams) + + // Headers + validateHeaders(t, test.name+" backend request", backendHandler.requestHeader, + test.requestHeader) + + // Validate proxy response + // Validate Body + responseBody, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("Unexpected error reading response body: %v", err) + } + if rb := string(responseBody); rb != backendResponse { + t.Errorf("Did not get expected response body: %s. Expected: %s", rb, backendResponse) + } + + // Error + err = proxyHandler.RequestError() + if err != nil { + t.Errorf("Unexpected proxy handler error: %v", err) + } + }() + } +} + +func TestProxyUpgrade(t *testing.T) { + backendServer := httptest.NewServer(websocket.Handler(func(ws *websocket.Conn) { + defer ws.Close() + body := make([]byte, 5) + ws.Read(body) + ws.Write([]byte("hello " + string(body))) + })) + defer backendServer.Close() + + serverURL, _ := url.Parse(backendServer.URL) + proxyHandler := &UpgradeAwareProxyHandler{ + Location: serverURL, + } + proxy := httptest.NewServer(proxyHandler) + defer proxy.Close() + + ws, err := websocket.Dial("ws://"+proxy.Listener.Addr().String()+"/some/path", "", "http://127.0.0.1/") + if err != nil { + t.Fatalf("websocket dial err: %s", err) + } + defer ws.Close() + + if _, err := ws.Write([]byte("world")); err != nil { + t.Fatalf("write err: %s", err) + } + + response := make([]byte, 20) + n, err := ws.Read(response) + if err != nil { + t.Fatalf("read err: %s", err) + } + if e, a := "hello world", string(response[0:n]); e != a { + t.Fatalf("expected '%#v', got '%#v'", e, a) + } +} diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 1647b5212cd..a3ab8e7edc1 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "net/url" + "path" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -39,10 +40,13 @@ import ( // PodStorage includes storage for pods and all sub resources type PodStorage struct { - Pod *REST - Binding *BindingREST - Status *StatusREST - Log *LogREST + Pod *REST + Binding *BindingREST + Status *StatusREST + Log *LogREST + Proxy *ProxyREST + Exec *ExecREST + PortForward *PortForwardREST } // REST implements a RESTStorage for pods against etcd @@ -85,10 +89,13 @@ func NewStorage(h tools.EtcdHelper, k client.ConnectionInfoGetter) PodStorage { statusStore.UpdateStrategy = pod.StatusStrategy return PodStorage{ - Pod: &REST{*store}, - Binding: &BindingREST{store: store}, - Status: &StatusREST{store: &statusStore}, - Log: &LogREST{store: store, kubeletConn: k}, + Pod: &REST{*store}, + Binding: &BindingREST{store: store}, + Status: &StatusREST{store: &statusStore}, + Log: &LogREST{store: store, kubeletConn: k}, + Proxy: &ProxyREST{store: store}, + Exec: &ExecREST{store: store, kubeletConn: k}, + PortForward: &PortForwardREST{store: store, kubeletConn: k}, } } @@ -193,6 +200,9 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object return r.store.Update(ctx, obj) } +// Implement GetterWithOptions +var _ = rest.GetterWithOptions(&LogREST{}) + // LogREST implements the log endpoint for a Pod type LogREST struct { store *etcdgeneric.Etcd @@ -230,3 +240,114 @@ func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtim func (r *LogREST) NewGetOptions() (runtime.Object, bool, string) { return &api.PodLogOptions{}, false, "" } + +// ProxyREST implements the proxy subresource for a Pod +type ProxyREST struct { + store *etcdgeneric.Etcd +} + +// Implement Connecter +var _ = rest.Connecter(&ProxyREST{}) + +var proxyMethods = []string{"GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS"} + +// New returns an empty pod resource +func (r *ProxyREST) New() runtime.Object { + return &api.Pod{} +} + +// ConnectMethods returns the list of HTTP methods that can be proxied +func (r *ProxyREST) ConnectMethods() []string { + return proxyMethods +} + +// NewConnectOptions returns versioned resource that represents proxy parameters +func (r *ProxyREST) NewConnectOptions() (runtime.Object, bool, string) { + return &api.PodProxyOptions{}, true, "path" +} + +// Connect returns a handler for the pod proxy +func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (rest.ConnectHandler, error) { + proxyOpts, ok := opts.(*api.PodProxyOptions) + if !ok { + return nil, fmt.Errorf("Invalid options object: %#v", opts) + } + location, _, err := pod.ResourceLocation(r.store, ctx, id) + if err != nil { + return nil, err + } + location.Path = path.Join(location.Path, proxyOpts.Path) + return genericrest.NewUpgradeAwareProxyHandler(location, nil, false), nil +} + +var upgradeableMethods = []string{"GET"} + +// ExecREST implements the exec subresource for a Pod +type ExecREST struct { + store *etcdgeneric.Etcd + kubeletConn client.ConnectionInfoGetter +} + +// Implement Connecter +var _ = rest.Connecter(&ExecREST{}) + +// New creates a new Pod object +func (r *ExecREST) New() runtime.Object { + return &api.Pod{} +} + +// Connect returns a handler for the pod exec proxy +func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object) (rest.ConnectHandler, error) { + execOpts, ok := opts.(*api.PodExecOptions) + if !ok { + return nil, fmt.Errorf("Invalid options object: %#v", opts) + } + location, transport, err := pod.ExecLocation(r.store, r.kubeletConn, ctx, name, execOpts) + if err != nil { + return nil, err + } + return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil +} + +// NewConnectOptions returns the versioned object that represents exec parameters +func (r *ExecREST) NewConnectOptions() (runtime.Object, bool, string) { + return &api.PodExecOptions{}, false, "" +} + +// ConnectMethods returns the methods supported by exec +func (r *ExecREST) ConnectMethods() []string { + return upgradeableMethods +} + +// PortForwardREST implements the portforward subresource for a Pod +type PortForwardREST struct { + store *etcdgeneric.Etcd + kubeletConn client.ConnectionInfoGetter +} + +// Implement Connecter +var _ = rest.Connecter(&PortForwardREST{}) + +// New returns an empty pod object +func (r *PortForwardREST) New() runtime.Object { + return &api.Pod{} +} + +// NewConnectOptions returns nil since portforward doesn't take additional parameters +func (r *PortForwardREST) NewConnectOptions() (runtime.Object, bool, string) { + return nil, false, "" +} + +// ConnectMethods returns the methods supported by portforward +func (r *PortForwardREST) ConnectMethods() []string { + return upgradeableMethods +} + +// Connect returns a handler for the pod portforward proxy +func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Object) (rest.ConnectHandler, error) { + location, transport, err := pod.PortForwardLocation(r.store, r.kubeletConn, ctx, name) + if err != nil { + return nil, err + } + return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil +} diff --git a/pkg/registry/pod/rest.go b/pkg/registry/pod/rest.go index 6d70f66ce62..5f81e9c8436 100644 --- a/pkg/registry/pod/rest.go +++ b/pkg/registry/pod/rest.go @@ -202,7 +202,7 @@ func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ct if len(pod.Spec.Containers) == 1 { container = pod.Spec.Containers[0].Name } else { - return nil, nil, fmt.Errorf("a container name must be specified for pod %s", name) + return nil, nil, errors.NewBadRequest(fmt.Sprintf("a container name must be specified for pod %s", name)) } } nodeHost := pod.Status.HostIP @@ -226,3 +226,78 @@ func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ct } return loc, nodeTransport, nil } + +// ExecLocation returns the exec URL for a pod container. If opts.Container is blank +// and only one container is present in the pod, that container is used. +func ExecLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodExecOptions) (*url.URL, http.RoundTripper, error) { + + pod, err := getPod(getter, ctx, name) + if err != nil { + return nil, nil, err + } + + // Try to figure out a container + container := opts.Container + if container == "" { + if len(pod.Spec.Containers) == 1 { + container = pod.Spec.Containers[0].Name + } else { + return nil, nil, errors.NewBadRequest(fmt.Sprintf("a container name must be specified for pod %s", name)) + } + } + nodeHost := pod.Status.HostIP + if len(nodeHost) == 0 { + // If pod has not been assigned a host, return an empty location + return nil, nil, fmt.Errorf("pod %s does not have a host assigned", name) + } + nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost) + if err != nil { + return nil, nil, err + } + params := url.Values{} + if opts.Stdin { + params.Add(api.ExecStdinParam, "1") + } + if opts.Stdout { + params.Add(api.ExecStdoutParam, "1") + } + if opts.Stderr { + params.Add(api.ExecStderrParam, "1") + } + if opts.TTY { + params.Add(api.ExecTTYParam, "1") + } + params.Add("command", opts.Command) + loc := &url.URL{ + Scheme: nodeScheme, + Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Path: fmt.Sprintf("/exec/%s/%s/%s", pod.Namespace, name, container), + RawQuery: params.Encode(), + } + return loc, nodeTransport, nil +} + +// PortForwardLocation returns a the port-forward URL for a pod. +func PortForwardLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string) (*url.URL, http.RoundTripper, error) { + + pod, err := getPod(getter, ctx, name) + if err != nil { + return nil, nil, err + } + + nodeHost := pod.Status.HostIP + if len(nodeHost) == 0 { + // If pod has not been assigned a host, return an empty location + return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name)) + } + nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost) + if err != nil { + return nil, nil, err + } + loc := &url.URL{ + Scheme: nodeScheme, + Host: fmt.Sprintf("%s:%d", nodeHost, nodePort), + Path: fmt.Sprintf("/portForward/%s/%s", pod.Namespace, name), + } + return loc, nodeTransport, nil +}