From e2b645ec15d77aae363eddfea6dfb391d0511364 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 27 Aug 2014 16:10:44 -0700 Subject: [PATCH] Add a generic proxier To proxy traffic to anything that implements ResourceLocation. Currently, this is only services. This is easily extensible to minions (would supercede existing mechanism) and pods. --- pkg/apiserver/apiserver.go | 2 + pkg/apiserver/apiserver_test.go | 5 +- pkg/apiserver/minionproxy.go | 1 + pkg/apiserver/proxy.go | 188 +++++++++++++++++++++++++++ pkg/apiserver/proxy_test.go | 137 +++++++++++++++++++ pkg/apiserver/redirect_test.go | 1 + pkg/registry/service/storage.go | 2 +- pkg/registry/service/storage_test.go | 2 +- 8 files changed, 334 insertions(+), 4 deletions(-) create mode 100644 pkg/apiserver/proxy.go create mode 100644 pkg/apiserver/proxy_test.go diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index d08f5dbe006..1e56cfd810f 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -99,8 +99,10 @@ func (g *APIGroup) InstallREST(mux mux, paths ...string) { for _, prefix := range paths { prefix = strings.TrimRight(prefix, "/") + proxyHandler := &ProxyHandler{prefix + "/proxy/", g.handler.storage, g.handler.codec} mux.Handle(prefix+"/", http.StripPrefix(prefix, restHandler)) mux.Handle(prefix+"/watch/", http.StripPrefix(prefix+"/watch/", watchHandler)) + mux.Handle(prefix+"/proxy/", http.StripPrefix(prefix+"/proxy/", proxyHandler)) mux.Handle(prefix+"/redirect/", http.StripPrefix(prefix+"/redirect/", redirectHandler)) mux.Handle(prefix+"/operations", http.StripPrefix(prefix+"/operations", opHandler)) mux.Handle(prefix+"/operations/", http.StripPrefix(prefix+"/operations/", opHandler)) diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 75bbbab51ab..8e461437d0f 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -73,8 +73,9 @@ type SimpleRESTStorage struct { requestedFieldSelector labels.Selector requestedResourceVersion uint64 - // The location + // The id requested, and location to return for ResourceLocation requestedResourceLocationID string + resourceLocation string // If non-nil, called inside the WorkFunc when answering update, delete, create. // obj receives the original input to the update, delete, or create call. @@ -153,7 +154,7 @@ func (storage *SimpleRESTStorage) ResourceLocation(id string) (string, error) { if err := storage.errors["resourceLocation"]; err != nil { return "", err } - return id, nil + return storage.resourceLocation, nil } func extractBody(response *http.Response, object interface{}) (string, error) { diff --git a/pkg/apiserver/minionproxy.go b/pkg/apiserver/minionproxy.go index b85e0ccc8ef..23ac515a367 100644 --- a/pkg/apiserver/minionproxy.go +++ b/pkg/apiserver/minionproxy.go @@ -31,6 +31,7 @@ import ( "github.com/golang/glog" ) +// TODO: replace with proxy handler on minions func handleProxyMinion(w http.ResponseWriter, req *http.Request) { path := strings.TrimLeft(req.URL.Path, "/") rawQuery := req.URL.RawQuery diff --git a/pkg/apiserver/proxy.go b/pkg/apiserver/proxy.go new file mode 100644 index 00000000000..2152c15667c --- /dev/null +++ b/pkg/apiserver/proxy.go @@ -0,0 +1,188 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" + "net/http/httputil" + "net/url" + "path" + "strings" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" + + "code.google.com/p/go.net/html" + "code.google.com/p/go.net/html/atom" + "github.com/golang/glog" +) + +type ProxyHandler struct { + prefix string + storage map[string]RESTStorage + codec Codec +} + +func (r *ProxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + parts := strings.SplitN(req.URL.Path, "/", 3) + if len(parts) < 2 { + notFound(w, req) + return + } + resourceName := parts[0] + id := parts[1] + rest := "" + if len(parts) == 3 { + rest = parts[2] + } + storage, ok := r.storage[resourceName] + if !ok { + httplog.LogOf(w).Addf("'%v' has no storage object", resourceName) + notFound(w, req) + return + } + + redirector, ok := storage.(Redirector) + if !ok { + httplog.LogOf(w).Addf("'%v' is not a redirector", resourceName) + notFound(w, req) + return + } + + location, err := redirector.ResourceLocation(id) + if err != nil { + status := errToAPIStatus(err) + writeJSON(status.Code, r.codec, status, w) + return + } + + destURL, err := url.Parse(location) + if err != nil { + status := errToAPIStatus(err) + writeJSON(status.Code, r.codec, status, w) + return + } + destURL.Path = rest + destURL.RawQuery = req.URL.RawQuery + newReq, err := http.NewRequest(req.Method, destURL.String(), req.Body) + if err != nil { + glog.Errorf("Failed to create request: %s", err) + } + newReq.Header = req.Header + + proxy := httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "http", Host: destURL.Host}) + proxy.Transport = &proxyTransport{ + proxyScheme: req.URL.Scheme, + proxyHost: req.URL.Host, + proxyPathPrepend: path.Join(r.prefix, resourceName, id), + } + proxy.ServeHTTP(w, newReq) +} + +type proxyTransport struct { + proxyScheme string + proxyHost string + proxyPathPrepend string +} + +func (t *proxyTransport) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := http.DefaultTransport.RoundTrip(req) + + if err != nil { + message := err.Error() + "\n" + req.URL.String() + if strings.Contains(err.Error(), "connection refused") { + message = fmt.Sprintf("Failed to connect to %s (%s)", req.URL.Host, err) + } + resp = &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Body: ioutil.NopCloser(strings.NewReader(message)), + } + return resp, nil + } + + if resp.Header.Get("Content-Type") != "text/html" { + // Do nothing, simply pass through + return resp, err + } + + resp, err = t.fixLinks(req, resp) + return resp, err +} + +// fixLinks modifies links in an HTML file such that they will be redirected through the proxy if needed. +func (t *proxyTransport) fixLinks(req *http.Request, resp *http.Response) (*http.Response, error) { + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + // copying the response body did not work + return nil, err + } + + bodyNode := &html.Node{ + Type: html.ElementNode, + Data: "body", + DataAtom: atom.Body, + } + nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode) + if err != nil { + glog.Errorf("Failed to found node: %v", err) + return resp, err + } + + // Define the method to traverse the doc tree and update href node to + // point to correct minion + var updateHRef func(*html.Node) + updateHRef = func(n *html.Node) { + if n.Type == html.ElementNode && n.Data == "a" { + for i, attr := range n.Attr { + if attr.Key == "href" { + url, err := url.Parse(attr.Val) + if err != nil { + continue + } + url.Scheme = t.proxyScheme + url.Host = t.proxyHost + url.Path = path.Join(t.proxyPathPrepend, path.Dir(req.URL.Path), url.Path, "/") + n.Attr[i].Val = url.String() + break + } + } + } + for c := n.FirstChild; c != nil; c = c.NextSibling { + updateHRef(c) + } + } + + newContent := &bytes.Buffer{} + for _, n := range nodes { + updateHRef(n) + err = html.Render(newContent, n) + if err != nil { + glog.Errorf("Failed to render: %v", err) + } + } + + resp.Body = ioutil.NopCloser(newContent) + // Update header node with new content-length + // TODO: Remove any hash/signature headers here? + resp.Header.Del("Content-Length") + resp.ContentLength = int64(newContent.Len()) + + return resp, err +} diff --git a/pkg/apiserver/proxy_test.go b/pkg/apiserver/proxy_test.go new file mode 100644 index 00000000000..aea841344e1 --- /dev/null +++ b/pkg/apiserver/proxy_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" +) + +func TestProxyTransport_fixLinks(t *testing.T) { + content := string(`
kubelet.loggoogle.log
`) + transport := &proxyTransport{ + proxyScheme: "http", + proxyHost: "foo.com", + proxyPathPrepend: "/proxy/minion/minion1:10250/", + } + + // Test /logs/ + request := &http.Request{ + Method: "GET", + URL: &url.URL{ + Path: "/logs/", + }, + } + response := &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader(content)), + Close: true, + } + updated_resp, _ := transport.fixLinks(request, response) + body, _ := ioutil.ReadAll(updated_resp.Body) + expected := string(`
kubelet.loggoogle.log
`) + if !strings.Contains(string(body), expected) { + t.Errorf("Received wrong content: %s", string(body)) + } + + // Test subdir under /logs/ + request = &http.Request{ + Method: "GET", + URL: &url.URL{ + Path: "/whatever/apt/somelog.log", + }, + } + transport.proxyScheme = "https" + transport.proxyPathPrepend = "/proxy/minion/minion1:8080/" + response = &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader(content)), + Close: true, + } + updated_resp, _ = transport.fixLinks(request, response) + body, _ = ioutil.ReadAll(updated_resp.Body) + expected = string(`
kubelet.loggoogle.log
`) + if !strings.Contains(string(body), expected) { + t.Errorf("Received wrong content: %s", string(body)) + } +} + +func TestProxy(t *testing.T) { + table := []struct { + method string + path string + reqBody string + respBody string + }{ + {"GET", "/some/dir", "", "answer"}, + {"POST", "/some/other/dir", "question", "answer"}, + {"PUT", "/some/dir/id", "different question", "answer"}, + {"DELETE", "/some/dir/id", "", "ok"}, + } + + for _, item := range table { + proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + gotBody, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Errorf("%v - unexpected error %v", item.method, err) + } + if e, a := item.reqBody, string(gotBody); e != a { + t.Errorf("%v - expected %v, got %v", item.method, e, a) + } + fmt.Fprint(w, item.respBody) + })) + + simpleStorage := &SimpleRESTStorage{ + errors: map[string]error{}, + resourceLocation: proxyServer.URL, + } + handler := Handle(map[string]RESTStorage{ + "foo": simpleStorage, + }, codec, "/prefix/version") + server := httptest.NewServer(handler) + + req, err := http.NewRequest( + item.method, + server.URL+"/prefix/version/proxy/foo/id"+item.path, + strings.NewReader(item.reqBody), + ) + if err != nil { + t.Errorf("%v - unexpected error %v", item.method, err) + continue + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Errorf("%v - unexpected error %v", item.method, err) + continue + } + gotResp, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("%v - unexpected error %v", item.method, err) + } + resp.Body.Close() + if e, a := item.respBody, string(gotResp); e != a { + t.Errorf("%v - expected %v, got %v", item.method, e, a) + } + } +} diff --git a/pkg/apiserver/redirect_test.go b/pkg/apiserver/redirect_test.go index aa780e6e60c..6425f920d8c 100644 --- a/pkg/apiserver/redirect_test.go +++ b/pkg/apiserver/redirect_test.go @@ -51,6 +51,7 @@ func TestRedirect(t *testing.T) { for _, item := range table { simpleStorage.errors["resourceLocation"] = item.err + simpleStorage.resourceLocation = item.id resp, err := client.Get(server.URL + "/prefix/version/redirect/foo/" + item.id) if resp == nil { t.Fatalf("Unexpected nil resp") diff --git a/pkg/registry/service/storage.go b/pkg/registry/service/storage.go index 44da947b49c..6ea612b9b3b 100644 --- a/pkg/registry/service/storage.go +++ b/pkg/registry/service/storage.go @@ -179,7 +179,7 @@ func (rs *RegistryStorage) ResourceLocation(id string) (string, error) { if len(e.Endpoints) == 0 { return "", fmt.Errorf("no endpoints available for %v", id) } - return e.Endpoints[rand.Intn(len(e.Endpoints))], nil + return "http://" + e.Endpoints[rand.Intn(len(e.Endpoints))], nil } func (rs *RegistryStorage) deleteExternalLoadBalancer(service *api.Service) error { diff --git a/pkg/registry/service/storage_test.go b/pkg/registry/service/storage_test.go index bddfc782cda..49e27978c0c 100644 --- a/pkg/registry/service/storage_test.go +++ b/pkg/registry/service/storage_test.go @@ -291,7 +291,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %v", err) } - if e, a := "foo:80", location; e != a { + if e, a := "http://foo:80", location; e != a { t.Errorf("Expected %v, but got %v", e, a) } if e, a := "foo", registry.GottenID; e != a {