Merge pull request #137298 from dims/dsri/cri-streaming-option-a-hardcut

cri streaming option a hardcut - add new staging repositories `streaming` and `cri-streaming`

Kubernetes-commit: 2bd6c7fe3cb8663804dc6e7672ff01aeebc97274
This commit is contained in:
Kubernetes Publisher
2026-03-13 17:23:36 +05:30
16 changed files with 567 additions and 35 deletions

12
go.mod
View File

@@ -23,10 +23,11 @@ require (
golang.org/x/time v0.14.0
google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af
gopkg.in/evanphx/json-patch.v4 v4.13.0
k8s.io/api v0.0.0-20260312204559-404992a372c9
k8s.io/apimachinery v0.0.0-20260312005947-65082b639303
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/klog/v2 v2.140.0
k8s.io/kube-openapi v0.0.0-20260304202019-5b3e3fdb0acf
k8s.io/streaming v0.0.0
k8s.io/utils v0.0.0-20260210185600-b8788abfbbc2
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730
sigs.k8s.io/randfill v1.0.0
@@ -50,7 +51,6 @@ require (
github.com/moby/spdystream v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
@@ -62,3 +62,9 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace (
k8s.io/api => ../api
k8s.io/apimachinery => ../apimachinery
k8s.io/streaming => ../streaming
)

16
go.sum
View File

@@ -1,7 +1,10 @@
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -23,6 +26,7 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
@@ -36,6 +40,7 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
github.com/ianlancetaylor/demangle v0.0.0-20250417193237-f615e6bd150b/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -59,7 +64,6 @@ github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFd
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.28.1 h1:S4hj+HbZp40fNKuLUQOYLDgZLwNUVn19N3Atb98NCyI=
github.com/onsi/ginkgo/v2 v2.28.1/go.mod h1:CLtbVInNckU3/+gC8LzkGUb9oF+e8W8TdUsxPwvdOgE=
@@ -87,12 +91,14 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c=
golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
@@ -103,6 +109,7 @@ golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2/go.mod h1:b7fPSJ0pKZ3ccUh8gnTONJxhn3c/PS6tyzQvyqw4iA8=
golang.org/x/term v0.39.0 h1:RclSuaJf32jOqZz74CkPA9qFuVTX7vhLlpfj/IGWlqY=
golang.org/x/term v0.39.0/go.mod h1:yxzUCTP/U+FzoxfdKmLaA0RV1WgE0VY7hXBwKtY/4ww=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
@@ -111,6 +118,8 @@ golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc=
golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg=
golang.org/x/tools/go/expect v0.1.0-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated/go.mod h1:RVAQXBGNv1ib0J382/DPCRS/BPnsGebyM1Gj5VSDpG8=
google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af h1:+5/Sw3GsDNlEmu7TfklWKPdQ0Ykja5VEmq2i817+jbI=
google.golang.org/protobuf v1.36.12-0.20260120151049-f2248ac996af/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -123,10 +132,7 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20260312204559-404992a372c9 h1:6EZRW6vggBhzAJwj5zp/u31P24GUl6WZgHCVU9x/1do=
k8s.io/api v0.0.0-20260312204559-404992a372c9/go.mod h1:G+40X+GgAMb4v2xDjHTqy1+i58sPinEjRD2x1YQmvuY=
k8s.io/apimachinery v0.0.0-20260312005947-65082b639303 h1:cvHmmm0qJ6l+OaXpiCi7wfcb93BYBCsMh7p0hm7w1yI=
k8s.io/apimachinery v0.0.0-20260312005947-65082b639303/go.mod h1:QbRChfKnvmh4Le8pGvybBym+p2oKR8lr/oN0Qi++0yw=
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU=
k8s.io/klog/v2 v2.140.0 h1:Tf+J3AH7xnUzZyVVXhTgGhEKnFqye14aadWv7bzXdzc=
k8s.io/klog/v2 v2.140.0/go.mod h1:o+/RWfJ6PwpnFn7OyAG3QnO47BFsymfEfrz6XyYSSp0=
k8s.io/kube-openapi v0.0.0-20260304202019-5b3e3fdb0acf h1:btPscg4cMql0XdYK2jLsJcNEKmACJz8l+U7geC06FiM=

View File

@@ -19,9 +19,11 @@ package portforward
import (
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/klog/v2"
streamhttp "k8s.io/streaming/pkg/httpstream"
)
var _ httpstream.Dialer = &FallbackDialer{}
var _ streamhttp.Dialer = &StreamingFallbackDialer{}
// FallbackDialer encapsulates a primary and secondary dialer, including
// the boolean function to determine if the primary dialer failed. Implements
@@ -42,6 +44,24 @@ func NewFallbackDialer(primary, secondary httpstream.Dialer, shouldFallback func
}
}
// StreamingFallbackDialer encapsulates a primary and secondary streaming dialer
// with fallback behavior.
type StreamingFallbackDialer struct {
primary streamhttp.Dialer
secondary streamhttp.Dialer
shouldFallback func(error) bool
}
// NewFallbackDialerForStreaming creates a fallback dialer for in-tree callers
// that use k8s.io/streaming/pkg/httpstream types.
func NewFallbackDialerForStreaming(primary, secondary streamhttp.Dialer, shouldFallback func(error) bool) streamhttp.Dialer {
return &StreamingFallbackDialer{
primary: primary,
secondary: secondary,
shouldFallback: shouldFallback,
}
}
// Dial is the single function necessary to implement the "httpstream.Dialer" interface.
// It takes the protocol version strings to request, returning an the upgraded
// httstream.Connection and the negotiated protocol version accepted. If the initial
@@ -55,3 +75,14 @@ func (f *FallbackDialer) Dial(protocols ...string) (httpstream.Connection, strin
}
return conn, version, err
}
// Dial is the single function necessary to implement the
// "k8s.io/streaming/pkg/httpstream.Dialer" interface.
func (f *StreamingFallbackDialer) Dial(protocols ...string) (streamhttp.Connection, string, error) {
conn, version, err := f.primary.Dial(protocols...)
if err != nil && f.shouldFallback(err) {
klog.V(4).Infof("fallback to secondary dialer from primary dialer err: %v", err)
return f.secondary.Dial(protocols...)
}
return conn, version, err
}

