Merge pull request #16118 from ncdc/bump-spdystream

bump(github.com/docker/spdystream): 43bffc4
This commit is contained in:
Wojciech Tyczynski 2015-10-30 13:33:27 +01:00
commit 57a745a404
2 changed files with 38 additions and 11 deletions

2
Godeps/Godeps.json generated
View File

@ -355,7 +355,7 @@
}, },
{ {
"ImportPath": "github.com/docker/spdystream", "ImportPath": "github.com/docker/spdystream",
"Rev": "b2c3287865f3ad6aa22821ddb7b4692b896ac207" "Rev": "43bffc458d55aa784be658c9867fbefcfcb7fecf"
}, },
{ {
"ImportPath": "github.com/elazarl/go-bindata-assetfs", "ImportPath": "github.com/elazarl/go-bindata-assetfs",

View File

@ -174,6 +174,9 @@ type Connection struct {
shutdownLock sync.Mutex shutdownLock sync.Mutex
shutdownChan chan error shutdownChan chan error
hasShutdown bool hasShutdown bool
// for testing https://github.com/docker/spdystream/pull/56
dataFrameHandler func(*spdy.DataFrame) error
} }
// NewConnection creates a new spdy connection from an existing // NewConnection creates a new spdy connection from an existing
@ -219,6 +222,7 @@ func NewConnection(conn net.Conn, server bool) (*Connection, error) {
shutdownChan: make(chan error), shutdownChan: make(chan error),
} }
session.dataFrameHandler = session.handleDataFrame
idleAwareFramer.conn = session idleAwareFramer.conn = session
go idleAwareFramer.monitor() go idleAwareFramer.monitor()
@ -262,28 +266,42 @@ func (s *Connection) Ping() (time.Duration, error) {
// which are needed to fully initiate connections. Both clients and servers // which are needed to fully initiate connections. Both clients and servers
// should call Serve in a separate goroutine before creating streams. // should call Serve in a separate goroutine before creating streams.
func (s *Connection) Serve(newHandler StreamHandler) { func (s *Connection) Serve(newHandler StreamHandler) {
// use a WaitGroup to wait for all frames to be drained after receiving
// go-away.
var wg sync.WaitGroup
// Parition queues to ensure stream frames are handled // Parition queues to ensure stream frames are handled
// by the same worker, ensuring order is maintained // by the same worker, ensuring order is maintained
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS) frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
for i := 0; i < FRAME_WORKERS; i++ { for i := 0; i < FRAME_WORKERS; i++ {
frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE) frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)
// Ensure frame queue is drained when connection is closed // Ensure frame queue is drained when connection is closed
go func(frameQueue *PriorityFrameQueue) { go func(frameQueue *PriorityFrameQueue) {
<-s.closeChan <-s.closeChan
frameQueue.Drain() frameQueue.Drain()
}(frameQueues[i]) }(frameQueues[i])
go s.frameHandler(frameQueues[i], newHandler) wg.Add(1)
go func(frameQueue *PriorityFrameQueue) {
// let the WaitGroup know this worker is done
defer wg.Done()
s.frameHandler(frameQueue, newHandler)
}(frameQueues[i])
} }
var partitionRoundRobin int var (
partitionRoundRobin int
goAwayFrame *spdy.GoAwayFrame
)
for { for {
readFrame, err := s.framer.ReadFrame() readFrame, err := s.framer.ReadFrame()
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
fmt.Errorf("frame read error: %s", err) fmt.Errorf("frame read error: %s", err)
} else { } else {
debugMessage("EOF received") debugMessage("(%p) EOF received", s)
} }
break break
} }
@ -317,9 +335,9 @@ func (s *Connection) Serve(newHandler StreamHandler) {
partition = partitionRoundRobin partition = partitionRoundRobin
partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
case *spdy.GoAwayFrame: case *spdy.GoAwayFrame:
priority = 0 // hold on to the go away frame and exit the loop
partition = partitionRoundRobin goAwayFrame = frame
partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS break
default: default:
priority = 7 priority = 7
partition = partitionRoundRobin partition = partitionRoundRobin
@ -329,6 +347,15 @@ func (s *Connection) Serve(newHandler StreamHandler) {
} }
close(s.closeChan) close(s.closeChan)
// wait for all frame handler workers to indicate they've drained their queues
// before handling the go away frame
wg.Wait()
if goAwayFrame != nil {
s.handleGoAwayFrame(goAwayFrame)
}
// now it's safe to close remote channels and empty s.streams
s.streamCond.L.Lock() s.streamCond.L.Lock()
// notify streams that they're now closed, which will // notify streams that they're now closed, which will
// unblock any stream Read() calls // unblock any stream Read() calls
@ -354,7 +381,7 @@ func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler Str
case *spdy.SynReplyFrame: case *spdy.SynReplyFrame:
frameErr = s.handleReplyFrame(frame) frameErr = s.handleReplyFrame(frame)
case *spdy.DataFrame: case *spdy.DataFrame:
frameErr = s.handleDataFrame(frame) frameErr = s.dataFrameHandler(frame)
case *spdy.RstStreamFrame: case *spdy.RstStreamFrame:
frameErr = s.handleResetFrame(frame) frameErr = s.handleResetFrame(frame)
case *spdy.HeadersFrame: case *spdy.HeadersFrame:
@ -514,12 +541,12 @@ func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
debugMessage("(%p) Data frame received for %d", s, frame.StreamId) debugMessage("(%p) Data frame received for %d", s, frame.StreamId)
stream, streamOk := s.getStream(frame.StreamId) stream, streamOk := s.getStream(frame.StreamId)
if !streamOk { if !streamOk {
debugMessage("Data frame gone away for %d", frame.StreamId) debugMessage("(%p) Data frame gone away for %d", s, frame.StreamId)
// Stream has already gone away // Stream has already gone away
return nil return nil
} }
if !stream.replied { if !stream.replied {
debugMessage("Data frame not replied %d", frame.StreamId) debugMessage("(%p) Data frame not replied %d", s, frame.StreamId)
// No reply received...Protocol error? // No reply received...Protocol error?
return nil return nil
} }
@ -871,7 +898,7 @@ func (s *Connection) addStream(stream *Stream) {
func (s *Connection) removeStream(stream *Stream) { func (s *Connection) removeStream(stream *Stream) {
s.streamCond.L.Lock() s.streamCond.L.Lock()
delete(s.streams, stream.streamId) delete(s.streams, stream.streamId)
debugMessage("Stream removed, broadcasting: %d", stream.streamId) debugMessage("(%p) (%p) Stream removed, broadcasting: %d", s, stream, stream.streamId)
s.streamCond.Broadcast() s.streamCond.Broadcast()
s.streamCond.L.Unlock() s.streamCond.L.Unlock()
} }