Merge pull request #475 from lavalamp/httpLogger

Move logger to own package; clean up apiserver to use serve mux.
This commit is contained in:
Tim Hockin 2014-07-15 22:12:09 -07:00
commit a28e900d46
6 changed files with 133 additions and 44 deletions

View File

@ -20,12 +20,14 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "path"
"runtime/debug" "runtime/debug"
"strings" "strings"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -83,25 +85,37 @@ func MakeAsync(fn WorkFunc) <-chan interface{} {
// //
// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing. // TODO: consider migrating this to go-restful which is a more full-featured version of the same thing.
type APIServer struct { type APIServer struct {
prefix string prefix string
storage map[string]RESTStorage storage map[string]RESTStorage
ops *Operations ops *Operations
logserver http.Handler mux *http.ServeMux
} }
// New creates a new APIServer object. // New creates a new APIServer object.
// 'storage' contains a map of handlers. // 'storage' contains a map of handlers.
// 'prefix' is the hosting path prefix. // 'prefix' is the hosting path prefix.
func New(storage map[string]RESTStorage, prefix string) *APIServer { func New(storage map[string]RESTStorage, prefix string) *APIServer {
return &APIServer{ s := &APIServer{
storage: storage, storage: storage,
prefix: prefix, prefix: strings.TrimRight(prefix, "/"),
ops: NewOperations(), ops: NewOperations(),
logserver: http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))), mux: http.NewServeMux(),
} }
s.mux.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))))
s.mux.HandleFunc(s.prefix+"/", s.ServeREST)
healthz.InstallHandler(s.mux)
s.mux.HandleFunc("/index.html", s.handleIndex)
// Handle both operations and operations/* with the same handler
opPrefix := path.Join(s.prefix, "operations")
s.mux.HandleFunc(opPrefix, s.handleOperationRequest)
s.mux.HandleFunc(opPrefix+"/", s.handleOperationRequest)
return s
} }
func (server *APIServer) handleIndex(w http.ResponseWriter) { func (server *APIServer) handleIndex(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
// TODO: serve this out of a file? // TODO: serve this out of a file?
data := "<html><body>Welcome to Kubernetes</body></html>" data := "<html><body>Welcome to Kubernetes</body></html>"
@ -117,47 +131,37 @@ func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack()) glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack())
} }
}() }()
defer MakeLogged(req, &w).StacktraceWhen( defer httplog.MakeLogged(req, &w).StacktraceWhen(
StatusIsNot( httplog.StatusIsNot(
http.StatusOK, http.StatusOK,
http.StatusAccepted, http.StatusAccepted,
http.StatusConflict, http.StatusConflict,
), ),
).Log() ).Log()
url, err := url.ParseRequestURI(req.RequestURI)
if err != nil { // Dispatch via our mux.
server.error(err, w) server.mux.ServeHTTP(w, req)
return }
}
if url.Path == "/index.html" || url.Path == "/" || url.Path == "" { // ServeREST handles requests to all our RESTStorage objects.
server.handleIndex(w) func (server *APIServer) ServeREST(w http.ResponseWriter, req *http.Request) {
return if !strings.HasPrefix(req.URL.Path, server.prefix) {
}
if strings.HasPrefix(url.Path, "/logs/") {
server.logserver.ServeHTTP(w, req)
return
}
if !strings.HasPrefix(url.Path, server.prefix) {
server.notFound(req, w) server.notFound(req, w)
return return
} }
requestParts := strings.Split(url.Path[len(server.prefix):], "/")[1:] requestParts := strings.Split(req.URL.Path[len(server.prefix):], "/")[1:]
if len(requestParts) < 1 { if len(requestParts) < 1 {
server.notFound(req, w) server.notFound(req, w)
return return
} }
if requestParts[0] == "operations" {
server.handleOperationRequest(requestParts[1:], w, req)
return
}
storage := server.storage[requestParts[0]] storage := server.storage[requestParts[0]]
if storage == nil { if storage == nil {
LogOf(w).Addf("'%v' has no storage object", requestParts[0]) httplog.LogOf(w).Addf("'%v' has no storage object", requestParts[0])
server.notFound(req, w) server.notFound(req, w)
return return
} }
server.handleREST(requestParts, url, req, w, storage) server.handleREST(requestParts, req, w, storage)
} }
func (server *APIServer) notFound(req *http.Request, w http.ResponseWriter) { func (server *APIServer) notFound(req *http.Request, w http.ResponseWriter) {
@ -197,7 +201,7 @@ func (server *APIServer) finishReq(out <-chan interface{}, sync bool, timeout ti
status := http.StatusOK status := http.StatusOK
switch stat := obj.(type) { switch stat := obj.(type) {
case api.Status: case api.Status:
LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.") httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
if stat.Code != 0 { if stat.Code != 0 {
status = stat.Code status = stat.Code
} }
@ -236,14 +240,14 @@ func parseTimeout(str string) time.Duration {
// sync=[false|true] Synchronous request (only applies to create, update, delete operations) // sync=[false|true] Synchronous request (only applies to create, update, delete operations)
// timeout=<duration> Timeout for synchronous requests, only applies if sync=true // timeout=<duration> Timeout for synchronous requests, only applies if sync=true
// labels=<label-selector> Used for filtering list operations // labels=<label-selector> Used for filtering list operations
func (server *APIServer) handleREST(parts []string, requestURL *url.URL, req *http.Request, w http.ResponseWriter, storage RESTStorage) { func (server *APIServer) handleREST(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
sync := requestURL.Query().Get("sync") == "true" sync := req.URL.Query().Get("sync") == "true"
timeout := parseTimeout(requestURL.Query().Get("timeout")) timeout := parseTimeout(req.URL.Query().Get("timeout"))
switch req.Method { switch req.Method {
case "GET": case "GET":
switch len(parts) { switch len(parts) {
case 1: case 1:
selector, err := labels.ParseSelector(requestURL.Query().Get("labels")) selector, err := labels.ParseSelector(req.URL.Query().Get("labels"))
if err != nil { if err != nil {
server.error(err, w) server.error(err, w)
return return
@ -325,7 +329,18 @@ func (server *APIServer) handleREST(parts []string, requestURL *url.URL, req *ht
} }
} }
func (server *APIServer) handleOperationRequest(parts []string, w http.ResponseWriter, req *http.Request) { func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http.Request) {
opPrefix := path.Join(server.prefix, "operations")
if !strings.HasPrefix(req.URL.Path, opPrefix) {
server.notFound(req, w)
return
}
trimmed := strings.TrimLeft(req.URL.Path[len(opPrefix):], "/")
parts := strings.Split(trimmed, "/")
if len(parts) > 1 {
server.notFound(req, w)
return
}
if req.Method != "GET" { if req.Method != "GET" {
server.notFound(req, w) server.notFound(req, w)
} }

View File

@ -409,3 +409,37 @@ func TestSyncCreateTimeout(t *testing.T) {
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, 202, response) t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, 202, response)
} }
} }
func TestOpGet(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
client := http.Client{}
simple := Simple{Name: "foo"}
data, _ := api.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
expectNoError(t, err)
response, err := client.Do(request)
expectNoError(t, err)
if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response)
}
var itemOut api.Status
body, err := extractBody(response, &itemOut)
expectNoError(t, err)
if itemOut.Status != api.StatusWorking || itemOut.Details == "" {
t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body))
}
req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details, nil)
expectNoError(t, err)
_, err = client.Do(req2)
expectNoError(t, err)
if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response)
}
}