View File

@@ -26,10 +26,13 @@ import (
"strconv"
"strings"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
streamhttp "k8s.io/streaming/pkg/httpstream"
netutils "k8s.io/utils/net"
)
@@ -164,6 +167,12 @@ func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, rea
return NewOnAddresses(dialer, []string{"localhost"}, ports, stopChan, readyChan, out, errOut)
}
// NewForStreaming creates a new PortForwarder with localhost listen addresses
// for in-tree callers that use k8s.io/streaming/pkg/httpstream types.
func NewForStreaming(dialer streamhttp.Dialer, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
return NewOnAddressesForStreaming(dialer, []string{"localhost"}, ports, stopChan, readyChan, out, errOut)
}
// NewOnAddresses creates a new PortForwarder with custom listen addresses.
func NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
if len(addresses) == 0 {
@@ -191,6 +200,95 @@ func NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string
}, nil
}
// NewOnAddressesForStreaming creates a new PortForwarder with custom listen
// addresses for in-tree callers that use k8s.io/streaming/pkg/httpstream types.
func NewOnAddressesForStreaming(dialer streamhttp.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
return NewOnAddresses(&compatDialerAdapter{delegate: dialer}, addresses, ports, stopChan, readyChan, out, errOut)
}
type compatDialerAdapter struct {
delegate streamhttp.Dialer
}
func (d *compatDialerAdapter) Dial(protocols ...string) (httpstream.Connection, string, error) {
conn, protocol, err := d.delegate.Dial(protocols...)
if err != nil {
return nil, "", err
}
return &compatConnectionAdapter{delegate: conn}, protocol, nil
}
type compatConnectionAdapter struct {
delegate streamhttp.Connection
}
func (c *compatConnectionAdapter) CreateStream(headers http.Header) (httpstream.Stream, error) {
stream, err := c.delegate.CreateStream(headers)
if err != nil {
return nil, err
}
return &compatStreamAdapter{delegate: stream}, nil
}
func (c *compatConnectionAdapter) Close() error {
return c.delegate.Close()
}
func (c *compatConnectionAdapter) CloseChan() <-chan bool {
return c.delegate.CloseChan()
}
func (c *compatConnectionAdapter) SetIdleTimeout(timeout time.Duration) {
c.delegate.SetIdleTimeout(timeout)
}
func (c *compatConnectionAdapter) RemoveStreams(streams ...httpstream.Stream) {
streamingStreams := make([]streamhttp.Stream, 0, len(streams))
for _, stream := range streams {
if stream == nil {
continue
}
if s, ok := stream.(*compatStreamAdapter); ok {
streamingStreams = append(streamingStreams, s.delegate)
continue
}
if s, ok := stream.(streamhttp.Stream); ok {
streamingStreams = append(streamingStreams, s)
continue
}
klog.V(5).Infof("dropping unadaptable stream %T in portforward RemoveStreams", stream)
}
c.delegate.RemoveStreams(streamingStreams...)
}
type compatStreamAdapter struct {
delegate streamhttp.Stream
}
func (s *compatStreamAdapter) Read(p []byte) (int, error) {
return s.delegate.Read(p)
}
func (s *compatStreamAdapter) Write(p []byte) (int, error) {
return s.delegate.Write(p)
}
func (s *compatStreamAdapter) Close() error {
return s.delegate.Close()
}
func (s *compatStreamAdapter) Reset() error {
return s.delegate.Reset()
}
func (s *compatStreamAdapter) Headers() http.Header {
return s.delegate.Headers()
}
func (s *compatStreamAdapter) Identifier() uint32 {
return s.delegate.Identifier()
}
// ForwardPorts formats and executes a port forwarding request. The connection will remain
// open until stopChan is closed.
func (pf *PortForwarder) ForwardPorts() error {

View File

@@ -24,11 +24,13 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
httpstreamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
constants "k8s.io/apimachinery/pkg/util/portforward"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport/websocket"
"k8s.io/klog/v2"
streamhttp "k8s.io/streaming/pkg/httpstream"
streamspdy "k8s.io/streaming/pkg/httpstream/spdy"
)
const PingPeriod = 10 * time.Second
@@ -40,6 +42,14 @@ type tunnelingDialer struct {
holder websocket.ConnectionHolder
}
// streamingTunnelingDialer implements "k8s.io/streaming/pkg/httpstream.Dialer"
// for in-tree callers.
type streamingTunnelingDialer struct {
url *url.URL
transport http.RoundTripper
holder websocket.ConnectionHolder
}
// NewTunnelingDialer creates and returns the tunnelingDialer structure which implemements the "httpstream.Dialer"
// interface. The dialer can upgrade a websocket request, creating a websocket connection. This function
// returns an error if one occurs.
@@ -55,25 +65,36 @@ func NewSPDYOverWebsocketDialer(url *url.URL, config *restclient.Config) (httpst
}, nil
}
// Dial upgrades to a tunneling streaming connection, returning a SPDY connection
// containing a WebSockets connection (which implements "net.Conn"). Also
// returns the protocol negotiated, or an error.
func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
// NewSPDYOverWebsocketDialerForStreaming creates a SPDY-over-websocket dialer
// for in-tree callers that use k8s.io/streaming/pkg/httpstream types.
func NewSPDYOverWebsocketDialerForStreaming(url *url.URL, config *restclient.Config) (streamhttp.Dialer, error) {
transport, holder, err := websocket.RoundTripperFor(config)
if err != nil {
return nil, err
}
return &streamingTunnelingDialer{
url: url,
transport: transport,
holder: holder,
}, nil
}
func negotiateSPDYOverWebsocket(url *url.URL, transport http.RoundTripper, holder websocket.ConnectionHolder, protocols ...string) (*TunnelingConnection, string, error) {
// There is no passed context, so skip the context when creating request for now.
// Websockets requires "GET" method: RFC 6455 Sec. 4.1 (page 17).
req, err := http.NewRequest("GET", d.url.String(), nil)
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, "", err
}
// Add the spdy tunneling prefix to the requested protocols. The tunneling
// handler will know how to negotiate these protocols.
tunnelingProtocols := []string{}
tunnelingProtocols := make([]string, 0, len(protocols))
for _, protocol := range protocols {
tunnelingProtocol := constants.WebsocketsSPDYTunnelingPrefix + protocol
tunnelingProtocols = append(tunnelingProtocols, tunnelingProtocol)
}
klog.V(4).Infoln("Before WebSocket Upgrade Connection...")
conn, err := websocket.Negotiate(d.transport, d.holder, req, tunnelingProtocols...)
conn, err := websocket.Negotiate(transport, holder, req, tunnelingProtocols...)
if err != nil {
return nil, "", err
}
@@ -84,10 +105,32 @@ func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, stri
protocol = strings.TrimPrefix(protocol, constants.WebsocketsSPDYTunnelingPrefix)
klog.V(4).Infof("negotiated protocol: %s", protocol)
// Wrap the websocket connection which implements "net.Conn".
tConn := NewTunnelingConnection("client", conn)
return NewTunnelingConnection("client", conn), protocol, nil
}
// Dial upgrades to a tunneling streaming connection, returning a SPDY connection
// containing a WebSockets connection (which implements "net.Conn"). Also
// returns the protocol negotiated, or an error.
func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
tConn, protocol, err := negotiateSPDYOverWebsocket(d.url, d.transport, d.holder, protocols...)
if err != nil {
return nil, "", err
}
// Create SPDY connection injecting the previously created tunneling connection.
spdyConn, err := spdy.NewClientConnectionWithPings(tConn, PingPeriod)
spdyConn, err := httpstreamspdy.NewClientConnectionWithPings(tConn, PingPeriod)
return spdyConn, protocol, err
}
// Dial upgrades to a tunneling streaming connection for callers using
// k8s.io/streaming/pkg/httpstream types.
func (d *streamingTunnelingDialer) Dial(protocols ...string) (streamhttp.Connection, string, error) {
tConn, protocol, err := negotiateSPDYOverWebsocket(d.url, d.transport, d.holder, protocols...)
if err != nil {
return nil, "", err
}
// Create SPDY connection injecting the previously created tunneling connection.
spdyConn, err := streamspdy.NewClientConnectionWithPings(tConn, PingPeriod)
return spdyConn, protocol, err
}

