Refactor kubelet to use http.ServeMux

This commit is contained in:
derekwaynecarr 2014-08-20 13:24:51 -05:00
parent 6b05d71d74
commit fd8741edf2
4 changed files with 144 additions and 102 deletions

View File

@ -118,7 +118,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1) myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0) go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(func() { go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250) kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), "localhost", 10250)
}, 0) }, 0)
// Kubelet (machine) // Kubelet (machine)
@ -129,7 +129,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2) otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0) go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(func() { go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251) kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), "localhost", 10251)
}, 0) }, 0)
return apiServer.URL return apiServer.URL

View File

@ -162,7 +162,7 @@ func main() {
// start the kubelet server // start the kubelet server
if *enableServer { if *enableServer {
go util.Forever(func() { go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), http.DefaultServeMux, *address, *port) kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), *address, *port)
}, 0) }, 0)
} }

View File

@ -41,16 +41,13 @@ import (
type Server struct { type Server struct {
host HostInterface host HostInterface
updates chan<- interface{} updates chan<- interface{}
handler http.Handler mux *http.ServeMux
} }
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, delegate http.Handler, address string, port uint) { // ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, address string, port uint) {
glog.Infof("Starting to listen on %s:%d", address, port) glog.Infof("Starting to listen on %s:%d", address, port)
handler := Server{ handler := NewServer(host, updates)
host: host,
updates: updates,
handler: delegate,
}
s := &http.Server{ s := &http.Server{
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)), Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
Handler: &handler, Handler: &handler,
@ -71,33 +68,45 @@ type HostInterface interface {
ServeLogs(w http.ResponseWriter, req *http.Request) ServeLogs(w http.ResponseWriter, req *http.Request)
} }
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests
func NewServer(host HostInterface, updates chan<- interface{}) Server {
server := Server{
host: host,
updates: updates,
mux: http.NewServeMux(),
}
server.InstallDefaultHandlers()
return server
}
// InstallDefaultHandlers registers the set of supported HTTP request patterns with the mux
func (s *Server) InstallDefaultHandlers() {
s.mux.HandleFunc("/healthz", s.handleHealth)
s.mux.HandleFunc("/container", s.handleContainer)
s.mux.HandleFunc("/containers", s.handleContainers)
s.mux.HandleFunc("/podInfo", s.handlePodInfo)
s.mux.HandleFunc("/stats/", s.handleStats)
s.mux.HandleFunc("/logs/", s.handleLogs)
s.mux.HandleFunc("/spec/", s.handleSpec)
}
// error serializes an error object into an HTTP response
func (s *Server) error(w http.ResponseWriter, err error) { func (s *Server) error(w http.ResponseWriter, err error) {
http.Error(w, fmt.Sprintf("Internal Error: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Internal Error: %v", err), http.StatusInternalServerError)
} }
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { // handleHealth handles health checking requests against the Kubelet
defer httplog.MakeLogged(req, &w).StacktraceWhen( func (s *Server) handleHealth(w http.ResponseWriter, req *http.Request) {
httplog.StatusIsNot(
http.StatusOK,
http.StatusNotFound,
),
).Log()
u, err := url.ParseRequestURI(req.RequestURI)
if err != nil {
s.error(w, err)
return
} }
// TODO: use an http.ServeMux instead of a switch.
switch { // handleContainer handles container requests against the Kubelet
case u.Path == "/container" || u.Path == "/containers": func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close() defer req.Body.Close()
data, err := ioutil.ReadAll(req.Body) data, err := ioutil.ReadAll(req.Body)
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
return return
} }
if u.Path == "/container" {
// This is to provide backward compatibility. It only supports a single manifest // This is to provide backward compatibility. It only supports a single manifest
var pod Pod var pod Pod
err = yaml.Unmarshal(data, &pod.Manifest) err = yaml.Unmarshal(data, &pod.Manifest)
@ -108,7 +117,17 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
//TODO: sha1 of manifest? //TODO: sha1 of manifest?
pod.Name = "1" pod.Name = "1"
s.updates <- PodUpdate{[]Pod{pod}, SET} s.updates <- PodUpdate{[]Pod{pod}, SET}
} else if u.Path == "/containers" {
}
// handleContainers handles containers requests against the Kubelet
func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := ioutil.ReadAll(req.Body)
if err != nil {
s.error(w, err)
return
}
var manifests []api.ContainerManifest var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests) err = yaml.Unmarshal(data, &manifests)
if err != nil { if err != nil {
@ -121,8 +140,16 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
pods[i].Manifest = manifests[i] pods[i].Manifest = manifests[i]
} }
s.updates <- PodUpdate{pods, SET} s.updates <- PodUpdate{pods, SET}
}
// handlePodInfo handles podInfo requests against the Kubelet
func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) {
u, err := url.ParseRequestURI(req.RequestURI)
if err != nil {
s.error(w, err)
return
} }
case u.Path == "/podInfo":
podID := u.Query().Get("podID") podID := u.Query().Get("podID")
if len(podID) == 0 { if len(podID) == 0 {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
@ -148,9 +175,20 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Header().Add("Content-type", "application/json") w.Header().Add("Content-type", "application/json")
w.Write(data) w.Write(data)
case strings.HasPrefix(u.Path, "/stats"): }
// handleStats handles stats requests against the Kubelet
func (s *Server) handleStats(w http.ResponseWriter, req *http.Request) {
s.serveStats(w, req) s.serveStats(w, req)
case strings.HasPrefix(u.Path, "/spec"): }
// handleLogs handles logs requests against the Kubelet
func (s *Server) handleLogs(w http.ResponseWriter, req *http.Request) {
s.host.ServeLogs(w, req)
}
// handleSpec handles spec requests against the Kubelet
func (s *Server) handleSpec(w http.ResponseWriter, req *http.Request) {
info, err := s.host.GetMachineInfo() info, err := s.host.GetMachineInfo()
if err != nil { if err != nil {
s.error(w, err) s.error(w, err)
@ -163,15 +201,21 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
w.Header().Add("Content-type", "application/json") w.Header().Add("Content-type", "application/json")
w.Write(data) w.Write(data)
case strings.HasPrefix(u.Path, "/logs/"):
s.host.ServeLogs(w, req)
default:
if s.handler != nil {
s.handler.ServeHTTP(w, req)
}
}
} }
// ServeHTTP responds to HTTP requests on the Kubelet
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer httplog.MakeLogged(req, &w).StacktraceWhen(
httplog.StatusIsNot(
http.StatusOK,
http.StatusNotFound,
),
).Log()
s.mux.ServeHTTP(w, req)
}
// serveStats implements stats logic
func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) { func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
// /stats/<podfullname>/<containerName> // /stats/<podfullname>/<containerName>
components := strings.Split(strings.TrimPrefix(path.Clean(req.URL.Path), "/"), "/") components := strings.Split(strings.TrimPrefix(path.Clean(req.URL.Path), "/"), "/")

View File

@ -76,10 +76,8 @@ func makeServerTest() *serverTestFramework {
} }
fw.updateReader = startReading(fw.updateChan) fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{} fw.fakeKubelet = &fakeKubelet{}
fw.serverUnderTest = &Server{ server := NewServer(fw.fakeKubelet, fw.updateChan)
host: fw.fakeKubelet, fw.serverUnderTest = &server
updates: fw.updateChan,
}
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest) fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw return fw
} }