Merge pull request #1492 from dave-tucker/gcp

Add gcp backend for moby run
This commit is contained in:
Justin Cormack 2017-04-05 15:17:01 +01:00 committed by GitHub
commit 1602277ba7
5 changed files with 489 additions and 62 deletions

View File

@ -36,6 +36,7 @@ type Moby struct {
Project string
Bucket string
Family string
Keys string
Public bool
Replace bool
}

View File

@ -1,100 +1,443 @@
package main
import (
"errors"
"crypto/rand"
"crypto/rsa"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/exec"
"time"
"cloud.google.com/go/storage"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/term"
"golang.org/x/crypto/ssh"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"google.golang.org/api/storage/v1"
)
func uploadGS(filename, project, bucket string, public bool) error {
if project != "" {
err := os.Setenv("GOOGLE_CLOUD_PROJECT", project)
const pollingInterval = 500 * time.Millisecond
const timeout = 300
// GCPClient contains state required for communication with GCP
type GCPClient struct {
client *http.Client
compute *compute.Service
storage *storage.Service
projectName string
fileName string
privKey *rsa.PrivateKey
}
// NewGCPClient creates a new GCP client
func NewGCPClient(keys, projectName string) (*GCPClient, error) {
log.Debugf("Connecting to GCP")
ctx := context.Background()
var client *GCPClient
if keys != "" {
log.Debugf("Using Keys %s", keys)
f, err := os.Open(keys)
if err != nil {
return err
return nil, err
}
jsonKey, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
config, err := google.JWTConfigFromJSON(jsonKey,
storage.DevstorageReadWriteScope,
compute.ComputeScope,
)
if err != nil {
return nil, err
}
client = &GCPClient{
client: config.Client(ctx),
projectName: projectName,
}
} else {
log.Debugf("Using Application Default crednetials")
gc, err := google.DefaultClient(
ctx,
storage.DevstorageReadWriteScope,
compute.ComputeScope,
)
if err != nil {
return nil, err
}
client = &GCPClient{
client: gc,
projectName: projectName,
}
}
if os.Getenv("GOOGLE_CLOUD_PROJECT") == "" {
return errors.New("GOOGLE_CLOUD_PROJECT environment variable must be set or project specified in config")
}
ctx := context.Background()
client, err := storage.NewClient(ctx)
var err error
client.compute, err = compute.New(client.client)
if err != nil {
return err
return nil, err
}
client.storage, err = storage.New(client.client)
if err != nil {
return nil, err
}
log.Debugf("Generating SSH Keypair")
client.privKey, err = rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return nil, err
}
return client, nil
}
// UploadFile uploads a file to Google Storage
func (g GCPClient) UploadFile(filename, bucketName string, public bool) error {
log.Infof("Uploading file %s to Google Storage", filename)
f, err := os.Open(filename)
if err != nil {
return err
}
defer f.Close()
obj := client.Bucket(bucket).Object(filename)
wc := obj.NewWriter(ctx)
_, err = io.Copy(wc, f)
if err != nil {
return err
}
err = wc.Close()
if err != nil {
return err
}
objectCall := g.storage.Objects.Insert(bucketName, &storage.Object{Name: filename}).Media(f)
if public {
err = obj.ACL().Set(ctx, storage.AllUsers, storage.RoleReader)
if err != nil {
return err
}
objectCall.PredefinedAcl("publicRead")
}
fmt.Println("gs://" + bucket + "/" + filename)
_, err = objectCall.Do()
if err != nil {
return err
}
log.Infof("Upload Complete!")
fmt.Println("gs://" + bucketName + "/" + filename)
return nil
}
func imageGS(filename, project, storage, family string, replace bool) error {
if project != "" {
err := os.Setenv("GOOGLE_CLOUD_PROJECT", project)
if err != nil {
return err
}
}
if os.Getenv("GOOGLE_CLOUD_PROJECT") == "" {
return errors.New("GOOGLE_CLOUD_PROJECT environment variable must be set or project specified in config")
}
// TODO do not shell out to gcloud tool, use the API
gcloud, err := exec.LookPath("gcloud")
if err != nil {
return errors.New("Please install the gcloud binary")
}
// CreateImage creates a GCE image using the a source from Google Storage
func (g GCPClient) CreateImage(filename, storageURL, family string, replace bool) error {
if replace {
args := []string{"compute", "images", "delete", filename}
cmd := exec.Command(gcloud, args...)
// ignore failures; it may not exist
_ = cmd.Run()
if err := g.DeleteImage(filename); err != nil {
return err
}
}
log.Infof("Creating image: %s", filename)
imgObj := &compute.Image{
RawDisk: &compute.ImageRawDisk{
Source: storageURL,
},
Name: filename,
}
args := []string{"compute", "images", "create", "--source-uri", storage}
if family != "" {
args = append(args, "--family", family)
imgObj.Family = family
}
args = append(args, filename)
cmd := exec.Command(gcloud, args...)
out, err := cmd.CombinedOutput()
op, err := g.compute.Images.Insert(g.projectName, imgObj).Do()
if err != nil {
return fmt.Errorf("Image creation failed: %v - %s", err, string(out))
return err
}
fmt.Println(filename)
if err := g.pollOperationStatus(op.Name); err != nil {
return err
}
log.Infof("Image %s created", filename)
return nil
}
// DeleteImage deletes and image
func (g GCPClient) DeleteImage(filename string) error {
var notFound bool
op, err := g.compute.Images.Delete(g.projectName, filename).Do()
if err != nil {
if err.(*googleapi.Error).Code != 404 {
return err
}
notFound = true
}
if !notFound {
log.Infof("Deleting existing image...")
if err := g.pollOperationStatus(op.Name); err != nil {
return err
}
log.Infof("Image %s deleted", filename)
}
return nil
}
// CreateInstance creates and starts an instance on GCE
func (g GCPClient) CreateInstance(image, zone, machineType string, replace bool) error {
if replace {
if err := g.DeleteInstance(image, zone, true); err != nil {
return err
}
}
log.Infof("Creating instance %s", image)
enabled := new(string)
*enabled = "1"
k, err := ssh.NewPublicKey(g.privKey.Public())
if err != nil {
return err
}
sshKey := new(string)
*sshKey = fmt.Sprintf("moby:%s moby", string(ssh.MarshalAuthorizedKey(k)))
instanceObj := &compute.Instance{
MachineType: fmt.Sprintf("zones/%s/machineTypes/%s", zone, machineType),
Name: image,
Disks: []*compute.AttachedDisk{
{
AutoDelete: true,
Boot: true,
InitializeParams: &compute.AttachedDiskInitializeParams{
SourceImage: fmt.Sprintf("global/images/%s", image),
},
},
},
NetworkInterfaces: []*compute.NetworkInterface{
{
Network: "global/networks/default",
},
},
Metadata: &compute.Metadata{
Items: []*compute.MetadataItems{
{
Key: "serial-port-enable",
Value: enabled,
},
{
Key: "ssh-keys",
Value: sshKey,
},
},
},
}
// Don't wait for operation to complete!
// A headstart is needed as by the time we've polled for this event to be
// completed, the instance may have already terminated
_, err = g.compute.Instances.Insert(g.projectName, zone, instanceObj).Do()
if err != nil {
return err
}
log.Infof("Instance created")
return nil
}
// DeleteInstance removes an instance
func (g GCPClient) DeleteInstance(instance, zone string, wait bool) error {
var notFound bool
op, err := g.compute.Instances.Delete(g.projectName, zone, instance).Do()
if err != nil {
if err.(*googleapi.Error).Code != 404 {
return err
}
notFound = true
}
if !notFound && wait {
log.Infof("Deleting existing instance...")
if err := g.pollZoneOperationStatus(op.Name, zone); err != nil {
return err
}
log.Infof("Instance %s deleted", instance)
}
return nil
}
// GetInstanceSerialOutput streams the serial output of an instance
func (g GCPClient) GetInstanceSerialOutput(instance, zone string) error {
log.Infof("Getting serial port output for instance %s", instance)
var next int64
for {
res, err := g.compute.Instances.GetSerialPortOutput(g.projectName, zone, instance).Start(next).Do()
if err != nil {
if err.(*googleapi.Error).Code == 400 {
// Instance may not be ready yet...
time.Sleep(pollingInterval)
continue
}
if err.(*googleapi.Error).Code == 503 {
// Timeout received when the instance has terminated
break
}
return err
}
fmt.Printf(res.Contents)
next = res.Next
// When the instance has been stopped, Start and Next will both be 0
if res.Start > 0 && next == 0 {
break
}
}
return nil
}
// ConnectToInstanceSerialPort uses SSH to connect to the serial port of the instance
func (g GCPClient) ConnectToInstanceSerialPort(instance, zone string) error {
log.Infof("Connecting to serial port of instance %s", instance)
gPubKeyURL := "https://cloud-certs.storage.googleapis.com/google-cloud-serialport-host-key.pub"
resp, err := http.Get(gPubKeyURL)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
gPubKey, _, _, _, err := ssh.ParseAuthorizedKey(body)
if err != nil {
return err
}
signer, err := ssh.NewSignerFromKey(g.privKey)
if err != nil {
return err
}
config := &ssh.ClientConfig{
User: fmt.Sprintf("%s.%s.%s.moby", g.projectName, zone, instance),
Auth: []ssh.AuthMethod{
ssh.PublicKeys(signer),
},
HostKeyCallback: ssh.FixedHostKey(gPubKey),
Timeout: 5 * time.Second,
}
var conn *ssh.Client
// Retry connection as VM may not be ready yet
for i := 0; i < timeout; i++ {
conn, err = ssh.Dial("tcp", "ssh-serialport.googleapis.com:9600", config)
if err != nil {
time.Sleep(pollingInterval)
continue
}
break
}
if conn == nil {
return fmt.Errorf(err.Error())
}
defer conn.Close()
session, err := conn.NewSession()
if err != nil {
return err
}
defer session.Close()
stdin, err := session.StdinPipe()
if err != nil {
return fmt.Errorf("Unable to setup stdin for session: %v", err)
}
go io.Copy(stdin, os.Stdin)
stdout, err := session.StdoutPipe()
if err != nil {
return fmt.Errorf("Unable to setup stdout for session: %v", err)
}
go io.Copy(os.Stdout, stdout)
stderr, err := session.StderrPipe()
if err != nil {
return fmt.Errorf("Unable to setup stderr for session: %v", err)
}
go io.Copy(os.Stderr, stderr)
/*
c := make(chan os.Signal, 1)
exit := make(chan bool, 1)
signal.Notify(c)
go func(exit <-chan bool, c <-chan os.Signal) {
select {
case <-exit:
return
case s := <-c:
switch s {
// CTRL+C
case os.Interrupt:
session.Signal(ssh.SIGINT)
// CTRL+\
case os.Kill:
session.Signal(ssh.SIGQUIT)
default:
log.Debugf("Received signal %s but not forwarding to ssh", s)
}
}
}(exit, c)
*/
var termWidth, termHeight int
fd := os.Stdin.Fd()
if term.IsTerminal(fd) {
oldState, err := term.MakeRaw(fd)
if err != nil {
return err
}
defer term.RestoreTerminal(fd, oldState)
winsize, err := term.GetWinsize(fd)
if err != nil {
termWidth = 80
termHeight = 24
} else {
termWidth = int(winsize.Width)
termHeight = int(winsize.Height)
}
}
session.RequestPty("xterm", termHeight, termWidth, ssh.TerminalModes{
ssh.ECHO: 1,
})
session.Shell()
err = session.Wait()
//exit <- true
if err != nil {
return err
}
return nil
}
func (g *GCPClient) pollOperationStatus(operationName string) error {
for i := 0; i < timeout; i++ {
operation, err := g.compute.GlobalOperations.Get(g.projectName, operationName).Do()
if err != nil {
return fmt.Errorf("error fetching operation status: %v", err)
}
if operation.Error != nil {
return fmt.Errorf("error running operation: %v", operation.Error)
}
if operation.Status == "DONE" {
return nil
}
time.Sleep(pollingInterval)
}
return fmt.Errorf("timeout waiting for operation to finish")
}
func (g *GCPClient) pollZoneOperationStatus(operationName, zone string) error {
for i := 0; i < timeout; i++ {
operation, err := g.compute.ZoneOperations.Get(g.projectName, zone, operationName).Do()
if err != nil {
return fmt.Errorf("error fetching operation status: %v", err)
}
if operation.Error != nil {
return fmt.Errorf("error running operation: %v", operation.Error)
}
if operation.Status == "DONE" {
return nil
}
time.Sleep(pollingInterval)
}
return fmt.Errorf("timeout waiting for operation to finish")
}

View File

@ -51,7 +51,11 @@ func outputs(m *Moby, base string, bzimage []byte, initrd []byte) error {
if o.Bucket == "" {
return fmt.Errorf("No bucket specified for GCE output")
}
err = uploadGS(base+".img.tar.gz", o.Project, o.Bucket, o.Public)
gClient, err := NewGCPClient(o.Keys, o.Project)
if err != nil {
return fmt.Errorf("Unable to connect to GCP")
}
err = gClient.UploadFile(base+".img.tar.gz", o.Bucket, o.Public)
if err != nil {
return fmt.Errorf("Error copying to Google Storage: %v", err)
}
@ -63,11 +67,15 @@ func outputs(m *Moby, base string, bzimage []byte, initrd []byte) error {
if o.Bucket == "" {
return fmt.Errorf("No bucket specified for GCE output")
}
err = uploadGS(base+".img.tar.gz", o.Project, o.Bucket, o.Public)
gClient, err := NewGCPClient(o.Keys, o.Project)
if err != nil {
return fmt.Errorf("Unable to connect to GCP")
}
err = gClient.UploadFile(base+".img.tar.gz", o.Bucket, o.Public)
if err != nil {
return fmt.Errorf("Error copying to Google Storage: %v", err)
}
err = imageGS(base, o.Project, "https://storage.googleapis.com/"+o.Bucket+"/"+base+".img.tar.gz", o.Family, o.Replace)
err = gClient.CreateImage(base, "https://storage.googleapis.com/"+o.Bucket+"/"+base+".img.tar.gz", o.Family, o.Replace)
if err != nil {
return fmt.Errorf("Error creating Google Compute Image: %v", err)
}

View File

@ -37,6 +37,8 @@ func run(args []string) {
runHyperKit(args[1:])
case "vmware":
runVMware(args[1:])
case "gcp":
runGcp(args[1:])
default:
switch runtime.GOOS {
case "darwin":

73
src/cmd/moby/run_gcp.go Normal file
View File

@ -0,0 +1,73 @@
package main
import (
"flag"
"fmt"
"os"
"strings"
log "github.com/Sirupsen/logrus"
)
// Process the run arguments and execute run
func runGcp(args []string) {
gcpCmd := flag.NewFlagSet("gcp", flag.ExitOnError)
gcpCmd.Usage = func() {
fmt.Printf("USAGE: %s run gcp [options] [name]\n\n", os.Args[0])
fmt.Printf("'name' specifies either the name of an already uploaded\n")
fmt.Printf("GCE image or the full path to a image file which will be\n")
fmt.Printf("uploaded before it is run.\n\n")
fmt.Printf("Options:\n\n")
gcpCmd.PrintDefaults()
}
zone := gcpCmd.String("zone", "europe-west1-d", "GCP Zone")
machine := gcpCmd.String("machine", "g1-small", "GCE Machine Type")
keys := gcpCmd.String("keys", "", "Path to Service Account JSON key file")
project := gcpCmd.String("project", "", "GCP Project Name")
bucket := gcpCmd.String("bucket", "", "GS Bucket to upload to. *Required* when 'prefix' is a filename")
public := gcpCmd.Bool("public", false, "Select if file on GS should be public. *Optional* when 'prefix' is a filename")
family := gcpCmd.String("family", "", "GCE Image Family. A group of images where the family name points to the most recent image. *Optional* when 'prefix' is a filename")
gcpCmd.Parse(args)
remArgs := gcpCmd.Args()
if len(remArgs) == 0 {
fmt.Printf("Please specify the prefix to the image to boot\n")
gcpCmd.Usage()
os.Exit(1)
}
prefix := remArgs[0]
client, err := NewGCPClient(*keys, *project)
if err != nil {
log.Fatalf("Unable to connect to GCP")
}
suffix := ".img.tar.gz"
if strings.HasSuffix(prefix, suffix) {
filename := prefix
prefix = prefix[:len(prefix)-len(suffix)]
if *bucket == "" {
log.Fatalf("No bucket specified. Please provide one using the -bucket flag")
}
err = client.UploadFile(filename, *bucket, *public)
if err != nil {
log.Fatalf("Error copying to Google Storage: %v", err)
}
err = client.CreateImage(prefix, "https://storage.googleapis.com/"+*bucket+"/"+prefix+".img.tar.gz", *family, true)
if err != nil {
log.Fatalf("Error creating Google Compute Image: %v", err)
}
}
if err = client.CreateInstance(prefix, *zone, *machine, true); err != nil {
log.Fatal(err)
}
if err = client.ConnectToInstanceSerialPort(prefix, *zone); err != nil {
log.Fatal(err)
}
if err = client.DeleteInstance(prefix, *zone, true); err != nil {
log.Fatal(err)
}
}