Use gcloud for gce_runner

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
This commit is contained in:
Davanum Srinivas 2024-01-26 11:52:18 -05:00
parent 7340ce932b
commit 76ea142051
No known key found for this signature in database
GPG Key ID: 80D83A796103BF59
2 changed files with 260 additions and 173 deletions

View File

@ -17,12 +17,11 @@ limitations under the License.
package gce
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"flag"
"fmt"
"net/http"
"os"
"path/filepath"
"regexp"
@ -33,9 +32,6 @@ import (
"k8s.io/kubernetes/test/e2e_node/remote"
"github.com/google/uuid"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/api/option"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"
@ -85,14 +81,12 @@ func init() {
}
const (
defaultGCEMachine = "e2-standard-2"
acceleratorTypeResourceFormat = "https://www.googleapis.com/compute/beta/projects/%s/zones/%s/acceleratorTypes/%s"
defaultGCEMachine = "e2-standard-2"
)
type GCERunner struct {
cfg remote.Config
gceComputeService *compute.Service
gceImages *internalGCEImageConfig
cfg remote.Config
gceImages *internalGCEImageConfig
}
func NewGCERunner(cfg remote.Config) remote.Runner {
@ -106,10 +100,10 @@ func (g *GCERunner) Validate() error {
if len(g.cfg.Hosts) == 0 && g.cfg.ImageConfigFile == "" && len(g.cfg.Images) == 0 {
klog.Fatalf("Must specify one of --image-config-file, --hosts, --images.")
}
var err error
g.gceComputeService, err = getComputeClient()
_, err := runGCPCommandWithZones("compute", "instances", "list")
if err != nil {
return fmt.Errorf("Unable to create gcloud compute service using defaults. Make sure you are authenticated. %w", err)
klog.Fatalf("While listing GCE instances: %v", err)
}
if g.gceImages, err = g.prepareGceImages(); err != nil {
@ -130,34 +124,6 @@ func (g *GCERunner) StartTests(suite remote.TestSuite, archivePath string, resul
return
}
func getComputeClient() (*compute.Service, error) {
const retries = 10
const backoff = time.Second * 6
// Setup the gce client for provisioning instances
// Getting credentials on gce jenkins is flaky, so try a couple times
var err error
var cs *compute.Service
for i := 0; i < retries; i++ {
if i > 0 {
time.Sleep(backoff)
}
var client *http.Client
client, err = google.DefaultClient(context.Background(), compute.ComputeScope)
if err != nil {
continue
}
cs, err = compute.NewService(context.Background(), option.WithHTTPClient(client))
if err != nil {
continue
}
return cs, nil
}
return nil, err
}
// Accelerator contains type and count about resource.
type Accelerator struct {
Type string `json:"type,omitempty"`
@ -178,7 +144,7 @@ type internalGCEImage struct {
kernelArguments []string
project string
resources Resources
metadata *compute.Metadata
metadata *gceMetadata
machine string
}
@ -220,31 +186,35 @@ type GCEImage struct {
// Returns an image name based on regex and given GCE project.
func (g *GCERunner) getGCEImage(imageRegex, imageFamily string, project string) (string, error) {
data, err := runGCPCommandNoProject("compute", "images", "list",
"--format=json", "--project="+project)
if err != nil {
return "", fmt.Errorf("failed to list images in project %q: %w", project, err)
}
var images []gceImage
err = json.Unmarshal(data, &images)
if err != nil {
return "", fmt.Errorf("failed to parse images: %w", err)
}
imageObjs := []imageObj{}
imageRe := regexp.MustCompile(imageRegex)
if err := g.gceComputeService.Images.List(project).Pages(context.Background(),
func(ilc *compute.ImageList) error {
for _, instance := range ilc.Items {
if imageRegex != "" && !imageRe.MatchString(instance.Name) {
continue
}
if imageFamily != "" && instance.Family != imageFamily {
continue
}
creationTime, err := time.Parse(time.RFC3339, instance.CreationTimestamp)
if err != nil {
return fmt.Errorf("failed to parse instance creation timestamp %q: %w", instance.CreationTimestamp, err)
}
io := imageObj{
creationTime: creationTime,
name: instance.Name,
}
imageObjs = append(imageObjs, io)
}
return nil
},
); err != nil {
return "", fmt.Errorf("failed to list images in project %q: %w", project, err)
for _, instance := range images {
if imageRegex != "" && !imageRe.MatchString(instance.Name) {
continue
}
if imageFamily != "" && instance.Family != imageFamily {
continue
}
creationTime, err := time.Parse(time.RFC3339, instance.CreationTimestamp)
if err != nil {
return "", fmt.Errorf("failed to parse instance creation timestamp %q: %w", instance.CreationTimestamp, err)
}
io := imageObj{
creationTime: creationTime,
name: instance.Name,
}
imageObjs = append(imageObjs, io)
}
// Pick the latest image after sorting.
@ -357,28 +327,27 @@ func (a byCreationTime) Len() int { return len(a) }
func (a byCreationTime) Less(i, j int) bool { return a[i].creationTime.After(a[j].creationTime) }
func (a byCreationTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (g *GCERunner) getImageMetadata(input string) *compute.Metadata {
func (g *GCERunner) getImageMetadata(input string) *gceMetadata {
if input == "" {
return nil
}
klog.V(3).Infof("parsing instance metadata: %q", input)
raw := g.parseInstanceMetadata(input)
klog.V(4).Infof("parsed instance metadata: %v", raw)
metadataItems := []*compute.MetadataItems{}
metadataItems := []gceMetadataItems{}
for k, v := range raw {
val := v
metadataItems = append(metadataItems, &compute.MetadataItems{
metadataItems = append(metadataItems, gceMetadataItems{
Key: k,
Value: &val,
Value: v,
})
}
ret := compute.Metadata{Items: metadataItems}
ret := gceMetadata{Items: metadataItems}
return &ret
}
func (g *GCERunner) deleteGCEInstance(host string) {
func (g *GCERunner) DeleteGCEInstance(host string) {
klog.Infof("Deleting instance %q", host)
_, err := g.gceComputeService.Instances.Delete(*project, *zone, host).Do()
_, err := runGCPCommandWithZone("compute", "instances", "delete", host)
if err != nil {
klog.Errorf("Error deleting instance %q: %v", host, err)
}
@ -407,7 +376,7 @@ func (g *GCERunner) parseInstanceMetadata(str string) map[string]string {
klog.Fatalf("Failed to read metadata file %q: %v", metaPath, err)
continue
}
metadata[kp[0]] = ignitionInjectGCEPublicKey(metaPath, string(v))
metadata[kp[0]] = ignitionInjectGCEPublicKey(string(v))
}
for k, v := range nodeEnvs {
metadata[k] = v
@ -421,7 +390,7 @@ func (g *GCERunner) parseInstanceMetadata(str string) map[string]string {
// This will only being done if the job has the
// IGNITION_INJECT_GCE_SSH_PUBLIC_KEY_FILE environment variable set, while it
// tried to replace the GCE_SSH_PUBLIC_KEY_FILE_CONTENT placeholder.
func ignitionInjectGCEPublicKey(path string, content string) string {
func ignitionInjectGCEPublicKey(content string) string {
if os.Getenv("IGNITION_INJECT_GCE_SSH_PUBLIC_KEY_FILE") == "" {
return content
}
@ -458,7 +427,7 @@ func (g *GCERunner) testGCEImage(suite remote.TestSuite, archivePath string, ima
host, err := g.createGCEInstance(imageConfig)
if g.cfg.DeleteInstances {
defer g.deleteGCEInstance(host)
defer g.DeleteGCEInstance(host)
}
if err != nil {
return &remote.TestResult{
@ -500,125 +469,101 @@ func (g *GCERunner) testGCEImage(suite remote.TestSuite, archivePath string, ima
// This is a temporary solution to collect serial node serial log. Only port 1 contains useful information.
// TODO(random-liu): Extract out and unify log collection logic with cluste e2e.
serialPortOutput, err := g.gceComputeService.Instances.GetSerialPortOutput(*project, *zone, host).Port(1).Do()
contents, err := g.getSerialOutput(host)
logFilename := "serial-1.log"
err = remote.WriteLog(host, logFilename, contents)
if err != nil {
klog.Errorf("Failed to collect serial Output from node %q: %v", host, err)
} else {
logFilename := "serial-1.log"
err := remote.WriteLog(host, logFilename, serialPortOutput.Contents)
if err != nil {
klog.Errorf("Failed to write serial Output from node %q to %q: %v", host, logFilename, err)
}
klog.Errorf("Failed to write serial Output from node %q to %q: %v", host, logFilename, err)
}
return &result
}
// Provision a gce instance using image
func (g *GCERunner) createGCEInstance(imageConfig *internalGCEImage) (string, error) {
p, err := g.gceComputeService.Projects.Get(*project).Do()
data, err := runGCPCommand("compute", "project-info", "describe", "--format=json", "--project="+*project)
if err != nil {
return "", fmt.Errorf("failed to get project info %q: %w", *project, err)
return "", fmt.Errorf("failed to get project info for %q: %w", project, err)
}
var p projectInfo
err = json.Unmarshal(data, &p)
if err != nil {
return "", fmt.Errorf("failed parse project info %q: %w", *project, err)
}
// Use default service account
serviceAccount := p.DefaultServiceAccount
klog.V(1).Infof("Creating instance %+v with service account %q", *imageConfig, serviceAccount)
name := g.imageToInstanceName(imageConfig)
i := &compute.Instance{
Name: name,
MachineType: g.machineType(imageConfig.machine),
NetworkInterfaces: []*compute.NetworkInterface{
{
AccessConfigs: []*compute.AccessConfig{
{
Type: "ONE_TO_ONE_NAT",
Name: "External NAT",
},
}},
},
Disks: []*compute.AttachedDisk{
{
AutoDelete: true,
Boot: true,
Type: "PERSISTENT",
InitializeParams: &compute.AttachedDiskInitializeParams{
SourceImage: g.sourceImage(imageConfig.image, imageConfig.project),
DiskSizeGb: 20,
},
},
},
ServiceAccounts: []*compute.ServiceAccount{
{
Email: serviceAccount,
Scopes: []string{
"https://www.googleapis.com/auth/cloud-platform",
},
},
},
diskArgs := []string{
"image-project=" + imageConfig.project,
"image=" + imageConfig.image,
"type=pd-standard",
"auto-delete=yes",
"boot=yes",
"size=20GB",
}
scheduling := compute.Scheduling{
Preemptible: *preemptibleInstances,
createArgs := []string{"compute", "instances", "create"}
createArgs = append(createArgs, name)
createArgs = append(createArgs, "--machine-type="+imageConfig.machine)
createArgs = append(createArgs, "--create-disk="+strings.Join(diskArgs, ","))
createArgs = append(createArgs, "--service-account="+serviceAccount)
if *preemptibleInstances {
createArgs = append(createArgs, "--preemptible")
}
for _, accelerator := range imageConfig.resources.Accelerators {
if i.GuestAccelerators == nil {
autoRestart := true
i.GuestAccelerators = []*compute.AcceleratorConfig{}
scheduling.OnHostMaintenance = "TERMINATE"
scheduling.AutomaticRestart = &autoRestart
if len(imageConfig.resources.Accelerators) > 0 {
createArgs = append(createArgs, "--maintenance-policy=TERMINATE")
createArgs = append(createArgs, "--restart-on-failure")
for _, accelerator := range imageConfig.resources.Accelerators {
createArgs = append(createArgs,
fmt.Sprintf("--accelerator=count=%d,type=%s", accelerator.Count, accelerator.Type))
}
}
if imageConfig.metadata != nil {
var itemArgs []string
var itemFileArgs []string
for _, item := range imageConfig.metadata.Items {
if strings.HasPrefix(item.Key, "user-") || strings.HasPrefix(item.Key, "startup-") ||
strings.HasPrefix(item.Key, "containerd-") || strings.HasPrefix(item.Key, "cni-") {
dataFile, err := os.CreateTemp("", "metadata")
if err != nil {
return "", fmt.Errorf("unable to create temp file %v", err)
}
defer os.Remove(dataFile.Name()) // clean up
if err = os.WriteFile(dataFile.Name(), []byte(item.Value), 0666); err != nil {
return "", fmt.Errorf("could not write contents of metadata item into file %v", err)
}
itemFileArgs = append(itemFileArgs, item.Key+"="+dataFile.Name())
} else {
itemArgs = append(itemArgs, item.Key+"="+item.Value)
}
}
if len(itemArgs) > 0 {
createArgs = append(createArgs, "--metadata="+strings.Join(itemArgs, ","))
}
if len(itemFileArgs) > 0 {
createArgs = append(createArgs, "--metadata-from-file="+strings.Join(itemFileArgs, ","))
}
aType := fmt.Sprintf(acceleratorTypeResourceFormat, *project, *zone, accelerator.Type)
ac := &compute.AcceleratorConfig{
AcceleratorCount: accelerator.Count,
AcceleratorType: aType,
}
i.GuestAccelerators = append(i.GuestAccelerators, ac)
}
i.Scheduling = &scheduling
i.Metadata = imageConfig.metadata
var insertionOperationName string
if _, err := g.gceComputeService.Instances.Get(*project, *zone, i.Name).Do(); err != nil {
op, err := g.gceComputeService.Instances.Insert(*project, *zone, i).Do()
if _, err := getGCEInstance(name); err != nil {
fmt.Printf("Running gcloud with parameters : %#v\n", createArgs)
_, err := runGCPCommandWithZone(createArgs...)
if err != nil {
ret := fmt.Sprintf("could not create instance %s: API error: %v", name, err)
if op != nil {
ret = fmt.Sprintf("%s: %v", ret, op.Error)
}
return "", fmt.Errorf(ret)
} else if op.Error != nil {
var errs []string
for _, insertErr := range op.Error.Errors {
errs = append(errs, fmt.Sprintf("%+v", insertErr))
}
return "", fmt.Errorf("could not create instance %s: %+v", name, errs)
fmt.Println(err)
return "", fmt.Errorf("failed to create instance in project %q: %w", project, err)
}
insertionOperationName = op.Name
}
instanceRunning := false
var instance *compute.Instance
var instance *gceInstance
for i := 0; i < 30 && !instanceRunning; i++ {
if i > 0 {
time.Sleep(time.Second * 20)
}
var insertionOperation *compute.Operation
insertionOperation, err = g.gceComputeService.ZoneOperations.Get(*project, *zone, insertionOperationName).Do()
if err != nil {
continue
}
if strings.ToUpper(insertionOperation.Status) != "DONE" {
err = fmt.Errorf("instance insert operation %s not in state DONE, was %s", name, insertionOperation.Status)
continue
}
if insertionOperation.Error != nil {
var errs []string
for _, insertErr := range insertionOperation.Error.Errors {
errs = append(errs, fmt.Sprintf("%+v", insertErr))
}
return name, fmt.Errorf("could not create instance %s: %+v", name, errs)
}
instance, err = g.gceComputeService.Instances.Get(*project, *zone, name).Do()
instance, err := getGCEInstance(name)
if err != nil {
continue
}
@ -677,12 +622,12 @@ func (g *GCERunner) createGCEInstance(imageConfig *internalGCEImage) (string, er
return name, err
}
func (g *GCERunner) isCloudInitUsed(metadata *compute.Metadata) bool {
func (g *GCERunner) isCloudInitUsed(metadata *gceMetadata) bool {
if metadata == nil {
return false
}
for _, item := range metadata.Items {
if item.Key == "user-data" && item.Value != nil && strings.HasPrefix(*item.Value, "#cloud-config") {
if item.Key == "user-data" && item.Value != "" && strings.HasPrefix(item.Value, "#cloud-config") {
return true
}
}
@ -703,7 +648,7 @@ func (g *GCERunner) imageToInstanceName(imageConfig *internalGCEImage) string {
}
func (g *GCERunner) registerGceHostIP(host string) error {
instance, err := g.gceComputeService.Instances.Get(*project, *zone, host).Do()
instance, err := getGCEInstance(host)
if err != nil {
return err
}
@ -716,7 +661,8 @@ func (g *GCERunner) registerGceHostIP(host string) error {
}
return nil
}
func (g *GCERunner) getExternalIP(instance *compute.Instance) string {
func (g *GCERunner) getExternalIP(instance *gceInstance) string {
for i := range instance.NetworkInterfaces {
ni := instance.NetworkInterfaces[i]
for j := range ni.AccessConfigs {
@ -728,7 +674,8 @@ func (g *GCERunner) getExternalIP(instance *compute.Instance) string {
}
return ""
}
func (g *GCERunner) updateKernelArguments(instance *compute.Instance, image string, kernelArgs []string) error {
func (g *GCERunner) updateKernelArguments(instance *gceInstance, image string, kernelArgs []string) error {
kernelArgsString := strings.Join(kernelArgs, " ")
var cmd []string
@ -778,7 +725,8 @@ func (g *GCERunner) machineType(machine string) string {
}
return fmt.Sprintf("zones/%s/machineTypes/%s", *zone, ret)
}
func (g *GCERunner) rebootInstance(instance *compute.Instance) error {
func (g *GCERunner) rebootInstance(instance *gceInstance) error {
// wait until the instance will not response to SSH
klog.Info("Reboot the node and wait for instance not to be available via SSH")
if waitErr := wait.PollImmediate(5*time.Second, 5*time.Minute, func() (bool, error) {

View File

@ -0,0 +1,139 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"encoding/json"
"fmt"
"os/exec"
)
type gceImage struct {
CreationTimestamp string `json:"creationTimestamp"`
Family string `json:"family"`
Id string `json:"id"`
Name string `json:"name"`
}
type gceMetadata struct {
Fingerprint string `json:"fingerprint"`
Kind string `json:"kind"`
Items []gceMetadataItems `json:"items,omitempty"`
}
type gceMetadataItems struct {
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
}
type gceAccessConfigs struct {
Type string `json:"type"`
Name string `json:"name"`
NatIP string `json:"natIP,omitempty"`
}
type gceNetworkInterfaces struct {
AccessConfigs []gceAccessConfigs `json:"accessConfigs"`
}
type gceInstance struct {
CreationTimestamp string `json:"creationTimestamp"`
Description string `json:"description"`
Fingerprint string `json:"fingerprint"`
Id string `json:"id"`
KeyRevocationActionType string `json:"keyRevocationActionType"`
Kind string `json:"kind"`
LabelFingerprint string `json:"labelFingerprint"`
LastStartTimestamp string `json:"lastStartTimestamp"`
MachineType string `json:"machineType"`
Metadata gceMetadata `json:"metadata"`
Name string `json:"name"`
NetworkInterfaces []gceNetworkInterfaces `json:"networkInterfaces"`
Status string `json:"status"`
}
type projectInfo struct {
CommonInstanceMetadata struct {
Fingerprint string `json:"fingerprint"`
Items []struct {
Key string `json:"key"`
Value string `json:"value"`
} `json:"items"`
Kind string `json:"kind"`
} `json:"commonInstanceMetadata"`
CreationTimestamp string `json:"creationTimestamp"`
DefaultNetworkTier string `json:"defaultNetworkTier"`
DefaultServiceAccount string `json:"defaultServiceAccount"`
Id string `json:"id"`
}
func runGCPCommandWithZone(args ...string) ([]byte, error) {
if zone != nil && len(*zone) > 0 {
args = append(args, "--zone="+*zone)
}
return runGCPCommand(args...)
}
func runGCPCommandWithZones(args ...string) ([]byte, error) {
if zone != nil && len(*zone) > 0 {
args = append(args, "--zones="+*zone+",")
}
return runGCPCommand(args...)
}
func runGCPCommand(args ...string) ([]byte, error) {
if project != nil && len(*project) > 0 {
args = append(args, "--project="+*project)
}
return runGCPCommandNoProject(args...)
}
func runGCPCommandNoProject(args ...string) ([]byte, error) {
bytes, err := exec.Command("gcloud", args...).Output()
if err != nil {
var message string
if ee, ok := err.(*exec.ExitError); ok {
message = fmt.Sprintf("%v\n%v", ee, string(ee.Stderr))
} else {
message = fmt.Sprintf("%v", err)
}
return nil, fmt.Errorf("Unable to run gcloud command\n %s \n %w", message, err)
}
return bytes, nil
}
func getGCEInstance(host string) (*gceInstance, error) {
data, err := runGCPCommandWithZone("compute", "instances", "describe", host, "--format=json")
if err != nil {
return nil, fmt.Errorf("failed to describe instance in project %q: %w", project, err)
}
var gceHost gceInstance
err = json.Unmarshal(data, &gceHost)
if err != nil {
return nil, fmt.Errorf("failed to parse instance: %w", err)
}
return &gceHost, nil
}
func (g *GCERunner) getSerialOutput(host string) (string, error) {
data, err := runGCPCommandWithZone("compute", "instances", "get-serial-port-output", "--port=1", host)
if err != nil {
return "", fmt.Errorf("failed to describe instance in project %q: %w", project, err)
}
return string(data), nil
}