Merge pull request #131203 from xuzhenglun/fix-ws-race

make sure all streams are created before start demux websocket

Kubernetes-commit: 2dfb3e4ffd31c8b6b9f7c55328bcf8c7da53aa68
This commit is contained in:
Kubernetes Publisher 2025-07-08 06:45:26 -07:00
commit 879be6242f
11 changed files with 56 additions and 18 deletions

2
go.mod
View File

@ -26,7 +26,7 @@ require (
google.golang.org/protobuf v1.36.5
gopkg.in/evanphx/json-patch.v4 v4.12.0
k8s.io/api v0.0.0-20250705010445-839e6c7fb630
k8s.io/apimachinery v0.0.0-20250703090149-a9de165b70c8
k8s.io/apimachinery v0.0.0-20250708050202-b18bb6a9e8e6
k8s.io/klog/v2 v2.130.1
k8s.io/kube-openapi v0.0.0-20250628140032-d90c4fd18f59
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397

4
go.sum
View File

@ -153,8 +153,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20250705010445-839e6c7fb630 h1:pnI9Db0bmtO4qa+X6jGK8WslPvzLwW8wrAe5B2//yGU=
k8s.io/api v0.0.0-20250705010445-839e6c7fb630/go.mod h1:cQb0K/knyMnN0b7QfEoYB+YzMbFk6PMoa/XTGxEJ7iw=
k8s.io/apimachinery v0.0.0-20250703090149-a9de165b70c8 h1:cyn8l4zqyrfCZFbRwIXfhaFQ4KOK9oDCdUMBy5RVeck=
k8s.io/apimachinery v0.0.0-20250703090149-a9de165b70c8/go.mod h1:Th679JJyaVRDNFk3vKPKY43ypziDeoGnbEiEgBCz8s4=
k8s.io/apimachinery v0.0.0-20250708050202-b18bb6a9e8e6 h1:yU8UMv2l/HGGPqnHLc88hXDE//i7q/fxHS+alFSlpkM=
k8s.io/apimachinery v0.0.0-20250708050202-b18bb6a9e8e6/go.mod h1:Th679JJyaVRDNFk3vKPKY43ypziDeoGnbEiEgBCz8s4=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20250628140032-d90c4fd18f59 h1:Jc4GiFTK2HHOpfQFoQEGXTBTs2pETwHukmoD4yoTqwo=

View File

@ -54,5 +54,5 @@ type streamCreator interface {
}
type streamProtocolHandler interface {
stream(conn streamCreator) error
stream(conn streamCreator, ready chan<- struct{}) error
}

View File

@ -157,7 +157,11 @@ func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options Stre
panicChan <- p
}
}()
errorChan <- streamer.stream(conn)
// The SPDY executor does not need to synchronize stream creation, so we pass a nil
// ready channel. The underlying spdystream library handles stream multiplexing
// without a race condition.
errorChan <- streamer.stream(conn, nil)
}()
select {

View File

@ -352,7 +352,7 @@ func TestStreamExitsAfterConnectionIsClosed(t *testing.T) {
errorChan := make(chan error)
go func() {
errorChan <- streamer.stream(conn)
errorChan <- streamer.stream(conn, nil)
}()
// Wait until stream goroutine starts.

View File

@ -47,7 +47,7 @@ func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
}
}
func (p *streamProtocolV1) stream(conn streamCreator) error {
func (p *streamProtocolV1) stream(conn streamCreator, ready chan<- struct{}) error {
doneChan := make(chan struct{}, 2)
errorChan := make(chan error)
@ -106,6 +106,11 @@ func (p *streamProtocolV1) stream(conn streamCreator) error {
defer p.remoteStderr.Reset()
}
// Signal that all streams have been created.
if ready != nil {
close(ready)
}
// now that all the streams have been created, proceed with reading & copying
// always read from errorStream

View File

@ -169,11 +169,16 @@ func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
}()
}
func (p *streamProtocolV2) stream(conn streamCreator) error {
func (p *streamProtocolV2) stream(conn streamCreator, ready chan<- struct{}) error {
if err := p.createStreams(conn); err != nil {
return err
}
// Signal that all streams have been created.
if ready != nil {
close(ready)
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})

View File

@ -82,11 +82,16 @@ func (p *streamProtocolV3) handleResizes() {
}()
}
func (p *streamProtocolV3) stream(conn streamCreator) error {
func (p *streamProtocolV3) stream(conn streamCreator, ready chan<- struct{}) error {
if err := p.createStreams(conn); err != nil {
return err
}
// Signal that all streams have been created.
if ready != nil {
close(ready)
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})

View File

@ -51,11 +51,16 @@ func (p *streamProtocolV4) handleResizes() {
p.streamProtocolV3.handleResizes()
}
func (p *streamProtocolV4) stream(conn streamCreator) error {
func (p *streamProtocolV4) stream(conn streamCreator, ready chan<- struct{}) error {
if err := p.createStreams(conn); err != nil {
return err
}
// Signal that all streams have been created.
if ready != nil {
close(ready)
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})

View File

@ -30,6 +30,6 @@ func newStreamProtocolV5(options StreamOptions) streamProtocolHandler {
}
}
func (p *streamProtocolV5) stream(conn streamCreator) error {
return p.streamProtocolV4.stream(conn)
func (p *streamProtocolV5) stream(conn streamCreator, ready chan<- struct{}) error {
return p.streamProtocolV4.stream(conn, ready)
}

View File

@ -157,13 +157,27 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
panicChan <- p
}
}()
readyChan := make(chan struct{})
creator := newWSStreamCreator(conn)
go creator.readDemuxLoop(
e.upgrader.DataBufferSize(),
e.heartbeatPeriod,
e.heartbeatDeadline,
)
errorChan <- streamer.stream(creator)
go func() {
select {
// Wait until all streams have been created before starting the readDemuxLoop.
// This is to avoid a race condition where the readDemuxLoop receives a message
// for a stream that has not yet been created.
case <-readyChan:
case <-ctx.Done():
creator.closeAllStreamReaders(ctx.Err())
return
}
creator.readDemuxLoop(
e.upgrader.DataBufferSize(),
e.heartbeatPeriod,
e.heartbeatDeadline,
)
}()
errorChan <- streamer.stream(creator, readyChan)
}()
select {