View File

@@ -25,17 +25,21 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/httpstream"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnettesting "k8s.io/apimachinery/pkg/util/net/testing"
"k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
utilexec "k8s.io/client-go/util/exec"
"k8s.io/streaming/pkg/httpstream"
)
func TestFallbackClient_WebSocketPrimarySucceeds(t *testing.T) {
@@ -234,6 +238,74 @@ func TestFallbackClient_PrimaryAndSecondaryFail(t *testing.T) {
}
}
func TestFallbackClient_SPDYSecondaryNonZeroExitCode(t *testing.T) {
const expectedExitCode = 23
const expectedStdout = "stdout-before-exit"
// Create fake SPDY server that writes stdout followed by a v4 status error.
spdyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx, err := createHTTPStreams(w, req, &StreamOptions{
Stdin: strings.NewReader("input"),
Stdout: &bytes.Buffer{},
})
if err != nil {
w.WriteHeader(http.StatusForbidden)
return
}
defer ctx.conn.Close()
if _, err := io.WriteString(ctx.stdoutStream, expectedStdout); err != nil {
t.Fatalf("error writing stdout stream: %v", err)
}
statusErr := &apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Reason: remotecommand.NonZeroExitCodeReason,
Details: &metav1.StatusDetails{
Causes: []metav1.StatusCause{
{
Type: remotecommand.ExitCodeCauseType,
Message: "23",
},
},
},
Message: "command terminated with non-zero exit code: 23",
}}
if err := ctx.writeStatus(statusErr); err != nil {
t.Fatalf("error writing status stream: %v", err)
}
}))
defer spdyServer.Close()
spdyLocation, err := url.Parse(spdyServer.URL)
require.NoError(t, err)
// Primary websocket executor points at a SPDY-only endpoint and should fail.
websocketExecutor, err := NewWebSocketExecutor(&rest.Config{Host: spdyLocation.Host}, "GET", spdyServer.URL+"?stdin=true&stdout=true")
require.NoError(t, err)
spdyExecutor, err := NewSPDYExecutor(&rest.Config{Host: spdyLocation.Host}, "POST", spdyLocation)
require.NoError(t, err)
var sawPrimaryError atomic.Bool
exec, err := NewFallbackExecutor(websocketExecutor, spdyExecutor, func(err error) bool {
sawPrimaryError.Store(true)
return true
})
require.NoError(t, err)
var stdout bytes.Buffer
err = exec.StreamWithContext(context.Background(), StreamOptions{
Stdin: strings.NewReader("input"),
Stdout: &stdout,
})
require.Error(t, err)
require.True(t, sawPrimaryError.Load(), "expected primary websocket path to fail and trigger fallback")
var exitErr utilexec.ExitError
require.ErrorAs(t, err, &exitErr, "expected ExitError from secondary SPDY path, got: %T: %v", err, err)
require.Equal(t, expectedExitCode, exitErr.ExitStatus())
require.Equal(t, expectedStdout, stdout.String())
}
// localhostCert was generated from crypto/tls/generate_cert.go with the following command:
//
// go run generate_cert.go --rsa-bits 2048 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h

