1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-26 07:25:09 +00:00

Update to k8s v1.20

This commit is contained in:
Darren Shepherd
2020-12-08 16:32:23 -07:00
parent e70f5ca388
commit 68f1d2e966
937 changed files with 126796 additions and 35141 deletions

View File

@@ -107,6 +107,7 @@ func (p *clientConnPool) getClientConn(req *http.Request, addr string, dialOnMis
// dialCall is an in-flight Transport dial call to a host.
type dialCall struct {
_ incomparable
p *clientConnPool
done chan struct{} // closed when done
res *ClientConn // valid after done is closed
@@ -180,6 +181,7 @@ func (p *clientConnPool) addConnIfNeeded(key string, t *Transport, c *tls.Conn)
}
type addConnCall struct {
_ incomparable
p *clientConnPool
done chan struct{} // closed when done
err error
@@ -200,12 +202,6 @@ func (c *addConnCall) run(t *Transport, key string, tc *tls.Conn) {
close(c.done)
}
func (p *clientConnPool) addConn(key string, cc *ClientConn) {
p.mu.Lock()
p.addConnLocked(key, cc)
p.mu.Unlock()
}
// p.mu must be held
func (p *clientConnPool) addConnLocked(key string, cc *ClientConn) {
for _, v := range p.conns[key] {

View File

@@ -8,6 +8,8 @@ package http2
// flow is the flow control window's size.
type flow struct {
_ incomparable
// n is the number of DATA bytes we're allowed to send.
// A flow is kept both on a conn and a per-stream.
n int32

View File

@@ -105,7 +105,14 @@ func huffmanDecode(buf *bytes.Buffer, maxLen int, v []byte) error {
return nil
}
// incomparable is a zero-width, non-comparable type. Adding it to a struct
// makes that struct also non-comparable, and generally doesn't add
// any size (as long as it's first).
type incomparable [0]func()
type node struct {
_ incomparable
// children is non-nil for internal nodes
children *[256]*node

View File

@@ -19,7 +19,6 @@ package http2 // import "golang.org/x/net/http2"
import (
"bufio"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
@@ -173,11 +172,6 @@ func (s SettingID) String() string {
return fmt.Sprintf("UNKNOWN_SETTING_%d", uint16(s))
}
var (
errInvalidHeaderFieldName = errors.New("http2: invalid header field name")
errInvalidHeaderFieldValue = errors.New("http2: invalid header field value")
)
// validWireHeaderFieldName reports whether v is a valid header field
// name (key). See httpguts.ValidHeaderName for the base rules.
//
@@ -247,6 +241,7 @@ func (cw closeWaiter) Wait() {
// Its buffered writer is lazily allocated as needed, to minimize
// idle memory usage with many connections.
type bufferedWriter struct {
_ incomparable
w io.Writer // immutable
bw *bufio.Writer // non-nil when data is buffered
}
@@ -319,6 +314,7 @@ func bodyAllowedForStatus(status int) bool {
}
type httpError struct {
_ incomparable
msg string
timeout bool
}
@@ -382,3 +378,8 @@ func (s *sorter) SortStrings(ss []string) {
func validPseudoPath(v string) bool {
return (len(v) > 0 && v[0] == '/') || v == "*"
}
// incomparable is a zero-width, non-comparable type. Adding it to a struct
// makes that struct also non-comparable, and generally doesn't add
// any size (as long as it's first).
type incomparable [0]func()

View File

@@ -581,13 +581,10 @@ type stream struct {
cancelCtx func()
// owned by serverConn's serve loop:
bodyBytes int64 // body bytes seen so far
declBodyBytes int64 // or -1 if undeclared
flow flow // limits writing from Handler to client
inflow flow // what the client is allowed to POST/etc to us
parent *stream // or nil
numTrailerValues int64
weight uint8
bodyBytes int64 // body bytes seen so far
declBodyBytes int64 // or -1 if undeclared
flow flow // limits writing from Handler to client
inflow flow // what the client is allowed to POST/etc to us
state streamState
resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
@@ -764,6 +761,7 @@ func (sc *serverConn) readFrames() {
// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
type frameWriteResult struct {
_ incomparable
wr FrameWriteRequest // what was written (or attempted)
err error // result of the writeFrame call
}
@@ -774,7 +772,7 @@ type frameWriteResult struct {
// serverConn.
func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) {
err := wr.write.writeFrame(sc)
sc.wroteFrameCh <- frameWriteResult{wr, err}
sc.wroteFrameCh <- frameWriteResult{wr: wr, err: err}
}
func (sc *serverConn) closeAllStreamsOnConnClose() {
@@ -1164,7 +1162,7 @@ func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) {
if wr.write.staysWithinBuffer(sc.bw.Available()) {
sc.writingFrameAsync = false
err := wr.write.writeFrame(sc)
sc.wroteFrame(frameWriteResult{wr, err})
sc.wroteFrame(frameWriteResult{wr: wr, err: err})
} else {
sc.writingFrameAsync = true
go sc.writeFrameAsync(wr)
@@ -1696,6 +1694,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
if len(data) > 0 {
wrote, err := st.body.Write(data)
if err != nil {
sc.sendWindowUpdate(nil, int(f.Length)-wrote)
return streamError(id, ErrCodeStreamClosed)
}
if wrote != len(data) {
@@ -2022,7 +2021,11 @@ func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*res
}
if bodyOpen {
if vv, ok := rp.header["Content-Length"]; ok {
req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
if cl, err := strconv.ParseUint(vv[0], 10, 63); err == nil {
req.ContentLength = int64(cl)
} else {
req.ContentLength = 0
}
} else {
req.ContentLength = -1
}
@@ -2060,7 +2063,7 @@ func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*r
var trailer http.Header
for _, v := range rp.header["Trailer"] {
for _, key := range strings.Split(v, ",") {
key = http.CanonicalHeaderKey(strings.TrimSpace(key))
key = http.CanonicalHeaderKey(textproto.TrimString(key))
switch key {
case "Transfer-Encoding", "Trailer", "Content-Length":
// Bogus. (copy of http1 rules)
@@ -2278,6 +2281,7 @@ func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
// requestBody is the Handler's Request.Body type.
// Read and Close may be called concurrently.
type requestBody struct {
_ incomparable
stream *stream
conn *serverConn
closed bool // for use by Close only
@@ -2404,9 +2408,8 @@ func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
var ctype, clen string
if clen = rws.snapHeader.Get("Content-Length"); clen != "" {
rws.snapHeader.Del("Content-Length")
clen64, err := strconv.ParseInt(clen, 10, 64)
if err == nil && clen64 >= 0 {
rws.sentContentLen = clen64
if cl, err := strconv.ParseUint(clen, 10, 63); err == nil {
rws.sentContentLen = int64(cl)
} else {
clen = ""
}

View File

@@ -93,7 +93,7 @@ type Transport struct {
// send in the initial settings frame. It is how many bytes
// of response headers are allowed. Unlike the http2 spec, zero here
// means to use a default limit (currently 10MB). If you actually
// want to advertise an ulimited value to the peer, Transport
// want to advertise an unlimited value to the peer, Transport
// interprets the highest possible value here (0xffffffff or 1<<32-1)
// to mean no limit.
MaxHeaderListSize uint32
@@ -108,6 +108,19 @@ type Transport struct {
// waiting for their turn.
StrictMaxConcurrentStreams bool
// ReadIdleTimeout is the timeout after which a health check using ping
// frame will be carried out if no frame is received on the connection.
// Note that a ping response will is considered a received frame, so if
// there is no other traffic on the connection, the health check will
// be performed every ReadIdleTimeout interval.
// If zero, no health check is performed.
ReadIdleTimeout time.Duration
// PingTimeout is the timeout after which the connection will be closed
// if a response to Ping is not received.
// Defaults to 15s.
PingTimeout time.Duration
// t1, if non-nil, is the standard library Transport using
// this transport. Its settings are used (but not its
// RoundTrip method, etc).
@@ -131,14 +144,31 @@ func (t *Transport) disableCompression() bool {
return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
}
func (t *Transport) pingTimeout() time.Duration {
if t.PingTimeout == 0 {
return 15 * time.Second
}
return t.PingTimeout
}
// ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
// It returns an error if t1 has already been HTTP/2-enabled.
//
// Use ConfigureTransports instead to configure the HTTP/2 Transport.
func ConfigureTransport(t1 *http.Transport) error {
_, err := configureTransport(t1)
_, err := ConfigureTransports(t1)
return err
}
func configureTransport(t1 *http.Transport) (*Transport, error) {
// ConfigureTransports configures a net/http HTTP/1 Transport to use HTTP/2.
// It returns a new HTTP/2 Transport for further configuration.
// It returns an error if t1 has already been HTTP/2-enabled.
func ConfigureTransports(t1 *http.Transport) (*Transport, error) {
return configureTransports(t1)
}
func configureTransports(t1 *http.Transport) (*Transport, error) {
connPool := new(clientConnPool)
t2 := &Transport{
ConnPool: noDialClientConnPool{connPool},
@@ -227,6 +257,7 @@ type ClientConn struct {
br *bufio.Reader
fr *Framer
lastActive time.Time
lastIdle time.Time // time last idle
// Settings from peer: (also guarded by mu)
maxFrameSize uint32
maxConcurrentStreams uint32
@@ -667,6 +698,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
cc.inflow.add(transportDefaultConnFlow + initialWindowSize)
cc.bw.Flush()
if cc.werr != nil {
cc.Close()
return nil, cc.werr
}
@@ -674,6 +706,20 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
return cc, nil
}
func (cc *ClientConn) healthCheck() {
pingTimeout := cc.t.pingTimeout()
// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
// trigger the healthCheck again if there is no frame received.
ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
defer cancel()
err := cc.Ping(ctx)
if err != nil {
cc.closeForLostPing()
cc.t.connPool().MarkDead(cc)
return
}
}
func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -736,7 +782,8 @@ func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
}
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32
int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
!cc.tooIdleLocked()
st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest
return
}
@@ -746,6 +793,16 @@ func (cc *ClientConn) canTakeNewRequestLocked() bool {
return st.canTakeNewRequest
}
// tooIdleLocked reports whether this connection has been been sitting idle
// for too much wall time.
func (cc *ClientConn) tooIdleLocked() bool {
// The Round(0) strips the monontonic clock reading so the
// times are compared based on their wall time. We don't want
// to reuse a connection that's been sitting idle during
// VM/laptop suspend if monotonic time was also frozen.
return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
}
// onIdleTimeout is called from a time.AfterFunc goroutine. It will
// only be called when we're idle, but because we're coming from a new
// goroutine, there could be a new request coming in at the same time,
@@ -834,14 +891,12 @@ func (cc *ClientConn) sendGoAway() error {
return nil
}
// Close closes the client connection immediately.
//
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
func (cc *ClientConn) Close() error {
// closes the client connection immediately. In-flight requests are interrupted.
// err is sent to streams.
func (cc *ClientConn) closeForError(err error) error {
cc.mu.Lock()
defer cc.cond.Broadcast()
defer cc.mu.Unlock()
err := errors.New("http2: client connection force closed via ClientConn.Close")
for id, cs := range cc.streams {
select {
case cs.resc <- resAndError{err: err}:
@@ -854,6 +909,20 @@ func (cc *ClientConn) Close() error {
return cc.tconn.Close()
}
// Close closes the client connection immediately.
//
// In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
func (cc *ClientConn) Close() error {
err := errors.New("http2: client connection force closed via ClientConn.Close")
return cc.closeForError(err)
}
// closes the client connection immediately. In-flight requests are interrupted.
func (cc *ClientConn) closeForLostPing() error {
err := errors.New("http2: client connection lost")
return cc.closeForError(err)
}
const maxAllocFrameSize = 512 << 10
// frameBuffer returns a scratch buffer suitable for writing DATA frames.
@@ -904,7 +973,7 @@ func commaSeparatedTrailers(req *http.Request) (string, error) {
k = http.CanonicalHeaderKey(k)
switch k {
case "Transfer-Encoding", "Trailer", "Content-Length":
return "", &badStringError{"invalid Trailer key", k}
return "", fmt.Errorf("invalid Trailer key %q", k)
}
keys = append(keys, k)
}
@@ -1021,6 +1090,15 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
bodyWriter := cc.t.getBodyWriterState(cs, body)
cs.on100 = bodyWriter.on100
defer func() {
cc.wmu.Lock()
werr := cc.werr
cc.wmu.Unlock()
if werr != nil {
cc.Close()
}
}()
cc.wmu.Lock()
endStream := !hasBody && !hasTrailers
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
@@ -1070,6 +1148,9 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
// we can keep it.
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWrite)
if hasBody && !bodyWritten {
<-bodyWriter.resc
}
}
if re.err != nil {
cc.forgetStreamID(cs.ID)
@@ -1090,6 +1171,7 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-bodyWriter.resc
}
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), errTimeout
@@ -1099,6 +1181,7 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-bodyWriter.resc
}
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), ctx.Err()
@@ -1108,6 +1191,7 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
} else {
bodyWriter.cancel()
cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel)
<-bodyWriter.resc
}
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), errRequestCanceled
@@ -1117,6 +1201,7 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
// forgetStreamID.
return nil, cs.getStartedWrite(), cs.resetErr
case err := <-bodyWriter.resc:
bodyWritten = true
// Prefer the read loop's response, if available. Issue 16102.
select {
case re := <-readLoopResCh:
@@ -1127,7 +1212,6 @@ func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAf
cc.forgetStreamID(cs.ID)
return nil, cs.getStartedWrite(), err
}
bodyWritten = true
if d := cc.responseHeaderTimeout(); d != 0 {
timer := time.NewTimer(d)
defer timer.Stop()
@@ -1150,6 +1234,7 @@ func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error {
}
return errClientConnUnusable
}
cc.lastIdle = time.Time{}
if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
if waitingForConn != nil {
close(waitingForConn)
@@ -1381,13 +1466,6 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
}
}
type badStringError struct {
what string
str string
}
func (e *badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) }
// requires cc.mu be held.
func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
cc.hbuf.Reset()
@@ -1603,6 +1681,7 @@ func (cc *ClientConn) writeHeader(name, value string) {
}
type resAndError struct {
_ incomparable
res *http.Response
err error
}
@@ -1638,6 +1717,7 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
delete(cc.streams, id)
if len(cc.streams) == 0 && cc.idleTimer != nil {
cc.idleTimer.Reset(cc.idleTimeout)
cc.lastIdle = time.Now()
}
close(cs.done)
// Wake up checkResetOrDone via clientStream.awaitFlowControl and
@@ -1649,6 +1729,7 @@ func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream {
// clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
type clientConnReadLoop struct {
_ incomparable
cc *ClientConn
closeWhenIdle bool
}
@@ -1728,8 +1809,17 @@ func (rl *clientConnReadLoop) run() error {
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
readIdleTimeout := cc.t.ReadIdleTimeout
var t *time.Timer
if readIdleTimeout != 0 {
t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
defer t.Stop()
}
for {
f, err := cc.fr.ReadFrame()
if t != nil {
t.Reset(readIdleTimeout)
}
if err != nil {
cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
}
@@ -1878,7 +1968,9 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
}
header := make(http.Header)
regularFields := f.RegularFields()
strs := make([]string, len(regularFields))
header := make(http.Header, len(regularFields))
res := &http.Response{
Proto: "HTTP/2.0",
ProtoMajor: 2,
@@ -1886,7 +1978,7 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
StatusCode: statusCode,
Status: status + " " + http.StatusText(statusCode),
}
for _, hf := range f.RegularFields() {
for _, hf := range regularFields {
key := http.CanonicalHeaderKey(hf.Name)
if key == "Trailer" {
t := res.Trailer
@@ -1898,7 +1990,18 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
t[http.CanonicalHeaderKey(v)] = nil
})
} else {
header[key] = append(header[key], hf.Value)
vv := header[key]
if vv == nil && len(strs) > 0 {
// More than likely this will be a single-element key.
// Most headers aren't multi-valued.
// Set the capacity on strs[0] to 1, so any future append
// won't extend the slice into the other strings.
vv, strs = strs[:1:1], strs[1:]
vv[0] = hf.Value
header[key] = vv
} else {
header[key] = append(vv, hf.Value)
}
}
}
@@ -1928,8 +2031,8 @@ func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFra
if !streamEnded || isHead {
res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
res.ContentLength = clen64
if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
res.ContentLength = int64(cl)
} else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
@@ -2184,8 +2287,6 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
return nil
}
var errInvalidTrailers = errors.New("http2: invalid trailers")
func (rl *clientConnReadLoop) endStream(cs *clientStream) {
// TODO: check that any declared content-length matches, like
// server.go's (*stream).endStream method.
@@ -2416,7 +2517,6 @@ func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error)
var (
errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit")
errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers")
)
func (cc *ClientConn) logf(format string, args ...interface{}) {
@@ -2450,11 +2550,13 @@ func strSliceContains(ss []string, s string) bool {
type erringRoundTripper struct{ err error }
func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
// gzipReader wraps a response body so it can lazily
// call gzip.NewReader on the first call to Read
type gzipReader struct {
_ incomparable
body io.ReadCloser // underlying Response.Body
zr *gzip.Reader // lazily-initialized gzip reader
zerr error // sticky error