Merge pull request #23501 from a-robinson/md

Update our GCP metadata dependency to include a recent flakiness improvement
This commit is contained in:
Alex Robinson 2016-03-30 09:51:52 -07:00
commit cd0caf5312
11 changed files with 537 additions and 144 deletions

4
Godeps/Godeps.json generated
View File

@ -1059,11 +1059,11 @@
},
{
"ImportPath": "google.golang.org/cloud/compute/metadata",
"Rev": "2e43671e4ad874a7bca65746ff3edb38e6e93762"
"Rev": "eb47ba841d53d93506cfbfbc03927daf9cc48f88"
},
{
"ImportPath": "google.golang.org/cloud/internal",
"Rev": "2e43671e4ad874a7bca65746ff3edb38e6e93762"
"Rev": "eb47ba841d53d93506cfbfbc03927daf9cc48f88"
},
{
"ImportPath": "google.golang.org/grpc",

View File

@ -25,13 +25,21 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"google.golang.org/cloud/internal"
)
// metadataIP is the documented metadata server IP address.
const metadataIP = "169.254.169.254"
type cachedValue struct {
k string
trim bool
@ -45,33 +53,29 @@ var (
instID = &cachedValue{k: "instance/id", trim: true}
)
var metaClient = &http.Client{
var (
metaClient = &http.Client{
Transport: &internal.Transport{
Base: &http.Transport{
Dial: dialer().Dial,
ResponseHeaderTimeout: 750 * time.Millisecond,
Dial: (&net.Dialer{
Timeout: 2 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
ResponseHeaderTimeout: 2 * time.Second,
},
},
}
// go13Dialer is nil until we're using Go 1.3+.
// This is a workaround for https://github.com/golang/oauth2/issues/70, where
// net.Dialer.KeepAlive is unavailable on Go 1.2 (which App Engine as of
// Jan 2015 still runs).
//
// TODO(bradfitz,jbd,adg,dsymonds): remove this once App Engine supports Go
// 1.3+ and go-app-builder also supports 1.3+, or when Go 1.2 is no longer an
// option on App Engine.
var go13Dialer func() *net.Dialer
func dialer() *net.Dialer {
if fn := go13Dialer; fn != nil {
return fn()
}
return &net.Dialer{
Timeout: 750 * time.Millisecond,
subscribeClient = &http.Client{
Transport: &internal.Transport{
Base: &http.Transport{
Dial: (&net.Dialer{
Timeout: 2 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
},
},
}
}
)
// NotDefinedError is returned when requested metadata is not defined.
//
@ -86,35 +90,54 @@ func (suffix NotDefinedError) Error() string {
}
// Get returns a value from the metadata service.
// The suffix is appended to "http://metadata/computeMetadata/v1/".
// The suffix is appended to "http://${GCE_METADATA_HOST}/computeMetadata/v1/".
//
// If the GCE_METADATA_HOST environment variable is not defined, a default of
// 169.254.169.254 will be used instead.
//
// If the requested metadata is not defined, the returned error will
// be of type NotDefinedError.
func Get(suffix string) (string, error) {
val, _, err := getETag(metaClient, suffix)
return val, err
}
// getETag returns a value from the metadata service as well as the associated
// ETag using the provided client. This func is otherwise equivalent to Get.
func getETag(client *http.Client, suffix string) (value, etag string, err error) {
// Using a fixed IP makes it very difficult to spoof the metadata service in
// a container, which is an important use-case for local testing of cloud
// deployments. To enable spoofing of the metadata service, the environment
// variable GCE_METADATA_HOST is first inspected to decide where metadata
// requests shall go.
host := os.Getenv("GCE_METADATA_HOST")
if host == "" {
// Using 169.254.169.254 instead of "metadata" here because Go
// binaries built with the "netgo" tag and without cgo won't
// know the search suffix for "metadata" is
// ".google.internal", and this IP address is documented as
// being stable anyway.
url := "http://169.254.169.254/computeMetadata/v1/" + suffix
host = metadataIP
}
url := "http://" + host + "/computeMetadata/v1/" + suffix
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Metadata-Flavor", "Google")
res, err := metaClient.Do(req)
res, err := client.Do(req)
if err != nil {
return "", err
return "", "", err
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return "", NotDefinedError(suffix)
return "", "", NotDefinedError(suffix)
}
if res.StatusCode != 200 {
return "", fmt.Errorf("status code %d trying to fetch %s", res.StatusCode, url)
return "", "", fmt.Errorf("status code %d trying to fetch %s", res.StatusCode, url)
}
all, err := ioutil.ReadAll(res.Body)
if err != nil {
return "", err
return "", "", err
}
return string(all), nil
return string(all), res.Header.Get("Etag"), nil
}
func getTrimmed(suffix string) (s string, err error) {
@ -154,17 +177,85 @@ func OnGCE() bool {
return onGCE.v
}
onGCE.set = true
// We use the DNS name of the metadata service here instead of the IP address
// because we expect that to fail faster in the not-on-GCE case.
res, err := metaClient.Get("http://metadata.google.internal")
if err != nil {
return false
}
onGCE.v = res.Header.Get("Metadata-Flavor") == "Google"
onGCE.v = testOnGCE()
return onGCE.v
}
func testOnGCE() bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resc := make(chan bool, 2)
// Try two strategies in parallel.
// See https://github.com/GoogleCloudPlatform/gcloud-golang/issues/194
go func() {
res, err := ctxhttp.Get(ctx, metaClient, "http://"+metadataIP)
if err != nil {
resc <- false
return
}
defer res.Body.Close()
resc <- res.Header.Get("Metadata-Flavor") == "Google"
}()
go func() {
addrs, err := net.LookupHost("metadata.google.internal")
if err != nil || len(addrs) == 0 {
resc <- false
return
}
resc <- strsContains(addrs, metadataIP)
}()
return <-resc
}
// Subscribe subscribes to a value from the metadata service.
// The suffix is appended to "http://${GCE_METADATA_HOST}/computeMetadata/v1/".
// The suffix may contain query parameters.
//
// Subscribe calls fn with the latest metadata value indicated by the provided
// suffix. If the metadata value is deleted, fn is called with the empty string
// and ok false. Subscribe blocks until fn returns a non-nil error or the value
// is deleted. Subscribe returns the error value returned from the last call to
// fn, which may be nil when ok == false.
func Subscribe(suffix string, fn func(v string, ok bool) error) error {
const failedSubscribeSleep = time.Second * 5
// First check to see if the metadata value exists at all.
val, lastETag, err := getETag(subscribeClient, suffix)
if err != nil {
return err
}
if err := fn(val, true); err != nil {
return err
}
ok := true
if strings.ContainsRune(suffix, '?') {
suffix += "&wait_for_change=true&last_etag="
} else {
suffix += "?wait_for_change=true&last_etag="
}
for {
val, etag, err := getETag(subscribeClient, suffix+url.QueryEscape(lastETag))
if err != nil {
if _, deleted := err.(NotDefinedError); !deleted {
time.Sleep(failedSubscribeSleep)
continue // Retry on other errors.
}
ok = false
}
lastETag = etag
if err := fn(val, ok); err != nil || !ok {
return err
}
}
}
// ProjectID returns the current instance's project ID string.
func ProjectID() (string, error) { return projID.get() }
@ -181,14 +272,10 @@ func ExternalIP() (string, error) {
return getTrimmed("instance/network-interfaces/0/access-configs/0/external-ip")
}
// Hostname returns the instance's hostname. This will probably be of
// the form "INSTANCENAME.c.PROJECT.internal" but that isn't
// guaranteed.
//
// TODO: what is this defined to be? Docs say "The host name of the
// instance."
// Hostname returns the instance's hostname. This will be of the form
// "<instanceID>.c.<projID>.internal".
func Hostname() (string, error) {
return getTrimmed("network-interfaces/0/ip")
return getTrimmed("instance/hostname")
}
// InstanceTags returns the list of user-defined instance tags,
@ -210,6 +297,25 @@ func InstanceID() (string, error) {
return instID.get()
}
// InstanceName returns the current VM's instance ID string.
func InstanceName() (string, error) {
host, err := Hostname()
if err != nil {
return "", err
}
return strings.Split(host, ".")[0], nil
}
// Zone returns the current VM's zone, such as "us-central1-b".
func Zone() (string, error) {
zone, err := getTrimmed("instance/zone")
// zone is of the form "projects/<projNum>/zones/<zoneName>".
if err != nil {
return "", err
}
return zone[strings.LastIndex(zone, "/")+1:], nil
}
// InstanceAttributes returns the list of user-defined attributes,
// assigned when initially creating a GCE VM instance. The value of an
// attribute can be obtained with InstanceAttributeValue.
@ -265,3 +371,12 @@ func Scopes(serviceAccount string) ([]string, error) {
}
return lines("instance/service-accounts/" + serviceAccount + "/scopes")
}
func strsContains(ss []string, s string) bool {
for _, v := range ss {
if v == s {
return true
}
}
return false
}

View File

@ -77,8 +77,8 @@ func (c *cloudContext) service(name string, fill func(*http.Client) interface{})
// Google Cloud client's user-agent to the original
// request's user-agent header.
type Transport struct {
// Base represents the actual http.RoundTripper
// the requests will be delegated to.
// Base is the actual http.RoundTripper
// requests will use. It must not be nil.
Base http.RoundTripper
}
@ -90,7 +90,7 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
if ua == "" {
ua = userAgent
} else {
ua = fmt.Sprintf("%s;%s", ua, userAgent)
ua = fmt.Sprintf("%s %s", ua, userAgent)
}
req.Header.Set("User-Agent", ua)
return t.Base.RoundTrip(req)

View File

@ -3,7 +3,7 @@
// DO NOT EDIT!
/*
Package pb is a generated protocol buffer package.
Package datastore is a generated protocol buffer package.
It is generated from these files:
datastore_v1.proto
@ -42,7 +42,7 @@ It has these top-level messages:
AllocateIdsRequest
AllocateIdsResponse
*/
package pb
package datastore
import proto "github.com/golang/protobuf/proto"
import math "math"
@ -897,7 +897,7 @@ type PropertyExpression struct {
// Can only be used when grouping by at least one property. Must
// then be set on all properties in the projection that are not
// being grouped by.
AggregationFunction *PropertyExpression_AggregationFunction `protobuf:"varint,2,opt,name=aggregation_function,enum=pb.PropertyExpression_AggregationFunction" json:"aggregation_function,omitempty"`
AggregationFunction *PropertyExpression_AggregationFunction `protobuf:"varint,2,opt,name=aggregation_function,enum=datastore.PropertyExpression_AggregationFunction" json:"aggregation_function,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -924,7 +924,7 @@ type PropertyOrder struct {
// The property to order by.
Property *PropertyReference `protobuf:"bytes,1,req,name=property" json:"property,omitempty"`
// The direction to order by.
Direction *PropertyOrder_Direction `protobuf:"varint,2,opt,name=direction,enum=pb.PropertyOrder_Direction,def=1" json:"direction,omitempty"`
Direction *PropertyOrder_Direction `protobuf:"varint,2,opt,name=direction,enum=datastore.PropertyOrder_Direction,def=1" json:"direction,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -978,7 +978,7 @@ func (m *Filter) GetPropertyFilter() *PropertyFilter {
// A filter that merges the multiple other filters using the given operation.
type CompositeFilter struct {
// The operator for combining multiple filters.
Operator *CompositeFilter_Operator `protobuf:"varint,1,req,name=operator,enum=pb.CompositeFilter_Operator" json:"operator,omitempty"`
Operator *CompositeFilter_Operator `protobuf:"varint,1,req,name=operator,enum=datastore.CompositeFilter_Operator" json:"operator,omitempty"`
// The list of filters to combine.
// Must contain at least one filter.
Filter []*Filter `protobuf:"bytes,2,rep,name=filter" json:"filter,omitempty"`
@ -1008,7 +1008,7 @@ type PropertyFilter struct {
// The property to filter by.
Property *PropertyReference `protobuf:"bytes,1,req,name=property" json:"property,omitempty"`
// The operator to filter by.
Operator *PropertyFilter_Operator `protobuf:"varint,2,req,name=operator,enum=pb.PropertyFilter_Operator" json:"operator,omitempty"`
Operator *PropertyFilter_Operator `protobuf:"varint,2,req,name=operator,enum=datastore.PropertyFilter_Operator" json:"operator,omitempty"`
// The value to compare the property to.
Value *Value `protobuf:"bytes,3,req,name=value" json:"value,omitempty"`
XXX_unrecognized []byte `json:"-"`
@ -1134,14 +1134,14 @@ func (m *GqlQueryArg) GetCursor() []byte {
// A batch of results produced by a query.
type QueryResultBatch struct {
// The result type for every entity in entityResults.
EntityResultType *EntityResult_ResultType `protobuf:"varint,1,req,name=entity_result_type,enum=pb.EntityResult_ResultType" json:"entity_result_type,omitempty"`
EntityResultType *EntityResult_ResultType `protobuf:"varint,1,req,name=entity_result_type,enum=datastore.EntityResult_ResultType" json:"entity_result_type,omitempty"`
// The results for this batch.
EntityResult []*EntityResult `protobuf:"bytes,2,rep,name=entity_result" json:"entity_result,omitempty"`
// A cursor that points to the position after the last result in the batch.
// May be absent.
EndCursor []byte `protobuf:"bytes,4,opt,name=end_cursor" json:"end_cursor,omitempty"`
// The state of the query after the current batch.
MoreResults *QueryResultBatch_MoreResultsType `protobuf:"varint,5,req,name=more_results,enum=pb.QueryResultBatch_MoreResultsType" json:"more_results,omitempty"`
MoreResults *QueryResultBatch_MoreResultsType `protobuf:"varint,5,req,name=more_results,enum=datastore.QueryResultBatch_MoreResultsType" json:"more_results,omitempty"`
// The number of results skipped because of <code>Query.offset</code>.
SkippedResults *int32 `protobuf:"varint,6,opt,name=skipped_results" json:"skipped_results,omitempty"`
XXX_unrecognized []byte `json:"-"`
@ -1300,7 +1300,7 @@ type ReadOptions struct {
// Cannot be set when transaction is set.
// Lookup and ancestor queries default to STRONG, global queries default to
// EVENTUAL and cannot be set to STRONG.
ReadConsistency *ReadOptions_ReadConsistency `protobuf:"varint,1,opt,name=read_consistency,enum=pb.ReadOptions_ReadConsistency,def=0" json:"read_consistency,omitempty"`
ReadConsistency *ReadOptions_ReadConsistency `protobuf:"varint,1,opt,name=read_consistency,enum=datastore.ReadOptions_ReadConsistency,def=0" json:"read_consistency,omitempty"`
// The transaction to use. Optional.
Transaction []byte `protobuf:"bytes,2,opt,name=transaction" json:"transaction,omitempty"`
XXX_unrecognized []byte `json:"-"`
@ -1462,7 +1462,7 @@ func (m *RunQueryResponse) GetBatch() *QueryResultBatch {
// The request for BeginTransaction.
type BeginTransactionRequest struct {
// The transaction isolation level.
IsolationLevel *BeginTransactionRequest_IsolationLevel `protobuf:"varint,1,opt,name=isolation_level,enum=pb.BeginTransactionRequest_IsolationLevel,def=0" json:"isolation_level,omitempty"`
IsolationLevel *BeginTransactionRequest_IsolationLevel `protobuf:"varint,1,opt,name=isolation_level,enum=datastore.BeginTransactionRequest_IsolationLevel,def=0" json:"isolation_level,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1533,7 +1533,7 @@ type CommitRequest struct {
// The mutation to perform. Optional.
Mutation *Mutation `protobuf:"bytes,2,opt,name=mutation" json:"mutation,omitempty"`
// The type of commit to perform. Either TRANSACTIONAL or NON_TRANSACTIONAL.
Mode *CommitRequest_Mode `protobuf:"varint,5,opt,name=mode,enum=pb.CommitRequest_Mode,def=1" json:"mode,omitempty"`
Mode *CommitRequest_Mode `protobuf:"varint,5,opt,name=mode,enum=datastore.CommitRequest_Mode,def=1" json:"mode,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -1621,13 +1621,13 @@ func (m *AllocateIdsResponse) GetKey() []*Key {
}
func init() {
proto.RegisterEnum("pb.EntityResult_ResultType", EntityResult_ResultType_name, EntityResult_ResultType_value)
proto.RegisterEnum("pb.PropertyExpression_AggregationFunction", PropertyExpression_AggregationFunction_name, PropertyExpression_AggregationFunction_value)
proto.RegisterEnum("pb.PropertyOrder_Direction", PropertyOrder_Direction_name, PropertyOrder_Direction_value)
proto.RegisterEnum("pb.CompositeFilter_Operator", CompositeFilter_Operator_name, CompositeFilter_Operator_value)
proto.RegisterEnum("pb.PropertyFilter_Operator", PropertyFilter_Operator_name, PropertyFilter_Operator_value)
proto.RegisterEnum("pb.QueryResultBatch_MoreResultsType", QueryResultBatch_MoreResultsType_name, QueryResultBatch_MoreResultsType_value)
proto.RegisterEnum("pb.ReadOptions_ReadConsistency", ReadOptions_ReadConsistency_name, ReadOptions_ReadConsistency_value)
proto.RegisterEnum("pb.BeginTransactionRequest_IsolationLevel", BeginTransactionRequest_IsolationLevel_name, BeginTransactionRequest_IsolationLevel_value)
proto.RegisterEnum("pb.CommitRequest_Mode", CommitRequest_Mode_name, CommitRequest_Mode_value)
proto.RegisterEnum("datastore.EntityResult_ResultType", EntityResult_ResultType_name, EntityResult_ResultType_value)
proto.RegisterEnum("datastore.PropertyExpression_AggregationFunction", PropertyExpression_AggregationFunction_name, PropertyExpression_AggregationFunction_value)
proto.RegisterEnum("datastore.PropertyOrder_Direction", PropertyOrder_Direction_name, PropertyOrder_Direction_value)
proto.RegisterEnum("datastore.CompositeFilter_Operator", CompositeFilter_Operator_name, CompositeFilter_Operator_value)
proto.RegisterEnum("datastore.PropertyFilter_Operator", PropertyFilter_Operator_name, PropertyFilter_Operator_value)
proto.RegisterEnum("datastore.QueryResultBatch_MoreResultsType", QueryResultBatch_MoreResultsType_name, QueryResultBatch_MoreResultsType_value)
proto.RegisterEnum("datastore.ReadOptions_ReadConsistency", ReadOptions_ReadConsistency_name, ReadOptions_ReadConsistency_value)
proto.RegisterEnum("datastore.BeginTransactionRequest_IsolationLevel", BeginTransactionRequest_IsolationLevel_name, BeginTransactionRequest_IsolationLevel_value)
proto.RegisterEnum("datastore.CommitRequest_Mode", CommitRequest_Mode_name, CommitRequest_Mode_value)
}

View File

@ -1,10 +1,22 @@
// Copyright 2013 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// The datastore v1 service proto definitions
syntax = "proto2";
package pb;
package datastore;
option java_package = "com.google.api.services.datastore";

View File

@ -0,0 +1,25 @@
// Package opts holds the DialOpts struct, configurable by
// cloud.ClientOptions to set up transports for cloud packages.
//
// This is a separate page to prevent cycles between the core
// cloud packages.
package opts
import (
"net/http"
"golang.org/x/oauth2"
"google.golang.org/grpc"
)
type DialOpt struct {
Endpoint string
Scopes []string
UserAgent string
TokenSource oauth2.TokenSource
HTTPClient *http.Client
GRPCClient *grpc.ClientConn
GRPCDialOpts []grpc.DialOption
}

View File

@ -18,13 +18,11 @@ package testutil
import (
"io/ioutil"
"log"
"net/http"
"os"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/cloud"
)
const (
@ -32,10 +30,23 @@ const (
envPrivateKey = "GCLOUD_TESTS_GOLANG_KEY"
)
func Context(scopes ...string) context.Context {
key, projID := os.Getenv(envPrivateKey), os.Getenv(envProjID)
if key == "" || projID == "" {
log.Fatal("GCLOUD_TESTS_GOLANG_KEY and GCLOUD_TESTS_GOLANG_PROJECT_ID must be set. See CONTRIBUTING.md for details.")
// ProjID returns the project ID to use in integration tests, or the empty
// string if none is configured.
func ProjID() string {
projID := os.Getenv(envProjID)
if projID == "" {
return ""
}
return projID
}
// TokenSource returns the OAuth2 token source to use in integration tests,
// or nil if none is configured. TokenSource will log.Fatal if the token
// source is specified but missing or invalid.
func TokenSource(ctx context.Context, scopes ...string) oauth2.TokenSource {
key := os.Getenv(envPrivateKey)
if key == "" {
return nil
}
jsonKey, err := ioutil.ReadFile(key)
if err != nil {
@ -43,15 +54,7 @@ func Context(scopes ...string) context.Context {
}
conf, err := google.JWTConfigFromJSON(jsonKey, scopes...)
if err != nil {
log.Fatal(err)
log.Fatalf("google.JWTConfigFromJSON: %v", err)
}
return cloud.NewContext(projID, conf.Client(oauth2.NoContext))
}
func NoAuthContext() context.Context {
projID := os.Getenv(envProjID)
if projID == "" {
log.Fatal("GCLOUD_TESTS_GOLANG_PROJECT_ID must be set. See CONTRIBUTING.md for details.")
}
return cloud.NewContext(projID, &http.Client{Transport: http.DefaultTransport})
return conf.TokenSource(ctx)
}

View File

@ -0,0 +1,29 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build go1.5
package transport
import "net/http"
// makeReqCancel returns a closure that cancels the given http.Request
// when called.
func makeReqCancel(req *http.Request) func(http.RoundTripper) {
c := make(chan struct{})
req.Cancel = c
return func(http.RoundTripper) {
close(c)
}
}

View File

@ -12,26 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// +build go1.3
// +build !go1.5
package metadata
package transport
import (
"net"
"time"
)
import "net/http"
// This is a workaround for https://github.com/golang/oauth2/issues/70, where
// net.Dialer.KeepAlive is unavailable on Go 1.2 (which App Engine as of
// Jan 2015 still runs).
//
// TODO(bradfitz,jbd,adg): remove this once App Engine supports Go
// 1.3+.
func init() {
go13Dialer = func() *net.Dialer {
return &net.Dialer{
Timeout: 750 * time.Millisecond,
KeepAlive: 30 * time.Second,
// makeReqCancel returns a closure that cancels the given http.Request
// when called.
func makeReqCancel(req *http.Request) func(http.RoundTripper) {
// Go 1.4 and prior do not have a reliable way of cancelling a request.
// Transport.CancelRequest will only work if the request is already in-flight.
return func(r http.RoundTripper) {
if t, ok := r.(*http.Transport); ok {
t.CancelRequest(req)
}
}
}

View File

@ -0,0 +1,135 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transport
import (
"errors"
"fmt"
"net/http"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/cloud"
"google.golang.org/cloud/internal/opts"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/oauth"
)
// ErrHTTP is returned when on a non-200 HTTP response.
type ErrHTTP struct {
StatusCode int
Body []byte
err error
}
func (e *ErrHTTP) Error() string {
if e.err == nil {
return fmt.Sprintf("error during call, http status code: %v %s", e.StatusCode, e.Body)
}
return e.err.Error()
}
// NewHTTPClient returns an HTTP client for use communicating with a Google cloud
// service, configured with the given ClientOptions. It also returns the endpoint
// for the service as specified in the options.
func NewHTTPClient(ctx context.Context, opt ...cloud.ClientOption) (*http.Client, string, error) {
var o opts.DialOpt
for _, opt := range opt {
opt.Resolve(&o)
}
if o.GRPCClient != nil {
return nil, "", errors.New("unsupported GRPC base transport specified")
}
// TODO(djd): Wrap all http.Clients with appropriate internal version to add
// UserAgent header and prepend correct endpoint.
if o.HTTPClient != nil {
return o.HTTPClient, o.Endpoint, nil
}
if o.TokenSource == nil {
var err error
o.TokenSource, err = google.DefaultTokenSource(ctx, o.Scopes...)
if err != nil {
return nil, "", fmt.Errorf("google.DefaultTokenSource: %v", err)
}
}
return oauth2.NewClient(ctx, o.TokenSource), o.Endpoint, nil
}
// NewProtoClient returns a ProtoClient for communicating with a Google cloud service,
// configured with the given ClientOptions.
func NewProtoClient(ctx context.Context, opt ...cloud.ClientOption) (*ProtoClient, error) {
var o opts.DialOpt
for _, opt := range opt {
opt.Resolve(&o)
}
if o.GRPCClient != nil {
return nil, errors.New("unsupported GRPC base transport specified")
}
var client *http.Client
switch {
case o.HTTPClient != nil:
if o.TokenSource != nil {
return nil, errors.New("at most one of WithTokenSource or WithBaseHTTP may be provided")
}
client = o.HTTPClient
case o.TokenSource != nil:
client = oauth2.NewClient(ctx, o.TokenSource)
default:
var err error
client, err = google.DefaultClient(ctx, o.Scopes...)
if err != nil {
return nil, err
}
}
return &ProtoClient{
client: client,
endpoint: o.Endpoint,
userAgent: o.UserAgent,
}, nil
}
// DialGRPC returns a GRPC connection for use communicating with a Google cloud
// service, configured with the given ClientOptions.
func DialGRPC(ctx context.Context, opt ...cloud.ClientOption) (*grpc.ClientConn, error) {
var o opts.DialOpt
for _, opt := range opt {
opt.Resolve(&o)
}
if o.HTTPClient != nil {
return nil, errors.New("unsupported HTTP base transport specified")
}
if o.GRPCClient != nil {
return o.GRPCClient, nil
}
if o.TokenSource == nil {
var err error
o.TokenSource, err = google.DefaultTokenSource(ctx, o.Scopes...)
if err != nil {
return nil, fmt.Errorf("google.DefaultTokenSource: %v", err)
}
}
grpcOpts := []grpc.DialOption{
grpc.WithPerRPCCredentials(oauth.TokenSource{o.TokenSource}),
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
}
grpcOpts = append(grpcOpts, o.GRPCDialOpts...)
if o.UserAgent != "" {
grpcOpts = append(grpcOpts, grpc.WithUserAgent(o.UserAgent))
}
return grpc.Dial(o.Endpoint, grpcOpts...)
}

View File

@ -0,0 +1,80 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transport
import (
"bytes"
"io/ioutil"
"net/http"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
)
type ProtoClient struct {
client *http.Client
endpoint string
userAgent string
}
func (c *ProtoClient) Call(ctx context.Context, method string, req, resp proto.Message) error {
payload, err := proto.Marshal(req)
if err != nil {
return err
}
httpReq, err := http.NewRequest("POST", c.endpoint+method, bytes.NewReader(payload))
if err != nil {
return err
}
httpReq.Header.Set("Content-Type", "application/x-protobuf")
if ua := c.userAgent; ua != "" {
httpReq.Header.Set("User-Agent", ua)
}
errc := make(chan error, 1)
cancel := makeReqCancel(httpReq)
go func() {
r, err := c.client.Do(httpReq)
if err != nil {
errc <- err
return
}
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if r.StatusCode != http.StatusOK {
err = &ErrHTTP{
StatusCode: r.StatusCode,
Body: body,
err: err,
}
}
if err != nil {
errc <- err
return
}
errc <- proto.Unmarshal(body, resp)
}()
select {
case <-ctx.Done():
cancel(c.client.Transport) // Cancel the HTTP request.
return ctx.Err()
case err := <-errc:
return err
}
}