diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 67e9f4d7567..d24c425b966 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -162,6 +162,7 @@ filegroup( "//test/e2e/framework/ginkgowrapper:all-srcs", "//test/e2e/framework/ingress:all-srcs", "//test/e2e/framework/metrics:all-srcs", + "//test/e2e/framework/podlogs:all-srcs", "//test/e2e/framework/providers/aws:all-srcs", "//test/e2e/framework/providers/azure:all-srcs", "//test/e2e/framework/providers/gce:all-srcs", diff --git a/test/e2e/framework/podlogs/BUILD b/test/e2e/framework/podlogs/BUILD new file mode 100644 index 00000000000..404f6b7a98d --- /dev/null +++ b/test/e2e/framework/podlogs/BUILD @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["podlogs.go"], + importpath = "k8s.io/kubernetes/test/e2e/framework/podlogs", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/github.com/pkg/errors:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/test/e2e/framework/podlogs/podlogs.go b/test/e2e/framework/podlogs/podlogs.go new file mode 100644 index 00000000000..77fae293af0 --- /dev/null +++ b/test/e2e/framework/podlogs/podlogs.go @@ -0,0 +1,263 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package podlogs enables live capturing of all events and log +// messages for some or all pods in a namespace as they get generated. +// This helps debugging both a running test (what is currently going +// on?) and the output of a CI run (events appear in chronological +// order and output that normally isn't available like the command +// stdout messages are available). +package podlogs + +import ( + "bufio" + "bytes" + "context" + "fmt" + "github.com/pkg/errors" + "io" + "os" + "path" + "regexp" + "strings" + "sync" + + "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" +) + +// LogsForPod starts reading the logs for a certain pod. If the pod has more than one +// container, opts.Container must be set. Reading stops when the context is done. +// The stream includes formatted error messages and ends with +// rpc error: code = Unknown desc = Error: No such container: 41a... +// when the pod gets deleted while streaming. +func LogsForPod(ctx context.Context, cs clientset.Interface, ns, pod string, opts *v1.PodLogOptions) (io.ReadCloser, error) { + req := cs.Core().Pods(ns).GetLogs(pod, opts) + return req.Context(ctx).Stream() +} + +// LogOutput determines where output from CopyAllLogs goes. +type LogOutput struct { + // If not nil, errors will be logged here. + StatusWriter io.Writer + + // If not nil, all output goes to this writer with "/:" as prefix. + LogWriter io.Writer + + // Base directory for one log file per container. + // The full path of each log file will be -.log. + LogPathPrefix string +} + +// Matches harmless errors from pkg/kubelet/kubelet_pods.go. +var expectedErrors = regexp.MustCompile(`container .* in pod .* is (terminated|waiting to start|not available)|the server could not find the requested resource`) + +// CopyAllLogs follows the logs of all containers in all pods, +// including those that get created in the future, and writes each log +// line as configured in the output options. It does that until the +// context is done or until an error occurs. +// +// Beware that there is currently no way to force log collection +// before removing pods, which means that there is a known race +// between "stop pod" and "collecting log entries". The alternative +// would be a blocking function with collects logs from all currently +// running pods, but that then would have the disadvantage that +// already deleted pods aren't covered. +func CopyAllLogs(ctx context.Context, cs clientset.Interface, ns string, to LogOutput) error { + watcher, err := cs.Core().Pods(ns).Watch(meta.ListOptions{}) + if err != nil { + return errors.Wrap(err, "cannot create Pod event watcher") + } + + go func() { + var m sync.Mutex + logging := map[string]bool{} + check := func() { + m.Lock() + defer m.Unlock() + + pods, err := cs.Core().Pods(ns).List(meta.ListOptions{}) + if err != nil { + if to.StatusWriter != nil { + fmt.Fprintf(to.StatusWriter, "ERROR: get pod list in %s: %s\n", ns, err) + } + return + } + + for _, pod := range pods.Items { + for _, c := range pod.Spec.Containers { + name := pod.ObjectMeta.Name + "/" + c.Name + if logging[name] { + continue + } + readCloser, err := LogsForPod(ctx, cs, ns, pod.ObjectMeta.Name, + &v1.PodLogOptions{ + Container: c.Name, + Follow: true, + }) + if err != nil { + // We do get "normal" errors here, like trying to read too early. + // We can ignore those. + if to.StatusWriter != nil && + expectedErrors.FindStringIndex(err.Error()) == nil { + fmt.Fprintf(to.StatusWriter, "WARNING: pod log: %s: %s\n", name, err) + } + continue + } + + // Determine where we write. If this fails, we intentionally return without clearing + // the logging[name] flag, which prevents trying over and over again to + // create the output file. + var out io.Writer + var closer io.Closer + var prefix string + if to.LogWriter != nil { + out = to.LogWriter + prefix = name + ": " + } else { + var err error + filename := to.LogPathPrefix + pod.ObjectMeta.Name + "-" + c.Name + ".log" + err = os.MkdirAll(path.Dir(filename), 0755) + if err != nil { + if to.StatusWriter != nil { + fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create directory for %s: %s\n", filename, err) + } + return + } + // The test suite might run the same test multiple times, + // so we have to append here. + file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + if to.StatusWriter != nil { + fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create file %s: %s\n", filename, err) + } + return + } + closer = file + out = file + } + go func() { + if closer != nil { + defer closer.Close() + } + defer func() { + m.Lock() + logging[name] = false + m.Unlock() + readCloser.Close() + }() + scanner := bufio.NewScanner(readCloser) + first := true + for scanner.Scan() { + line := scanner.Text() + // Filter out the expected "end of stream" error message, + // it would just confuse developers who don't know about it. + // Same for attempts to read logs from a container that + // isn't ready (yet?!). + if !strings.HasPrefix(line, "rpc error: code = Unknown desc = Error: No such container:") && + !strings.HasPrefix(line, "Unable to retrieve container logs for ") { + if first { + if to.LogWriter == nil { + // Because the same log might be written to multiple times + // in different test instances, log an extra line to separate them. + // Also provides some useful extra information. + fmt.Fprintf(out, "==== start of log for container %s ====\n", name) + } + first = false + } + fmt.Fprintf(out, "%s%s\n", prefix, scanner.Text()) + } + } + }() + logging[name] = true + } + } + } + + // Watch events to see whether we can start logging + // and log interesting ones. + check() + for { + select { + case <-watcher.ResultChan(): + check() + case <-ctx.Done(): + return + } + } + }() + + return nil +} + +// WatchPods prints pod status events for a certain namespace or all namespaces +// when namespace name is empty. +func WatchPods(ctx context.Context, cs clientset.Interface, ns string, to io.Writer) error { + watcher, err := cs.Core().Pods(ns).Watch(meta.ListOptions{}) + if err != nil { + return errors.Wrap(err, "cannot create Pod event watcher") + } + + go func() { + defer watcher.Stop() + for { + select { + case e := <-watcher.ResultChan(): + if e.Object == nil { + continue + } + + pod, ok := e.Object.(*v1.Pod) + if !ok { + continue + } + buffer := new(bytes.Buffer) + fmt.Fprintf(buffer, + "pod event: %s: %s/%s %s: %s %s\n", + e.Type, + pod.Namespace, + pod.Name, + pod.Status.Phase, + pod.Status.Reason, + pod.Status.Conditions, + ) + for _, cst := range pod.Status.ContainerStatuses { + fmt.Fprintf(buffer, " %s: ", cst.Name) + if cst.State.Waiting != nil { + fmt.Fprintf(buffer, "WAITING: %s - %s", + cst.State.Waiting.Reason, + cst.State.Waiting.Message, + ) + } else if cst.State.Running != nil { + fmt.Fprintf(buffer, "RUNNING") + } else if cst.State.Waiting != nil { + fmt.Fprintf(buffer, "TERMINATED: %s - %s", + cst.State.Waiting.Reason, + cst.State.Waiting.Message, + ) + } + fmt.Fprintf(buffer, "\n") + } + to.Write(buffer.Bytes()) + case <-ctx.Done(): + return + } + } + }() + + return nil +} diff --git a/test/e2e/storage/BUILD b/test/e2e/storage/BUILD index cef62cdd351..d85d24cc147 100644 --- a/test/e2e/storage/BUILD +++ b/test/e2e/storage/BUILD @@ -70,6 +70,7 @@ go_library( "//staging/src/k8s.io/csi-api/pkg/crd:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e/framework/metrics:go_default_library", + "//test/e2e/framework/podlogs:go_default_library", "//test/e2e/framework/providers/gce:go_default_library", "//test/e2e/framework/testfiles:go_default_library", "//test/e2e/storage/drivers:go_default_library", diff --git a/test/e2e/storage/csi_volumes.go b/test/e2e/storage/csi_volumes.go index d8cbc6dd2e7..359323b469f 100644 --- a/test/e2e/storage/csi_volumes.go +++ b/test/e2e/storage/csi_volumes.go @@ -17,8 +17,10 @@ limitations under the License. package storage import ( + "context" "fmt" "math/rand" + "regexp" "time" "k8s.io/api/core/v1" @@ -30,6 +32,7 @@ import ( csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1" csiclient "k8s.io/csi-api/pkg/client/clientset/versioned" "k8s.io/kubernetes/test/e2e/framework" + "k8s.io/kubernetes/test/e2e/framework/podlogs" "k8s.io/kubernetes/test/e2e/storage/testsuites" "k8s.io/kubernetes/test/e2e/storage/utils" imageutils "k8s.io/kubernetes/test/utils/image" @@ -57,6 +60,7 @@ var _ = utils.SIGDescribe("CSI Volumes", func() { f := framework.NewDefaultFramework("csi-volumes") var ( + cancel context.CancelFunc cs clientset.Interface crdclient apiextensionsclient.Interface csics csiclient.Interface @@ -66,11 +70,40 @@ var _ = utils.SIGDescribe("CSI Volumes", func() { ) BeforeEach(func() { + ctx, c := context.WithCancel(context.Background()) + cancel = c + cs = f.ClientSet crdclient = f.APIExtensionsClientSet csics = f.CSIClientSet ns = f.Namespace + // Debugging of the following tests heavily depends on the log output + // of the different containers. Therefore include all of that in log + // files (when using --report-dir, as in the CI) or the output stream + // (otherwise). + to := podlogs.LogOutput{ + StatusWriter: GinkgoWriter, + } + if framework.TestContext.ReportDir == "" { + to.LogWriter = GinkgoWriter + } else { + test := CurrentGinkgoTestDescription() + reg := regexp.MustCompile("[^a-zA-Z0-9_-]+") + // We end the prefix with a slash to ensure that all logs + // end up in a directory named after the current test. + to.LogPathPrefix = framework.TestContext.ReportDir + "/" + + reg.ReplaceAllString(test.FullTestText, "_") + "/" + } + podlogs.CopyAllLogs(ctx, cs, ns.Name, to) + + // pod events are something that the framework already collects itself + // after a failed test. Logging them live is only useful for interactive + // debugging, not when we collect reports. + if framework.TestContext.ReportDir == "" { + podlogs.WatchPods(ctx, cs, ns.Name, GinkgoWriter) + } + nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) node = nodes.Items[rand.Intn(len(nodes.Items))] config = framework.VolumeTestConfig{ @@ -84,6 +117,10 @@ var _ = utils.SIGDescribe("CSI Volumes", func() { createCSICRDs(crdclient) }) + AfterEach(func() { + cancel() + }) + for driverName, initCSIDriver := range csiTestDrivers { curDriverName := driverName curInitCSIDriver := initCSIDriver