mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-11 06:02:18 +00:00
Add streaming command execution & port forwarding
Add streaming command execution & port forwarding via HTTP connection upgrades (currently using SPDY).
This commit is contained in:
@@ -27,6 +27,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
@@ -34,6 +35,8 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
|
||||
"github.com/golang/glog"
|
||||
"github.com/google/cadvisor/info"
|
||||
)
|
||||
@@ -69,8 +72,11 @@ type HostInterface interface {
|
||||
GetPodByName(namespace, name string) (*api.BoundPod, bool)
|
||||
GetPodStatus(name string, uid types.UID) (api.PodStatus, error)
|
||||
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
|
||||
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
|
||||
GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
|
||||
ServeLogs(w http.ResponseWriter, req *http.Request)
|
||||
PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error
|
||||
StreamingConnectionIdleTimeout() time.Duration
|
||||
}
|
||||
|
||||
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
|
||||
@@ -99,6 +105,8 @@ func (s *Server) InstallDefaultHandlers() {
|
||||
// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
|
||||
func (s *Server) InstallDebuggingHandlers() {
|
||||
s.mux.HandleFunc("/run/", s.handleRun)
|
||||
s.mux.HandleFunc("/exec/", s.handleExec)
|
||||
s.mux.HandleFunc("/portForward/", s.handlePortForward)
|
||||
|
||||
s.mux.HandleFunc("/logs/", s.handleLogs)
|
||||
s.mux.HandleFunc("/containerLogs/", s.handleContainerLogs)
|
||||
@@ -301,6 +309,28 @@ func (s *Server) handleSpec(w http.ResponseWriter, req *http.Request) {
|
||||
w.Write(data)
|
||||
}
|
||||
|
||||
func parseContainerCoordinates(path string) (namespace, pod string, uid types.UID, container string, err error) {
|
||||
parts := strings.Split(path, "/")
|
||||
|
||||
if len(parts) == 5 {
|
||||
namespace = parts[2]
|
||||
pod = parts[3]
|
||||
container = parts[4]
|
||||
return
|
||||
}
|
||||
|
||||
if len(parts) == 6 {
|
||||
namespace = parts[2]
|
||||
pod = parts[3]
|
||||
uid = types.UID(parts[4])
|
||||
container = parts[5]
|
||||
return
|
||||
}
|
||||
|
||||
err = fmt.Errorf("Unexpected path %s. Expected /.../.../<namespace>/<pod>/<container> or /.../.../<namespace>/<pod>/<uid>/<container>", path)
|
||||
return
|
||||
}
|
||||
|
||||
// handleRun handles requests to run a command inside a container.
|
||||
func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
|
||||
u, err := url.ParseRequestURI(req.RequestURI)
|
||||
@@ -308,20 +338,9 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
|
||||
s.error(w, err)
|
||||
return
|
||||
}
|
||||
parts := strings.Split(u.Path, "/")
|
||||
var podNamespace, podID, container string
|
||||
var uid types.UID
|
||||
if len(parts) == 5 {
|
||||
podNamespace = parts[2]
|
||||
podID = parts[3]
|
||||
container = parts[4]
|
||||
} else if len(parts) == 6 {
|
||||
podNamespace = parts[2]
|
||||
podID = parts[3]
|
||||
uid = types.UID(parts[4])
|
||||
container = parts[5]
|
||||
} else {
|
||||
http.Error(w, "Unexpected path for command running", http.StatusBadRequest)
|
||||
podNamespace, podID, uid, container, err := parseContainerCoordinates(u.Path)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||
@@ -339,6 +358,227 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
|
||||
w.Write(data)
|
||||
}
|
||||
|
||||
// handleExec handles requests to run a command inside a container.
|
||||
func (s *Server) handleExec(w http.ResponseWriter, req *http.Request) {
|
||||
u, err := url.ParseRequestURI(req.RequestURI)
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
}
|
||||
podNamespace, podID, uid, container, err := parseContainerCoordinates(u.Path)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||
if !ok {
|
||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
req.ParseForm()
|
||||
// start at 1 for error stream
|
||||
expectedStreams := 1
|
||||
if req.FormValue(api.ExecStdinParam) == "1" {
|
||||
expectedStreams++
|
||||
}
|
||||
if req.FormValue(api.ExecStdoutParam) == "1" {
|
||||
expectedStreams++
|
||||
}
|
||||
tty := req.FormValue(api.ExecTTYParam) == "1"
|
||||
if !tty && req.FormValue(api.ExecStderrParam) == "1" {
|
||||
expectedStreams++
|
||||
}
|
||||
|
||||
if expectedStreams == 1 {
|
||||
http.Error(w, "You must specify at least 1 of stdin, stdout, stderr", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
streamCh := make(chan httpstream.Stream)
|
||||
|
||||
upgrader := spdy.NewResponseUpgrader()
|
||||
conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream) error {
|
||||
streamCh <- stream
|
||||
return nil
|
||||
})
|
||||
// from this point on, we can no longer call methods on w
|
||||
if conn == nil {
|
||||
// The upgrader is responsible for notifying the client of any errors that
|
||||
// occurred during upgrading. All we can do is return here at this point
|
||||
// if we weren't successful in upgrading.
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
conn.SetIdleTimeout(s.host.StreamingConnectionIdleTimeout())
|
||||
|
||||
// TODO find a good default timeout value
|
||||
// TODO make it configurable?
|
||||
expired := time.NewTimer(2 * time.Second)
|
||||
|
||||
var errorStream, stdinStream, stdoutStream, stderrStream httpstream.Stream
|
||||
receivedStreams := 0
|
||||
WaitForStreams:
|
||||
for {
|
||||
select {
|
||||
case stream := <-streamCh:
|
||||
streamType := stream.Headers().Get(api.StreamType)
|
||||
switch streamType {
|
||||
case api.StreamTypeError:
|
||||
errorStream = stream
|
||||
defer errorStream.Reset()
|
||||
receivedStreams++
|
||||
case api.StreamTypeStdin:
|
||||
stdinStream = stream
|
||||
receivedStreams++
|
||||
case api.StreamTypeStdout:
|
||||
stdoutStream = stream
|
||||
receivedStreams++
|
||||
case api.StreamTypeStderr:
|
||||
stderrStream = stream
|
||||
receivedStreams++
|
||||
default:
|
||||
glog.Errorf("Unexpected stream type: '%s'", streamType)
|
||||
}
|
||||
if receivedStreams == expectedStreams {
|
||||
break WaitForStreams
|
||||
}
|
||||
case <-expired.C:
|
||||
// TODO find a way to return the error to the user. Maybe use a separate
|
||||
// stream to report errors?
|
||||
glog.Error("Timed out waiting for client to create streams")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if stdinStream != nil {
|
||||
// close our half of the input stream, since we won't be writing to it
|
||||
stdinStream.Close()
|
||||
}
|
||||
|
||||
err = s.host.ExecInContainer(GetPodFullName(pod), uid, container, u.Query()[api.ExecCommandParamm], stdinStream, stdoutStream, stderrStream, tty)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("Error executing command in container: %v", err)
|
||||
glog.Error(msg)
|
||||
errorStream.Write([]byte(msg))
|
||||
}
|
||||
}
|
||||
|
||||
func parsePodCoordinates(path string) (namespace, pod string, uid types.UID, err error) {
|
||||
parts := strings.Split(path, "/")
|
||||
|
||||
if len(parts) == 4 {
|
||||
namespace = parts[2]
|
||||
pod = parts[3]
|
||||
return
|
||||
}
|
||||
|
||||
if len(parts) == 5 {
|
||||
namespace = parts[2]
|
||||
pod = parts[3]
|
||||
uid = types.UID(parts[4])
|
||||
return
|
||||
}
|
||||
|
||||
err = fmt.Errorf("Unexpected path %s. Expected /.../.../<namespace>/<pod> or /.../.../<namespace>/<pod>/<uid>", path)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) handlePortForward(w http.ResponseWriter, req *http.Request) {
|
||||
u, err := url.ParseRequestURI(req.RequestURI)
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
}
|
||||
podNamespace, podID, uid, err := parsePodCoordinates(u.Path)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
pod, ok := s.host.GetPodByName(podNamespace, podID)
|
||||
if !ok {
|
||||
http.Error(w, "Pod does not exist", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
streamChan := make(chan httpstream.Stream, 1)
|
||||
upgrader := spdy.NewResponseUpgrader()
|
||||
conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream) error {
|
||||
portString := stream.Headers().Get(api.PortHeader)
|
||||
port, err := strconv.ParseUint(portString, 10, 16)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to parse '%s' as a port: %v", portString, err)
|
||||
}
|
||||
if port < 1 {
|
||||
return fmt.Errorf("Port '%d' must be greater than 0", port)
|
||||
}
|
||||
streamChan <- stream
|
||||
return nil
|
||||
})
|
||||
if conn == nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
conn.SetIdleTimeout(s.host.StreamingConnectionIdleTimeout())
|
||||
|
||||
var dataStreamLock sync.Mutex
|
||||
dataStreamChans := make(map[string]chan httpstream.Stream)
|
||||
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-conn.CloseChan():
|
||||
break Loop
|
||||
case stream := <-streamChan:
|
||||
streamType := stream.Headers().Get(api.StreamType)
|
||||
port := stream.Headers().Get(api.PortHeader)
|
||||
dataStreamLock.Lock()
|
||||
switch streamType {
|
||||
case "error":
|
||||
ch := make(chan httpstream.Stream)
|
||||
dataStreamChans[port] = ch
|
||||
go waitForPortForwardDataStreamAndRun(GetPodFullName(pod), uid, stream, ch, s.host)
|
||||
case "data":
|
||||
ch, ok := dataStreamChans[port]
|
||||
if ok {
|
||||
ch <- stream
|
||||
delete(dataStreamChans, port)
|
||||
} else {
|
||||
glog.Errorf("Unable to locate data stream channel for port %s", port)
|
||||
}
|
||||
default:
|
||||
glog.Errorf("streamType header must be 'error' or 'data', got: '%s'", streamType)
|
||||
stream.Reset()
|
||||
}
|
||||
dataStreamLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForPortForwardDataStreamAndRun(pod string, uid types.UID, errorStream httpstream.Stream, dataStreamChan chan httpstream.Stream, host HostInterface) {
|
||||
defer errorStream.Reset()
|
||||
|
||||
var dataStream httpstream.Stream
|
||||
|
||||
select {
|
||||
case dataStream = <-dataStreamChan:
|
||||
case <-time.After(1 * time.Second):
|
||||
errorStream.Write([]byte("Timed out waiting for data stream"))
|
||||
//TODO delete from dataStreamChans[port]
|
||||
return
|
||||
}
|
||||
|
||||
portString := dataStream.Headers().Get(api.PortHeader)
|
||||
port, _ := strconv.ParseUint(portString, 10, 16)
|
||||
err := host.PortForward(pod, uid, uint16(port), dataStream)
|
||||
if err != nil {
|
||||
msg := fmt.Errorf("Error forwarding port %d to pod %s, uid %v: %v", port, pod, uid, err)
|
||||
glog.Error(msg)
|
||||
errorStream.Write([]byte(msg.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
// ServeHTTP responds to HTTP requests on the Kubelet.
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
defer httplog.NewLogged(req, &w).StacktraceWhen(
|
||||
@@ -347,6 +587,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
http.StatusMovedPermanently,
|
||||
http.StatusTemporaryRedirect,
|
||||
http.StatusNotFound,
|
||||
http.StatusSwitchingProtocols,
|
||||
),
|
||||
).Log()
|
||||
s.mux.ServeHTTP(w, req)
|
||||
|
Reference in New Issue
Block a user