From 23f71f0eba3aa3a1617c829b637594ff44a0d163 Mon Sep 17 00:00:00 2001 From: Todd Neal Date: Fri, 24 Feb 2023 10:37:32 -0600 Subject: [PATCH] refactor remote test running --- build/root/Makefile | 1 + test/e2e_node/remote/gce_runner.go | 791 ++++++++++++++++++++ test/e2e_node/remote/remote.go | 61 +- test/e2e_node/remote/runner.go | 45 ++ test/e2e_node/remote/ssh.go | 54 +- test/e2e_node/remote/ssh_runner.go | 75 ++ test/e2e_node/runner/remote/run_remote.go | 840 ++-------------------- 7 files changed, 1059 insertions(+), 808 deletions(-) create mode 100644 test/e2e_node/remote/gce_runner.go create mode 100644 test/e2e_node/remote/runner.go create mode 100644 test/e2e_node/remote/ssh_runner.go diff --git a/build/root/Makefile b/build/root/Makefile index 312c43b3e23..fd54d4445a8 100644 --- a/build/root/Makefile +++ b/build/root/Makefile @@ -254,6 +254,7 @@ define TEST_E2E_NODE_HELP_INFO # GUBERNATOR: For REMOTE=true only. Produce link to Gubernator to view logs. # Defaults to false. # TEST_SUITE: For REMOTE=true only. Test suite to use. Defaults to "default". +# SSH_USER: For REMOTE=true only SSH username to use. # SSH_KEY: For REMOTE=true only. Path to SSH key to use. # SSH_OPTIONS: For REMOTE=true only. SSH options to use. # RUNTIME_CONFIG: The runtime configuration for the API server on the node e2e tests. diff --git a/test/e2e_node/remote/gce_runner.go b/test/e2e_node/remote/gce_runner.go new file mode 100644 index 00000000000..99911e2fb61 --- /dev/null +++ b/test/e2e_node/remote/gce_runner.go @@ -0,0 +1,791 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "context" + "encoding/base64" + "errors" + "flag" + "fmt" + "net/http" + "os" + "path/filepath" + "regexp" + "sort" + "strings" + "time" + + "github.com/google/uuid" + "golang.org/x/oauth2/google" + "google.golang.org/api/compute/v1" + "google.golang.org/api/option" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + "sigs.k8s.io/yaml" +) + +var _ Runner = (*GCERunner)(nil) + +// envs is the type used to collect all node envs. The key is the env name, +// and the value is the env value +type envs map[string]string + +// String function of flag.Value +func (e *envs) String() string { + return fmt.Sprint(*e) +} + +// Set function of flag.Value +func (e *envs) Set(value string) error { + kv := strings.SplitN(value, "=", 2) + if len(kv) != 2 { + return fmt.Errorf("invalid env string %s", value) + } + emap := *e + emap[kv[0]] = kv[1] + return nil +} + +// nodeEnvs is the node envs from the flag `node-env`. +var nodeEnvs = make(envs) + +var project = flag.String("project", "", "gce project the hosts live in (gce)") +var zone = flag.String("zone", "", "gce zone that the hosts live in (gce)") +var instanceMetadata = flag.String("instance-metadata", "", "key/value metadata for instances separated by '=' or '<', 'k=v' means the key is 'k' and the value is 'v'; 'k 0 { + time.Sleep(backoff) + } + + var client *http.Client + client, err = google.DefaultClient(context.Background(), compute.ComputeScope) + if err != nil { + continue + } + + cs, err = compute.NewService(context.Background(), option.WithHTTPClient(client)) + if err != nil { + continue + } + return cs, nil + } + return nil, err +} + +// Accelerator contains type and count about resource. +type Accelerator struct { + Type string `json:"type,omitempty"` + Count int64 `json:"count,omitempty"` +} + +// Resources contains accelerators array. +type Resources struct { + Accelerators []Accelerator `json:"accelerators,omitempty"` +} + +// internalGCEImage is an internal GCE image representation for E2E node. +type internalGCEImage struct { + image string + // imageDesc is the description of the image. If empty, the value in the + // 'image' will be used. + imageDesc string + kernelArguments []string + project string + resources Resources + metadata *compute.Metadata + machine string +} + +type internalGCEImageConfig struct { + images map[string]internalGCEImage +} + +// GCEImageConfig specifies what images should be run and how for these tests. +// It can be created via the `--images` and `--image-project` flags, or by +// specifying the `--image-config-file` flag, pointing to a json or yaml file +// of the form: +// +// images: +// short-name: +// image: gce-image-name +// project: gce-image-project +// machine: for benchmark only, the machine type (GCE instance) to run test +// tests: for benchmark only, a list of ginkgo focus strings to match tests +// +// TODO(coufon): replace 'image' with 'node' in configurations +// and we plan to support testing custom machines other than GCE by specifying Host +type GCEImageConfig struct { + Images map[string]GCEImage `json:"images"` +} + +// GCEImage contains some information about GCE Image. +type GCEImage struct { + Image string `json:"image,omitempty"` + ImageRegex string `json:"image_regex,omitempty"` + // ImageFamily is the image family to use. The latest image from the image family will be used, e.g cos-81-lts. + ImageFamily string `json:"image_family,omitempty"` + ImageDesc string `json:"image_description,omitempty"` + KernelArguments []string `json:"kernel_arguments,omitempty"` + Project string `json:"project"` + Metadata string `json:"metadata"` + Machine string `json:"machine,omitempty"` + Resources Resources `json:"resources,omitempty"` +} + +// Returns an image name based on regex and given GCE project. +func (g *GCERunner) getGCEImage(imageRegex, imageFamily string, project string) (string, error) { + imageObjs := []imageObj{} + imageRe := regexp.MustCompile(imageRegex) + if err := g.gceComputeService.Images.List(project).Pages(context.Background(), + func(ilc *compute.ImageList) error { + for _, instance := range ilc.Items { + if imageRegex != "" && !imageRe.MatchString(instance.Name) { + continue + } + if imageFamily != "" && instance.Family != imageFamily { + continue + } + creationTime, err := time.Parse(time.RFC3339, instance.CreationTimestamp) + if err != nil { + return fmt.Errorf("failed to parse instance creation timestamp %q: %w", instance.CreationTimestamp, err) + } + io := imageObj{ + creationTime: creationTime, + name: instance.Name, + } + imageObjs = append(imageObjs, io) + } + return nil + }, + ); err != nil { + return "", fmt.Errorf("failed to list images in project %q: %w", project, err) + } + + // Pick the latest image after sorting. + sort.Sort(byCreationTime(imageObjs)) + if len(imageObjs) > 0 { + klog.V(4).Infof("found images %+v based on regex %q and family %q in project %q", imageObjs, imageRegex, imageFamily, project) + return imageObjs[0].name, nil + } + return "", fmt.Errorf("found zero images based on regex %q and family %q in project %q", imageRegex, imageFamily, project) +} + +func (g *GCERunner) prepareGceImages() (*internalGCEImageConfig, error) { + gceImages := &internalGCEImageConfig{ + images: make(map[string]internalGCEImage), + } + + // Parse images from given config file and convert them to internalGCEImage. + if g.cfg.ImageConfigFile != "" { + configPath := g.cfg.ImageConfigFile + if g.cfg.ImageConfigDir != "" { + configPath = filepath.Join(g.cfg.ImageConfigDir, g.cfg.ImageConfigFile) + } + + imageConfigData, err := os.ReadFile(configPath) + if err != nil { + return nil, fmt.Errorf("Could not read image config file provided: %w", err) + } + // Unmarshal the given image config file. All images for this test run will be organized into a map. + // shortName->GCEImage, e.g cos-stable->cos-stable-81-12871-103-0. + externalImageConfig := GCEImageConfig{Images: make(map[string]GCEImage)} + err = yaml.Unmarshal(imageConfigData, &externalImageConfig) + if err != nil { + return nil, fmt.Errorf("Could not parse image config file: %w", err) + } + + for shortName, imageConfig := range externalImageConfig.Images { + var image string + if (imageConfig.ImageRegex != "" || imageConfig.ImageFamily != "") && imageConfig.Image == "" { + image, err = g.getGCEImage(imageConfig.ImageRegex, imageConfig.ImageFamily, imageConfig.Project) + if err != nil { + return nil, fmt.Errorf("Could not retrieve a image based on image regex %q and family %q: %v", + imageConfig.ImageRegex, imageConfig.ImageFamily, err) + } + } else { + image = imageConfig.Image + } + // Convert the given image into an internalGCEImage. + metadata := imageConfig.Metadata + if len(strings.TrimSpace(*instanceMetadata)) > 0 { + metadata += "," + *instanceMetadata + } + gceImage := internalGCEImage{ + image: image, + imageDesc: imageConfig.ImageDesc, + project: imageConfig.Project, + metadata: g.getImageMetadata(metadata), + kernelArguments: imageConfig.KernelArguments, + machine: imageConfig.Machine, + resources: imageConfig.Resources, + } + if gceImage.imageDesc == "" { + gceImage.imageDesc = gceImage.image + } + gceImages.images[shortName] = gceImage + } + } + + // Allow users to specify additional images via cli flags for local testing + // convenience; merge in with config file + if len(g.cfg.Images) > 0 { + if *imageProject == "" { + klog.Fatal("Must specify --image-project if you specify --images") + } + for _, image := range g.cfg.Images { + gceImage := internalGCEImage{ + image: image, + project: *imageProject, + metadata: g.getImageMetadata(*instanceMetadata), + } + gceImages.images[image] = gceImage + } + } + + if len(gceImages.images) != 0 && *zone == "" { + return nil, errors.New("must specify --zone flag") + } + // Make sure GCP project is set. Without a project, images can't be retrieved.. + for shortName, imageConfig := range gceImages.images { + if imageConfig.project == "" { + return nil, fmt.Errorf("invalid config for %v; must specify a project", shortName) + } + } + if len(gceImages.images) != 0 { + if *project == "" { + return nil, errors.New("must specify --project flag to launch images into") + } + } + + return gceImages, nil +} + +type imageObj struct { + creationTime time.Time + name string +} + +type byCreationTime []imageObj + +func (a byCreationTime) Len() int { return len(a) } +func (a byCreationTime) Less(i, j int) bool { return a[i].creationTime.After(a[j].creationTime) } +func (a byCreationTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +func (g *GCERunner) getImageMetadata(input string) *compute.Metadata { + if input == "" { + return nil + } + klog.V(3).Infof("parsing instance metadata: %q", input) + raw := g.parseInstanceMetadata(input) + klog.V(4).Infof("parsed instance metadata: %v", raw) + metadataItems := []*compute.MetadataItems{} + for k, v := range raw { + val := v + metadataItems = append(metadataItems, &compute.MetadataItems{ + Key: k, + Value: &val, + }) + } + ret := compute.Metadata{Items: metadataItems} + return &ret +} + +func (g *GCERunner) deleteGCEInstance(host string) { + klog.Infof("Deleting instance %q", host) + _, err := g.gceComputeService.Instances.Delete(*project, *zone, host).Do() + if err != nil { + klog.Errorf("Error deleting instance %q: %v", host, err) + } +} + +func (g *GCERunner) parseInstanceMetadata(str string) map[string]string { + metadata := make(map[string]string) + ss := strings.Split(str, ",") + for _, s := range ss { + kv := strings.Split(s, "=") + if len(kv) == 2 { + metadata[kv[0]] = kv[1] + continue + } + kp := strings.Split(s, "<") + if len(kp) != 2 { + klog.Fatalf("Invalid instance metadata: %q", s) + continue + } + metaPath := kp[1] + if g.cfg.ImageConfigDir != "" { + metaPath = filepath.Join(g.cfg.ImageConfigDir, metaPath) + } + v, err := os.ReadFile(metaPath) + if err != nil { + klog.Fatalf("Failed to read metadata file %q: %v", metaPath, err) + continue + } + metadata[kp[0]] = ignitionInjectGCEPublicKey(metaPath, string(v)) + } + for k, v := range nodeEnvs { + metadata[k] = v + } + return metadata +} + +// ignitionInjectGCEPublicKey tries to inject the GCE SSH public key into the +// provided ignition file path. +// +// This will only being done if the job has the +// IGNITION_INJECT_GCE_SSH_PUBLIC_KEY_FILE environment variable set, while it +// tried to replace the GCE_SSH_PUBLIC_KEY_FILE_CONTENT placeholder. +func ignitionInjectGCEPublicKey(path string, content string) string { + if os.Getenv("IGNITION_INJECT_GCE_SSH_PUBLIC_KEY_FILE") == "" { + return content + } + + klog.Infof("Injecting SSH public key into ignition") + + const publicKeyEnv = "GCE_SSH_PUBLIC_KEY_FILE" + sshPublicKeyFile := os.Getenv(publicKeyEnv) + if sshPublicKeyFile == "" { + klog.Errorf("Environment variable %s is not set", publicKeyEnv) + os.Exit(1) + } + + sshPublicKey, err := os.ReadFile(sshPublicKeyFile) + if err != nil { + klog.ErrorS(err, "unable to read SSH public key file") + os.Exit(1) + } + + const sshPublicKeyFileContentMarker = "GCE_SSH_PUBLIC_KEY_FILE_CONTENT" + key := base64.StdEncoding.EncodeToString(sshPublicKey) + base64Marker := base64.StdEncoding.EncodeToString([]byte(sshPublicKeyFileContentMarker)) + replacer := strings.NewReplacer( + sshPublicKeyFileContentMarker, key, + base64Marker, key, + ) + return replacer.Replace(content) +} + +// Provision a gce instance using image and run the tests in archive against the instance. +// Delete the instance afterward. +func (g *GCERunner) testGCEImage(suite TestSuite, archivePath string, imageConfig *internalGCEImage, junitFileName string) *TestResult { + ginkgoFlagsStr := g.cfg.GinkgoFlags + + host, err := g.createGCEInstance(imageConfig) + if g.cfg.DeleteInstances { + defer g.deleteGCEInstance(host) + } + if err != nil { + return &TestResult{ + Err: fmt.Errorf("unable to create gce instance with running docker daemon for image %s. %v", imageConfig.image, err), + } + } + + // Only delete the files if we are keeping the instance and want it cleaned up. + // If we are going to delete the instance, don't bother with cleaning up the files + deleteFiles := !g.cfg.DeleteInstances && g.cfg.Cleanup + + if err = g.registerGceHostIP(host); err != nil { + return &TestResult{ + Err: err, + Host: host, + ExitOK: false, + } + } + + output, exitOk, err := RunRemote(RunRemoteConfig{ + suite: suite, + archive: archivePath, + host: host, + cleanup: deleteFiles, + imageDesc: imageConfig.imageDesc, + junitFileName: junitFileName, + testArgs: g.cfg.TestArgs, + ginkgoArgs: ginkgoFlagsStr, + systemSpecName: g.cfg.SystemSpecName, + extraEnvs: g.cfg.ExtraEnvs, + runtimeConfig: g.cfg.RuntimeConfig, + }) + result := TestResult{ + Output: output, + Err: err, + Host: host, + ExitOK: exitOk, + } + + // This is a temporary solution to collect serial node serial log. Only port 1 contains useful information. + // TODO(random-liu): Extract out and unify log collection logic with cluste e2e. + serialPortOutput, err := g.gceComputeService.Instances.GetSerialPortOutput(*project, *zone, host).Port(1).Do() + if err != nil { + klog.Errorf("Failed to collect serial Output from node %q: %v", host, err) + } else { + logFilename := "serial-1.log" + err := WriteLog(host, logFilename, serialPortOutput.Contents) + if err != nil { + klog.Errorf("Failed to write serial Output from node %q to %q: %v", host, logFilename, err) + } + } + return &result +} + +// Provision a gce instance using image +func (g *GCERunner) createGCEInstance(imageConfig *internalGCEImage) (string, error) { + p, err := g.gceComputeService.Projects.Get(*project).Do() + if err != nil { + return "", fmt.Errorf("failed to get project info %q: %w", *project, err) + } + // Use default service account + serviceAccount := p.DefaultServiceAccount + klog.V(1).Infof("Creating instance %+v with service account %q", *imageConfig, serviceAccount) + name := g.imageToInstanceName(imageConfig) + i := &compute.Instance{ + Name: name, + MachineType: g.machineType(imageConfig.machine), + NetworkInterfaces: []*compute.NetworkInterface{ + { + AccessConfigs: []*compute.AccessConfig{ + { + Type: "ONE_TO_ONE_NAT", + Name: "External NAT", + }, + }}, + }, + Disks: []*compute.AttachedDisk{ + { + AutoDelete: true, + Boot: true, + Type: "PERSISTENT", + InitializeParams: &compute.AttachedDiskInitializeParams{ + SourceImage: g.sourceImage(imageConfig.image, imageConfig.project), + DiskSizeGb: 20, + }, + }, + }, + ServiceAccounts: []*compute.ServiceAccount{ + { + Email: serviceAccount, + Scopes: []string{ + "https://www.googleapis.com/auth/cloud-platform", + }, + }, + }, + } + + scheduling := compute.Scheduling{ + Preemptible: *preemptibleInstances, + } + for _, accelerator := range imageConfig.resources.Accelerators { + if i.GuestAccelerators == nil { + autoRestart := true + i.GuestAccelerators = []*compute.AcceleratorConfig{} + scheduling.OnHostMaintenance = "TERMINATE" + scheduling.AutomaticRestart = &autoRestart + } + aType := fmt.Sprintf(acceleratorTypeResourceFormat, *project, *zone, accelerator.Type) + ac := &compute.AcceleratorConfig{ + AcceleratorCount: accelerator.Count, + AcceleratorType: aType, + } + i.GuestAccelerators = append(i.GuestAccelerators, ac) + } + i.Scheduling = &scheduling + i.Metadata = imageConfig.metadata + var insertionOperationName string + if _, err := g.gceComputeService.Instances.Get(*project, *zone, i.Name).Do(); err != nil { + op, err := g.gceComputeService.Instances.Insert(*project, *zone, i).Do() + + if err != nil { + ret := fmt.Sprintf("could not create instance %s: API error: %v", name, err) + if op != nil { + ret = fmt.Sprintf("%s: %v", ret, op.Error) + } + return "", fmt.Errorf(ret) + } else if op.Error != nil { + var errs []string + for _, insertErr := range op.Error.Errors { + errs = append(errs, fmt.Sprintf("%+v", insertErr)) + } + return "", fmt.Errorf("could not create instance %s: %+v", name, errs) + + } + insertionOperationName = op.Name + } + instanceRunning := false + var instance *compute.Instance + for i := 0; i < 30 && !instanceRunning; i++ { + if i > 0 { + time.Sleep(time.Second * 20) + } + var insertionOperation *compute.Operation + insertionOperation, err = g.gceComputeService.ZoneOperations.Get(*project, *zone, insertionOperationName).Do() + if err != nil { + continue + } + if strings.ToUpper(insertionOperation.Status) != "DONE" { + err = fmt.Errorf("instance insert operation %s not in state DONE, was %s", name, insertionOperation.Status) + continue + } + if insertionOperation.Error != nil { + var errs []string + for _, insertErr := range insertionOperation.Error.Errors { + errs = append(errs, fmt.Sprintf("%+v", insertErr)) + } + return name, fmt.Errorf("could not create instance %s: %+v", name, errs) + } + + instance, err = g.gceComputeService.Instances.Get(*project, *zone, name).Do() + if err != nil { + continue + } + if strings.ToUpper(instance.Status) != "RUNNING" { + err = fmt.Errorf("instance %s not in state RUNNING, was %s", name, instance.Status) + continue + } + externalIP := g.getExternalIP(instance) + if len(externalIP) > 0 { + AddHostnameIP(name, externalIP) + } + + var output string + output, err = SSH(name, "sh", "-c", + "'systemctl list-units --type=service --state=running | grep -e containerd -e crio'") + if err != nil { + err = fmt.Errorf("instance %s not running containerd/crio daemon - Command failed: %s", name, output) + continue + } + if !strings.Contains(output, "containerd.service") && + !strings.Contains(output, "crio.service") { + err = fmt.Errorf("instance %s not running containerd/crio daemon: %s", name, output) + continue + } + instanceRunning = true + } + // If instance didn't reach running state in time, return with error now. + if err != nil { + return name, err + } + // Instance reached running state in time, make sure that cloud-init is complete + if g.isCloudInitUsed(imageConfig.metadata) { + cloudInitFinished := false + for i := 0; i < 60 && !cloudInitFinished; i++ { + if i > 0 { + time.Sleep(time.Second * 20) + } + var finished string + finished, err = SSH(name, "ls", "/var/lib/cloud/instance/boot-finished") + if err != nil { + err = fmt.Errorf("instance %s has not finished cloud-init script: %s", name, finished) + continue + } + cloudInitFinished = true + } + } + + // apply additional kernel arguments to the instance + if len(imageConfig.kernelArguments) > 0 { + klog.Info("Update kernel arguments") + if err := g.updateKernelArguments(instance, imageConfig.image, imageConfig.kernelArguments); err != nil { + return name, err + } + } + + return name, err +} + +func (g *GCERunner) isCloudInitUsed(metadata *compute.Metadata) bool { + if metadata == nil { + return false + } + for _, item := range metadata.Items { + if item.Key == "user-data" && item.Value != nil && strings.HasPrefix(*item.Value, "#cloud-config") { + return true + } + } + return false +} + +func (g *GCERunner) sourceImage(image, imageProject string) string { + return fmt.Sprintf("projects/%s/global/images/%s", imageProject, image) +} + +func (g *GCERunner) imageToInstanceName(imageConfig *internalGCEImage) string { + if imageConfig.machine == "" { + return g.cfg.InstanceNamePrefix + "-" + imageConfig.image + } + // For benchmark test, node name has the format 'machine-image-uuid' to run + // different machine types with the same image in parallel + return imageConfig.machine + "-" + imageConfig.image + "-" + uuid.New().String()[:8] +} + +func (g *GCERunner) registerGceHostIP(host string) error { + instance, err := g.gceComputeService.Instances.Get(*project, *zone, host).Do() + if err != nil { + return err + } + if strings.ToUpper(instance.Status) != "RUNNING" { + return fmt.Errorf("instance %s not in state RUNNING, was %s", host, instance.Status) + } + externalIP := g.getExternalIP(instance) + if len(externalIP) > 0 { + AddHostnameIP(host, externalIP) + } + return nil +} +func (g *GCERunner) getExternalIP(instance *compute.Instance) string { + for i := range instance.NetworkInterfaces { + ni := instance.NetworkInterfaces[i] + for j := range ni.AccessConfigs { + ac := ni.AccessConfigs[j] + if len(ac.NatIP) > 0 { + return ac.NatIP + } + } + } + return "" +} +func (g *GCERunner) updateKernelArguments(instance *compute.Instance, image string, kernelArgs []string) error { + kernelArgsString := strings.Join(kernelArgs, " ") + + var cmd []string + if strings.Contains(image, "cos") { + cmd = []string{ + "dir=$(mktemp -d)", + "mount /dev/sda12 ${dir}", + fmt.Sprintf("sed -i -e \"s|cros_efi|cros_efi %s|g\" ${dir}/efi/boot/grub.cfg", kernelArgsString), + "umount ${dir}", + "rmdir ${dir}", + } + } + + if strings.Contains(image, "ubuntu") { + cmd = []string{ + fmt.Sprintf("echo \"GRUB_CMDLINE_LINUX_DEFAULT=%s ${GRUB_CMDLINE_LINUX_DEFAULT}\" > /etc/default/grub.d/99-additional-arguments.cfg", kernelArgsString), + "/usr/sbin/update-grub", + } + } + + if len(cmd) == 0 { + klog.Warningf("The image %s does not support adding an additional kernel arguments", image) + return nil + } + + out, err := SSH(instance.Name, "sh", "-c", fmt.Sprintf("'%s'", strings.Join(cmd, "&&"))) + if err != nil { + klog.Errorf("failed to run command %s: out: %s, Err: %v", cmd, out, err) + return err + } + + if err := g.rebootInstance(instance); err != nil { + return err + } + + return nil +} + +func (g *GCERunner) machineType(machine string) string { + if machine == "" { + machine = defaultGCEMachine + } + return fmt.Sprintf("zones/%s/machineTypes/%s", *zone, machine) +} +func (g *GCERunner) rebootInstance(instance *compute.Instance) error { + // wait until the instance will not response to SSH + klog.Info("Reboot the node and wait for instance not to be available via SSH") + if waitErr := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) { + if _, err := SSH(instance.Name, "reboot"); err != nil { + return true, nil + } + + return false, nil + }); waitErr != nil { + return fmt.Errorf("the instance %s still response to SSH: %v", instance.Name, waitErr) + } + + // wait until the instance will response again to SSH + klog.Info("Wait for instance to be available via SSH") + if waitErr := wait.PollImmediate(30*time.Second, 5*time.Minute, func() (bool, error) { + if _, err := SSH(instance.Name, "sh", "-c", "date"); err != nil { + return false, nil + } + return true, nil + }); waitErr != nil { + return fmt.Errorf("the instance %s does not response to SSH: %v", instance.Name, waitErr) + } + + return nil +} diff --git a/test/e2e_node/remote/remote.go b/test/e2e_node/remote/remote.go index 8d38b3d8c5d..42d000fa24e 100644 --- a/test/e2e_node/remote/remote.go +++ b/test/e2e_node/remote/remote.go @@ -100,29 +100,37 @@ func CreateTestArchive(suite TestSuite, systemSpecName, kubeletConfigFile string return filepath.Join(dir, archiveName), nil } -// RunRemote returns the command output, whether the exit was ok, and any errors -func RunRemote(suite TestSuite, archive string, host string, cleanup bool, imageDesc, junitFileName, testArgs, ginkgoArgs, systemSpecName, extraEnvs, runtimeConfig string) (string, bool, error) { +// RunRemote returns the command Output, whether the exit was ok, and any errors +type RunRemoteConfig struct { + suite TestSuite + archive string + host string + cleanup bool + imageDesc, junitFileName, testArgs, ginkgoArgs, systemSpecName, extraEnvs, runtimeConfig string +} + +func RunRemote(cfg RunRemoteConfig) (string, bool, error) { // Create the temp staging directory - klog.V(2).Infof("Staging test binaries on %q", host) + klog.V(2).Infof("Staging test binaries on %q", cfg.host) workspace := newWorkspaceDir() // Do not sudo here, so that we can use scp to copy test archive to the directory. - if output, err := SSHNoSudo(host, "mkdir", workspace); err != nil { + if output, err := SSHNoSudo(cfg.host, "mkdir", workspace); err != nil { // Exit failure with the error - return "", false, fmt.Errorf("failed to create workspace directory %q on host %q: %v output: %q", workspace, host, err, output) + return "", false, fmt.Errorf("failed to create workspace directory %q on Host %q: %v Output: %q", workspace, cfg.host, err, output) } - if cleanup { + if cfg.cleanup { defer func() { - output, err := SSH(host, "rm", "-rf", workspace) + output, err := SSH(cfg.host, "rm", "-rf", workspace) if err != nil { - klog.Errorf("failed to cleanup workspace %q on host %q: %v. Output:\n%s", workspace, host, err, output) + klog.Errorf("failed to cleanup workspace %q on Host %q: %v. Output:\n%s", workspace, cfg.host, err, output) } }() } // Copy the archive to the staging directory - if output, err := runSSHCommand("scp", archive, fmt.Sprintf("%s:%s/", GetHostnameOrIP(host), workspace)); err != nil { + if output, err := runSSHCommand(cfg.host, "scp", cfg.archive, fmt.Sprintf("%s:%s/", GetHostnameOrIP(cfg.host), workspace)); err != nil { // Exit failure with the error - return "", false, fmt.Errorf("failed to copy test archive: %v, output: %q", err, output) + return "", false, fmt.Errorf("failed to copy test archive: %v, Output: %q", err, output) } // Extract the archive @@ -130,33 +138,34 @@ func RunRemote(suite TestSuite, archive string, host string, cleanup bool, image fmt.Sprintf("cd %s", workspace), fmt.Sprintf("tar -xzvf ./%s", archiveName), ) - klog.V(2).Infof("Extracting tar on %q", host) + klog.V(2).Infof("Extracting tar on %q", cfg.host) // Do not use sudo here, because `sudo tar -x` will recover the file ownership inside the tar ball, but // we want the extracted files to be owned by the current user. - if output, err := SSHNoSudo(host, "sh", "-c", cmd); err != nil { + if output, err := SSHNoSudo(cfg.host, "sh", "-c", cmd); err != nil { // Exit failure with the error - return "", false, fmt.Errorf("failed to extract test archive: %v, output: %q", err, output) + return "", false, fmt.Errorf("failed to extract test archive: %v, Output: %q", err, output) } // Create the test result directory. resultDir := filepath.Join(workspace, "results") - if output, err := SSHNoSudo(host, "mkdir", resultDir); err != nil { + if output, err := SSHNoSudo(cfg.host, "mkdir", resultDir); err != nil { // Exit failure with the error - return "", false, fmt.Errorf("failed to create test result directory %q on host %q: %v output: %q", resultDir, host, err, output) + return "", false, fmt.Errorf("failed to create test result directory %q on Host %q: %v Output: %q", resultDir, cfg.host, err, output) } - klog.V(2).Infof("Running test on %q", host) - output, err := suite.RunTest(host, workspace, resultDir, imageDesc, junitFileName, testArgs, ginkgoArgs, systemSpecName, extraEnvs, runtimeConfig, *testTimeout) + klog.V(2).Infof("Running test on %q", cfg.host) + output, err := cfg.suite.RunTest(cfg.host, workspace, resultDir, cfg.imageDesc, cfg.junitFileName, cfg.testArgs, + cfg.ginkgoArgs, cfg.systemSpecName, cfg.extraEnvs, cfg.runtimeConfig, *testTimeout) - aggErrs := []error{} - // Do not log the output here, let the caller deal with the test output. + var aggErrs []error + // Do not log the Output here, let the caller deal with the test Output. if err != nil { aggErrs = append(aggErrs, err) - collectSystemLog(host) + collectSystemLog(cfg.host) } - klog.V(2).Infof("Copying test artifacts from %q", host) - scpErr := getTestArtifacts(host, workspace) + klog.V(2).Infof("Copying test artifacts from %q", cfg.host) + scpErr := getTestArtifacts(cfg.host, workspace) if scpErr != nil { aggErrs = append(aggErrs, scpErr) } @@ -199,18 +208,18 @@ func getTestArtifacts(host, testDir string) error { return fmt.Errorf("failed to create log directory %q: %w", logPath, err) } // Copy logs to artifacts/hostname - if _, err := runSSHCommand("scp", "-r", fmt.Sprintf("%s:%s/results/*.log", GetHostnameOrIP(host), testDir), logPath); err != nil { + if _, err := runSSHCommand(host, "scp", "-r", fmt.Sprintf("%s:%s/results/*.log", GetHostnameOrIP(host), testDir), logPath); err != nil { return err } // Copy json files (if any) to artifacts. if _, err := SSH(host, "ls", fmt.Sprintf("%s/results/*.json", testDir)); err == nil { - if _, err = runSSHCommand("scp", "-r", fmt.Sprintf("%s:%s/results/*.json", GetHostnameOrIP(host), testDir), *resultsDir); err != nil { + if _, err = runSSHCommand(host, "scp", "-r", fmt.Sprintf("%s:%s/results/*.json", GetHostnameOrIP(host), testDir), *resultsDir); err != nil { return err } } if _, err := SSH(host, "ls", fmt.Sprintf("%s/results/junit*", testDir)); err == nil { // Copy junit (if any) to the top of artifacts - if _, err = runSSHCommand("scp", fmt.Sprintf("%s:%s/results/junit*", GetHostnameOrIP(host), testDir), *resultsDir); err != nil { + if _, err = runSSHCommand(host, "scp", fmt.Sprintf("%s:%s/results/junit*", GetHostnameOrIP(host), testDir), *resultsDir); err != nil { return err } } @@ -236,7 +245,7 @@ func collectSystemLog(host string) { // it could've be been removed if the node was rebooted. if output, err := SSH(host, "sh", "-c", fmt.Sprintf("'journalctl --system --all > %s'", logPath)); err == nil { klog.V(2).Infof("Got the system logs from journald; copying it back...") - if output, err := runSSHCommand("scp", fmt.Sprintf("%s:%s", GetHostnameOrIP(host), logPath), destPath); err != nil { + if output, err := runSSHCommand(host, "scp", fmt.Sprintf("%s:%s", GetHostnameOrIP(host), logPath), destPath); err != nil { klog.V(2).Infof("Failed to copy the log: err: %v, output: %q", err, output) } } else { diff --git a/test/e2e_node/remote/runner.go b/test/e2e_node/remote/runner.go new file mode 100644 index 00000000000..0cd0f3714b3 --- /dev/null +++ b/test/e2e_node/remote/runner.go @@ -0,0 +1,45 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +type Runner interface { + Validate() error + StartTests(suite TestSuite, archivePath string, results chan *TestResult) (numTests int) +} + +type Config struct { + InstanceNamePrefix string + ImageConfigFile string + Images []string + ImageConfigDir string + GinkgoFlags string + DeleteInstances bool + Cleanup bool + TestArgs string + ExtraEnvs string + RuntimeConfig string + SystemSpecName string + Hosts []string +} + +// TestResult contains some information about the test results. +type TestResult struct { + Output string + Err error + Host string + ExitOK bool +} diff --git a/test/e2e_node/remote/ssh.go b/test/e2e_node/remote/ssh.go index f62005cc97c..166d69b7044 100644 --- a/test/e2e_node/remote/ssh.go +++ b/test/e2e_node/remote/ssh.go @@ -35,6 +35,7 @@ var sshUser = flag.String("ssh-user", "", "Use predefined user for ssh.") var sshOptionsMap map[string]string var sshDefaultKeyMap map[string]string +var sshDefaultUserMap map[string]string func init() { usr, err := user.Current() @@ -43,6 +44,7 @@ func init() { } sshOptionsMap = map[string]string{ "gce": "-o UserKnownHostsFile=/dev/null -o IdentitiesOnly=yes -o CheckHostIP=no -o StrictHostKeyChecking=no -o ServerAliveInterval=30 -o LogLevel=ERROR", + "aws": "-o UserKnownHostsFile=/dev/null -o IdentitiesOnly=yes -o CheckHostIP=no -o StrictHostKeyChecking=no -o ServerAliveInterval=30 -o LogLevel=ERROR", } defaultGceKey := os.Getenv("GCE_SSH_PRIVATE_KEY_FILE") if defaultGceKey == "" { @@ -51,6 +53,9 @@ func init() { sshDefaultKeyMap = map[string]string{ "gce": defaultGceKey, } + sshDefaultUserMap = map[string]string{ + "aws": "ec2-user", + } } var hostnameIPOverrides = struct { @@ -65,6 +70,30 @@ func AddHostnameIP(hostname, ip string) { hostnameIPOverrides.m[hostname] = ip } +var sshKeyOverrides = struct { + sync.RWMutex + m map[string]string +}{m: make(map[string]string)} + +// AddSSHKey adds a pair into the sshKeyOverrides map +func AddSSHKey(hostname, keyFilePath string) { + sshKeyOverrides.Lock() + defer sshKeyOverrides.Unlock() + sshKeyOverrides.m[hostname] = keyFilePath +} + +// GetSSHUser returns the ssh-user CLI flag, the KUBE_SSH_USER environment variable, or the default ssh user +// for the ssh environment in that order +func GetSSHUser() string { + if *sshUser == "" { + *sshUser = os.Getenv("KUBE_SSH_USER") + } + if *sshUser == "" { + *sshUser = sshDefaultUserMap[*sshEnv] + } + return *sshUser +} + // GetHostnameOrIP converts hostname into ip and apply user if necessary. func GetHostnameOrIP(hostname string) string { hostnameIPOverrides.RLock() @@ -74,12 +103,9 @@ func GetHostnameOrIP(hostname string) string { host = ip } - if *sshUser == "" { - *sshUser = os.Getenv("KUBE_SSH_USER") - } - - if *sshUser != "" { - host = fmt.Sprintf("%s@%s", *sshUser, host) + sshUser := GetSSHUser() + if sshUser != "" { + host = fmt.Sprintf("%s@%s", sshUser, host) } return host } @@ -92,18 +118,18 @@ func getSSHCommand(sep string, args ...string) string { // SSH executes ssh command with runSSHCommand as root. The `sudo` makes sure that all commands // are executed by root, so that there won't be permission mismatch between different commands. func SSH(host string, cmd ...string) (string, error) { - return runSSHCommand("ssh", append([]string{GetHostnameOrIP(host), "--", "sudo"}, cmd...)...) + return runSSHCommand(host, "ssh", append([]string{GetHostnameOrIP(host), "--", "sudo"}, cmd...)...) } // SSHNoSudo executes ssh command with runSSHCommand as normal user. Sometimes we need this, // for example creating a directory that we'll copy files there with scp. func SSHNoSudo(host string, cmd ...string) (string, error) { - return runSSHCommand("ssh", append([]string{GetHostnameOrIP(host), "--"}, cmd...)...) + return runSSHCommand(host, "ssh", append([]string{GetHostnameOrIP(host), "--"}, cmd...)...) } // runSSHCommand executes the ssh or scp command, adding the flag provided --ssh-options -func runSSHCommand(cmd string, args ...string) (string, error) { - if key, err := getPrivateSSHKey(); len(key) != 0 { +func runSSHCommand(host, cmd string, args ...string) (string, error) { + if key, err := getPrivateSSHKey(host); len(key) != 0 { if err != nil { klog.Errorf("private SSH key (%s) not found. Check if the SSH key is configured properly:, err: %v", key, err) return "", fmt.Errorf("private SSH key (%s) does not exist", key) @@ -127,7 +153,7 @@ func runSSHCommand(cmd string, args ...string) (string, error) { } // getPrivateSSHKey returns the path to ssh private key -func getPrivateSSHKey() (string, error) { +func getPrivateSSHKey(host string) (string, error) { if *sshKey != "" { if _, err := os.Stat(*sshKey); err != nil { return *sshKey, err @@ -136,6 +162,12 @@ func getPrivateSSHKey() (string, error) { return *sshKey, nil } + sshKeyOverrides.Lock() + defer sshKeyOverrides.Unlock() + if key, ok := sshKeyOverrides.m[host]; ok { + return key, nil + } + if key, found := sshDefaultKeyMap[*sshEnv]; found { if _, err := os.Stat(key); err != nil { return key, err diff --git a/test/e2e_node/remote/ssh_runner.go b/test/e2e_node/remote/ssh_runner.go new file mode 100644 index 00000000000..3ec947817a4 --- /dev/null +++ b/test/e2e_node/remote/ssh_runner.go @@ -0,0 +1,75 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "fmt" +) + +var _ Runner = (*SSHRunner)(nil) + +type SSHRunner struct { + cfg Config +} + +func (s *SSHRunner) StartTests(suite TestSuite, archivePath string, results chan *TestResult) (numTests int) { + for _, host := range s.cfg.Hosts { + fmt.Printf("Initializing e2e tests using host %s.\n", host) + numTests++ + go func(host string, junitFileName string) { + output, exitOk, err := RunRemote(RunRemoteConfig{ + suite: suite, + archive: archivePath, + host: host, + cleanup: s.cfg.Cleanup, + imageDesc: "", + junitFileName: junitFileName, + testArgs: s.cfg.TestArgs, + ginkgoArgs: s.cfg.GinkgoFlags, + systemSpecName: s.cfg.SystemSpecName, + extraEnvs: s.cfg.ExtraEnvs, + runtimeConfig: s.cfg.RuntimeConfig, + }) + results <- &TestResult{ + Output: output, + Err: err, + Host: host, + ExitOK: exitOk, + } + }(host, host) + } + return +} + +func NewSSHRunner(cfg Config) Runner { + return &SSHRunner{ + cfg: cfg, + } +} + +func (s *SSHRunner) Validate() error { + if len(s.cfg.Hosts) == 0 { + return fmt.Errorf("must specify --hosts when running ssh") + } + if s.cfg.ImageConfigFile != "" { + return fmt.Errorf("must not specify --image-config-file when running ssh") + } + if len(s.cfg.Images) > 0 { + return fmt.Errorf("must not specify --images when running ssh") + } + return nil +} diff --git a/test/e2e_node/runner/remote/run_remote.go b/test/e2e_node/runner/remote/run_remote.go index 58364b13fe0..9b4edb65714 100644 --- a/test/e2e_node/runner/remote/run_remote.go +++ b/test/e2e_node/runner/remote/run_remote.go @@ -21,93 +21,42 @@ limitations under the License. package main import ( - "context" - "encoding/base64" "flag" "fmt" + "log" "math/rand" - "net/http" "os" "os/exec" "os/signal" - "path/filepath" - "regexp" - "sort" "strings" "sync" "time" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/test/e2e_node/remote" "k8s.io/kubernetes/test/e2e_node/system" - "github.com/google/uuid" - "golang.org/x/oauth2/google" - compute "google.golang.org/api/compute/v0.beta" - "google.golang.org/api/option" "k8s.io/klog/v2" - "sigs.k8s.io/yaml" ) var mode = flag.String("mode", "gce", "Mode to operate in. One of gce|ssh. Defaults to gce") var testArgs = flag.String("test_args", "", "Space-separated list of arguments to pass to Ginkgo test runner.") var testSuite = flag.String("test-suite", "default", "Test suite the runner initializes with. Currently support default|cadvisor|conformance") var instanceNamePrefix = flag.String("instance-name-prefix", "", "prefix for instance names") -var zone = flag.String("zone", "", "gce zone the hosts live in") -var project = flag.String("project", "", "gce project the hosts live in") var imageConfigFile = flag.String("image-config-file", "", "yaml file describing images to run") -var imageConfigDir = flag.String("image-config-dir", "", "(optional)path to image config files") -var imageProject = flag.String("image-project", "", "gce project the hosts live in") +var imageConfigDir = flag.String("image-config-dir", "", "(optional) path to image config files") var images = flag.String("images", "", "images to test") -var preemptibleInstances = flag.Bool("preemptible-instances", false, "If true, gce instances will be configured to be preemptible") var hosts = flag.String("hosts", "", "hosts to test") var cleanup = flag.Bool("cleanup", true, "If true remove files from remote hosts and delete temporary instances") var deleteInstances = flag.Bool("delete-instances", true, "If true, delete any instances created") var buildOnly = flag.Bool("build-only", false, "If true, build e2e_node_test.tar.gz and exit.") -var instanceMetadata = flag.String("instance-metadata", "", "key/value metadata for instances separated by '=' or '<', 'k=v' means the key is 'k' and the value is 'v'; 'k 0 { + running += sshRunner.StartTests(suite, path, results) } // Wait for all tests to complete and emit the results @@ -285,20 +168,20 @@ func main() { exitOk := true for i := 0; i < running; i++ { tr := <-results - host := tr.host + host := tr.Host fmt.Println() // Print an empty line fmt.Printf("%s>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>%s\n", blue, noColour) fmt.Printf("%s> START TEST >%s\n", blue, noColour) fmt.Printf("%s>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>%s\n", blue, noColour) fmt.Printf("Start Test Suite on Host %s\n", host) - fmt.Printf("%s\n", tr.output) - if tr.err != nil { + fmt.Printf("%s\n", tr.Output) + if tr.Err != nil { errCount++ - fmt.Printf("Failure Finished Test Suite on Host %s. Refer to artifacts directory for ginkgo log for this host.\n%v\n", host, tr.err) + fmt.Printf("Failure Finished Test Suite on Host %s. Refer to artifacts directory for ginkgo log for this host.\n%v\n", host, tr.Err) } else { fmt.Printf("Success Finished Test Suite on Host %s. Refer to artifacts directory for ginkgo log for this host.\n", host) } - exitOk = exitOk && tr.exitOk + exitOk = exitOk && tr.ExitOK fmt.Printf("%s<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<%s\n", blue, noColour) fmt.Printf("%s< FINISH TEST <%s\n", blue, noColour) fmt.Printf("%s<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<%s\n", blue, noColour) @@ -314,95 +197,11 @@ func main() { callGubernator(*gubernator) } -func prepareGceImages() (*internalImageConfig, error) { - gceImages := &internalImageConfig{ - images: make(map[string]internalGCEImage), +func splitCommaList(s string) []string { + if len(s) == 0 { + return nil } - - // Parse images from given config file and convert them to internalGCEImage. - if *imageConfigFile != "" { - configPath := *imageConfigFile - if *imageConfigDir != "" { - configPath = filepath.Join(*imageConfigDir, *imageConfigFile) - } - - imageConfigData, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("Could not read image config file provided: %w", err) - } - // Unmarshal the given image config file. All images for this test run will be organized into a map. - // shortName->GCEImage, e.g cos-stable->cos-stable-81-12871-103-0. - externalImageConfig := ImageConfig{Images: make(map[string]GCEImage)} - err = yaml.Unmarshal(imageConfigData, &externalImageConfig) - if err != nil { - return nil, fmt.Errorf("Could not parse image config file: %w", err) - } - - for shortName, imageConfig := range externalImageConfig.Images { - var image string - if (imageConfig.ImageRegex != "" || imageConfig.ImageFamily != "") && imageConfig.Image == "" { - image, err = getGCEImage(imageConfig.ImageRegex, imageConfig.ImageFamily, imageConfig.Project) - if err != nil { - return nil, fmt.Errorf("Could not retrieve a image based on image regex %q and family %q: %v", - imageConfig.ImageRegex, imageConfig.ImageFamily, err) - } - } else { - image = imageConfig.Image - } - // Convert the given image into an internalGCEImage. - metadata := imageConfig.Metadata - if len(strings.TrimSpace(*instanceMetadata)) > 0 { - metadata += "," + *instanceMetadata - } - gceImage := internalGCEImage{ - image: image, - imageDesc: imageConfig.ImageDesc, - project: imageConfig.Project, - metadata: getImageMetadata(metadata), - kernelArguments: imageConfig.KernelArguments, - machine: imageConfig.Machine, - resources: imageConfig.Resources, - } - if gceImage.imageDesc == "" { - gceImage.imageDesc = gceImage.image - } - gceImages.images[shortName] = gceImage - } - } - - // Allow users to specify additional images via cli flags for local testing - // convenience; merge in with config file - if *images != "" { - if *imageProject == "" { - klog.Fatal("Must specify --image-project if you specify --images") - } - cliImages := strings.Split(*images, ",") - for _, image := range cliImages { - gceImage := internalGCEImage{ - image: image, - project: *imageProject, - metadata: getImageMetadata(*instanceMetadata), - } - gceImages.images[image] = gceImage - } - } - - if len(gceImages.images) != 0 && *zone == "" { - klog.Fatal("Must specify --zone flag") - } - // Make sure GCP project is set. Without a project, images can't be retrieved.. - for shortName, imageConfig := range gceImages.images { - if imageConfig.project == "" { - klog.Fatalf("Invalid config for %v; must specify a project", shortName) - } - } - if len(gceImages.images) != 0 { - if *project == "" { - klog.Fatal("Must specify --project flag to launch images into") - } - } - - return gceImages, nil + return strings.Split(s, ",") } func callGubernator(gubernator bool) { @@ -420,536 +219,35 @@ func callGubernator(gubernator bool) { return } -func (a *Archive) getArchive() (string, error) { +func (a *Archive) getArchive(suite remote.TestSuite) (string, error) { a.Do(func() { a.path, a.err = remote.CreateTestArchive(suite, *systemSpecName, *kubeletConfigFile) }) return a.path, a.err } func (a *Archive) deleteArchive() { - path, err := a.getArchive() + path, err := a.getArchive(nil) if err != nil { return } os.Remove(path) } -func getImageMetadata(input string) *compute.Metadata { - if input == "" { +// parseHostsList splits a host list of the form a=1.2.3.4,b=5.6.7.8 into the list of hosts [a,b] while registering the +// given addresses +func parseHostsList(hostList string) []string { + if len(hostList) == 0 { return nil } - klog.V(3).Infof("parsing instance metadata: %q", input) - raw := parseInstanceMetadata(input) - klog.V(4).Infof("parsed instance metadata: %v", raw) - metadataItems := []*compute.MetadataItems{} - for k, v := range raw { - val := v - metadataItems = append(metadataItems, &compute.MetadataItems{ - Key: k, - Value: &val, - }) + hosts := strings.Split(hostList, ",") + var hostsOnly []string + for _, host := range hosts { + segs := strings.Split(host, "=") + if len(segs) == 2 { + remote.AddHostnameIP(segs[0], segs[1]) + } else if len(segs) > 2 { + klog.Fatalf("invalid format of host %q", hostList) + } + hostsOnly = append(hostsOnly, segs[0]) } - ret := compute.Metadata{Items: metadataItems} - return &ret -} - -func registerGceHostIP(host string) error { - instance, err := computeService.Instances.Get(*project, *zone, host).Do() - if err != nil { - return err - } - if strings.ToUpper(instance.Status) != "RUNNING" { - return fmt.Errorf("instance %s not in state RUNNING, was %s", host, instance.Status) - } - externalIP := getExternalIP(instance) - if len(externalIP) > 0 { - remote.AddHostnameIP(host, externalIP) - } - return nil -} - -// Run tests in archive against host -func testHost(host string, deleteFiles bool, imageDesc, junitFileName, ginkgoFlagsStr string) *TestResult { - path, err := arc.getArchive() - if err != nil { - // Don't log fatal because we need to do any needed cleanup contained in "defer" statements - return &TestResult{ - err: fmt.Errorf("unable to create test archive: %w", err), - } - } - - output, exitOk, err := remote.RunRemote(suite, path, host, deleteFiles, imageDesc, junitFileName, *testArgs, ginkgoFlagsStr, *systemSpecName, *extraEnvs, *runtimeConfig) - return &TestResult{ - output: output, - err: err, - host: host, - exitOk: exitOk, - } -} - -type imageObj struct { - creationTime time.Time - name string -} - -type byCreationTime []imageObj - -func (a byCreationTime) Len() int { return len(a) } -func (a byCreationTime) Less(i, j int) bool { return a[i].creationTime.After(a[j].creationTime) } -func (a byCreationTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -// Returns an image name based on regex and given GCE project. -func getGCEImage(imageRegex, imageFamily string, project string) (string, error) { - imageObjs := []imageObj{} - imageRe := regexp.MustCompile(imageRegex) - if err := computeService.Images.List(project).Pages(context.Background(), - func(ilc *compute.ImageList) error { - for _, instance := range ilc.Items { - if imageRegex != "" && !imageRe.MatchString(instance.Name) { - continue - } - if imageFamily != "" && instance.Family != imageFamily { - continue - } - creationTime, err := time.Parse(time.RFC3339, instance.CreationTimestamp) - if err != nil { - return fmt.Errorf("failed to parse instance creation timestamp %q: %w", instance.CreationTimestamp, err) - } - io := imageObj{ - creationTime: creationTime, - name: instance.Name, - } - imageObjs = append(imageObjs, io) - } - return nil - }, - ); err != nil { - return "", fmt.Errorf("failed to list images in project %q: %w", project, err) - } - - // Pick the latest image after sorting. - sort.Sort(byCreationTime(imageObjs)) - if len(imageObjs) > 0 { - klog.V(4).Infof("found images %+v based on regex %q and family %q in project %q", imageObjs, imageRegex, imageFamily, project) - return imageObjs[0].name, nil - } - return "", fmt.Errorf("found zero images based on regex %q and family %q in project %q", imageRegex, imageFamily, project) -} - -// Provision a gce instance using image and run the tests in archive against the instance. -// Delete the instance afterward. -func testImage(imageConfig *internalGCEImage, junitFileName string, ginkgoFlagsStr string) *TestResult { - host, err := createInstance(imageConfig) - if *deleteInstances { - defer deleteInstance(host) - } - if err != nil { - return &TestResult{ - err: fmt.Errorf("unable to create gce instance with running docker daemon for image %s. %v", imageConfig.image, err), - } - } - - // Only delete the files if we are keeping the instance and want it cleaned up. - // If we are going to delete the instance, don't bother with cleaning up the files - deleteFiles := !*deleteInstances && *cleanup - - if err = registerGceHostIP(host); err != nil { - return &TestResult{ - err: err, - host: host, - exitOk: false, - } - } - - result := testHost(host, deleteFiles, imageConfig.imageDesc, junitFileName, ginkgoFlagsStr) - // This is a temporary solution to collect serial node serial log. Only port 1 contains useful information. - // TODO(random-liu): Extract out and unify log collection logic with cluste e2e. - serialPortOutput, err := computeService.Instances.GetSerialPortOutput(*project, *zone, host).Port(1).Do() - if err != nil { - klog.Errorf("Failed to collect serial output from node %q: %v", host, err) - } else { - logFilename := "serial-1.log" - err := remote.WriteLog(host, logFilename, serialPortOutput.Contents) - if err != nil { - klog.Errorf("Failed to write serial output from node %q to %q: %v", host, logFilename, err) - } - } - return result -} - -// Provision a gce instance using image -func createInstance(imageConfig *internalGCEImage) (string, error) { - p, err := computeService.Projects.Get(*project).Do() - if err != nil { - return "", fmt.Errorf("failed to get project info %q: %w", *project, err) - } - // Use default service account - serviceAccount := p.DefaultServiceAccount - klog.V(1).Infof("Creating instance %+v with service account %q", *imageConfig, serviceAccount) - name := imageToInstanceName(imageConfig) - i := &compute.Instance{ - Name: name, - MachineType: machineType(imageConfig.machine), - NetworkInterfaces: []*compute.NetworkInterface{ - { - AccessConfigs: []*compute.AccessConfig{ - { - Type: "ONE_TO_ONE_NAT", - Name: "External NAT", - }, - }}, - }, - Disks: []*compute.AttachedDisk{ - { - AutoDelete: true, - Boot: true, - Type: "PERSISTENT", - InitializeParams: &compute.AttachedDiskInitializeParams{ - SourceImage: sourceImage(imageConfig.image, imageConfig.project), - DiskSizeGb: 20, - }, - }, - }, - ServiceAccounts: []*compute.ServiceAccount{ - { - Email: serviceAccount, - Scopes: []string{ - "https://www.googleapis.com/auth/cloud-platform", - }, - }, - }, - } - - scheduling := compute.Scheduling{ - Preemptible: *preemptibleInstances, - } - for _, accelerator := range imageConfig.resources.Accelerators { - if i.GuestAccelerators == nil { - autoRestart := true - i.GuestAccelerators = []*compute.AcceleratorConfig{} - scheduling.OnHostMaintenance = "TERMINATE" - scheduling.AutomaticRestart = &autoRestart - } - aType := fmt.Sprintf(acceleratorTypeResourceFormat, *project, *zone, accelerator.Type) - ac := &compute.AcceleratorConfig{ - AcceleratorCount: accelerator.Count, - AcceleratorType: aType, - } - i.GuestAccelerators = append(i.GuestAccelerators, ac) - } - i.Scheduling = &scheduling - i.Metadata = imageConfig.metadata - var insertionOperationName string - if _, err := computeService.Instances.Get(*project, *zone, i.Name).Do(); err != nil { - op, err := computeService.Instances.Insert(*project, *zone, i).Do() - - if err != nil { - ret := fmt.Sprintf("could not create instance %s: API error: %v", name, err) - if op != nil { - ret = fmt.Sprintf("%s: %v", ret, op.Error) - } - return "", fmt.Errorf(ret) - } else if op.Error != nil { - var errs []string - for _, insertErr := range op.Error.Errors { - errs = append(errs, fmt.Sprintf("%+v", insertErr)) - } - return "", fmt.Errorf("could not create instance %s: %+v", name, errs) - - } - insertionOperationName = op.Name - } - instanceRunning := false - var instance *compute.Instance - for i := 0; i < 30 && !instanceRunning; i++ { - if i > 0 { - time.Sleep(time.Second * 20) - } - var insertionOperation *compute.Operation - insertionOperation, err = computeService.ZoneOperations.Get(*project, *zone, insertionOperationName).Do() - if err != nil { - continue - } - if strings.ToUpper(insertionOperation.Status) != "DONE" { - err = fmt.Errorf("instance insert operation %s not in state DONE, was %s", name, insertionOperation.Status) - continue - } - if insertionOperation.Error != nil { - var errs []string - for _, insertErr := range insertionOperation.Error.Errors { - errs = append(errs, fmt.Sprintf("%+v", insertErr)) - } - return name, fmt.Errorf("could not create instance %s: %+v", name, errs) - } - - instance, err = computeService.Instances.Get(*project, *zone, name).Do() - if err != nil { - continue - } - if strings.ToUpper(instance.Status) != "RUNNING" { - err = fmt.Errorf("instance %s not in state RUNNING, was %s", name, instance.Status) - continue - } - externalIP := getExternalIP(instance) - if len(externalIP) > 0 { - remote.AddHostnameIP(name, externalIP) - } - - var output string - output, err = remote.SSH(name, "sh", "-c", - "'systemctl list-units --type=service --state=running | grep -e containerd -e crio'") - if err != nil { - err = fmt.Errorf("instance %s not running containerd/crio daemon - Command failed: %s", name, output) - continue - } - if !strings.Contains(output, "containerd.service") && - !strings.Contains(output, "crio.service") { - err = fmt.Errorf("instance %s not running containerd/crio daemon: %s", name, output) - continue - } - instanceRunning = true - } - // If instance didn't reach running state in time, return with error now. - if err != nil { - return name, err - } - // Instance reached running state in time, make sure that cloud-init is complete - if isCloudInitUsed(imageConfig.metadata) { - cloudInitFinished := false - for i := 0; i < 60 && !cloudInitFinished; i++ { - if i > 0 { - time.Sleep(time.Second * 20) - } - var finished string - finished, err = remote.SSH(name, "ls", "/var/lib/cloud/instance/boot-finished") - if err != nil { - err = fmt.Errorf("instance %s has not finished cloud-init script: %s", name, finished) - continue - } - cloudInitFinished = true - } - } - - // apply additional kernel arguments to the instance - if len(imageConfig.kernelArguments) > 0 { - klog.Info("Update kernel arguments") - if err := updateKernelArguments(instance, imageConfig.image, imageConfig.kernelArguments); err != nil { - return name, err - } - } - - return name, err -} - -func updateKernelArguments(instance *compute.Instance, image string, kernelArgs []string) error { - kernelArgsString := strings.Join(kernelArgs, " ") - - var cmd []string - if strings.Contains(image, "cos") { - cmd = []string{ - "dir=$(mktemp -d)", - "mount /dev/sda12 ${dir}", - fmt.Sprintf("sed -i -e \"s|cros_efi|cros_efi %s|g\" ${dir}/efi/boot/grub.cfg", kernelArgsString), - "umount ${dir}", - "rmdir ${dir}", - } - } - - if strings.Contains(image, "ubuntu") { - cmd = []string{ - fmt.Sprintf("echo \"GRUB_CMDLINE_LINUX_DEFAULT=%s ${GRUB_CMDLINE_LINUX_DEFAULT}\" > /etc/default/grub.d/99-additional-arguments.cfg", kernelArgsString), - "/usr/sbin/update-grub", - } - } - - if len(cmd) == 0 { - klog.Warningf("The image %s does not support adding an additional kernel arguments", image) - return nil - } - - out, err := remote.SSH(instance.Name, "sh", "-c", fmt.Sprintf("'%s'", strings.Join(cmd, "&&"))) - if err != nil { - klog.Errorf("failed to run command %s: out: %s, err: %v", cmd, out, err) - return err - } - - if err := rebootInstance(instance); err != nil { - return err - } - - return nil -} - -func rebootInstance(instance *compute.Instance) error { - // wait until the instance will not response to SSH - klog.Info("Reboot the node and wait for instance not to be available via SSH") - if waitErr := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) { - if _, err := remote.SSH(instance.Name, "reboot"); err != nil { - return true, nil - } - - return false, nil - }); waitErr != nil { - return fmt.Errorf("the instance %s still response to SSH: %v", instance.Name, waitErr) - } - - // wait until the instance will response again to SSH - klog.Info("Wait for instance to be available via SSH") - if waitErr := wait.PollImmediate(30*time.Second, 5*time.Minute, func() (bool, error) { - if _, err := remote.SSH(instance.Name, "sh", "-c", "date"); err != nil { - return false, nil - } - return true, nil - }); waitErr != nil { - return fmt.Errorf("the instance %s does not response to SSH: %v", instance.Name, waitErr) - } - - return nil -} - -func isCloudInitUsed(metadata *compute.Metadata) bool { - if metadata == nil { - return false - } - for _, item := range metadata.Items { - if item.Key == "user-data" && item.Value != nil && strings.HasPrefix(*item.Value, "#cloud-config") { - return true - } - } - return false -} - -func getExternalIP(instance *compute.Instance) string { - for i := range instance.NetworkInterfaces { - ni := instance.NetworkInterfaces[i] - for j := range ni.AccessConfigs { - ac := ni.AccessConfigs[j] - if len(ac.NatIP) > 0 { - return ac.NatIP - } - } - } - return "" -} - -func getComputeClient() (*compute.Service, error) { - const retries = 10 - const backoff = time.Second * 6 - - // Setup the gce client for provisioning instances - // Getting credentials on gce jenkins is flaky, so try a couple times - var err error - var cs *compute.Service - for i := 0; i < retries; i++ { - if i > 0 { - time.Sleep(backoff) - } - - var client *http.Client - client, err = google.DefaultClient(context.Background(), compute.ComputeScope) - if err != nil { - continue - } - - cs, err = compute.NewService(context.Background(), option.WithHTTPClient(client)) - if err != nil { - continue - } - return cs, nil - } - return nil, err -} - -func deleteInstance(host string) { - klog.Infof("Deleting instance %q", host) - _, err := computeService.Instances.Delete(*project, *zone, host).Do() - if err != nil { - klog.Errorf("Error deleting instance %q: %v", host, err) - } -} - -func parseInstanceMetadata(str string) map[string]string { - metadata := make(map[string]string) - ss := strings.Split(str, ",") - for _, s := range ss { - kv := strings.Split(s, "=") - if len(kv) == 2 { - metadata[kv[0]] = kv[1] - continue - } - kp := strings.Split(s, "<") - if len(kp) != 2 { - klog.Fatalf("Invalid instance metadata: %q", s) - continue - } - metaPath := kp[1] - if *imageConfigDir != "" { - metaPath = filepath.Join(*imageConfigDir, metaPath) - } - v, err := os.ReadFile(metaPath) - if err != nil { - klog.Fatalf("Failed to read metadata file %q: %v", metaPath, err) - continue - } - metadata[kp[0]] = ignitionInjectGCEPublicKey(metaPath, string(v)) - } - for k, v := range nodeEnvs { - metadata[k] = v - } - return metadata -} - -// ignitionInjectGCEPublicKey tries to inject the GCE SSH public key into the -// provided ignition file path. -// -// This will only being done if the job has the -// IGNITION_INJECT_GCE_SSH_PUBLIC_KEY_FILE environment variable set, while it -// tried to replace the GCE_SSH_PUBLIC_KEY_FILE_CONTENT placeholder. -func ignitionInjectGCEPublicKey(path string, content string) string { - if os.Getenv("IGNITION_INJECT_GCE_SSH_PUBLIC_KEY_FILE") == "" { - return content - } - - klog.Infof("Injecting SSH public key into ignition") - - const publicKeyEnv = "GCE_SSH_PUBLIC_KEY_FILE" - sshPublicKeyFile := os.Getenv(publicKeyEnv) - if sshPublicKeyFile == "" { - klog.Errorf("Environment variable %s is not set", publicKeyEnv) - os.Exit(1) - } - - sshPublicKey, err := os.ReadFile(sshPublicKeyFile) - if err != nil { - klog.ErrorS(err, "unable to read SSH public key file") - os.Exit(1) - } - - const sshPublicKeyFileContentMarker = "GCE_SSH_PUBLIC_KEY_FILE_CONTENT" - key := base64.StdEncoding.EncodeToString(sshPublicKey) - base64Marker := base64.StdEncoding.EncodeToString([]byte(sshPublicKeyFileContentMarker)) - replacer := strings.NewReplacer( - sshPublicKeyFileContentMarker, key, - base64Marker, key, - ) - return replacer.Replace(content) -} - -func imageToInstanceName(imageConfig *internalGCEImage) string { - if imageConfig.machine == "" { - return *instanceNamePrefix + "-" + imageConfig.image - } - // For benchmark test, node name has the format 'machine-image-uuid' to run - // different machine types with the same image in parallel - return imageConfig.machine + "-" + imageConfig.image + "-" + uuid.New().String()[:8] -} - -func sourceImage(image, imageProject string) string { - return fmt.Sprintf("projects/%s/global/images/%s", imageProject, image) -} - -func machineType(machine string) string { - if machine == "" { - machine = defaultMachine - } - return fmt.Sprintf("zones/%s/machineTypes/%s", *zone, machine) + return hostsOnly }