mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #109651 from ash2k/ash2k/spdy-cleanup
Fix issues in SPDY RoundTripper
This commit is contained in:
commit
dfee09a27d
@ -18,13 +18,11 @@ package spdy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -275,29 +273,20 @@ func (s *SpdyRoundTripper) tlsConn(ctx context.Context, rwc net.Conn, targetHost
|
|||||||
// dialWithoutProxy dials the host specified by url, using TLS if appropriate.
|
// dialWithoutProxy dials the host specified by url, using TLS if appropriate.
|
||||||
func (s *SpdyRoundTripper) dialWithoutProxy(ctx context.Context, url *url.URL) (net.Conn, error) {
|
func (s *SpdyRoundTripper) dialWithoutProxy(ctx context.Context, url *url.URL) (net.Conn, error) {
|
||||||
dialAddr := netutil.CanonicalAddr(url)
|
dialAddr := netutil.CanonicalAddr(url)
|
||||||
|
dialer := s.Dialer
|
||||||
|
if dialer == nil {
|
||||||
|
dialer = &net.Dialer{}
|
||||||
|
}
|
||||||
|
|
||||||
if url.Scheme == "http" {
|
if url.Scheme == "http" {
|
||||||
if s.Dialer == nil {
|
return dialer.DialContext(ctx, "tcp", dialAddr)
|
||||||
var d net.Dialer
|
|
||||||
return d.DialContext(ctx, "tcp", dialAddr)
|
|
||||||
} else {
|
|
||||||
return s.Dialer.DialContext(ctx, "tcp", dialAddr)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO validate the TLSClientConfig is set up?
|
tlsDialer := tls.Dialer{
|
||||||
var conn *tls.Conn
|
NetDialer: dialer,
|
||||||
var err error
|
Config: s.tlsConfig,
|
||||||
if s.Dialer == nil {
|
|
||||||
conn, err = tls.Dial("tcp", dialAddr, s.tlsConfig)
|
|
||||||
} else {
|
|
||||||
conn, err = tls.DialWithDialer(s.Dialer, "tcp", dialAddr, s.tlsConfig)
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
return tlsDialer.DialContext(ctx, "tcp", dialAddr)
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return conn, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// proxyAuth returns, for a given proxy URL, the value to be used for the Proxy-Authorization header
|
// proxyAuth returns, for a given proxy URL, the value to be used for the Proxy-Authorization header
|
||||||
@ -314,35 +303,20 @@ func (s *SpdyRoundTripper) proxyAuth(proxyURL *url.URL) string {
|
|||||||
// clients may call SpdyRoundTripper.Connection() to retrieve the upgraded
|
// clients may call SpdyRoundTripper.Connection() to retrieve the upgraded
|
||||||
// connection.
|
// connection.
|
||||||
func (s *SpdyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
func (s *SpdyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||||
header := utilnet.CloneHeader(req.Header)
|
req = utilnet.CloneRequest(req)
|
||||||
header.Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
|
req.Header.Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
|
||||||
header.Add(httpstream.HeaderUpgrade, HeaderSpdy31)
|
req.Header.Add(httpstream.HeaderUpgrade, HeaderSpdy31)
|
||||||
|
|
||||||
var (
|
conn, err := s.Dial(req)
|
||||||
conn net.Conn
|
|
||||||
rawResponse []byte
|
|
||||||
err error
|
|
||||||
)
|
|
||||||
|
|
||||||
clone := utilnet.CloneRequest(req)
|
|
||||||
clone.Header = header
|
|
||||||
conn, err = s.Dial(clone)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
responseReader := bufio.NewReader(
|
responseReader := bufio.NewReader(conn)
|
||||||
io.MultiReader(
|
|
||||||
bytes.NewBuffer(rawResponse),
|
|
||||||
conn,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
resp, err := http.ReadResponse(responseReader, nil)
|
resp, err := http.ReadResponse(responseReader, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if conn != nil {
|
conn.Close()
|
||||||
conn.Close()
|
|
||||||
}
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,6 +31,8 @@ import (
|
|||||||
|
|
||||||
"github.com/armon/go-socks5"
|
"github.com/armon/go-socks5"
|
||||||
"github.com/elazarl/goproxy"
|
"github.com/elazarl/goproxy"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||||
)
|
)
|
||||||
@ -682,6 +684,21 @@ func TestRoundTripSocks5AndNewConnection(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRoundTripPassesContextToDialer(t *testing.T) {
|
||||||
|
urls := []string{"http://127.0.0.1:1233/", "https://127.0.0.1:1233/"}
|
||||||
|
for _, u := range urls {
|
||||||
|
t.Run(u, func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel()
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
spdyTransport := NewRoundTripper(&tls.Config{})
|
||||||
|
_, err = spdyTransport.Dial(req)
|
||||||
|
assert.EqualError(t, err, "dial tcp 127.0.0.1:1233: operation was canceled")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// exampleCert was generated from crypto/tls/generate_cert.go with the following command:
|
// exampleCert was generated from crypto/tls/generate_cert.go with the following command:
|
||||||
// go run generate_cert.go --rsa-bits 2048 --host example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
|
// go run generate_cert.go --rsa-bits 2048 --host example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h
|
||||||
var exampleCert = []byte(`-----BEGIN CERTIFICATE-----
|
var exampleCert = []byte(`-----BEGIN CERTIFICATE-----
|
||||||
|
Loading…
Reference in New Issue
Block a user