mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
kubelet: Switch to restful for the REST api endpoints
This commit is contained in:
parent
4f856b595d
commit
7862ed3656
@ -25,13 +25,13 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/pprof"
|
"net/http/pprof"
|
||||||
"net/url"
|
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
restful "github.com/emicklei/go-restful"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
cadvisorApi "github.com/google/cadvisor/info/v1"
|
cadvisorApi "github.com/google/cadvisor/info/v1"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
@ -49,7 +49,7 @@ import (
|
|||||||
// Server is a http.Handler which exposes kubelet functionality over HTTP.
|
// Server is a http.Handler which exposes kubelet functionality over HTTP.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
host HostInterface
|
host HostInterface
|
||||||
mux *http.ServeMux
|
restfulCont *restful.Container
|
||||||
}
|
}
|
||||||
|
|
||||||
type TLSOptions struct {
|
type TLSOptions struct {
|
||||||
@ -79,7 +79,7 @@ func ListenAndServeKubeletServer(host HostInterface, address net.IP, port uint,
|
|||||||
func ListenAndServeKubeletReadOnlyServer(host HostInterface, address net.IP, port uint) {
|
func ListenAndServeKubeletReadOnlyServer(host HostInterface, address net.IP, port uint) {
|
||||||
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
|
glog.V(1).Infof("Starting to listen read-only on %s:%d", address, port)
|
||||||
s := NewServer(host, false)
|
s := NewServer(host, false)
|
||||||
s.mux.Handle("/metrics", prometheus.Handler())
|
s.restfulCont.Handle("/metrics", prometheus.Handler())
|
||||||
|
|
||||||
server := &http.Server{
|
server := &http.Server{
|
||||||
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
|
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
|
||||||
@ -115,7 +115,7 @@ type HostInterface interface {
|
|||||||
func NewServer(host HostInterface, enableDebuggingHandlers bool) Server {
|
func NewServer(host HostInterface, enableDebuggingHandlers bool) Server {
|
||||||
server := Server{
|
server := Server{
|
||||||
host: host,
|
host: host,
|
||||||
mux: http.NewServeMux(),
|
restfulCont: restful.NewContainer(),
|
||||||
}
|
}
|
||||||
server.InstallDefaultHandlers()
|
server.InstallDefaultHandlers()
|
||||||
if enableDebuggingHandlers {
|
if enableDebuggingHandlers {
|
||||||
@ -124,35 +124,144 @@ func NewServer(host HostInterface, enableDebuggingHandlers bool) Server {
|
|||||||
return server
|
return server
|
||||||
}
|
}
|
||||||
|
|
||||||
// InstallDefaultHandlers registers the default set of supported HTTP request patterns with the mux.
|
// InstallDefaultHandlers registers the default set of supported HTTP request
|
||||||
|
// patterns with the restful Container.
|
||||||
func (s *Server) InstallDefaultHandlers() {
|
func (s *Server) InstallDefaultHandlers() {
|
||||||
healthz.InstallHandler(s.mux,
|
healthz.InstallHandler(s.restfulCont,
|
||||||
healthz.PingHealthz,
|
healthz.PingHealthz,
|
||||||
healthz.NamedCheck("docker", s.dockerHealthCheck),
|
healthz.NamedCheck("docker", s.dockerHealthCheck),
|
||||||
healthz.NamedCheck("hostname", s.hostnameHealthCheck),
|
healthz.NamedCheck("hostname", s.hostnameHealthCheck),
|
||||||
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
|
healthz.NamedCheck("syncloop", s.syncLoopHealthCheck),
|
||||||
)
|
)
|
||||||
s.mux.HandleFunc("/pods", s.handlePods)
|
var ws *restful.WebService
|
||||||
s.mux.HandleFunc("/stats/", s.handleStats)
|
ws = new(restful.WebService)
|
||||||
s.mux.HandleFunc("/spec/", s.handleSpec)
|
ws.
|
||||||
|
Path("/pods").
|
||||||
|
Produces(restful.MIME_JSON)
|
||||||
|
ws.Route(ws.GET("").
|
||||||
|
To(s.getPods).
|
||||||
|
Operation("getPods"))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
|
||||||
|
s.restfulCont.Handle("/stats/", &httpHandler{f: s.handleStats})
|
||||||
|
|
||||||
|
ws = new(restful.WebService)
|
||||||
|
ws.
|
||||||
|
Path("/spec/").
|
||||||
|
Produces(restful.MIME_JSON)
|
||||||
|
ws.Route(ws.GET("").
|
||||||
|
To(s.getSpec).
|
||||||
|
Operation("getSpec").
|
||||||
|
Writes(cadvisorApi.MachineInfo{}))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const pprofBasePath = "/debug/pprof/"
|
||||||
|
|
||||||
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
|
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
|
||||||
func (s *Server) InstallDebuggingHandlers() {
|
func (s *Server) InstallDebuggingHandlers() {
|
||||||
s.mux.HandleFunc("/run/", s.handleRun)
|
var ws *restful.WebService
|
||||||
s.mux.HandleFunc("/exec/", s.handleExec)
|
|
||||||
s.mux.HandleFunc("/attach/", s.handleAttach)
|
ws = new(restful.WebService)
|
||||||
s.mux.HandleFunc("/portForward/", s.handlePortForward)
|
ws.
|
||||||
|
Path("/run")
|
||||||
|
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
|
||||||
|
To(s.getRun).
|
||||||
|
Operation("getRun"))
|
||||||
|
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
|
||||||
|
To(s.getRun).
|
||||||
|
Operation("getRun"))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
|
||||||
|
ws = new(restful.WebService)
|
||||||
|
ws.
|
||||||
|
Path("/exec")
|
||||||
|
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
|
||||||
|
To(s.getExec).
|
||||||
|
Operation("getExec"))
|
||||||
|
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
|
||||||
|
To(s.getExec).
|
||||||
|
Operation("getExec"))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
|
||||||
|
ws = new(restful.WebService)
|
||||||
|
ws.
|
||||||
|
Path("/attach")
|
||||||
|
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
|
||||||
|
To(s.getAttach).
|
||||||
|
Operation("getAttach"))
|
||||||
|
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
|
||||||
|
To(s.getAttach).
|
||||||
|
Operation("getAttach"))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
|
||||||
|
ws = new(restful.WebService)
|
||||||
|
ws.
|
||||||
|
Path("/portForward")
|
||||||
|
ws.Route(ws.POST("/{podNamespace}/{podID}").
|
||||||
|
To(s.getPortForward).
|
||||||
|
Operation("getPortForward"))
|
||||||
|
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}").
|
||||||
|
To(s.getPortForward).
|
||||||
|
Operation("getPortForward"))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
|
||||||
|
ws = new(restful.WebService)
|
||||||
|
ws.
|
||||||
|
Path("/logs/")
|
||||||
|
ws.Route(ws.GET("").
|
||||||
|
To(s.getLogs).
|
||||||
|
Operation("getLogs"))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
|
||||||
|
ws = new(restful.WebService)
|
||||||
|
ws.
|
||||||
|
Path("/containerLogs")
|
||||||
|
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
|
||||||
|
To(s.getContainerLogs).
|
||||||
|
Operation("getContainerLogs"))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
|
||||||
|
s.restfulCont.Handle("/metrics", prometheus.Handler())
|
||||||
|
|
||||||
|
handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {
|
||||||
|
name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)
|
||||||
|
switch name {
|
||||||
|
case "profile":
|
||||||
|
pprof.Profile(resp, req.Request)
|
||||||
|
case "symbol":
|
||||||
|
pprof.Symbol(resp, req.Request)
|
||||||
|
case "cmdline":
|
||||||
|
pprof.Cmdline(resp, req.Request)
|
||||||
|
default:
|
||||||
|
pprof.Index(resp, req.Request)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup pporf handlers.
|
||||||
|
ws = new(restful.WebService).Path(pprofBasePath)
|
||||||
|
ws.Route(ws.GET("/{subpath:*}").To(func(req *restful.Request, resp *restful.Response) {
|
||||||
|
handlePprofEndpoint(req, resp)
|
||||||
|
})).Doc("pprof endpoint")
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
|
||||||
s.mux.HandleFunc("/logs/", s.handleLogs)
|
|
||||||
s.mux.HandleFunc("/containerLogs/", s.handleContainerLogs)
|
|
||||||
s.mux.Handle("/metrics", prometheus.Handler())
|
|
||||||
// The /runningpods endpoint is used for testing only.
|
// The /runningpods endpoint is used for testing only.
|
||||||
s.mux.HandleFunc("/runningpods", s.handleRunningPods)
|
ws = new(restful.WebService)
|
||||||
|
ws.
|
||||||
|
Path("/runningpods/").
|
||||||
|
Produces(restful.MIME_JSON)
|
||||||
|
ws.Route(ws.GET("").
|
||||||
|
To(s.getRunningPods).
|
||||||
|
Operation("getRunningPods"))
|
||||||
|
s.restfulCont.Add(ws)
|
||||||
|
}
|
||||||
|
|
||||||
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
|
type httpHandler struct {
|
||||||
s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
f func(w http.ResponseWriter, r *http.Request)
|
||||||
s.mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
}
|
||||||
|
|
||||||
|
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
h.f(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
// error serializes an error object into an HTTP response.
|
// error serializes an error object into an HTTP response.
|
||||||
@ -211,48 +320,35 @@ func (s *Server) syncLoopHealthCheck(req *http.Request) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleContainerLogs handles containerLogs request against the Kubelet
|
// getContainerLogs handles containerLogs request against the Kubelet
|
||||||
func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getContainerLogs(request *restful.Request, response *restful.Response) {
|
||||||
defer req.Body.Close()
|
podNamespace := request.PathParameter("podNamespace")
|
||||||
u, err := url.ParseRequestURI(req.RequestURI)
|
podID := request.PathParameter("podID")
|
||||||
if err != nil {
|
containerName := request.PathParameter("containerName")
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
parts := strings.Split(u.Path, "/")
|
|
||||||
|
|
||||||
// req URI: /containerLogs/<podNamespace>/<podID>/<containerName>
|
|
||||||
var podNamespace, podID, containerName string
|
|
||||||
if len(parts) == 5 {
|
|
||||||
podNamespace = parts[2]
|
|
||||||
podID = parts[3]
|
|
||||||
containerName = parts[4]
|
|
||||||
} else {
|
|
||||||
http.Error(w, "Unexpected path for command running", http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(podID) == 0 {
|
if len(podID) == 0 {
|
||||||
http.Error(w, `{"message": "Missing podID."}`, http.StatusBadRequest)
|
// TODO: Why return JSON when the rest return plaintext errors?
|
||||||
|
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podID."}`))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(containerName) == 0 {
|
if len(containerName) == 0 {
|
||||||
http.Error(w, `{"message": "Missing container name."}`, http.StatusBadRequest)
|
// TODO: Why return JSON when the rest return plaintext errors?
|
||||||
|
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing container name."}`))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(podNamespace) == 0 {
|
if len(podNamespace) == 0 {
|
||||||
http.Error(w, `{"message": "Missing podNamespace."}`, http.StatusBadRequest)
|
// TODO: Why return JSON when the rest return plaintext errors?
|
||||||
|
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Missing podNamespace."}`))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
uriValues := u.Query()
|
follow, _ := strconv.ParseBool(request.QueryParameter("follow"))
|
||||||
follow, _ := strconv.ParseBool(uriValues.Get("follow"))
|
previous, _ := strconv.ParseBool(request.QueryParameter("previous"))
|
||||||
previous, _ := strconv.ParseBool(uriValues.Get("previous"))
|
tail := request.QueryParameter("tail")
|
||||||
tail := uriValues.Get("tail")
|
|
||||||
|
|
||||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(w, fmt.Sprintf("Pod %q does not exist", podID), http.StatusNotFound)
|
response.WriteError(http.StatusNotFound, fmt.Errorf("Pod %q does not exist", podID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Check if containerName is valid.
|
// Check if containerName is valid.
|
||||||
@ -263,20 +359,20 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !containerExists {
|
if !containerExists {
|
||||||
http.Error(w, fmt.Sprintf("Container %q not found in Pod %q", containerName, podID), http.StatusNotFound)
|
response.WriteError(http.StatusNotFound, fmt.Errorf("Container %q not found in Pod %q", containerName, podID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := w.(http.Flusher); !ok {
|
if _, ok := response.ResponseWriter.(http.Flusher); !ok {
|
||||||
s.error(w, fmt.Errorf("unable to convert %v into http.Flusher", w))
|
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher", response))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fw := flushwriter.Wrap(w)
|
fw := flushwriter.Wrap(response)
|
||||||
w.Header().Set("Transfer-Encoding", "chunked")
|
response.Header().Set("Transfer-Encoding", "chunked")
|
||||||
w.WriteHeader(http.StatusOK)
|
response.WriteHeader(http.StatusOK)
|
||||||
err = s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, previous, fw, fw)
|
err := s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, tail, follow, previous, fw, fw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.error(w, err)
|
response.WriteError(http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -291,34 +387,32 @@ func encodePods(pods []*api.Pod) (data []byte, err error) {
|
|||||||
return latest.Codec.Encode(podList)
|
return latest.Codec.Encode(podList)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handlePods returns a list of pods bound to the Kubelet and their spec.
|
// getPods returns a list of pods bound to the Kubelet and their spec.
|
||||||
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getPods(request *restful.Request, response *restful.Response) {
|
||||||
pods := s.host.GetPods()
|
pods := s.host.GetPods()
|
||||||
data, err := encodePods(pods)
|
data, err := encodePods(pods)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.error(w, err)
|
response.WriteError(http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Add("Content-type", "application/json")
|
response.Write(data)
|
||||||
w.Write(data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleRunningPods returns a list of pods running on Kubelet. The list is
|
// getRunningPods returns a list of pods running on Kubelet. The list is
|
||||||
// provided by the container runtime, and is different from the list returned
|
// provided by the container runtime, and is different from the list returned
|
||||||
// by handlePods, which is a set of desired pods to run.
|
// by getPods, which is a set of desired pods to run.
|
||||||
func (s *Server) handleRunningPods(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getRunningPods(request *restful.Request, response *restful.Response) {
|
||||||
pods, err := s.host.GetRunningPods()
|
pods, err := s.host.GetRunningPods()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.error(w, err)
|
response.WriteError(http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
data, err := encodePods(pods)
|
data, err := encodePods(pods)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.error(w, err)
|
response.WriteError(http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Add("Content-type", "application/json")
|
response.Write(data)
|
||||||
w.Write(data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleStats handles stats requests against the Kubelet.
|
// handleStats handles stats requests against the Kubelet.
|
||||||
@ -326,69 +420,42 @@ func (s *Server) handleStats(w http.ResponseWriter, req *http.Request) {
|
|||||||
s.serveStats(w, req)
|
s.serveStats(w, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleLogs handles logs requests against the Kubelet.
|
// getLogs handles logs requests against the Kubelet.
|
||||||
func (s *Server) handleLogs(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getLogs(request *restful.Request, response *restful.Response) {
|
||||||
s.host.ServeLogs(w, req)
|
s.host.ServeLogs(response, request.Request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleSpec handles spec requests against the Kubelet.
|
// getSpec handles spec requests against the Kubelet.
|
||||||
func (s *Server) handleSpec(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getSpec(request *restful.Request, response *restful.Response) {
|
||||||
info, err := s.host.GetCachedMachineInfo()
|
info, err := s.host.GetCachedMachineInfo()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.error(w, err)
|
response.WriteError(http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
data, err := json.Marshal(info)
|
response.WriteEntity(info)
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
w.Header().Add("Content-type", "application/json")
|
|
||||||
w.Write(data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseContainerCoordinates(path string) (namespace, pod string, uid types.UID, container string, err error) {
|
func getContainerCoordinates(request *restful.Request) (namespace, pod string, uid types.UID, container string) {
|
||||||
parts := strings.Split(path, "/")
|
namespace = request.PathParameter("podNamespace")
|
||||||
|
pod = request.PathParameter("podID")
|
||||||
if len(parts) == 5 {
|
if uidStr := request.PathParameter("uid"); uidStr != "" {
|
||||||
namespace = parts[2]
|
uid = types.UID(uidStr)
|
||||||
pod = parts[3]
|
|
||||||
container = parts[4]
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
container = request.PathParameter("containerName")
|
||||||
if len(parts) == 6 {
|
|
||||||
namespace = parts[2]
|
|
||||||
pod = parts[3]
|
|
||||||
uid = types.UID(parts[4])
|
|
||||||
container = parts[5]
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = fmt.Errorf("Unexpected path %s. Expected /.../.../<namespace>/<pod>/<container> or /.../.../<namespace>/<pod>/<uid>/<container>", path)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
const streamCreationTimeout = 30 * time.Second
|
const streamCreationTimeout = 30 * time.Second
|
||||||
|
|
||||||
func (s *Server) handleAttach(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
|
||||||
u, err := url.ParseRequestURI(req.RequestURI)
|
podNamespace, podID, uid, container := getContainerCoordinates(request)
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
podNamespace, podID, uid, container, err := parseContainerCoordinates(u.Path)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(w, req)
|
stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(request, response)
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
}
|
}
|
||||||
@ -397,7 +464,7 @@ func (s *Server) handleAttach(w http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.host.AttachContainer(kubecontainer.GetPodFullName(pod), uid, container, stdinStream, stdoutStream, stderrStream, tty)
|
err := s.host.AttachContainer(kubecontainer.GetPodFullName(pod), uid, container, stdinStream, stdoutStream, stderrStream, tty)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("Error executing command in container: %v", err)
|
msg := fmt.Sprintf("Error executing command in container: %v", err)
|
||||||
glog.Error(msg)
|
glog.Error(msg)
|
||||||
@ -405,58 +472,41 @@ func (s *Server) handleAttach(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleRun handles requests to run a command inside a container.
|
// getRun handles requests to run a command inside a container.
|
||||||
func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getRun(request *restful.Request, response *restful.Response) {
|
||||||
u, err := url.ParseRequestURI(req.RequestURI)
|
podNamespace, podID, uid, container := getContainerCoordinates(request)
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
podNamespace, podID, uid, container, err := parseContainerCoordinates(u.Path)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
command := strings.Split(u.Query().Get("cmd"), " ")
|
command := strings.Split(request.QueryParameter("cmd"), " ")
|
||||||
data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), uid, container, command)
|
data, err := s.host.RunInContainer(kubecontainer.GetPodFullName(pod), uid, container, command)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.error(w, err)
|
response.WriteError(http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Add("Content-type", "text/plain")
|
response.Write(data)
|
||||||
w.Write(data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleExec handles requests to run a command inside a container.
|
// getExec handles requests to run a command inside a container.
|
||||||
func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
|
||||||
u, err := url.ParseRequestURI(req.RequestURI)
|
podNamespace, podID, uid, container := getContainerCoordinates(request)
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
podNamespace, podID, uid, container, err := parseContainerCoordinates(u.Path)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(w, req)
|
stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, ok := s.createStreams(request, response)
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
|
// error is handled in the createStreams function
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = s.host.ExecInContainer(kubecontainer.GetPodFullName(pod), uid, container, u.Query()[api.ExecCommandParamm], stdinStream, stdoutStream, stderrStream, tty)
|
cmd := request.Request.URL.Query()[api.ExecCommandParamm]
|
||||||
|
err := s.host.ExecInContainer(kubecontainer.GetPodFullName(pod), uid, container, cmd, stdinStream, stdoutStream, stderrStream, tty)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
msg := fmt.Sprintf("Error executing command in container: %v", err)
|
msg := fmt.Sprintf("Error executing command in container: %v", err)
|
||||||
glog.Error(msg)
|
glog.Error(msg)
|
||||||
@ -464,34 +514,33 @@ func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) createStreams(w http.ResponseWriter, req *http.Request) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, httpstream.Connection, bool, bool) {
|
func (s *Server) createStreams(request *restful.Request, response *restful.Response) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, httpstream.Connection, bool, bool) {
|
||||||
req.ParseForm()
|
|
||||||
// start at 1 for error stream
|
// start at 1 for error stream
|
||||||
expectedStreams := 1
|
expectedStreams := 1
|
||||||
if req.FormValue(api.ExecStdinParam) == "1" {
|
if request.QueryParameter(api.ExecStdinParam) == "1" {
|
||||||
expectedStreams++
|
expectedStreams++
|
||||||
}
|
}
|
||||||
if req.FormValue(api.ExecStdoutParam) == "1" {
|
if request.QueryParameter(api.ExecStdoutParam) == "1" {
|
||||||
expectedStreams++
|
expectedStreams++
|
||||||
}
|
}
|
||||||
tty := req.FormValue(api.ExecTTYParam) == "1"
|
tty := request.QueryParameter(api.ExecTTYParam) == "1"
|
||||||
if !tty && req.FormValue(api.ExecStderrParam) == "1" {
|
if !tty && request.QueryParameter(api.ExecStderrParam) == "1" {
|
||||||
expectedStreams++
|
expectedStreams++
|
||||||
}
|
}
|
||||||
|
|
||||||
if expectedStreams == 1 {
|
if expectedStreams == 1 {
|
||||||
http.Error(w, "You must specify at least 1 of stdin, stdout, stderr", http.StatusBadRequest)
|
response.WriteError(http.StatusBadRequest, fmt.Errorf("you must specify at least 1 of stdin, stdout, stderr"))
|
||||||
return nil, nil, nil, nil, nil, false, false
|
return nil, nil, nil, nil, nil, false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
streamCh := make(chan httpstream.Stream)
|
streamCh := make(chan httpstream.Stream)
|
||||||
|
|
||||||
upgrader := spdy.NewResponseUpgrader()
|
upgrader := spdy.NewResponseUpgrader()
|
||||||
conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream) error {
|
conn := upgrader.UpgradeResponse(response.ResponseWriter, request.Request, func(stream httpstream.Stream) error {
|
||||||
streamCh <- stream
|
streamCh <- stream
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
// from this point on, we can no longer call methods on w
|
// from this point on, we can no longer call methods on response
|
||||||
if conn == nil {
|
if conn == nil {
|
||||||
// The upgrader is responsible for notifying the client of any errors that
|
// The upgrader is responsible for notifying the client of any errors that
|
||||||
// occurred during upgrading. All we can do is return here at this point
|
// occurred during upgrading. All we can do is return here at this point
|
||||||
@ -547,46 +596,26 @@ WaitForStreams:
|
|||||||
return stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, true
|
return stdinStream, stdoutStream, stderrStream, errorStream, conn, tty, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func parsePodCoordinates(path string) (namespace, pod string, uid types.UID, err error) {
|
func getPodCoordinates(request *restful.Request) (namespace, pod string, uid types.UID) {
|
||||||
parts := strings.Split(path, "/")
|
namespace = request.PathParameter("podNamespace")
|
||||||
|
pod = request.PathParameter("podID")
|
||||||
if len(parts) == 4 {
|
if uidStr := request.PathParameter("uid"); uidStr != "" {
|
||||||
namespace = parts[2]
|
uid = types.UID(uidStr)
|
||||||
pod = parts[3]
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(parts) == 5 {
|
|
||||||
namespace = parts[2]
|
|
||||||
pod = parts[3]
|
|
||||||
uid = types.UID(parts[4])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = fmt.Errorf("Unexpected path %s. Expected /.../.../<namespace>/<pod> or /.../.../<namespace>/<pod>/<uid>", path)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handlePortForward(w http.ResponseWriter, req *http.Request) {
|
func (s *Server) getPortForward(request *restful.Request, response *restful.Response) {
|
||||||
u, err := url.ParseRequestURI(req.RequestURI)
|
podNamespace, podID, uid := getPodCoordinates(request)
|
||||||
if err != nil {
|
|
||||||
s.error(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
podNamespace, podID, uid, err := parsePodCoordinates(u.Path)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
response.WriteError(http.StatusNotFound, fmt.Errorf("pod does not exist"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
streamChan := make(chan httpstream.Stream, 1)
|
streamChan := make(chan httpstream.Stream, 1)
|
||||||
upgrader := spdy.NewResponseUpgrader()
|
upgrader := spdy.NewResponseUpgrader()
|
||||||
conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream) error {
|
conn := upgrader.UpgradeResponse(response.ResponseWriter, request.Request, func(stream httpstream.Stream) error {
|
||||||
portString := stream.Headers().Get(api.PortHeader)
|
portString := stream.Headers().Get(api.PortHeader)
|
||||||
port, err := strconv.ParseUint(portString, 10, 16)
|
port, err := strconv.ParseUint(portString, 10, 16)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -672,7 +701,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
http.StatusSwitchingProtocols,
|
http.StatusSwitchingProtocols,
|
||||||
),
|
),
|
||||||
).Log()
|
).Log()
|
||||||
s.mux.ServeHTTP(w, req)
|
s.restfulCont.ServeHTTP(w, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
type StatsRequest struct {
|
type StatsRequest struct {
|
||||||
@ -777,5 +806,4 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
|
|||||||
}
|
}
|
||||||
w.Header().Add("Content-type", "application/json")
|
w.Header().Add("Content-type", "application/json")
|
||||||
w.Write(data)
|
w.Write(data)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
@ -401,10 +401,10 @@ func TestServeRunInContainer(t *testing.T) {
|
|||||||
return []byte(output), nil
|
return []byte(output), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := http.Get(fw.testHTTPServer.URL + "/run/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?cmd=ls%20-a")
|
resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Got error GETing: %v", err)
|
t.Fatalf("Got error POSTing: %v", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
@ -445,10 +445,10 @@ func TestServeRunInContainerWithUID(t *testing.T) {
|
|||||||
return []byte(output), nil
|
return []byte(output), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := http.Get(fw.testHTTPServer.URL + "/run/" + podNamespace + "/" + podName + "/" + expectedUID + "/" + expectedContainerName + "?cmd=ls%20-a")
|
resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Got error GETing: %v", err)
|
t.Fatalf("Got error POSTing: %v", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
@ -681,9 +681,9 @@ func TestServeExecInContainerIdleTimeout(t *testing.T) {
|
|||||||
upgradeRoundTripper := spdy.NewSpdyRoundTripper(nil)
|
upgradeRoundTripper := spdy.NewSpdyRoundTripper(nil)
|
||||||
c := &http.Client{Transport: upgradeRoundTripper}
|
c := &http.Client{Transport: upgradeRoundTripper}
|
||||||
|
|
||||||
resp, err := c.Get(url)
|
resp, err := c.Post(url, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Got error GETing: %v", err)
|
t.Fatalf("Got error POSTing: %v", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
@ -839,9 +839,9 @@ func TestServeExecInContainer(t *testing.T) {
|
|||||||
c = &http.Client{Transport: upgradeRoundTripper}
|
c = &http.Client{Transport: upgradeRoundTripper}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err = c.Get(url)
|
resp, err = c.Post(url, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%d: Got error GETing: %v", i, err)
|
t.Fatalf("%d: Got error POSTing: %v", i, err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
@ -1070,9 +1070,9 @@ func TestServeAttachContainer(t *testing.T) {
|
|||||||
c = &http.Client{Transport: upgradeRoundTripper}
|
c = &http.Client{Transport: upgradeRoundTripper}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err = c.Get(url)
|
resp, err = c.Post(url, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%d: Got error GETing: %v", i, err)
|
t.Fatalf("%d: Got error POSTing: %v", i, err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
@ -1182,9 +1182,9 @@ func TestServePortForwardIdleTimeout(t *testing.T) {
|
|||||||
upgradeRoundTripper := spdy.NewRoundTripper(nil)
|
upgradeRoundTripper := spdy.NewRoundTripper(nil)
|
||||||
c := &http.Client{Transport: upgradeRoundTripper}
|
c := &http.Client{Transport: upgradeRoundTripper}
|
||||||
|
|
||||||
resp, err := c.Get(url)
|
resp, err := c.Post(url, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Got error GETing: %v", err)
|
t.Fatalf("Got error POSTing: %v", err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
@ -1284,9 +1284,9 @@ func TestServePortForward(t *testing.T) {
|
|||||||
upgradeRoundTripper := spdy.NewRoundTripper(nil)
|
upgradeRoundTripper := spdy.NewRoundTripper(nil)
|
||||||
c := &http.Client{Transport: upgradeRoundTripper}
|
c := &http.Client{Transport: upgradeRoundTripper}
|
||||||
|
|
||||||
resp, err := c.Get(url)
|
resp, err := c.Post(url, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("%d: Got error GETing: %v", i, err)
|
t.Fatalf("%d: Got error POSTing: %v", i, err)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user