mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-19 16:49:35 +00:00
Split minion
This commit is contained in:
parent
a46f6313bc
commit
8b511832ff
@ -17,21 +17,15 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"path"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/go.net/html"
|
||||
"code.google.com/p/go.net/html/atom"
|
||||
"code.google.com/p/go.net/websocket"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
|
||||
@ -98,7 +92,7 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer {
|
||||
|
||||
s.mux.HandleFunc(s.watchPrefix()+"/", s.handleWatch)
|
||||
|
||||
s.mux.HandleFunc("/proxy/minion/", s.handleMinionReq)
|
||||
s.mux.HandleFunc("/proxy/minion/", s.handleProxyMinion)
|
||||
|
||||
return s
|
||||
}
|
||||
@ -116,129 +110,6 @@ func (server *APIServer) handleVersionReq(w http.ResponseWriter, req *http.Reque
|
||||
server.writeRawJSON(http.StatusOK, version.Get(), w)
|
||||
}
|
||||
|
||||
func (server *APIServer) handleMinionReq(w http.ResponseWriter, req *http.Request) {
|
||||
minionPrefix := "/proxy/minion/"
|
||||
if !strings.HasPrefix(req.URL.Path, minionPrefix) {
|
||||
notFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
path := req.URL.Path[len(minionPrefix):]
|
||||
rawQuery := req.URL.RawQuery
|
||||
|
||||
// Expect path as: ${minion}/${query_to_minion}
|
||||
// and query_to_minion can be any query that kubelet will accept.
|
||||
//
|
||||
// For example:
|
||||
// To query stats of a minion or a pod or a container,
|
||||
// path string can be ${minion}/stats/<podid>/<containerName> or
|
||||
// ${minion}/podInfo?podID=<podid>
|
||||
//
|
||||
// To query logs on a minion, path string can be:
|
||||
// ${minion}/logs/
|
||||
idx := strings.Index(path, "/")
|
||||
minionHost := path[:idx]
|
||||
_, port, _ := net.SplitHostPort(minionHost)
|
||||
if port == "" {
|
||||
// Couldn't retrieve port information
|
||||
// TODO: Retrieve port info from a common object
|
||||
minionHost += ":10250"
|
||||
}
|
||||
minionPath := path[idx:]
|
||||
|
||||
minionURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: minionHost,
|
||||
}
|
||||
newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create request: %s", err)
|
||||
}
|
||||
|
||||
proxy := httputil.NewSingleHostReverseProxy(minionURL)
|
||||
proxy.Transport = &minionTransport{}
|
||||
proxy.ServeHTTP(w, newReq)
|
||||
}
|
||||
|
||||
type minionTransport struct{}
|
||||
|
||||
func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
resp, err := http.DefaultTransport.RoundTrip(req)
|
||||
|
||||
if err != nil && strings.Contains(err.Error(), "connection refused") {
|
||||
message := fmt.Sprintf("Failed to connect to minion:%s", req.URL.Host)
|
||||
resp = &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Body: ioutil.NopCloser(strings.NewReader(message)),
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") {
|
||||
// Do nothing, simply pass through
|
||||
return resp, err
|
||||
}
|
||||
|
||||
resp, err = t.ProcessResponse(req, resp)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
// copying the response body did not work
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bodyNode := &html.Node{
|
||||
Type: html.ElementNode,
|
||||
Data: "body",
|
||||
DataAtom: atom.Body,
|
||||
}
|
||||
nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to found <body> node: %v", err)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Define the method to traverse the doc tree and update href node to
|
||||
// point to correct minion
|
||||
var updateHRef func(*html.Node)
|
||||
updateHRef = func(n *html.Node) {
|
||||
if n.Type == html.ElementNode && n.Data == "a" {
|
||||
for i, attr := range n.Attr {
|
||||
if attr.Key == "href" {
|
||||
Url := &url.URL{
|
||||
Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val,
|
||||
}
|
||||
n.Attr[i].Val = Url.String()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for c := n.FirstChild; c != nil; c = c.NextSibling {
|
||||
updateHRef(c)
|
||||
}
|
||||
}
|
||||
|
||||
newContent := &bytes.Buffer{}
|
||||
for _, n := range nodes {
|
||||
updateHRef(n)
|
||||
err = html.Render(newContent, n)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to render: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
resp.Body = ioutil.NopCloser(newContent)
|
||||
// Update header node with new content-length
|
||||
// TODO: Remove any hash/signature headers here?
|
||||
resp.Header.Del("Content-Length")
|
||||
resp.ContentLength = int64(newContent.Len())
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// HTTP Handler interface
|
||||
func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
defer func() {
|
||||
|
@ -23,9 +23,7 @@ import (
|
||||
"log"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -535,52 +533,3 @@ func TestSyncCreateTimeout(t *testing.T) {
|
||||
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, 202, response)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinionTransport(t *testing.T) {
|
||||
content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
|
||||
transport := &minionTransport{}
|
||||
|
||||
// Test /logs/
|
||||
request := &http.Request{
|
||||
Method: "GET",
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "minion1:10250",
|
||||
Path: "/logs/",
|
||||
},
|
||||
}
|
||||
response := &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: ioutil.NopCloser(strings.NewReader(content)),
|
||||
Close: true,
|
||||
}
|
||||
updated_resp, _ := transport.ProcessResponse(request, response)
|
||||
body, _ := ioutil.ReadAll(updated_resp.Body)
|
||||
expected := string(`<pre><a href="/proxy/minion/minion1:10250/logs/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:10250/logs/google.log">google.log</a></pre>`)
|
||||
if !strings.Contains(string(body), expected) {
|
||||
t.Errorf("Received wrong content: %s", string(body))
|
||||
}
|
||||
|
||||
// Test subdir under /logs/
|
||||
request = &http.Request{
|
||||
Method: "GET",
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "minion1:8080",
|
||||
Path: "/whatever/apt/",
|
||||
},
|
||||
}
|
||||
response = &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: ioutil.NopCloser(strings.NewReader(content)),
|
||||
Close: true,
|
||||
}
|
||||
updated_resp, _ = transport.ProcessResponse(request, response)
|
||||
body, _ = ioutil.ReadAll(updated_resp.Body)
|
||||
expected = string(`<pre><a href="/proxy/minion/minion1:8080/whatever/apt/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:8080/whatever/apt/google.log">google.log</a></pre>`)
|
||||
if !strings.Contains(string(body), expected) {
|
||||
t.Errorf("Received wrong content: %s", string(body))
|
||||
}
|
||||
}
|
||||
|
155
pkg/apiserver/minionproxy.go
Normal file
155
pkg/apiserver/minionproxy.go
Normal file
@ -0,0 +1,155 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"code.google.com/p/go.net/html"
|
||||
"code.google.com/p/go.net/html/atom"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
func (server *APIServer) handleProxyMinion(w http.ResponseWriter, req *http.Request) {
|
||||
minionPrefix := "/proxy/minion/"
|
||||
if !strings.HasPrefix(req.URL.Path, minionPrefix) {
|
||||
notFound(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
path := req.URL.Path[len(minionPrefix):]
|
||||
rawQuery := req.URL.RawQuery
|
||||
|
||||
// Expect path as: ${minion}/${query_to_minion}
|
||||
// and query_to_minion can be any query that kubelet will accept.
|
||||
//
|
||||
// For example:
|
||||
// To query stats of a minion or a pod or a container,
|
||||
// path string can be ${minion}/stats/<podid>/<containerName> or
|
||||
// ${minion}/podInfo?podID=<podid>
|
||||
//
|
||||
// To query logs on a minion, path string can be:
|
||||
// ${minion}/logs/
|
||||
idx := strings.Index(path, "/")
|
||||
minionHost := path[:idx]
|
||||
_, port, _ := net.SplitHostPort(minionHost)
|
||||
if port == "" {
|
||||
// Couldn't retrieve port information
|
||||
// TODO: Retrieve port info from a common object
|
||||
minionHost += ":10250"
|
||||
}
|
||||
minionPath := path[idx:]
|
||||
|
||||
minionURL := &url.URL{
|
||||
Scheme: "http",
|
||||
Host: minionHost,
|
||||
}
|
||||
newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create request: %s", err)
|
||||
}
|
||||
|
||||
proxy := httputil.NewSingleHostReverseProxy(minionURL)
|
||||
proxy.Transport = &minionTransport{}
|
||||
proxy.ServeHTTP(w, newReq)
|
||||
}
|
||||
|
||||
type minionTransport struct{}
|
||||
|
||||
func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
resp, err := http.DefaultTransport.RoundTrip(req)
|
||||
|
||||
if err != nil && strings.Contains(err.Error(), "connection refused") {
|
||||
message := fmt.Sprintf("Failed to connect to minion:%s", req.URL.Host)
|
||||
resp = &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Body: ioutil.NopCloser(strings.NewReader(message)),
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") {
|
||||
// Do nothing, simply pass through
|
||||
return resp, err
|
||||
}
|
||||
|
||||
resp, err = t.ProcessResponse(req, resp)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
// copying the response body did not work
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bodyNode := &html.Node{
|
||||
Type: html.ElementNode,
|
||||
Data: "body",
|
||||
DataAtom: atom.Body,
|
||||
}
|
||||
nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to found <body> node: %v", err)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Define the method to traverse the doc tree and update href node to
|
||||
// point to correct minion
|
||||
var updateHRef func(*html.Node)
|
||||
updateHRef = func(n *html.Node) {
|
||||
if n.Type == html.ElementNode && n.Data == "a" {
|
||||
for i, attr := range n.Attr {
|
||||
if attr.Key == "href" {
|
||||
Url := &url.URL{
|
||||
Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val,
|
||||
}
|
||||
n.Attr[i].Val = Url.String()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for c := n.FirstChild; c != nil; c = c.NextSibling {
|
||||
updateHRef(c)
|
||||
}
|
||||
}
|
||||
|
||||
newContent := &bytes.Buffer{}
|
||||
for _, n := range nodes {
|
||||
updateHRef(n)
|
||||
err = html.Render(newContent, n)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to render: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
resp.Body = ioutil.NopCloser(newContent)
|
||||
// Update header node with new content-length
|
||||
// TODO: Remove any hash/signature headers here?
|
||||
resp.Header.Del("Content-Length")
|
||||
resp.ContentLength = int64(newContent.Len())
|
||||
|
||||
return resp, err
|
||||
}
|
74
pkg/apiserver/minionproxy_test.go
Normal file
74
pkg/apiserver/minionproxy_test.go
Normal file
@ -0,0 +1,74 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMinionTransport(t *testing.T) {
|
||||
content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
|
||||
transport := &minionTransport{}
|
||||
|
||||
// Test /logs/
|
||||
request := &http.Request{
|
||||
Method: "GET",
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "minion1:10250",
|
||||
Path: "/logs/",
|
||||
},
|
||||
}
|
||||
response := &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: ioutil.NopCloser(strings.NewReader(content)),
|
||||
Close: true,
|
||||
}
|
||||
updated_resp, _ := transport.ProcessResponse(request, response)
|
||||
body, _ := ioutil.ReadAll(updated_resp.Body)
|
||||
expected := string(`<pre><a href="/proxy/minion/minion1:10250/logs/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:10250/logs/google.log">google.log</a></pre>`)
|
||||
if !strings.Contains(string(body), expected) {
|
||||
t.Errorf("Received wrong content: %s", string(body))
|
||||
}
|
||||
|
||||
// Test subdir under /logs/
|
||||
request = &http.Request{
|
||||
Method: "GET",
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: "minion1:8080",
|
||||
Path: "/whatever/apt/",
|
||||
},
|
||||
}
|
||||
response = &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: http.StatusOK,
|
||||
Body: ioutil.NopCloser(strings.NewReader(content)),
|
||||
Close: true,
|
||||
}
|
||||
updated_resp, _ = transport.ProcessResponse(request, response)
|
||||
body, _ = ioutil.ReadAll(updated_resp.Body)
|
||||
expected = string(`<pre><a href="/proxy/minion/minion1:8080/whatever/apt/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:8080/whatever/apt/google.log">google.log</a></pre>`)
|
||||
if !strings.Contains(string(body), expected) {
|
||||
t.Errorf("Received wrong content: %s", string(body))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user