From 390b158db968fc18a3878ffeb81013a2a8463bc3 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Thu, 9 Aug 2018 17:12:26 -0700 Subject: [PATCH] kubelet: plumb context for log requests This allows kubelets to stop the necessary work when the context has been canceled (e.g., connection closed), and not leaking a goroutine and inotify watcher waiting indefinitely. --- pkg/kubelet/container/runtime.go | 3 ++- pkg/kubelet/container/testing/fake_runtime.go | 3 ++- pkg/kubelet/container/testing/runtime_mock.go | 3 ++- pkg/kubelet/dockershim/docker_legacy_service.go | 7 ++++--- pkg/kubelet/kubelet_pods.go | 7 ++++--- pkg/kubelet/kuberuntime/kuberuntime_container.go | 7 ++++--- pkg/kubelet/kuberuntime/kuberuntime_logs.go | 5 +++-- pkg/kubelet/kuberuntime/logs/logs.go | 9 ++++++--- pkg/kubelet/server/server.go | 6 ++++-- pkg/kubelet/server/server_test.go | 9 +++++---- 10 files changed, 36 insertions(+), 23 deletions(-) diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 70b72024c9c..4859342abcb 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -17,6 +17,7 @@ limitations under the License. package container import ( + "context" "fmt" "io" "net/url" @@ -113,7 +114,7 @@ type Runtime interface { // default, it returns a snapshot of the container log. Set 'follow' to true to // stream the log. Set 'follow' to false and specify the number of lines (e.g. // "100" or "all") to tail the log. - GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) + GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) // Delete a container. If the container is still running, an error is returned. DeleteContainer(containerID ContainerID) error // ImageService provides methods to image-related methods. diff --git a/pkg/kubelet/container/testing/fake_runtime.go b/pkg/kubelet/container/testing/fake_runtime.go index 707ee1ac456..d4db03724d4 100644 --- a/pkg/kubelet/container/testing/fake_runtime.go +++ b/pkg/kubelet/container/testing/fake_runtime.go @@ -17,6 +17,7 @@ limitations under the License. package testing import ( + "context" "fmt" "io" "net/url" @@ -289,7 +290,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS return &status, f.Err } -func (f *FakeRuntime) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { +func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { f.Lock() defer f.Unlock() diff --git a/pkg/kubelet/container/testing/runtime_mock.go b/pkg/kubelet/container/testing/runtime_mock.go index 7adbccae3ca..43db88521cd 100644 --- a/pkg/kubelet/container/testing/runtime_mock.go +++ b/pkg/kubelet/container/testing/runtime_mock.go @@ -17,6 +17,7 @@ limitations under the License. package testing import ( + "context" "io" "time" @@ -100,7 +101,7 @@ func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout, return args.Error(0) } -func (r *Mock) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { +func (r *Mock) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { args := r.Called(pod, containerID, logOptions, stdout, stderr) return args.Error(0) } diff --git a/pkg/kubelet/dockershim/docker_legacy_service.go b/pkg/kubelet/dockershim/docker_legacy_service.go index df756a5f44f..78616c21f24 100644 --- a/pkg/kubelet/dockershim/docker_legacy_service.go +++ b/pkg/kubelet/dockershim/docker_legacy_service.go @@ -17,6 +17,7 @@ limitations under the License. package dockershim import ( + "context" "fmt" "io" "strconv" @@ -39,7 +40,7 @@ import ( // more functions. type DockerLegacyService interface { // GetContainerLogs gets logs for a specific container. - GetContainerLogs(*v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error + GetContainerLogs(context.Context, *v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error // IsCRISupportedLogDriver checks whether the logging driver used by docker is // supported by native CRI integration. @@ -50,7 +51,7 @@ type DockerLegacyService interface { } // GetContainerLogs get container logs directly from docker daemon. -func (d *dockerService) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { +func (d *dockerService) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { container, err := d.client.InspectContainer(containerID.ID) if err != nil { return err @@ -97,7 +98,7 @@ func (d *dockerService) GetContainerLogTail(uid kubetypes.UID, name, namespace s Namespace: namespace, }, } - err := d.GetContainerLogs(pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf) + err := d.GetContainerLogs(context.Background(), pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf) if err != nil { return "", err } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 9265f624141..1fed05dfb52 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -18,6 +18,7 @@ package kubelet import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -1159,7 +1160,7 @@ func (kl *Kubelet) validateContainerLogStatus(podName string, podStatus *v1.PodS // GetKubeletContainerLogs returns logs from the container // TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt // or all of them. -func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { +func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { // Pod workers periodically write status to statusManager. If status is not // cached there, something is wrong (or kubelet just restarted and hasn't // caught up yet). Just assume the pod is not ready yet. @@ -1205,9 +1206,9 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, lo // dockerLegacyService should only be non-nil when we actually need it, so // inject it into the runtimeService. // TODO(random-liu): Remove this hack after deprecating unsupported log driver. - return kl.dockerLegacyService.GetContainerLogs(pod, containerID, logOptions, stdout, stderr) + return kl.dockerLegacyService.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr) } - return kl.containerRuntime.GetContainerLogs(pod, containerID, logOptions, stdout, stderr) + return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr) } // getPhase returns the phase of a pod given its container info. diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 4f2444dd82a..368dba73ced 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -17,6 +17,7 @@ limitations under the License. package kuberuntime import ( + "context" "errors" "fmt" "io" @@ -366,7 +367,7 @@ func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessag func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) string { value := int64(kubecontainer.MaxContainerTerminationMessageLogLines) buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength) - if err := m.ReadLogs(path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil { + if err := m.ReadLogs(context.Background(), path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil { return fmt.Sprintf("Error on reading termination message from logs: %v", err) } return buf.String() @@ -730,13 +731,13 @@ func findNextInitContainerToRun(pod *v1.Pod, podStatus *kubecontainer.PodStatus) } // GetContainerLogs returns logs of a specific container. -func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { +func (m *kubeGenericRuntimeManager) GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) { status, err := m.runtimeService.ContainerStatus(containerID.ID) if err != nil { glog.V(4).Infof("failed to get container status for %v: %v", containerID.String(), err) return fmt.Errorf("Unable to retrieve container logs for %v", containerID.String()) } - return m.ReadLogs(status.GetLogPath(), containerID.ID, logOptions, stdout, stderr) + return m.ReadLogs(ctx, status.GetLogPath(), containerID.ID, logOptions, stdout, stderr) } // GetExec gets the endpoint the runtime will serve the exec request from. diff --git a/pkg/kubelet/kuberuntime/kuberuntime_logs.go b/pkg/kubelet/kuberuntime/kuberuntime_logs.go index ee8dc4c81fe..b4bc61341b7 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_logs.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_logs.go @@ -17,6 +17,7 @@ limitations under the License. package kuberuntime import ( + "context" "io" "time" @@ -27,9 +28,9 @@ import ( // ReadLogs read the container log and redirect into stdout and stderr. // Note that containerID is only needed when following the log, or else // just pass in empty string "". -func (m *kubeGenericRuntimeManager) ReadLogs(path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error { +func (m *kubeGenericRuntimeManager) ReadLogs(ctx context.Context, path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error { // Convert v1.PodLogOptions into internal log options. opts := logs.NewLogOptions(apiOpts, time.Now()) - return logs.ReadLogs(path, containerID, opts, m.runtimeService, stdout, stderr) + return logs.ReadLogs(ctx, path, containerID, opts, m.runtimeService, stdout, stderr) } diff --git a/pkg/kubelet/kuberuntime/logs/logs.go b/pkg/kubelet/kuberuntime/logs/logs.go index b948c883b40..c887e278086 100644 --- a/pkg/kubelet/kuberuntime/logs/logs.go +++ b/pkg/kubelet/kuberuntime/logs/logs.go @@ -19,6 +19,7 @@ package logs import ( "bufio" "bytes" + "context" "encoding/json" "errors" "fmt" @@ -266,7 +267,7 @@ func (w *logWriter) write(msg *logMessage) error { // ReadLogs read the container log and redirect into stdout and stderr. // Note that containerID is only needed when following the log, or else // just pass in empty string "". -func ReadLogs(path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error { +func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error { f, err := os.Open(path) if err != nil { return fmt.Errorf("failed to open log file %q: %v", path, err) @@ -317,7 +318,7 @@ func ReadLogs(path, containerID string, opts *LogOptions, runtimeService interna } } // Wait until the next log change. - if found, err := waitLogs(containerID, watcher, runtimeService); !found { + if found, err := waitLogs(ctx, containerID, watcher, runtimeService); !found { return err } continue @@ -371,7 +372,7 @@ func isContainerRunning(id string, r internalapi.RuntimeService) (bool, error) { // waitLogs wait for the next log write. It returns a boolean and an error. The boolean // indicates whether a new log is found; the error is error happens during waiting new logs. -func waitLogs(id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) { +func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) { // no need to wait if the pod is not running if running, err := isContainerRunning(id, runtimeService); !running { return false, err @@ -379,6 +380,8 @@ func waitLogs(id string, w *fsnotify.Watcher, runtimeService internalapi.Runtime errRetry := 5 for { select { + case <-ctx.Done(): + return false, fmt.Errorf("context cancelled") case e := <-w.Events: switch e.Op { case fsnotify.Write: diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index feaca8531a6..2a945b8ace0 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "crypto/tls" "fmt" "io" @@ -172,7 +173,7 @@ type HostInterface interface { GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) GetRunningPods() ([]*v1.Pod, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) - GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error + GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error ServeLogs(w http.ResponseWriter, req *http.Request) ResyncInterval() time.Duration GetHostname() string @@ -457,6 +458,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re podNamespace := request.PathParameter("podNamespace") podID := request.PathParameter("podID") containerName := request.PathParameter("containerName") + ctx := request.Request.Context() if len(podID) == 0 { // TODO: Why return JSON when the rest return plaintext errors? @@ -528,7 +530,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re } fw := flushwriter.Wrap(response.ResponseWriter) response.Header().Set("Transfer-Encoding", "chunked") - if err := s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil { + if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil { response.WriteError(http.StatusBadRequest, err) return } diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index e84bec4d649..e3c54a07bce 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -18,6 +18,7 @@ package server import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -80,7 +81,7 @@ type fakeKubelet struct { getAttachCheck func(string, types.UID, string, remotecommandserver.Options) getPortForwardCheck func(string, string, types.UID, portforward.V4Options) - containerLogsFunc func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error + containerLogsFunc func(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error hostnameFunc func() string resyncInterval time.Duration loopEntryTime time.Time @@ -128,8 +129,8 @@ func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { fk.logFunc(w, req) } -func (fk *fakeKubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { - return fk.containerLogsFunc(podFullName, containerName, logOptions, stdout, stderr) +func (fk *fakeKubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { + return fk.containerLogsFunc(ctx, podFullName, containerName, logOptions, stdout, stderr) } func (fk *fakeKubelet) GetHostname() string { @@ -983,7 +984,7 @@ func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) } func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodName, expectedContainerName string, expectedLogOptions *v1.PodLogOptions, output string) { - fw.fakeKubelet.containerLogsFunc = func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { + fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { if podFullName != expectedPodName { t.Errorf("expected %s, got %s", expectedPodName, podFullName) }