View File

@ -29,3 +29,7 @@ func handleHealthz(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write([]byte("ok")) w.Write([]byte("ok"))
} }
func InstallHandler(mux *http.ServeMux) {
mux.HandleFunc("/healthz", handleHealthz)
}

19
pkg/httplog/doc.go Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package httplog contains a helper object and functions to maintain a log
// along with an http response.
package httplog

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package apiserver package httplog
import ( import (
"fmt" "fmt"
@ -25,6 +25,18 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// Handler wraps all HTTP calls to delegate with nice logging.
// delegate may use LogOf(w).Addf(...) to write additional info to
// the per-request log message.
//
// Intended to wrap calls to your ServeMux.
func Handler(delegate http.Handler, pred StacktracePred) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer MakeLogged(req, &w).StacktraceWhen(pred).Log()
delegate.ServeHTTP(w, req)
})
}
// StacktracePred returns true if a stacktrace should be logged for this status // StacktracePred returns true if a stacktrace should be logged for this status
type StacktracePred func(httpStatus int) (logStacktrace bool) type StacktracePred func(httpStatus int) (logStacktrace bool)
@ -44,7 +56,7 @@ type respLogger struct {
// DefaultStacktracePred is the default implementation of StacktracePred. // DefaultStacktracePred is the default implementation of StacktracePred.
func DefaultStacktracePred(status int) bool { func DefaultStacktracePred(status int) bool {
return status != http.StatusOK && status != http.StatusAccepted return status < http.StatusOK || status >= http.StatusBadRequest
} }
// MakeLogged turns a normal response writer into a logged response writer. // MakeLogged turns a normal response writer into a logged response writer.
@ -60,6 +72,10 @@ func DefaultStacktracePred(status int) bool {
// //
// Use LogOf(w).Addf(...) to log something along with the response result. // Use LogOf(w).Addf(...) to log something along with the response result.
func MakeLogged(req *http.Request, w *http.ResponseWriter) *respLogger { func MakeLogged(req *http.Request, w *http.ResponseWriter) *respLogger {
if _, ok := (*w).(*respLogger); ok {
// Don't double-wrap!
panic("multiple MakeLogged calls!")
}
rl := &respLogger{ rl := &respLogger{
startTime: time.Now(), startTime: time.Now(),
req: req, req: req,

View File

@ -28,8 +28,8 @@ import (
"strings" "strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/google/cadvisor/info" "github.com/google/cadvisor/info"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"gopkg.in/v1/yaml" "gopkg.in/v1/yaml"
) )
@ -53,13 +53,14 @@ func (s *KubeletServer) error(w http.ResponseWriter, err error) {
} }
func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer apiserver.MakeLogged(req, &w).Log() defer httplog.MakeLogged(req, &w).Log()
u, err := url.ParseRequestURI(req.RequestURI) u, err := url.ParseRequestURI(req.RequestURI)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
} }
// TODO: use an http.ServeMux instead of a switch.
switch { switch {
case u.Path == "/container" || u.Path == "/containers": case u.Path == "/container" || u.Path == "/containers":
defer req.Body.Close() defer req.Body.Close()