mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
e2e: live streaming of pod events and stdout for CSI volume tests
Debugging the CSI driver tests depends a lot on the output of the CSI sidecar containers and the CSI driver, but that information was not captured automatically and thus unavailable after a test run. This is particularly bad when running in a remote CI system, but also manually watching the cluster during a test was cumbersome. Now pod events and log messages get copied to the test's output at the time that they happen (when running without report directory) or get written to individual log files (when running with report directory in the CI).
This commit is contained in:
parent
1f0f4cd7eb
commit
1449353067
@ -162,6 +162,7 @@ filegroup(
|
||||
"//test/e2e/framework/ginkgowrapper:all-srcs",
|
||||
"//test/e2e/framework/ingress:all-srcs",
|
||||
"//test/e2e/framework/metrics:all-srcs",
|
||||
"//test/e2e/framework/podlogs:all-srcs",
|
||||
"//test/e2e/framework/providers/aws:all-srcs",
|
||||
"//test/e2e/framework/providers/azure:all-srcs",
|
||||
"//test/e2e/framework/providers/gce:all-srcs",
|
||||
|
28
test/e2e/framework/podlogs/BUILD
Normal file
28
test/e2e/framework/podlogs/BUILD
Normal file
@ -0,0 +1,28 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["podlogs.go"],
|
||||
importpath = "k8s.io/kubernetes/test/e2e/framework/podlogs",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/github.com/pkg/errors:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
263
test/e2e/framework/podlogs/podlogs.go
Normal file
263
test/e2e/framework/podlogs/podlogs.go
Normal file
@ -0,0 +1,263 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package podlogs enables live capturing of all events and log
|
||||
// messages for some or all pods in a namespace as they get generated.
|
||||
// This helps debugging both a running test (what is currently going
|
||||
// on?) and the output of a CI run (events appear in chronological
|
||||
// order and output that normally isn't available like the command
|
||||
// stdout messages are available).
|
||||
package podlogs
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// LogsForPod starts reading the logs for a certain pod. If the pod has more than one
|
||||
// container, opts.Container must be set. Reading stops when the context is done.
|
||||
// The stream includes formatted error messages and ends with
|
||||
// rpc error: code = Unknown desc = Error: No such container: 41a...
|
||||
// when the pod gets deleted while streaming.
|
||||
func LogsForPod(ctx context.Context, cs clientset.Interface, ns, pod string, opts *v1.PodLogOptions) (io.ReadCloser, error) {
|
||||
req := cs.Core().Pods(ns).GetLogs(pod, opts)
|
||||
return req.Context(ctx).Stream()
|
||||
}
|
||||
|
||||
// LogOutput determines where output from CopyAllLogs goes.
|
||||
type LogOutput struct {
|
||||
// If not nil, errors will be logged here.
|
||||
StatusWriter io.Writer
|
||||
|
||||
// If not nil, all output goes to this writer with "<pod>/<container>:" as prefix.
|
||||
LogWriter io.Writer
|
||||
|
||||
// Base directory for one log file per container.
|
||||
// The full path of each log file will be <log path prefix><pod>-<container>.log.
|
||||
LogPathPrefix string
|
||||
}
|
||||
|
||||
// Matches harmless errors from pkg/kubelet/kubelet_pods.go.
|
||||
var expectedErrors = regexp.MustCompile(`container .* in pod .* is (terminated|waiting to start|not available)|the server could not find the requested resource`)
|
||||
|
||||
// CopyAllLogs follows the logs of all containers in all pods,
|
||||
// including those that get created in the future, and writes each log
|
||||
// line as configured in the output options. It does that until the
|
||||
// context is done or until an error occurs.
|
||||
//
|
||||
// Beware that there is currently no way to force log collection
|
||||
// before removing pods, which means that there is a known race
|
||||
// between "stop pod" and "collecting log entries". The alternative
|
||||
// would be a blocking function with collects logs from all currently
|
||||
// running pods, but that then would have the disadvantage that
|
||||
// already deleted pods aren't covered.
|
||||
func CopyAllLogs(ctx context.Context, cs clientset.Interface, ns string, to LogOutput) error {
|
||||
watcher, err := cs.Core().Pods(ns).Watch(meta.ListOptions{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot create Pod event watcher")
|
||||
}
|
||||
|
||||
go func() {
|
||||
var m sync.Mutex
|
||||
logging := map[string]bool{}
|
||||
check := func() {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
pods, err := cs.Core().Pods(ns).List(meta.ListOptions{})
|
||||
if err != nil {
|
||||
if to.StatusWriter != nil {
|
||||
fmt.Fprintf(to.StatusWriter, "ERROR: get pod list in %s: %s\n", ns, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
for _, c := range pod.Spec.Containers {
|
||||
name := pod.ObjectMeta.Name + "/" + c.Name
|
||||
if logging[name] {
|
||||
continue
|
||||
}
|
||||
readCloser, err := LogsForPod(ctx, cs, ns, pod.ObjectMeta.Name,
|
||||
&v1.PodLogOptions{
|
||||
Container: c.Name,
|
||||
Follow: true,
|
||||
})
|
||||
if err != nil {
|
||||
// We do get "normal" errors here, like trying to read too early.
|
||||
// We can ignore those.
|
||||
if to.StatusWriter != nil &&
|
||||
expectedErrors.FindStringIndex(err.Error()) == nil {
|
||||
fmt.Fprintf(to.StatusWriter, "WARNING: pod log: %s: %s\n", name, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Determine where we write. If this fails, we intentionally return without clearing
|
||||
// the logging[name] flag, which prevents trying over and over again to
|
||||
// create the output file.
|
||||
var out io.Writer
|
||||
var closer io.Closer
|
||||
var prefix string
|
||||
if to.LogWriter != nil {
|
||||
out = to.LogWriter
|
||||
prefix = name + ": "
|
||||
} else {
|
||||
var err error
|
||||
filename := to.LogPathPrefix + pod.ObjectMeta.Name + "-" + c.Name + ".log"
|
||||
err = os.MkdirAll(path.Dir(filename), 0755)
|
||||
if err != nil {
|
||||
if to.StatusWriter != nil {
|
||||
fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create directory for %s: %s\n", filename, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
// The test suite might run the same test multiple times,
|
||||
// so we have to append here.
|
||||
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
if to.StatusWriter != nil {
|
||||
fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create file %s: %s\n", filename, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
closer = file
|
||||
out = file
|
||||
}
|
||||
go func() {
|
||||
if closer != nil {
|
||||
defer closer.Close()
|
||||
}
|
||||
defer func() {
|
||||
m.Lock()
|
||||
logging[name] = false
|
||||
m.Unlock()
|
||||
readCloser.Close()
|
||||
}()
|
||||
scanner := bufio.NewScanner(readCloser)
|
||||
first := true
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
// Filter out the expected "end of stream" error message,
|
||||
// it would just confuse developers who don't know about it.
|
||||
// Same for attempts to read logs from a container that
|
||||
// isn't ready (yet?!).
|
||||
if !strings.HasPrefix(line, "rpc error: code = Unknown desc = Error: No such container:") &&
|
||||
!strings.HasPrefix(line, "Unable to retrieve container logs for ") {
|
||||
if first {
|
||||
if to.LogWriter == nil {
|
||||
// Because the same log might be written to multiple times
|
||||
// in different test instances, log an extra line to separate them.
|
||||
// Also provides some useful extra information.
|
||||
fmt.Fprintf(out, "==== start of log for container %s ====\n", name)
|
||||
}
|
||||
first = false
|
||||
}
|
||||
fmt.Fprintf(out, "%s%s\n", prefix, scanner.Text())
|
||||
}
|
||||
}
|
||||
}()
|
||||
logging[name] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Watch events to see whether we can start logging
|
||||
// and log interesting ones.
|
||||
check()
|
||||
for {
|
||||
select {
|
||||
case <-watcher.ResultChan():
|
||||
check()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WatchPods prints pod status events for a certain namespace or all namespaces
|
||||
// when namespace name is empty.
|
||||
func WatchPods(ctx context.Context, cs clientset.Interface, ns string, to io.Writer) error {
|
||||
watcher, err := cs.Core().Pods(ns).Watch(meta.ListOptions{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot create Pod event watcher")
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer watcher.Stop()
|
||||
for {
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
if e.Object == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
pod, ok := e.Object.(*v1.Pod)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
buffer := new(bytes.Buffer)
|
||||
fmt.Fprintf(buffer,
|
||||
"pod event: %s: %s/%s %s: %s %s\n",
|
||||
e.Type,
|
||||
pod.Namespace,
|
||||
pod.Name,
|
||||
pod.Status.Phase,
|
||||
pod.Status.Reason,
|
||||
pod.Status.Conditions,
|
||||
)
|
||||
for _, cst := range pod.Status.ContainerStatuses {
|
||||
fmt.Fprintf(buffer, " %s: ", cst.Name)
|
||||
if cst.State.Waiting != nil {
|
||||
fmt.Fprintf(buffer, "WAITING: %s - %s",
|
||||
cst.State.Waiting.Reason,
|
||||
cst.State.Waiting.Message,
|
||||
)
|
||||
} else if cst.State.Running != nil {
|
||||
fmt.Fprintf(buffer, "RUNNING")
|
||||
} else if cst.State.Waiting != nil {
|
||||
fmt.Fprintf(buffer, "TERMINATED: %s - %s",
|
||||
cst.State.Waiting.Reason,
|
||||
cst.State.Waiting.Message,
|
||||
)
|
||||
}
|
||||
fmt.Fprintf(buffer, "\n")
|
||||
}
|
||||
to.Write(buffer.Bytes())
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
@ -70,6 +70,7 @@ go_library(
|
||||
"//staging/src/k8s.io/csi-api/pkg/crd:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//test/e2e/framework/metrics:go_default_library",
|
||||
"//test/e2e/framework/podlogs:go_default_library",
|
||||
"//test/e2e/framework/providers/gce:go_default_library",
|
||||
"//test/e2e/framework/testfiles:go_default_library",
|
||||
"//test/e2e/storage/drivers:go_default_library",
|
||||
|
@ -17,8 +17,10 @@ limitations under the License.
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
@ -30,6 +32,7 @@ import (
|
||||
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
|
||||
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/framework/podlogs"
|
||||
"k8s.io/kubernetes/test/e2e/storage/testsuites"
|
||||
"k8s.io/kubernetes/test/e2e/storage/utils"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
@ -57,6 +60,7 @@ var _ = utils.SIGDescribe("CSI Volumes", func() {
|
||||
f := framework.NewDefaultFramework("csi-volumes")
|
||||
|
||||
var (
|
||||
cancel context.CancelFunc
|
||||
cs clientset.Interface
|
||||
crdclient apiextensionsclient.Interface
|
||||
csics csiclient.Interface
|
||||
@ -66,11 +70,40 @@ var _ = utils.SIGDescribe("CSI Volumes", func() {
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
ctx, c := context.WithCancel(context.Background())
|
||||
cancel = c
|
||||
|
||||
cs = f.ClientSet
|
||||
crdclient = f.APIExtensionsClientSet
|
||||
csics = f.CSIClientSet
|
||||
ns = f.Namespace
|
||||
|
||||
// Debugging of the following tests heavily depends on the log output
|
||||
// of the different containers. Therefore include all of that in log
|
||||
// files (when using --report-dir, as in the CI) or the output stream
|
||||
// (otherwise).
|
||||
to := podlogs.LogOutput{
|
||||
StatusWriter: GinkgoWriter,
|
||||
}
|
||||
if framework.TestContext.ReportDir == "" {
|
||||
to.LogWriter = GinkgoWriter
|
||||
} else {
|
||||
test := CurrentGinkgoTestDescription()
|
||||
reg := regexp.MustCompile("[^a-zA-Z0-9_-]+")
|
||||
// We end the prefix with a slash to ensure that all logs
|
||||
// end up in a directory named after the current test.
|
||||
to.LogPathPrefix = framework.TestContext.ReportDir + "/" +
|
||||
reg.ReplaceAllString(test.FullTestText, "_") + "/"
|
||||
}
|
||||
podlogs.CopyAllLogs(ctx, cs, ns.Name, to)
|
||||
|
||||
// pod events are something that the framework already collects itself
|
||||
// after a failed test. Logging them live is only useful for interactive
|
||||
// debugging, not when we collect reports.
|
||||
if framework.TestContext.ReportDir == "" {
|
||||
podlogs.WatchPods(ctx, cs, ns.Name, GinkgoWriter)
|
||||
}
|
||||
|
||||
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
|
||||
node = nodes.Items[rand.Intn(len(nodes.Items))]
|
||||
config = framework.VolumeTestConfig{
|
||||
@ -84,6 +117,10 @@ var _ = utils.SIGDescribe("CSI Volumes", func() {
|
||||
createCSICRDs(crdclient)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
cancel()
|
||||
})
|
||||
|
||||
for driverName, initCSIDriver := range csiTestDrivers {
|
||||
curDriverName := driverName
|
||||
curInitCSIDriver := initCSIDriver
|
||||
|
Loading…
Reference in New Issue
Block a user