View File

@@ -21,8 +21,8 @@ import (
"io"
"net/http"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/klog/v2"
"k8s.io/streaming/pkg/httpstream"
)
// StreamOptions holds information pertaining to the current streaming session:

View File

@@ -22,11 +22,11 @@ import (
"net/http"
"net/url"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"
"k8s.io/streaming/pkg/httpstream"
)
// spdyStreamExecutor handles transporting standard shell streams over an httpstream connection.
@@ -109,7 +109,7 @@ func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options
return fmt.Errorf("redirect not allowed")
}
}
conn, protocol, err := spdy.Negotiate(
conn, protocol, err := spdy.NegotiateStreaming(
e.upgrader,
&client,
req,

View File

@@ -33,12 +33,13 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
remotecommandconsts "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
utilexec "k8s.io/client-go/util/exec"
"k8s.io/klog/v2/ktesting"
"k8s.io/streaming/pkg/httpstream"
"k8s.io/streaming/pkg/httpstream/spdy"
)
type AttachFunc func(in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan TerminalSize) error
@@ -183,6 +184,71 @@ func TestSPDYExecutorStream(t *testing.T) {
}
}
// TestSPDYExecutorNonZeroExitCode verifies SPDY v4 status-stream non-zero exit
// code handling remains compatible with CRI streaming servers.
func TestSPDYExecutorNonZeroExitCode(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
ctx, err := createHTTPStreams(writer, request, &StreamOptions{
Stdin: strings.NewReader("input"),
Stdout: &bytes.Buffer{},
})
if err != nil {
t.Errorf("unexpected stream setup error: %v", err)
return
}
defer ctx.conn.Close()
if _, err := io.WriteString(ctx.stdoutStream, "stdout-before-exit"); err != nil {
t.Errorf("unexpected stdout write error: %v", err)
return
}
err = ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{
Status: metav1.StatusFailure,
Reason: remotecommandconsts.NonZeroExitCodeReason,
Details: &metav1.StatusDetails{
Causes: []metav1.StatusCause{
{
Type: remotecommandconsts.ExitCodeCauseType,
Message: "17",
},
},
},
Message: "command terminated with non-zero exit code: 17",
}})
if err != nil {
t.Errorf("unexpected status write error: %v", err)
}
}))
defer server.Close()
uri, _ := url.Parse(server.URL)
exec, err := NewSPDYExecutor(&rest.Config{Host: uri.Host}, "POST", uri)
if err != nil {
t.Fatalf("unexpected executor error: %v", err)
}
var stdout bytes.Buffer
err = exec.StreamWithContext(context.Background(), StreamOptions{
Stdin: strings.NewReader("input"),
Stdout: &stdout,
})
if err == nil {
t.Fatal("expected non-zero exit error, got nil")
}
var exitErr utilexec.ExitError
if !errors.As(err, &exitErr) {
t.Fatalf("expected ExitError, got: %T: %v", err, err)
}
if exitErr.ExitStatus() != 17 {
t.Fatalf("expected exit code 17, got %d", exitErr.ExitStatus())
}
if got := stdout.String(); got != "stdout-before-exit" {
t.Fatalf("unexpected stdout: %q", got)
}
}
func newTestHTTPServer(f AttachFunc, options *StreamOptions) *httptest.Server {
//nolint:errcheck
server := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {

View File

@@ -22,8 +22,8 @@ import (
"net/http"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/klog/v2"
"k8s.io/streaming/pkg/httpstream"
)
// streamProtocolV1 implements the first version of the streaming exec & attach

View File

@@ -26,9 +26,9 @@ import (
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2/ktesting"
"k8s.io/streaming/pkg/httpstream"
)
type fakeReader struct {

View File

@@ -29,11 +29,11 @@ import (
gwebsocket "github.com/gorilla/websocket"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport/websocket"
"k8s.io/klog/v2"
"k8s.io/streaming/pkg/httpstream"
)
// writeDeadline defines the time that a client-side write to the websocket

View File

@@ -42,13 +42,13 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
utilnettesting "k8s.io/apimachinery/pkg/util/net/testing"
"k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2/ktesting"
"k8s.io/streaming/pkg/httpstream/wsstream"
)
// TestWebSocketClient_LoopbackStdinToStdout returns random data sent on the STDIN channel

View File

@@ -23,8 +23,10 @@ import (
"time"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
httpstreamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy"
restclient "k8s.io/client-go/rest"
"k8s.io/klog/v2"
streamhttp "k8s.io/streaming/pkg/httpstream"
)
// Upgrader validates a response from the server after a SPDY upgrade.
@@ -43,7 +45,7 @@ func RoundTripperFor(config *restclient.Config) (http.RoundTripper, Upgrader, er
if config.Proxy != nil {
proxy = config.Proxy
}
upgradeRoundTripper, err := spdy.NewRoundTripperWithConfig(spdy.RoundTripperConfig{
upgradeRoundTripper, err := httpstreamspdy.NewRoundTripperWithConfig(httpstreamspdy.RoundTripperConfig{
TLS: tlsConfig,
Proxier: proxy,
PingPeriod: time.Second * 5,
@@ -79,6 +81,18 @@ func NewDialer(upgrader Upgrader, client *http.Client, method string, url *url.U
}
}
// NewDialerForStreaming creates a SPDY dialer for in-tree callers that use
// k8s.io/streaming/pkg/httpstream types.
func NewDialerForStreaming(upgrader Upgrader, client *http.Client, method string, url *url.URL) streamhttp.Dialer {
return &streamingDialerAdapter{delegate: NewDialer(upgrader, client, method, url)}
}
// NewUpgraderForStreaming adapts a streaming upgrader for callers that need
// the compatibility Upgrader interface.
func NewUpgraderForStreaming(upgrader streamhttp.UpgradeRoundTripper) Upgrader {
return &compatUpgraderAdapter{delegate: upgrader}
}
func (d *dialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
req, err := http.NewRequest(d.method, d.url.String(), nil)
if err != nil {
@@ -105,3 +119,199 @@ func Negotiate(upgrader Upgrader, client *http.Client, req *http.Request, protoc
}
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
}
// NegotiateStreaming is for in-tree callers that still operate on
// k8s.io/streaming/pkg/httpstream types.
func NegotiateStreaming(upgrader Upgrader, client *http.Client, req *http.Request, protocols ...string) (streamhttp.Connection, string, error) {
conn, protocol, err := Negotiate(upgrader, client, req, protocols...)
if err != nil {
return nil, "", err
}
return wrapStreamingConnection(conn), protocol, nil
}
type streamingDialerAdapter struct {
delegate httpstream.Dialer
}
func (d *streamingDialerAdapter) Dial(protocols ...string) (streamhttp.Connection, string, error) {
conn, protocol, err := d.delegate.Dial(protocols...)
if err != nil {
return nil, "", err
}
return wrapStreamingConnection(conn), protocol, nil
}
type compatUpgraderAdapter struct {
delegate streamhttp.UpgradeRoundTripper
}
func (u *compatUpgraderAdapter) NewConnection(resp *http.Response) (httpstream.Connection, error) {
conn, err := u.delegate.NewConnection(resp)
if err != nil {
return nil, err
}
return wrapCompatConnection(conn), nil
}
type streamingStreamAdapter struct {
delegate httpstream.Stream
}
func (s *streamingStreamAdapter) Read(p []byte) (int, error) {
return s.delegate.Read(p)
}
func (s *streamingStreamAdapter) Write(p []byte) (int, error) {
return s.delegate.Write(p)
}
func (s *streamingStreamAdapter) Close() error {
return s.delegate.Close()
}
func (s *streamingStreamAdapter) Reset() error {
return s.delegate.Reset()
}
func (s *streamingStreamAdapter) Headers() http.Header {
return s.delegate.Headers()
}
func (s *streamingStreamAdapter) Identifier() uint32 {
return s.delegate.Identifier()
}
type streamingConnectionAdapter struct {
delegate httpstream.Connection
}
func (c *streamingConnectionAdapter) CreateStream(headers http.Header) (streamhttp.Stream, error) {
stream, err := c.delegate.CreateStream(headers)
if err != nil {
return nil, err
}
return &streamingStreamAdapter{delegate: stream}, nil
}
func (c *streamingConnectionAdapter) Close() error {
return c.delegate.Close()
}
func (c *streamingConnectionAdapter) CloseChan() <-chan bool {
return c.delegate.CloseChan()
}
func (c *streamingConnectionAdapter) SetIdleTimeout(timeout time.Duration) {
c.delegate.SetIdleTimeout(timeout)
}
func (c *streamingConnectionAdapter) RemoveStreams(streams ...streamhttp.Stream) {
compatStreams := make([]httpstream.Stream, 0, len(streams))
for _, stream := range streams {
if stream == nil {
continue
}
if s, ok := stream.(*streamingStreamAdapter); ok {
compatStreams = append(compatStreams, s.delegate)
continue
}
if s, ok := stream.(httpstream.Stream); ok {
compatStreams = append(compatStreams, s)
continue
}
klog.V(5).Infof("dropping unadaptable streaming stream %T in RemoveStreams", stream)
}
c.delegate.RemoveStreams(compatStreams...)
}
func wrapStreamingConnection(conn httpstream.Connection) streamhttp.Connection {
if conn == nil {
return nil
}
if wrapped, ok := conn.(*compatConnectionAdapter); ok {
return wrapped.delegate
}
return &streamingConnectionAdapter{delegate: conn}
}
type compatStreamAdapter struct {
delegate streamhttp.Stream
}
func (s *compatStreamAdapter) Read(p []byte) (int, error) {
return s.delegate.Read(p)
}
func (s *compatStreamAdapter) Write(p []byte) (int, error) {
return s.delegate.Write(p)
}
func (s *compatStreamAdapter) Close() error {
return s.delegate.Close()
}
func (s *compatStreamAdapter) Reset() error {
return s.delegate.Reset()
}
func (s *compatStreamAdapter) Headers() http.Header {
return s.delegate.Headers()
}
func (s *compatStreamAdapter) Identifier() uint32 {
return s.delegate.Identifier()
}
type compatConnectionAdapter struct {
delegate streamhttp.Connection
}
func (c *compatConnectionAdapter) CreateStream(headers http.Header) (httpstream.Stream, error) {
stream, err := c.delegate.CreateStream(headers)
if err != nil {
return nil, err
}
return &compatStreamAdapter{delegate: stream}, nil
}
func (c *compatConnectionAdapter) Close() error {
return c.delegate.Close()
}
func (c *compatConnectionAdapter) CloseChan() <-chan bool {
return c.delegate.CloseChan()
}
func (c *compatConnectionAdapter) SetIdleTimeout(timeout time.Duration) {
c.delegate.SetIdleTimeout(timeout)
}
func (c *compatConnectionAdapter) RemoveStreams(streams ...httpstream.Stream) {
streamingStreams := make([]streamhttp.Stream, 0, len(streams))
for _, stream := range streams {
if stream == nil {
continue
}
if s, ok := stream.(*compatStreamAdapter); ok {
streamingStreams = append(streamingStreams, s.delegate)
continue
}
if s, ok := stream.(streamhttp.Stream); ok {
streamingStreams = append(streamingStreams, s)
continue
}
klog.V(5).Infof("dropping unadaptable compat stream %T in RemoveStreams", stream)
}
c.delegate.RemoveStreams(streamingStreams...)
}
func wrapCompatConnection(conn streamhttp.Connection) httpstream.Connection {
if conn == nil {
return nil
}
if wrapped, ok := conn.(*streamingConnectionAdapter); ok {
return wrapped.delegate
}
return &compatConnectionAdapter{delegate: conn}
}

View File

@@ -31,11 +31,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport"
"k8s.io/streaming/pkg/httpstream"
"k8s.io/streaming/pkg/httpstream/wsstream"
)
var (

View File

@@ -32,10 +32,10 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/streaming/pkg/httpstream"
"k8s.io/streaming/pkg/httpstream/wsstream"
)
func TestWebSocketRoundTripper_RoundTripperSucceeds(t *testing.T) {