mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-31 05:40:42 +00:00 
			
		
		
		
	Bumped gRPC version to 1.3.0
This commit is contained in:
		
							
								
								
									
										260
									
								
								vendor/google.golang.org/grpc/clientconn.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										260
									
								
								vendor/google.golang.org/grpc/clientconn.go
									
									
									
										generated
									
									
										vendored
									
									
								
							| @@ -36,8 +36,8 @@ package grpc | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math" | ||||
| 	"net" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| @@ -45,6 +45,8 @@ import ( | ||||
| 	"golang.org/x/net/trace" | ||||
| 	"google.golang.org/grpc/credentials" | ||||
| 	"google.golang.org/grpc/grpclog" | ||||
| 	"google.golang.org/grpc/keepalive" | ||||
| 	"google.golang.org/grpc/stats" | ||||
| 	"google.golang.org/grpc/transport" | ||||
| ) | ||||
|  | ||||
| @@ -54,6 +56,8 @@ var ( | ||||
| 	ErrClientConnClosing = errors.New("grpc: the client connection is closing") | ||||
| 	// ErrClientConnTimeout indicates that the ClientConn cannot establish the | ||||
| 	// underlying connections within the specified timeout. | ||||
| 	// DEPRECATED: Please use context.DeadlineExceeded instead. This error will be | ||||
| 	// removed in Q1 2017. | ||||
| 	ErrClientConnTimeout = errors.New("grpc: timed out when dialing") | ||||
|  | ||||
| 	// errNoTransportSecurity indicates that there is no transport security | ||||
| @@ -75,7 +79,6 @@ var ( | ||||
| 	errConnClosing = errors.New("grpc: the connection is closing") | ||||
| 	// errConnUnavailable indicates that the connection is unavailable. | ||||
| 	errConnUnavailable = errors.New("grpc: the connection is unavailable") | ||||
| 	errNoAddr          = errors.New("grpc: there is no address available to dial") | ||||
| 	// minimum time to give a connection to complete | ||||
| 	minConnectTimeout = 20 * time.Second | ||||
| ) | ||||
| @@ -83,22 +86,33 @@ var ( | ||||
| // dialOptions configure a Dial call. dialOptions are set by the DialOption | ||||
| // values passed to Dial. | ||||
| type dialOptions struct { | ||||
| 	unaryInt  UnaryClientInterceptor | ||||
| 	streamInt StreamClientInterceptor | ||||
| 	codec     Codec | ||||
| 	cp        Compressor | ||||
| 	dc        Decompressor | ||||
| 	bs        backoffStrategy | ||||
| 	balancer  Balancer | ||||
| 	block     bool | ||||
| 	insecure  bool | ||||
| 	timeout   time.Duration | ||||
| 	copts     transport.ConnectOptions | ||||
| 	unaryInt   UnaryClientInterceptor | ||||
| 	streamInt  StreamClientInterceptor | ||||
| 	codec      Codec | ||||
| 	cp         Compressor | ||||
| 	dc         Decompressor | ||||
| 	bs         backoffStrategy | ||||
| 	balancer   Balancer | ||||
| 	block      bool | ||||
| 	insecure   bool | ||||
| 	timeout    time.Duration | ||||
| 	scChan     <-chan ServiceConfig | ||||
| 	copts      transport.ConnectOptions | ||||
| 	maxMsgSize int | ||||
| } | ||||
|  | ||||
| const defaultClientMaxMsgSize = math.MaxInt32 | ||||
|  | ||||
| // DialOption configures how we set up the connection. | ||||
| type DialOption func(*dialOptions) | ||||
|  | ||||
| // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. | ||||
| func WithMaxMsgSize(s int) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| 		o.maxMsgSize = s | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. | ||||
| func WithCodec(c Codec) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| @@ -129,6 +143,13 @@ func WithBalancer(b Balancer) DialOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithServiceConfig returns a DialOption which has a channel to read the service configuration. | ||||
| func WithServiceConfig(c <-chan ServiceConfig) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| 		o.scChan = c | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithBackoffMaxDelay configures the dialer to use the provided maximum delay | ||||
| // when backing off after failed connection attempts. | ||||
| func WithBackoffMaxDelay(md time.Duration) DialOption { | ||||
| @@ -199,6 +220,8 @@ func WithTimeout(d time.Duration) DialOption { | ||||
| } | ||||
|  | ||||
| // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. | ||||
| // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's | ||||
| // Temporary() method to decide if it should try to reconnect to the network address. | ||||
| func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| 		o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { | ||||
| @@ -210,6 +233,25 @@ func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithStatsHandler returns a DialOption that specifies the stats handler | ||||
| // for all the RPCs and underlying network connections in this ClientConn. | ||||
| func WithStatsHandler(h stats.Handler) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| 		o.copts.StatsHandler = h | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors. | ||||
| // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network | ||||
| // address and won't try to reconnect. | ||||
| // The default value of FailOnNonTempDialError is false. | ||||
| // This is an EXPERIMENTAL API. | ||||
| func FailOnNonTempDialError(f bool) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| 		o.copts.FailOnNonTempDialError = f | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. | ||||
| func WithUserAgent(s string) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| @@ -217,6 +259,13 @@ func WithUserAgent(s string) DialOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. | ||||
| func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| 		o.copts.KeepaliveParams = kp | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. | ||||
| func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| @@ -231,6 +280,15 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithAuthority returns a DialOption that specifies the value to be used as | ||||
| // the :authority pseudo-header. This value only works with WithInsecure and | ||||
| // has no effect if TransportCredentials are present. | ||||
| func WithAuthority(a string) DialOption { | ||||
| 	return func(o *dialOptions) { | ||||
| 		o.copts.Authority = a | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Dial creates a client connection to the given target. | ||||
| func Dial(target string, opts ...DialOption) (*ClientConn, error) { | ||||
| 	return DialContext(context.Background(), target, opts...) | ||||
| @@ -247,6 +305,32 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * | ||||
| 		conns:  make(map[Address]*addrConn), | ||||
| 	} | ||||
| 	cc.ctx, cc.cancel = context.WithCancel(context.Background()) | ||||
| 	cc.dopts.maxMsgSize = defaultClientMaxMsgSize | ||||
| 	for _, opt := range opts { | ||||
| 		opt(&cc.dopts) | ||||
| 	} | ||||
| 	cc.mkp = cc.dopts.copts.KeepaliveParams | ||||
|  | ||||
| 	if cc.dopts.copts.Dialer == nil { | ||||
| 		cc.dopts.copts.Dialer = newProxyDialer( | ||||
| 			func(ctx context.Context, addr string) (net.Conn, error) { | ||||
| 				return dialContext(ctx, "tcp", addr) | ||||
| 			}, | ||||
| 		) | ||||
| 	} | ||||
|  | ||||
| 	if cc.dopts.copts.UserAgent != "" { | ||||
| 		cc.dopts.copts.UserAgent += " " + grpcUA | ||||
| 	} else { | ||||
| 		cc.dopts.copts.UserAgent = grpcUA | ||||
| 	} | ||||
|  | ||||
| 	if cc.dopts.timeout > 0 { | ||||
| 		var cancel context.CancelFunc | ||||
| 		ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) | ||||
| 		defer cancel() | ||||
| 	} | ||||
|  | ||||
| 	defer func() { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| @@ -259,10 +343,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	for _, opt := range opts { | ||||
| 		opt(&cc.dopts) | ||||
| 	if cc.dopts.scChan != nil { | ||||
| 		// Wait for the initial service config. | ||||
| 		select { | ||||
| 		case sc, ok := <-cc.dopts.scChan: | ||||
| 			if ok { | ||||
| 				cc.sc = sc | ||||
| 			} | ||||
| 		case <-ctx.Done(): | ||||
| 			return nil, ctx.Err() | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Set defaults. | ||||
| 	if cc.dopts.codec == nil { | ||||
| 		cc.dopts.codec = protoCodec{} | ||||
| @@ -273,21 +364,18 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * | ||||
| 	creds := cc.dopts.copts.TransportCredentials | ||||
| 	if creds != nil && creds.Info().ServerName != "" { | ||||
| 		cc.authority = creds.Info().ServerName | ||||
| 	} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" { | ||||
| 		cc.authority = cc.dopts.copts.Authority | ||||
| 	} else { | ||||
| 		colonPos := strings.LastIndex(target, ":") | ||||
| 		if colonPos == -1 { | ||||
| 			colonPos = len(target) | ||||
| 		} | ||||
| 		cc.authority = target[:colonPos] | ||||
| 		cc.authority = target | ||||
| 	} | ||||
| 	var ok bool | ||||
| 	waitC := make(chan error, 1) | ||||
| 	go func() { | ||||
| 		var addrs []Address | ||||
| 		if cc.dopts.balancer == nil { | ||||
| 			// Connect to target directly if balancer is nil. | ||||
| 			addrs = append(addrs, Address{Addr: target}) | ||||
| 		} else { | ||||
| 		defer close(waitC) | ||||
| 		if cc.dopts.balancer == nil && cc.sc.LB != nil { | ||||
| 			cc.dopts.balancer = cc.sc.LB | ||||
| 		} | ||||
| 		if cc.dopts.balancer != nil { | ||||
| 			var credsClone credentials.TransportCredentials | ||||
| 			if creds != nil { | ||||
| 				credsClone = creds.Clone() | ||||
| @@ -300,29 +388,23 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * | ||||
| 				return | ||||
| 			} | ||||
| 			ch := cc.dopts.balancer.Notify() | ||||
| 			if ch == nil { | ||||
| 				// There is no name resolver installed. | ||||
| 				addrs = append(addrs, Address{Addr: target}) | ||||
| 			} else { | ||||
| 				addrs, ok = <-ch | ||||
| 				if !ok || len(addrs) == 0 { | ||||
| 					waitC <- errNoAddr | ||||
| 					return | ||||
| 			if ch != nil { | ||||
| 				if cc.dopts.block { | ||||
| 					doneChan := make(chan struct{}) | ||||
| 					go cc.lbWatcher(doneChan) | ||||
| 					<-doneChan | ||||
| 				} else { | ||||
| 					go cc.lbWatcher(nil) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		for _, a := range addrs { | ||||
| 			if err := cc.resetAddrConn(a, false, nil); err != nil { | ||||
| 				waitC <- err | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 		close(waitC) | ||||
| 		// No balancer, or no resolver within the balancer.  Connect directly. | ||||
| 		if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil { | ||||
| 			waitC <- err | ||||
| 			return | ||||
| 		} | ||||
| 	}() | ||||
| 	var timeoutCh <-chan time.Time | ||||
| 	if cc.dopts.timeout > 0 { | ||||
| 		timeoutCh = time.After(cc.dopts.timeout) | ||||
| 	} | ||||
| 	select { | ||||
| 	case <-ctx.Done(): | ||||
| 		return nil, ctx.Err() | ||||
| @@ -330,14 +412,12 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	case <-timeoutCh: | ||||
| 		return nil, ErrClientConnTimeout | ||||
| 	} | ||||
| 	// If balancer is nil or balancer.Notify() is nil, ok will be false here. | ||||
| 	// The lbWatcher goroutine will not be created. | ||||
| 	if ok { | ||||
| 		go cc.lbWatcher() | ||||
|  | ||||
| 	if cc.dopts.scChan != nil { | ||||
| 		go cc.scWatcher() | ||||
| 	} | ||||
|  | ||||
| 	return cc, nil | ||||
| } | ||||
|  | ||||
| @@ -384,10 +464,16 @@ type ClientConn struct { | ||||
| 	dopts     dialOptions | ||||
|  | ||||
| 	mu    sync.RWMutex | ||||
| 	sc    ServiceConfig | ||||
| 	conns map[Address]*addrConn | ||||
| 	// Keepalive parameter can be udated if a GoAway is received. | ||||
| 	mkp keepalive.ClientParameters | ||||
| } | ||||
|  | ||||
| func (cc *ClientConn) lbWatcher() { | ||||
| // lbWatcher watches the Notify channel of the balancer in cc and manages | ||||
| // connections accordingly.  If doneChan is not nil, it is closed after the | ||||
| // first successfull connection is made. | ||||
| func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { | ||||
| 	for addrs := range cc.dopts.balancer.Notify() { | ||||
| 		var ( | ||||
| 			add []Address   // Addresses need to setup connections. | ||||
| @@ -414,7 +500,15 @@ func (cc *ClientConn) lbWatcher() { | ||||
| 		} | ||||
| 		cc.mu.Unlock() | ||||
| 		for _, a := range add { | ||||
| 			cc.resetAddrConn(a, true, nil) | ||||
| 			if doneChan != nil { | ||||
| 				err := cc.resetAddrConn(a, true, nil) | ||||
| 				if err == nil { | ||||
| 					close(doneChan) | ||||
| 					doneChan = nil | ||||
| 				} | ||||
| 			} else { | ||||
| 				cc.resetAddrConn(a, false, nil) | ||||
| 			} | ||||
| 		} | ||||
| 		for _, c := range del { | ||||
| 			c.tearDown(errConnDrain) | ||||
| @@ -422,15 +516,36 @@ func (cc *ClientConn) lbWatcher() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (cc *ClientConn) scWatcher() { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case sc, ok := <-cc.dopts.scChan: | ||||
| 			if !ok { | ||||
| 				return | ||||
| 			} | ||||
| 			cc.mu.Lock() | ||||
| 			// TODO: load balance policy runtime change is ignored. | ||||
| 			// We may revist this decision in the future. | ||||
| 			cc.sc = sc | ||||
| 			cc.mu.Unlock() | ||||
| 		case <-cc.ctx.Done(): | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // resetAddrConn creates an addrConn for addr and adds it to cc.conns. | ||||
| // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. | ||||
| // If tearDownErr is nil, errConnDrain will be used instead. | ||||
| func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error { | ||||
| func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error { | ||||
| 	ac := &addrConn{ | ||||
| 		cc:    cc, | ||||
| 		addr:  addr, | ||||
| 		dopts: cc.dopts, | ||||
| 	} | ||||
| 	cc.mu.RLock() | ||||
| 	ac.dopts.copts.KeepaliveParams = cc.mkp | ||||
| 	cc.mu.RUnlock() | ||||
| 	ac.ctx, ac.cancel = context.WithCancel(cc.ctx) | ||||
| 	ac.stateCV = sync.NewCond(&ac.mu) | ||||
| 	if EnableTracing { | ||||
| @@ -475,8 +590,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err | ||||
| 			stale.tearDown(tearDownErr) | ||||
| 		} | ||||
| 	} | ||||
| 	// skipWait may overwrite the decision in ac.dopts.block. | ||||
| 	if ac.dopts.block && !skipWait { | ||||
| 	if block { | ||||
| 		if err := ac.resetTransport(false); err != nil { | ||||
| 			if err != errConnClosing { | ||||
| 				// Tear down ac and delete it from cc.conns. | ||||
| @@ -509,6 +623,14 @@ func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr err | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // TODO: Avoid the locking here. | ||||
| func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) { | ||||
| 	cc.mu.RLock() | ||||
| 	defer cc.mu.RUnlock() | ||||
| 	m, ok = cc.sc.Methods[method] | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { | ||||
| 	var ( | ||||
| 		ac  *addrConn | ||||
| @@ -547,6 +669,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) | ||||
| 	} | ||||
| 	if !ok { | ||||
| 		if put != nil { | ||||
| 			updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) | ||||
| 			put() | ||||
| 		} | ||||
| 		return nil, nil, errConnClosing | ||||
| @@ -554,6 +677,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) | ||||
| 	t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait) | ||||
| 	if err != nil { | ||||
| 		if put != nil { | ||||
| 			updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) | ||||
| 			put() | ||||
| 		} | ||||
| 		return nil, nil, err | ||||
| @@ -605,6 +729,20 @@ type addrConn struct { | ||||
| 	tearDownErr error | ||||
| } | ||||
|  | ||||
| // adjustParams updates parameters used to create transports upon | ||||
| // receiving a GoAway. | ||||
| func (ac *addrConn) adjustParams(r transport.GoAwayReason) { | ||||
| 	switch r { | ||||
| 	case transport.TooManyPings: | ||||
| 		v := 2 * ac.dopts.copts.KeepaliveParams.Time | ||||
| 		ac.cc.mu.Lock() | ||||
| 		if v > ac.cc.mkp.Time { | ||||
| 			ac.cc.mkp.Time = v | ||||
| 		} | ||||
| 		ac.cc.mu.Unlock() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // printf records an event in ac's event log, unless ac has been closed. | ||||
| // REQUIRES ac.mu is held. | ||||
| func (ac *addrConn) printf(format string, a ...interface{}) { | ||||
| @@ -689,6 +827,8 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { | ||||
| 			Metadata: ac.addr.Metadata, | ||||
| 		} | ||||
| 		newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts) | ||||
| 		// Don't call cancel in success path due to a race in Go 1.6: | ||||
| 		// https://github.com/golang/go/issues/15078. | ||||
| 		if err != nil { | ||||
| 			cancel() | ||||
|  | ||||
| @@ -759,6 +899,7 @@ func (ac *addrConn) transportMonitor() { | ||||
| 			} | ||||
| 			return | ||||
| 		case <-t.GoAway(): | ||||
| 			ac.adjustParams(t.GetGoAwayReason()) | ||||
| 			// If GoAway happens without any network I/O error, ac is closed without shutting down the | ||||
| 			// underlying transport (the transport will be closed when all the pending RPCs finished or | ||||
| 			// failed.). | ||||
| @@ -767,9 +908,9 @@ func (ac *addrConn) transportMonitor() { | ||||
| 			// In both cases, a new ac is created. | ||||
| 			select { | ||||
| 			case <-t.Error(): | ||||
| 				ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) | ||||
| 				ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) | ||||
| 			default: | ||||
| 				ac.cc.resetAddrConn(ac.addr, true, errConnDrain) | ||||
| 				ac.cc.resetAddrConn(ac.addr, false, errConnDrain) | ||||
| 			} | ||||
| 			return | ||||
| 		case <-t.Error(): | ||||
| @@ -778,7 +919,8 @@ func (ac *addrConn) transportMonitor() { | ||||
| 				t.Close() | ||||
| 				return | ||||
| 			case <-t.GoAway(): | ||||
| 				ac.cc.resetAddrConn(ac.addr, true, errNetworkIO) | ||||
| 				ac.adjustParams(t.GetGoAwayReason()) | ||||
| 				ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) | ||||
| 				return | ||||
| 			default: | ||||
| 			} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user