diff --git a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go index 97a21876fe5..13353988faa 100644 --- a/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go +++ b/staging/src/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go @@ -17,9 +17,13 @@ limitations under the License. package spdy import ( + "bufio" "fmt" + "io" + "net" "net/http" "strings" + "sync/atomic" "k8s.io/apimachinery/pkg/util/httpstream" "k8s.io/apimachinery/pkg/util/runtime" @@ -32,6 +36,30 @@ const HeaderSpdy31 = "SPDY/3.1" type responseUpgrader struct { } +// connWrapper is used to wrap a hijacked connection and its bufio.Reader. All +// calls will be handled directly by the underlying net.Conn with the exception +// of Read and Close calls, which will consider data in the bufio.Reader. This +// ensures that data already inside the used bufio.Reader instance is also +// read. +type connWrapper struct { + net.Conn + closed int32 + bufReader *bufio.Reader +} + +func (w *connWrapper) Read(b []byte) (n int, err error) { + if atomic.LoadInt32(&w.closed) == 1 { + return 0, io.EOF + } + return w.bufReader.Read(b) +} + +func (w *connWrapper) Close() error { + err := w.Conn.Close() + atomic.StoreInt32(&w.closed, 1) + return err +} + // NewResponseUpgrader returns a new httpstream.ResponseUpgrader that is // capable of upgrading HTTP responses using SPDY/3.1 via the // spdystream package. @@ -62,13 +90,14 @@ func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Reque w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31) w.WriteHeader(http.StatusSwitchingProtocols) - conn, _, err := hijacker.Hijack() + conn, bufrw, err := hijacker.Hijack() if err != nil { runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err)) return nil } - spdyConn, err := NewServerConnection(conn, newStreamHandler) + connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader} + spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler) if err != nil { runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) return nil