diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index b42e2ce9a36..1413681cb6f 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "io" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/fsouza/go-dockerclient" @@ -46,6 +47,7 @@ type DockerInterface interface { StartContainer(id string, hostConfig *docker.HostConfig) error StopContainer(id string, timeout uint) error PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error + Logs(opts docker.LogsOptions) error } // DockerID is an ID of docker container. It is a type to make it clear when we're working with docker container Ids @@ -202,6 +204,26 @@ func GetRecentDockerContainersWithNameAndUUID(client DockerInterface, podFullNam return result, nil } +// GetKubeletDockerContainerLogs returns logs of specific container +func GetKubeletDockerContainerLogs(client DockerInterface, containerID, tail string, follow bool, writer io.Writer) (err error) { + opts := docker.LogsOptions{ + Container: containerID, + Stdout: true, + Stderr: true, + OutputStream: writer, + ErrorStream: writer, + Timestamps: true, + RawTerminal: true, + } + + if opts.Follow = follow; follow == false { + opts.Tail = tail + } + + err = client.Logs(opts) + return +} + // ErrNoContainersInPod is returned when there are no containers for a given pod var ErrNoContainersInPod = errors.New("no containers exist for this pod") diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index c5d36a50570..2c222001c9b 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -111,6 +111,15 @@ func (f *FakeDockerClient) StopContainer(id string, timeout uint) error { return f.Err } +// Logs is a test-spy implementation of DockerInterface.Logs. +// It adds an entry "logs" to the internal method call record. +func (f *FakeDockerClient) Logs(opts docker.LogsOptions) error { + f.Lock() + defer f.Unlock() + f.called = append(f.called, "logs") + return f.Err +} + // PullImage is a test-spy implementation of DockerInterface.StopContainer. // It adds an entry "pull" to the internal method call record. func (f *FakeDockerClient) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error { diff --git a/pkg/kubelet/handlers.go b/pkg/kubelet/handlers.go index bf207c07f90..2936b963176 100644 --- a/pkg/kubelet/handlers.go +++ b/pkg/kubelet/handlers.go @@ -20,6 +20,8 @@ import ( "fmt" "net" "strconv" + "net/http" + "io" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -95,3 +97,21 @@ func (h *httpActionHandler) Run(podFullName, uuid string, container *api.Contain _, err := h.client.Get(url) return err } + +// flusherWriter provides wrapper for responseWriter with HTTP streaming capabilities +type FlushWriter struct { + flusher http.Flusher + writer io.Writer +} + +// Write is a flushWriter implementation of the io.Writer that sends any buffered data to the client. +func (fw *FlushWriter) Write(p []byte) (n int, err error) { + n, err = fw.writer.Write(p) + if err != nil { + return n, err + } + if fw.flusher != nil { + fw.flusher.Flush() + } + return +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ee0edd6c2ec..d6e342454e5 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -26,6 +26,7 @@ import ( "strings" "sync" "time" + "io" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" @@ -751,6 +752,11 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai return cinfo, nil } +// GetKubeletContainerLogs returns logs from the container +func (kl *Kubelet) GetKubeletContainerLogs(containerID, tail string, follow bool, writer io.Writer) error { + return dockertools.GetKubeletDockerContainerLogs(kl.dockerClient, containerID, tail , follow, writer) +} + // GetPodInfo returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) { return dockertools.GetDockerPodInfo(kl.dockerClient, podFullName, uuid) diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 14ea087515a..c910f0cb86d 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -68,6 +68,7 @@ type HostInterface interface { GetMachineInfo() (*info.MachineInfo, error) GetPodInfo(name, uuid string) (api.PodInfo, error) RunInContainer(name, uuid, container string, cmd []string) ([]byte, error) + GetKubeletContainerLogs(containerID, tail string, follow bool, writer io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) } @@ -92,6 +93,7 @@ func (s *Server) InstallDefaultHandlers() { s.mux.HandleFunc("/logs/", s.handleLogs) s.mux.HandleFunc("/spec/", s.handleSpec) s.mux.HandleFunc("/run/", s.handleRun) + s.mux.HandleFunc("/containerLogs", s.handleContainerLogs) } // error serializes an error object into an HTTP response. @@ -143,7 +145,45 @@ func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) { } -// handlePodInfo handles podInfo requests against the Kubelet. +// handleContainerLogs handles containerLogs request againts the Kubelet +func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { + defer req.Body.Close() + u, err := url.ParseRequestURI(req.RequestURI) + if err != nil { + s.error(w, err) + return + } + uriValues := u.Query() + + containerID := uriValues.Get("containerID") + follow := uriValues.Get("follow") == "1" + tail := uriValues.Get("tail") + + if len(containerID) == 0 { + w.WriteHeader(http.StatusBadRequest) + http.Error(w, "Missing 'containerID=' query entry.", http.StatusBadRequest) + return + } + logWriter := httplog.LogOf(req, w) + w = httplog.Unlogged(w) + fw := FlushWriter{writer: w} + if flusher, ok := w.(http.Flusher); ok { + fw.flusher = flusher + } else { + logWriter.Addf("unable to get Flusher") + http.NotFound(w, req) + return + } + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + err = s.host.GetKubeletContainerLogs(containerID, tail, follow, &fw) + if err != nil { + s.error(w, err) + return + } +} + +// handlePodInfo handles podInfo requests against the Kubelet func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) { u, err := url.ParseRequestURI(req.RequestURI) if err != nil { diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index a47fb2d1770..baec2523ecd 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -27,6 +27,7 @@ import ( "reflect" "strings" "testing" + "io" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -35,12 +36,17 @@ import ( ) type fakeKubelet struct { - infoFunc func(name string) (api.PodInfo, error) - containerInfoFunc func(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) - rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error) - machineInfoFunc func() (*info.MachineInfo, error) - logFunc func(w http.ResponseWriter, req *http.Request) - runFunc func(podFullName, uuid, containerName string, cmd []string) ([]byte, error) + infoFunc func(name string) (api.PodInfo, error) + containerInfoFunc func(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) + rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error) + machineInfoFunc func() (*info.MachineInfo, error) + logFunc func(w http.ResponseWriter, req *http.Request) + runFunc func(podFullName, uuid, containerName string, cmd []string) ([]byte, error) + containerLogsFunc func(containerID, tail string, follow bool, writer io.Writer) error +} + +func (fk *fakeKubelet) GetKubeletContainerLogs(containerID, tail string, follow bool, writer io.Writer) error { + return fk.containerLogsFunc(containerID, tail, follow, writer) } func (fk *fakeKubelet) GetPodInfo(name, uuid string) (api.PodInfo, error) {