make sure all streams are created before starting demux websocket

Kubernetes-commit: 3379d5ac4b6a1afbbaead06689a8584ce546a275
This commit is contained in:
xuzhenglun
2025-07-08 18:31:30 +08:00
committed by Kubernetes Publisher
parent 6c24674413
commit 2f016580ef
9 changed files with 53 additions and 15 deletions

View File

@@ -54,5 +54,5 @@ type streamCreator interface {
}
type streamProtocolHandler interface {
stream(conn streamCreator) error
stream(conn streamCreator, ready chan<- struct{}) error
}

View File

@@ -157,7 +157,11 @@ func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options Stre
panicChan <- p
}
}()
errorChan <- streamer.stream(conn)
// The SPDY executor does not need to synchronize stream creation, so we pass a nil
// ready channel. The underlying spdystream library handles stream multiplexing
// without a race condition.
errorChan <- streamer.stream(conn, nil)
}()
select {

View File

@@ -352,7 +352,7 @@ func TestStreamExitsAfterConnectionIsClosed(t *testing.T) {
errorChan := make(chan error)
go func() {
errorChan <- streamer.stream(conn)
errorChan <- streamer.stream(conn, nil)
}()
// Wait until stream goroutine starts.

View File

@@ -47,7 +47,7 @@ func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
}
}
func (p *streamProtocolV1) stream(conn streamCreator) error {
func (p *streamProtocolV1) stream(conn streamCreator, ready chan<- struct{}) error {
doneChan := make(chan struct{}, 2)
errorChan := make(chan error)
@@ -106,6 +106,11 @@ func (p *streamProtocolV1) stream(conn streamCreator) error {
defer p.remoteStderr.Reset()
}
// Signal that all streams have been created.
if ready != nil {
close(ready)
}
// now that all the streams have been created, proceed with reading & copying
// always read from errorStream

View File

@@ -169,11 +169,16 @@ func (p *streamProtocolV2) copyStderr(wg *sync.WaitGroup) {
}()
}
func (p *streamProtocolV2) stream(conn streamCreator) error {
func (p *streamProtocolV2) stream(conn streamCreator, ready chan<- struct{}) error {
if err := p.createStreams(conn); err != nil {
return err
}
// Signal that all streams have been created.
if ready != nil {
close(ready)
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV2{})

View File

@@ -82,11 +82,16 @@ func (p *streamProtocolV3) handleResizes() {
}()
}
func (p *streamProtocolV3) stream(conn streamCreator) error {
func (p *streamProtocolV3) stream(conn streamCreator, ready chan<- struct{}) error {
if err := p.createStreams(conn); err != nil {
return err
}
// Signal that all streams have been created.
if ready != nil {
close(ready)
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV3{})

View File

@@ -51,11 +51,16 @@ func (p *streamProtocolV4) handleResizes() {
p.streamProtocolV3.handleResizes()
}
func (p *streamProtocolV4) stream(conn streamCreator) error {
func (p *streamProtocolV4) stream(conn streamCreator, ready chan<- struct{}) error {
if err := p.createStreams(conn); err != nil {
return err
}
// Signal that all streams have been created.
if ready != nil {
close(ready)
}
// now that all the streams have been created, proceed with reading & copying
errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})

View File

@@ -30,6 +30,6 @@ func newStreamProtocolV5(options StreamOptions) streamProtocolHandler {
}
}
func (p *streamProtocolV5) stream(conn streamCreator) error {
return p.streamProtocolV4.stream(conn)
func (p *streamProtocolV5) stream(conn streamCreator, ready chan<- struct{}) error {
return p.streamProtocolV4.stream(conn, ready)
}

View File

@@ -157,13 +157,27 @@ func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options Stream
panicChan <- p
}
}()
readyChan := make(chan struct{})
creator := newWSStreamCreator(conn)
go creator.readDemuxLoop(
e.upgrader.DataBufferSize(),
e.heartbeatPeriod,
e.heartbeatDeadline,
)
errorChan <- streamer.stream(creator)
go func() {
select {
// Wait until all streams have been created before starting the readDemuxLoop.
// This is to avoid a race condition where the readDemuxLoop receives a message
// for a stream that has not yet been created.
case <-readyChan:
case <-ctx.Done():
creator.closeAllStreamReaders(ctx.Err())
return
}
creator.readDemuxLoop(
e.upgrader.DataBufferSize(),
e.heartbeatPeriod,
e.heartbeatDeadline,
)
}()
errorChan <- streamer.stream(creator, readyChan)
}()
select {