From 8b511832ffb09cd469490debcc500c757c1d5965 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 29 Jul 2014 17:35:56 -0400 Subject: [PATCH] Split minion --- pkg/apiserver/apiserver.go | 131 +------------------------ pkg/apiserver/apiserver_test.go | 51 ---------- pkg/apiserver/minionproxy.go | 155 ++++++++++++++++++++++++++++++ pkg/apiserver/minionproxy_test.go | 74 ++++++++++++++ 4 files changed, 230 insertions(+), 181 deletions(-) create mode 100644 pkg/apiserver/minionproxy.go create mode 100644 pkg/apiserver/minionproxy_test.go diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 197c904f0e3..c036e274090 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -17,21 +17,15 @@ limitations under the License. package apiserver import ( - "bytes" "encoding/json" "fmt" "io/ioutil" - "net" "net/http" - "net/http/httputil" - "net/url" "path" "runtime/debug" "strings" "time" - "code.google.com/p/go.net/html" - "code.google.com/p/go.net/html/atom" "code.google.com/p/go.net/websocket" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" @@ -98,7 +92,7 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer { s.mux.HandleFunc(s.watchPrefix()+"/", s.handleWatch) - s.mux.HandleFunc("/proxy/minion/", s.handleMinionReq) + s.mux.HandleFunc("/proxy/minion/", s.handleProxyMinion) return s } @@ -116,129 +110,6 @@ func (server *APIServer) handleVersionReq(w http.ResponseWriter, req *http.Reque server.writeRawJSON(http.StatusOK, version.Get(), w) } -func (server *APIServer) handleMinionReq(w http.ResponseWriter, req *http.Request) { - minionPrefix := "/proxy/minion/" - if !strings.HasPrefix(req.URL.Path, minionPrefix) { - notFound(w, req) - return - } - - path := req.URL.Path[len(minionPrefix):] - rawQuery := req.URL.RawQuery - - // Expect path as: ${minion}/${query_to_minion} - // and query_to_minion can be any query that kubelet will accept. - // - // For example: - // To query stats of a minion or a pod or a container, - // path string can be ${minion}/stats// or - // ${minion}/podInfo?podID= - // - // To query logs on a minion, path string can be: - // ${minion}/logs/ - idx := strings.Index(path, "/") - minionHost := path[:idx] - _, port, _ := net.SplitHostPort(minionHost) - if port == "" { - // Couldn't retrieve port information - // TODO: Retrieve port info from a common object - minionHost += ":10250" - } - minionPath := path[idx:] - - minionURL := &url.URL{ - Scheme: "http", - Host: minionHost, - } - newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil) - if err != nil { - glog.Errorf("Failed to create request: %s", err) - } - - proxy := httputil.NewSingleHostReverseProxy(minionURL) - proxy.Transport = &minionTransport{} - proxy.ServeHTTP(w, newReq) -} - -type minionTransport struct{} - -func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) { - resp, err := http.DefaultTransport.RoundTrip(req) - - if err != nil && strings.Contains(err.Error(), "connection refused") { - message := fmt.Sprintf("Failed to connect to minion:%s", req.URL.Host) - resp = &http.Response{ - StatusCode: http.StatusServiceUnavailable, - Body: ioutil.NopCloser(strings.NewReader(message)), - } - return resp, nil - } - - if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") { - // Do nothing, simply pass through - return resp, err - } - - resp, err = t.ProcessResponse(req, resp) - return resp, err -} - -func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) { - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - // copying the response body did not work - return nil, err - } - - bodyNode := &html.Node{ - Type: html.ElementNode, - Data: "body", - DataAtom: atom.Body, - } - nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode) - if err != nil { - glog.Errorf("Failed to found node: %v", err) - return resp, err - } - - // Define the method to traverse the doc tree and update href node to - // point to correct minion - var updateHRef func(*html.Node) - updateHRef = func(n *html.Node) { - if n.Type == html.ElementNode && n.Data == "a" { - for i, attr := range n.Attr { - if attr.Key == "href" { - Url := &url.URL{ - Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val, - } - n.Attr[i].Val = Url.String() - break - } - } - } - for c := n.FirstChild; c != nil; c = c.NextSibling { - updateHRef(c) - } - } - - newContent := &bytes.Buffer{} - for _, n := range nodes { - updateHRef(n) - err = html.Render(newContent, n) - if err != nil { - glog.Errorf("Failed to render: %v", err) - } - } - - resp.Body = ioutil.NopCloser(newContent) - // Update header node with new content-length - // TODO: Remove any hash/signature headers here? - resp.Header.Del("Content-Length") - resp.ContentLength = int64(newContent.Len()) - - return resp, err -} - // HTTP Handler interface func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer func() { diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 57ccf0da9ba..65547c46b01 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -23,9 +23,7 @@ import ( "log" "net/http" "net/http/httptest" - "net/url" "reflect" - "strings" "sync" "testing" "time" @@ -535,52 +533,3 @@ func TestSyncCreateTimeout(t *testing.T) { t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, 202, response) } } - -func TestMinionTransport(t *testing.T) { - content := string(`
kubelet.loggoogle.log
`) - transport := &minionTransport{} - - // Test /logs/ - request := &http.Request{ - Method: "GET", - URL: &url.URL{ - Scheme: "http", - Host: "minion1:10250", - Path: "/logs/", - }, - } - response := &http.Response{ - Status: "200 OK", - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader(content)), - Close: true, - } - updated_resp, _ := transport.ProcessResponse(request, response) - body, _ := ioutil.ReadAll(updated_resp.Body) - expected := string(`
kubelet.loggoogle.log
`) - if !strings.Contains(string(body), expected) { - t.Errorf("Received wrong content: %s", string(body)) - } - - // Test subdir under /logs/ - request = &http.Request{ - Method: "GET", - URL: &url.URL{ - Scheme: "http", - Host: "minion1:8080", - Path: "/whatever/apt/", - }, - } - response = &http.Response{ - Status: "200 OK", - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader(content)), - Close: true, - } - updated_resp, _ = transport.ProcessResponse(request, response) - body, _ = ioutil.ReadAll(updated_resp.Body) - expected = string(`
kubelet.loggoogle.log
`) - if !strings.Contains(string(body), expected) { - t.Errorf("Received wrong content: %s", string(body)) - } -} diff --git a/pkg/apiserver/minionproxy.go b/pkg/apiserver/minionproxy.go new file mode 100644 index 00000000000..87e3fc4bbab --- /dev/null +++ b/pkg/apiserver/minionproxy.go @@ -0,0 +1,155 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "bytes" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/http/httputil" + "net/url" + "strings" + + "code.google.com/p/go.net/html" + "code.google.com/p/go.net/html/atom" + "github.com/golang/glog" +) + +func (server *APIServer) handleProxyMinion(w http.ResponseWriter, req *http.Request) { + minionPrefix := "/proxy/minion/" + if !strings.HasPrefix(req.URL.Path, minionPrefix) { + notFound(w, req) + return + } + + path := req.URL.Path[len(minionPrefix):] + rawQuery := req.URL.RawQuery + + // Expect path as: ${minion}/${query_to_minion} + // and query_to_minion can be any query that kubelet will accept. + // + // For example: + // To query stats of a minion or a pod or a container, + // path string can be ${minion}/stats// or + // ${minion}/podInfo?podID= + // + // To query logs on a minion, path string can be: + // ${minion}/logs/ + idx := strings.Index(path, "/") + minionHost := path[:idx] + _, port, _ := net.SplitHostPort(minionHost) + if port == "" { + // Couldn't retrieve port information + // TODO: Retrieve port info from a common object + minionHost += ":10250" + } + minionPath := path[idx:] + + minionURL := &url.URL{ + Scheme: "http", + Host: minionHost, + } + newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil) + if err != nil { + glog.Errorf("Failed to create request: %s", err) + } + + proxy := httputil.NewSingleHostReverseProxy(minionURL) + proxy.Transport = &minionTransport{} + proxy.ServeHTTP(w, newReq) +} + +type minionTransport struct{} + +func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := http.DefaultTransport.RoundTrip(req) + + if err != nil && strings.Contains(err.Error(), "connection refused") { + message := fmt.Sprintf("Failed to connect to minion:%s", req.URL.Host) + resp = &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Body: ioutil.NopCloser(strings.NewReader(message)), + } + return resp, nil + } + + if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") { + // Do nothing, simply pass through + return resp, err + } + + resp, err = t.ProcessResponse(req, resp) + return resp, err +} + +func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + // copying the response body did not work + return nil, err + } + + bodyNode := &html.Node{ + Type: html.ElementNode, + Data: "body", + DataAtom: atom.Body, + } + nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode) + if err != nil { + glog.Errorf("Failed to found node: %v", err) + return resp, err + } + + // Define the method to traverse the doc tree and update href node to + // point to correct minion + var updateHRef func(*html.Node) + updateHRef = func(n *html.Node) { + if n.Type == html.ElementNode && n.Data == "a" { + for i, attr := range n.Attr { + if attr.Key == "href" { + Url := &url.URL{ + Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val, + } + n.Attr[i].Val = Url.String() + break + } + } + } + for c := n.FirstChild; c != nil; c = c.NextSibling { + updateHRef(c) + } + } + + newContent := &bytes.Buffer{} + for _, n := range nodes { + updateHRef(n) + err = html.Render(newContent, n) + if err != nil { + glog.Errorf("Failed to render: %v", err) + } + } + + resp.Body = ioutil.NopCloser(newContent) + // Update header node with new content-length + // TODO: Remove any hash/signature headers here? + resp.Header.Del("Content-Length") + resp.ContentLength = int64(newContent.Len()) + + return resp, err +} diff --git a/pkg/apiserver/minionproxy_test.go b/pkg/apiserver/minionproxy_test.go new file mode 100644 index 00000000000..e7bed930f63 --- /dev/null +++ b/pkg/apiserver/minionproxy_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "io/ioutil" + "net/http" + "net/url" + "strings" + "testing" +) + +func TestMinionTransport(t *testing.T) { + content := string(`
kubelet.loggoogle.log
`) + transport := &minionTransport{} + + // Test /logs/ + request := &http.Request{ + Method: "GET", + URL: &url.URL{ + Scheme: "http", + Host: "minion1:10250", + Path: "/logs/", + }, + } + response := &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader(content)), + Close: true, + } + updated_resp, _ := transport.ProcessResponse(request, response) + body, _ := ioutil.ReadAll(updated_resp.Body) + expected := string(`
kubelet.loggoogle.log
`) + if !strings.Contains(string(body), expected) { + t.Errorf("Received wrong content: %s", string(body)) + } + + // Test subdir under /logs/ + request = &http.Request{ + Method: "GET", + URL: &url.URL{ + Scheme: "http", + Host: "minion1:8080", + Path: "/whatever/apt/", + }, + } + response = &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader(content)), + Close: true, + } + updated_resp, _ = transport.ProcessResponse(request, response) + body, _ = ioutil.ReadAll(updated_resp.Body) + expected = string(`
kubelet.loggoogle.log
`) + if !strings.Contains(string(body), expected) { + t.Errorf("Received wrong content: %s", string(body)) + } +}