Merge pull request #38576 from fejta/log

Automatic merge from submit-queue

Interrupt/Terminate all child processes on timeout. Auto-gen stepName

Signal all children of the finishRunning command. See http://stackoverflow.com/questions/22470193/why-wont-go-kill-a-child-process-correctly

Most commands started by e2e.go are shell scripts like `hack/ginkgo-e2e.sh`, `hack/e2e-internal/e2e-down.sh`, etc which quickly starts a subprocess such as the ginkgo binary. We spend ~1s in ginkgo-e2e.sh and hours inside the ginkgo binary. Therefore when we want to timeout it is important that we signal the child processes as well, which we can accomplish by starting the command in a new process group, and signaling `-pid` instead of `pid`

Ref https://github.com/kubernetes/test-infra/issues/1316 https://github.com/kubernetes/test-infra/issues/1250
Test results: https://github.com/kubernetes/kubernetes/pull/37868
This commit is contained in:
Kubernetes Submit Queue
2016-12-12 14:16:34 -08:00
committed by GitHub

View File

@@ -32,6 +32,7 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"syscall"
"text/template" "text/template"
"time" "time"
) )
@@ -250,7 +251,7 @@ func run(deploy deployer) error {
os.Setenv("KUBE_RUNTIME_CONFIG", "batch/v2alpha1=true") os.Setenv("KUBE_RUNTIME_CONFIG", "batch/v2alpha1=true")
if *up { if *up {
if err := xmlWrap("TearDown", deploy.Down); err != nil { if err := xmlWrap("TearDown Previous", deploy.Down); err != nil {
return fmt.Errorf("error tearing down previous cluster: %s", err) return fmt.Errorf("error tearing down previous cluster: %s", err)
} }
} }
@@ -292,18 +293,7 @@ func run(deploy deployer) error {
return fmt.Errorf("starting e2e cluster: %s", err) return fmt.Errorf("starting e2e cluster: %s", err)
} }
if *dump != "" { if *dump != "" {
cmd := exec.Command("./cluster/kubectl.sh", "--match-server-version=false", "get", "nodes", "-oyaml") errs = appendError(errs, xmlWrap("list nodes", listNodes))
b, err := cmd.CombinedOutput()
if *verbose {
log.Printf("kubectl get nodes:\n%s", string(b))
}
if err == nil {
if err := ioutil.WriteFile(filepath.Join(*dump, "nodes.yaml"), b, 0644); err != nil {
errs = appendError(errs, fmt.Errorf("error writing nodes.yaml: %v", err))
}
} else {
errs = appendError(errs, fmt.Errorf("error running get nodes: %v", err))
}
} }
} }
@@ -323,22 +313,21 @@ func run(deploy deployer) error {
if *test { if *test {
errs = appendError(errs, xmlWrap("get kubeconfig", deploy.SetupKubecfg)) errs = appendError(errs, xmlWrap("get kubeconfig", deploy.SetupKubecfg))
errs = appendError(errs, xmlWrap("kubectl version", func() error { errs = appendError(errs, xmlWrap("kubectl version", func() error {
return finishRunning("kubectl version", exec.Command("./cluster/kubectl.sh", "version", "--match-server-version=false")) return finishRunning(exec.Command("./cluster/kubectl.sh", "version", "--match-server-version=false"))
})) }))
// Individual tests will create their own JUnit, so don't xmlWrap.
if *skewTests { if *skewTests {
errs = appendError(errs, SkewTest()) errs = appendError(errs, xmlWrap("SkewTest", SkewTest))
} else { } else {
if err := xmlWrap("IsUp", deploy.IsUp); err != nil { if err := xmlWrap("IsUp", deploy.IsUp); err != nil {
errs = appendError(errs, err) errs = appendError(errs, err)
} else { } else {
errs = appendError(errs, Test()) errs = appendError(errs, xmlWrap("Test", Test))
} }
} }
} }
if *kubemark { if *kubemark {
errs = appendError(errs, KubemarkTest()) errs = appendError(errs, xmlWrap("Kubemark", KubemarkTest))
} }
if len(errs) > 0 && *dump != "" { if len(errs) > 0 && *dump != "" {
@@ -379,6 +368,18 @@ func run(deploy deployer) error {
return nil return nil
} }
func listNodes() error {
cmd := exec.Command("./cluster/kubectl.sh", "--match-server-version=false", "get", "nodes", "-oyaml")
b, err := cmd.CombinedOutput()
if *verbose {
log.Printf("kubectl get nodes:\n%s", string(b))
}
if err != nil {
return err
}
return ioutil.WriteFile(filepath.Join(*dump, "nodes.yaml"), b, 0644)
}
func DiffResources(before, clusterUp, clusterDown, after []byte, location string) error { func DiffResources(before, clusterUp, clusterDown, after []byte, location string) error {
if location == "" { if location == "" {
var err error var err error
@@ -460,7 +461,7 @@ func Build() error {
// it's OK to download the docker image. // it's OK to download the docker image.
cmd := exec.Command("make", "quick-release") cmd := exec.Command("make", "quick-release")
cmd.Stdin = os.Stdin cmd.Stdin = os.Stdin
if err := finishRunning("build-release", cmd); err != nil { if err := finishRunning(cmd); err != nil {
return fmt.Errorf("error building kubernetes: %v", err) return fmt.Errorf("error building kubernetes: %v", err)
} }
return nil return nil
@@ -489,11 +490,11 @@ func getDeployer() (deployer, error) {
type bash struct{} type bash struct{}
func (b bash) Up() error { func (b bash) Up() error {
return finishRunning("up", exec.Command("./hack/e2e-internal/e2e-up.sh")) return finishRunning(exec.Command("./hack/e2e-internal/e2e-up.sh"))
} }
func (b bash) IsUp() error { func (b bash) IsUp() error {
return finishRunning("get status", exec.Command("./hack/e2e-internal/e2e-status.sh")) return finishRunning(exec.Command("./hack/e2e-internal/e2e-status.sh"))
} }
func (b bash) SetupKubecfg() error { func (b bash) SetupKubecfg() error {
@@ -501,7 +502,7 @@ func (b bash) SetupKubecfg() error {
} }
func (b bash) Down() error { func (b bash) Down() error {
return finishRunning("teardown", exec.Command("./hack/e2e-internal/e2e-down.sh")) return finishRunning(exec.Command("./hack/e2e-internal/e2e-down.sh"))
} }
type kops struct { type kops struct {
@@ -588,10 +589,10 @@ func (k kops) Up() error {
if k.kubeVersion != "" { if k.kubeVersion != "" {
createArgs = append(createArgs, "--kubernetes-version", k.kubeVersion) createArgs = append(createArgs, "--kubernetes-version", k.kubeVersion)
} }
if err := finishRunning("kops config", exec.Command(k.path, createArgs...)); err != nil { if err := finishRunning(exec.Command(k.path, createArgs...)); err != nil {
return fmt.Errorf("kops configuration failed: %v", err) return fmt.Errorf("kops configuration failed: %v", err)
} }
if err := finishRunning("kops update", exec.Command(k.path, "update", "cluster", k.cluster, "--yes")); err != nil { if err := finishRunning(exec.Command(k.path, "update", "cluster", k.cluster, "--yes")); err != nil {
return fmt.Errorf("kops bringup failed: %v", err) return fmt.Errorf("kops bringup failed: %v", err)
} }
// TODO(zmerlynn): More cluster validation. This should perhaps be // TODO(zmerlynn): More cluster validation. This should perhaps be
@@ -613,7 +614,7 @@ func (k kops) SetupKubecfg() error {
// Assume that if we already have it, it's good. // Assume that if we already have it, it's good.
return nil return nil
} }
if err := finishRunning("kops export", exec.Command(k.path, "export", "kubecfg", k.cluster)); err != nil { if err := finishRunning(exec.Command(k.path, "export", "kubecfg", k.cluster)); err != nil {
return fmt.Errorf("Failure exporting kops kubecfg: %v", err) return fmt.Errorf("Failure exporting kops kubecfg: %v", err)
} }
return nil return nil
@@ -623,12 +624,12 @@ func (k kops) Down() error {
// We do a "kops get" first so the exit status of "kops delete" is // We do a "kops get" first so the exit status of "kops delete" is
// more sensical in the case of a non-existant cluster. ("kops // more sensical in the case of a non-existant cluster. ("kops
// delete" will exit with status 1 on a non-existant cluster) // delete" will exit with status 1 on a non-existant cluster)
err := finishRunning("kops get", exec.Command(k.path, "get", "clusters", k.cluster)) err := finishRunning(exec.Command(k.path, "get", "clusters", k.cluster))
if err != nil { if err != nil {
// This is expected if the cluster doesn't exist. // This is expected if the cluster doesn't exist.
return nil return nil
} }
return finishRunning("kops delete", exec.Command(k.path, "delete", "cluster", k.cluster, "--yes")) return finishRunning(exec.Command(k.path, "delete", "cluster", k.cluster, "--yes"))
} }
type kubernetesAnywhere struct { type kubernetesAnywhere struct {
@@ -706,7 +707,7 @@ func (k kubernetesAnywhere) writeConfig() error {
func (k kubernetesAnywhere) Up() error { func (k kubernetesAnywhere) Up() error {
cmd := exec.Command("make", "-C", k.path, "WAIT_FOR_KUBECONFIG=y", "deploy-cluster") cmd := exec.Command("make", "-C", k.path, "WAIT_FOR_KUBECONFIG=y", "deploy-cluster")
if err := finishRunning("deploy-cluster", cmd); err != nil { if err := finishRunning(cmd); err != nil {
return err return err
} }
@@ -732,12 +733,12 @@ func (k kubernetesAnywhere) SetupKubecfg() error {
} }
func (k kubernetesAnywhere) Down() error { func (k kubernetesAnywhere) Down() error {
err := finishRunning("get kubeconfig-path", exec.Command("make", "-C", k.path, "kubeconfig-path")) err := finishRunning(exec.Command("make", "-C", k.path, "kubeconfig-path"))
if err != nil { if err != nil {
// This is expected if the cluster doesn't exist. // This is expected if the cluster doesn't exist.
return nil return nil
} }
return finishRunning("destroy-cluster", exec.Command("make", "-C", k.path, "FORCE_DESTROY=y", "destroy-cluster")) return finishRunning(exec.Command("make", "-C", k.path, "FORCE_DESTROY=y", "destroy-cluster"))
} }
func clusterSize(deploy deployer) (int, error) { func clusterSize(deploy deployer) (int, error) {
@@ -812,12 +813,12 @@ func waitForNodes(d deployer, nodes int, timeout time.Duration) error {
func DumpClusterLogs(location string) error { func DumpClusterLogs(location string) error {
log.Printf("Dumping cluster logs to: %v", location) log.Printf("Dumping cluster logs to: %v", location)
return finishRunning("dump cluster logs", exec.Command("./cluster/log-dump.sh", location)) return finishRunning(exec.Command("./cluster/log-dump.sh", location))
} }
func KubemarkTest() error { func KubemarkTest() error {
// Stop previous run // Stop previous run
err := finishRunning("Stop kubemark", exec.Command("./test/kubemark/stop-kubemark.sh")) err := finishRunning(exec.Command("./test/kubemark/stop-kubemark.sh"))
if err != nil { if err != nil {
return err return err
} }
@@ -828,7 +829,7 @@ func KubemarkTest() error {
// stop the leaking resources for now, we want to be on the safe side // stop the leaking resources for now, we want to be on the safe side
// and call it explictly in defer if the other one is not called. // and call it explictly in defer if the other one is not called.
defer xmlWrap("Deferred Stop kubemark", func() error { defer xmlWrap("Deferred Stop kubemark", func() error {
return finishRunning("Stop kubemark", exec.Command("./test/kubemark/stop-kubemark.sh")) return finishRunning(exec.Command("./test/kubemark/stop-kubemark.sh"))
}) })
// Start new run // Start new run
@@ -844,7 +845,7 @@ func KubemarkTest() error {
os.Setenv("NUM_NODES", os.Getenv("KUBEMARK_NUM_NODES")) os.Setenv("NUM_NODES", os.Getenv("KUBEMARK_NUM_NODES"))
os.Setenv("MASTER_SIZE", os.Getenv("KUBEMARK_MASTER_SIZE")) os.Setenv("MASTER_SIZE", os.Getenv("KUBEMARK_MASTER_SIZE"))
err = xmlWrap("Start kubemark", func() error { err = xmlWrap("Start kubemark", func() error {
return finishRunning("Start kubemark", exec.Command("./test/kubemark/start-kubemark.sh")) return finishRunning(exec.Command("./test/kubemark/start-kubemark.sh"))
}) })
if err != nil { if err != nil {
return err return err
@@ -857,13 +858,13 @@ func KubemarkTest() error {
} }
test_args := os.Getenv("KUBEMARK_TEST_ARGS") test_args := os.Getenv("KUBEMARK_TEST_ARGS")
err = finishRunning("Run kubemark tests", exec.Command("./test/kubemark/run-e2e-tests.sh", "--ginkgo.focus="+focus, test_args)) err = finishRunning(exec.Command("./test/kubemark/run-e2e-tests.sh", "--ginkgo.focus="+focus, test_args))
if err != nil { if err != nil {
return err return err
} }
err = xmlWrap("Stop kubemark", func() error { err = xmlWrap("Stop kubemark", func() error {
return finishRunning("Stop kubemark", exec.Command("./test/kubemark/stop-kubemark.sh")) return finishRunning(exec.Command("./test/kubemark/stop-kubemark.sh"))
}) })
if err != nil { if err != nil {
return err return err
@@ -896,13 +897,12 @@ func UpgradeTest(args string) error {
defer os.Unsetenv("E2E_REPORT_PREFIX") defer os.Unsetenv("E2E_REPORT_PREFIX")
} }
os.Setenv("E2E_REPORT_PREFIX", "upgrade") os.Setenv("E2E_REPORT_PREFIX", "upgrade")
return finishRunning("Upgrade Ginkgo tests", return finishRunning(exec.Command(
exec.Command( "go", "run", "./hack/e2e.go",
"go", "run", "./hack/e2e.go", "--test",
"--test", "--test_args="+args,
"--test_args="+args, fmt.Sprintf("--v=%t", *verbose),
fmt.Sprintf("--v=%t", *verbose), fmt.Sprintf("--check_version_skew=%t", *checkVersionSkew)))
fmt.Sprintf("--check_version_skew=%t", *checkVersionSkew)))
} }
func SkewTest() error { func SkewTest() error {
@@ -911,28 +911,28 @@ func SkewTest() error {
return err return err
} }
defer os.Chdir(old) defer os.Chdir(old)
return finishRunning("Skewed Ginkgo tests", return finishRunning(exec.Command(
exec.Command( "go", "run", "./hack/e2e.go",
"go", "run", "./hack/e2e.go", "--test",
"--test", "--test_args="+*testArgs,
"--test_args="+*testArgs, fmt.Sprintf("--v=%t", *verbose),
fmt.Sprintf("--v=%t", *verbose), fmt.Sprintf("--check_version_skew=%t", *checkVersionSkew)))
fmt.Sprintf("--check_version_skew=%t", *checkVersionSkew)))
} }
func Test() error { func Test() error {
// TODO(fejta): add a --federated or something similar // TODO(fejta): add a --federated or something similar
if os.Getenv("FEDERATION") != "true" { if os.Getenv("FEDERATION") != "true" {
return finishRunning("Ginkgo tests", exec.Command("./hack/ginkgo-e2e.sh", strings.Fields(*testArgs)...)) return finishRunning(exec.Command("./hack/ginkgo-e2e.sh", strings.Fields(*testArgs)...))
} }
if *testArgs == "" { if *testArgs == "" {
*testArgs = "--ginkgo.focus=\\[Feature:Federation\\]" *testArgs = "--ginkgo.focus=\\[Feature:Federation\\]"
} }
return finishRunning("Federated Ginkgo tests", exec.Command("./hack/federated-ginkgo-e2e.sh", strings.Fields(*testArgs)...)) return finishRunning(exec.Command("./hack/federated-ginkgo-e2e.sh", strings.Fields(*testArgs)...))
} }
func finishRunning(stepName string, cmd *exec.Cmd) error { func finishRunning(cmd *exec.Cmd) error {
stepName := strings.Join(cmd.Args, " ")
if *verbose { if *verbose {
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
@@ -942,6 +942,7 @@ func finishRunning(stepName string, cmd *exec.Cmd) error {
log.Printf("Step '%s' finished in %s", stepName, time.Since(start)) log.Printf("Step '%s' finished in %s", stepName, time.Since(start))
}(time.Now()) }(time.Now())
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting %v: %v", stepName, err) return fmt.Errorf("error starting %v: %v", stepName, err)
} }
@@ -956,12 +957,17 @@ func finishRunning(stepName string, cmd *exec.Cmd) error {
select { select {
case <-terminate.C: case <-terminate.C:
terminate.Reset(time.Duration(0)) // Kill subsequent processes immediately. terminate.Reset(time.Duration(0)) // Kill subsequent processes immediately.
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
cmd.Process.Kill() cmd.Process.Kill()
return fmt.Errorf("Terminate testing after 15m after %s timeout during %s", *timeout, stepName) return fmt.Errorf("Terminate testing after 15m after %s timeout during %s", *timeout, stepName)
case <-interrupt.C: case <-interrupt.C:
log.Printf("Interrupt testing after %s timeout. Will terminate in another 15m", *timeout) log.Printf("Interrupt testing after %s timeout. Will terminate in another 15m", *timeout)
terminate.Reset(15 * time.Minute) terminate.Reset(15 * time.Minute)
cmd.Process.Signal(os.Interrupt) if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGINT); err != nil {
log.Printf("Failed to interrupt %v. Will terminate immediately: %v", stepName, err)
syscall.Kill(-cmd.Process.Pid, syscall.SIGTERM)
cmd.Process.Kill()
}
case err := <-finished: case err := <-finished:
return err return err
} }