From 31aa192ad7145c14d952866987837ebf7fa706dc Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Thu, 22 Oct 2015 15:18:24 -0400 Subject: [PATCH] bump(github.com/docker/spdystream): 43bffc4 Address a potential race condition where pending data frames aren't delivered when a goaway frame is received. --- Godeps/Godeps.json | 2 +- .../docker/spdystream/connection.go | 47 +++++++++++++++---- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 565bcd006dc..570929d704e 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -355,7 +355,7 @@ }, { "ImportPath": "github.com/docker/spdystream", - "Rev": "b2c3287865f3ad6aa22821ddb7b4692b896ac207" + "Rev": "43bffc458d55aa784be658c9867fbefcfcb7fecf" }, { "ImportPath": "github.com/elazarl/go-bindata-assetfs", diff --git a/Godeps/_workspace/src/github.com/docker/spdystream/connection.go b/Godeps/_workspace/src/github.com/docker/spdystream/connection.go index bbc450edc0e..f5ef2a999dd 100644 --- a/Godeps/_workspace/src/github.com/docker/spdystream/connection.go +++ b/Godeps/_workspace/src/github.com/docker/spdystream/connection.go @@ -174,6 +174,9 @@ type Connection struct { shutdownLock sync.Mutex shutdownChan chan error hasShutdown bool + + // for testing https://github.com/docker/spdystream/pull/56 + dataFrameHandler func(*spdy.DataFrame) error } // NewConnection creates a new spdy connection from an existing @@ -219,6 +222,7 @@ func NewConnection(conn net.Conn, server bool) (*Connection, error) { shutdownChan: make(chan error), } + session.dataFrameHandler = session.handleDataFrame idleAwareFramer.conn = session go idleAwareFramer.monitor() @@ -262,28 +266,42 @@ func (s *Connection) Ping() (time.Duration, error) { // which are needed to fully initiate connections. Both clients and servers // should call Serve in a separate goroutine before creating streams. func (s *Connection) Serve(newHandler StreamHandler) { + // use a WaitGroup to wait for all frames to be drained after receiving + // go-away. + var wg sync.WaitGroup + // Parition queues to ensure stream frames are handled // by the same worker, ensuring order is maintained frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS) for i := 0; i < FRAME_WORKERS; i++ { frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE) + // Ensure frame queue is drained when connection is closed go func(frameQueue *PriorityFrameQueue) { <-s.closeChan frameQueue.Drain() }(frameQueues[i]) - go s.frameHandler(frameQueues[i], newHandler) + wg.Add(1) + go func(frameQueue *PriorityFrameQueue) { + // let the WaitGroup know this worker is done + defer wg.Done() + + s.frameHandler(frameQueue, newHandler) + }(frameQueues[i]) } - var partitionRoundRobin int + var ( + partitionRoundRobin int + goAwayFrame *spdy.GoAwayFrame + ) for { readFrame, err := s.framer.ReadFrame() if err != nil { if err != io.EOF { fmt.Errorf("frame read error: %s", err) } else { - debugMessage("EOF received") + debugMessage("(%p) EOF received", s) } break } @@ -317,9 +335,9 @@ func (s *Connection) Serve(newHandler StreamHandler) { partition = partitionRoundRobin partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS case *spdy.GoAwayFrame: - priority = 0 - partition = partitionRoundRobin - partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS + // hold on to the go away frame and exit the loop + goAwayFrame = frame + break default: priority = 7 partition = partitionRoundRobin @@ -329,6 +347,15 @@ func (s *Connection) Serve(newHandler StreamHandler) { } close(s.closeChan) + // wait for all frame handler workers to indicate they've drained their queues + // before handling the go away frame + wg.Wait() + + if goAwayFrame != nil { + s.handleGoAwayFrame(goAwayFrame) + } + + // now it's safe to close remote channels and empty s.streams s.streamCond.L.Lock() // notify streams that they're now closed, which will // unblock any stream Read() calls @@ -354,7 +381,7 @@ func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler Str case *spdy.SynReplyFrame: frameErr = s.handleReplyFrame(frame) case *spdy.DataFrame: - frameErr = s.handleDataFrame(frame) + frameErr = s.dataFrameHandler(frame) case *spdy.RstStreamFrame: frameErr = s.handleResetFrame(frame) case *spdy.HeadersFrame: @@ -514,12 +541,12 @@ func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error { debugMessage("(%p) Data frame received for %d", s, frame.StreamId) stream, streamOk := s.getStream(frame.StreamId) if !streamOk { - debugMessage("Data frame gone away for %d", frame.StreamId) + debugMessage("(%p) Data frame gone away for %d", s, frame.StreamId) // Stream has already gone away return nil } if !stream.replied { - debugMessage("Data frame not replied %d", frame.StreamId) + debugMessage("(%p) Data frame not replied %d", s, frame.StreamId) // No reply received...Protocol error? return nil } @@ -871,7 +898,7 @@ func (s *Connection) addStream(stream *Stream) { func (s *Connection) removeStream(stream *Stream) { s.streamCond.L.Lock() delete(s.streams, stream.streamId) - debugMessage("Stream removed, broadcasting: %d", stream.streamId) + debugMessage("(%p) (%p) Stream removed, broadcasting: %d", s, stream, stream.streamId) s.streamCond.Broadcast() s.streamCond.L.Unlock() }