Merge pull request #5194 from ncdc/4882-fix-flaky-spdy-tests-osx

bump(docker/spdystream):e731c8f9f19ffd7e51a469a2de1580c1dfbb4fae
This commit is contained in:
Alex Mohr 2015-03-10 10:07:18 -07:00
commit 420b6cde6c
4 changed files with 131 additions and 38 deletions

2
Godeps/Godeps.json generated
View File

@ -114,7 +114,7 @@
}, },
{ {
"ImportPath": "github.com/docker/spdystream", "ImportPath": "github.com/docker/spdystream",
"Rev": "e9bf9912b85eec0ed6aaf317808a0eab25e3ca43" "Rev": "e731c8f9f19ffd7e51a469a2de1580c1dfbb4fae"
}, },
{ {
"ImportPath": "github.com/elazarl/go-bindata-assetfs", "ImportPath": "github.com/elazarl/go-bindata-assetfs",

View File

@ -31,6 +31,7 @@ type AuthHandler func(header http.Header, slot uint8, parent uint32) bool
type idleAwareFramer struct { type idleAwareFramer struct {
f *spdy.Framer f *spdy.Framer
conn *Connection conn *Connection
writeLock sync.Mutex
resetChan chan struct{} resetChan chan struct{}
setTimeoutChan chan time.Duration setTimeoutChan chan time.Duration
timeout time.Duration timeout time.Duration
@ -49,6 +50,7 @@ func (i *idleAwareFramer) monitor() {
var ( var (
timer *time.Timer timer *time.Timer
expired <-chan time.Time expired <-chan time.Time
resetChan = i.resetChan
) )
Loop: Loop:
for { for {
@ -67,7 +69,7 @@ Loop:
timer.Reset(timeout) timer.Reset(timeout)
} }
} }
case <-i.resetChan: case <-resetChan:
if timer != nil && i.timeout > 0 { if timer != nil && i.timeout > 0 {
timer.Reset(i.timeout) timer.Reset(i.timeout)
} }
@ -87,12 +89,25 @@ Loop:
if timer != nil { if timer != nil {
timer.Stop() timer.Stop()
} }
i.writeLock.Lock()
close(resetChan)
i.resetChan = nil
i.writeLock.Unlock()
break Loop break Loop
} }
} }
// Drain resetChan
for _ = range resetChan {
}
} }
func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error { func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
i.writeLock.Lock()
defer i.writeLock.Unlock()
if i.resetChan == nil {
return io.EOF
}
err := i.f.WriteFrame(frame) err := i.f.WriteFrame(frame)
if err != nil { if err != nil {
return err return err
@ -109,6 +124,10 @@ func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
return nil, err return nil, err
} }
// resetChan should never be closed since it is only closed
// when the connection has closed its closeChan. This closure
// only occurs after all Reads have finished
// TODO (dmcgowan): refactor relationship into connection
i.resetChan <- struct{}{} i.resetChan <- struct{}{}
return frame, nil return frame, nil
@ -117,7 +136,6 @@ func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
type Connection struct { type Connection struct {
conn net.Conn conn net.Conn
framer *idleAwareFramer framer *idleAwareFramer
writeLock sync.Mutex
closeChan chan bool closeChan chan bool
goneAway bool goneAway bool
@ -209,9 +227,7 @@ func (s *Connection) Ping() (time.Duration, error) {
frame := &spdy.PingFrame{Id: pid} frame := &spdy.PingFrame{Id: pid}
startTime := time.Now() startTime := time.Now()
s.writeLock.Lock()
writeErr := s.framer.WriteFrame(frame) writeErr := s.framer.WriteFrame(frame)
s.writeLock.Unlock()
if writeErr != nil { if writeErr != nil {
return time.Duration(0), writeErr return time.Duration(0), writeErr
} }
@ -512,8 +528,6 @@ func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
func (s *Connection) handlePingFrame(frame *spdy.PingFrame) error { func (s *Connection) handlePingFrame(frame *spdy.PingFrame) error {
if s.pingId&0x01 != frame.Id&0x01 { if s.pingId&0x01 != frame.Id&0x01 {
s.writeLock.Lock()
defer s.writeLock.Unlock()
return s.framer.WriteFrame(frame) return s.framer.WriteFrame(frame)
} }
pingChan, pingOk := s.pingChans[frame.Id] pingChan, pingOk := s.pingChans[frame.Id]
@ -663,9 +677,7 @@ func (s *Connection) Close() error {
Status: spdy.GoAwayOK, Status: spdy.GoAwayOK,
} }
s.writeLock.Lock()
err := s.framer.WriteFrame(goAwayFrame) err := s.framer.WriteFrame(goAwayFrame)
s.writeLock.Unlock()
if err != nil { if err != nil {
return err return err
} }
@ -750,8 +762,6 @@ func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool)
CFHeader: spdy.ControlFrameHeader{Flags: flags}, CFHeader: spdy.ControlFrameHeader{Flags: flags},
} }
s.writeLock.Lock()
defer s.writeLock.Unlock()
return s.framer.WriteFrame(headerFrame) return s.framer.WriteFrame(headerFrame)
} }
@ -767,8 +777,6 @@ func (s *Connection) sendReply(headers http.Header, stream *Stream, fin bool) er
CFHeader: spdy.ControlFrameHeader{Flags: flags}, CFHeader: spdy.ControlFrameHeader{Flags: flags},
} }
s.writeLock.Lock()
defer s.writeLock.Unlock()
return s.framer.WriteFrame(replyFrame) return s.framer.WriteFrame(replyFrame)
} }
@ -778,8 +786,6 @@ func (s *Connection) sendResetFrame(status spdy.RstStreamStatus, streamId spdy.S
Status: status, Status: status,
} }
s.writeLock.Lock()
defer s.writeLock.Unlock()
return s.framer.WriteFrame(resetFrame) return s.framer.WriteFrame(resetFrame)
} }
@ -806,8 +812,6 @@ func (s *Connection) sendStream(stream *Stream, fin bool) error {
CFHeader: spdy.ControlFrameHeader{Flags: flags}, CFHeader: spdy.ControlFrameHeader{Flags: flags},
} }
s.writeLock.Lock()
defer s.writeLock.Unlock()
return s.framer.WriteFrame(streamFrame) return s.framer.WriteFrame(streamFrame)
} }

View File

@ -1,10 +1,12 @@
package spdystream package spdystream
import ( import (
"bufio"
"bytes" "bytes"
"io" "io"
"net" "net"
"net/http" "net/http"
"net/http/httptest"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -322,8 +324,6 @@ func TestUnexpectedRemoteConnectionClosed(t *testing.T) {
if e == nil || e != io.EOF { if e == nil || e != io.EOF {
t.Fatalf("(%d) Expected to get an EOF stream error", tix) t.Fatalf("(%d) Expected to get an EOF stream error", tix)
} }
case <-time.After(500 * time.Millisecond):
t.Fatalf("(%d) Timeout waiting for stream closure", tix)
} }
closeErr = conn.Close() closeErr = conn.Close()
@ -381,8 +381,6 @@ func TestCloseNotification(t *testing.T) {
var serverConn net.Conn var serverConn net.Conn
select { select {
case serverConn = <-serverConnChan: case serverConn = <-serverConnChan:
case <-time.After(500 * time.Millisecond):
t.Fatal("Timed out waiting for connection closed notification")
} }
err = serverConn.Close() err = serverConn.Close()
@ -522,11 +520,7 @@ func TestIdleNoData(t *testing.T) {
go spdyConn.Serve(NoOpStreamHandler) go spdyConn.Serve(NoOpStreamHandler)
spdyConn.SetIdleTimeout(10 * time.Millisecond) spdyConn.SetIdleTimeout(10 * time.Millisecond)
select { <-spdyConn.CloseChan()
case <-spdyConn.CloseChan():
case <-time.After(20 * time.Millisecond):
t.Fatal("Timed out waiting for idle connection closure")
}
closeErr := server.Close() closeErr := server.Close()
if closeErr != nil { if closeErr != nil {
@ -577,8 +571,6 @@ func TestIdleWithData(t *testing.T) {
writesFinished := false writesFinished := false
expired := time.NewTimer(200 * time.Millisecond)
Loop: Loop:
for { for {
select { select {
@ -589,8 +581,6 @@ Loop:
t.Fatal("Connection closed before all writes finished") t.Fatal("Connection closed before all writes finished")
} }
break Loop break Loop
case <-expired.C:
t.Fatal("Timed out waiting for idle connection closure")
} }
} }
@ -784,6 +774,109 @@ func TestStreamResetWithDataRemaining(t *testing.T) {
wg.Wait() wg.Wait()
} }
type roundTripper struct {
conn net.Conn
}
func (s *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
r := *req
req = &r
conn, err := net.Dial("tcp", req.URL.Host)
if err != nil {
return nil, err
}
err = req.Write(conn)
if err != nil {
return nil, err
}
resp, err := http.ReadResponse(bufio.NewReader(conn), req)
if err != nil {
return nil, err
}
s.conn = conn
return resp, nil
}
// see https://github.com/GoogleCloudPlatform/kubernetes/issues/4882
func TestFramingAfterRemoteConnectionClosed(t *testing.T) {
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
streamCh := make(chan *Stream)
w.WriteHeader(http.StatusSwitchingProtocols)
netconn, _, _ := w.(http.Hijacker).Hijack()
conn, _ := NewConnection(netconn, true)
go conn.Serve(func(s *Stream) {
s.SendReply(http.Header{}, false)
streamCh <- s
})
stream := <-streamCh
io.Copy(stream, stream)
closeChan := make(chan struct{})
go func() {
stream.Reset()
conn.Close()
close(closeChan)
}()
<-closeChan
}))
server.Start()
defer server.Close()
req, err := http.NewRequest("GET", server.URL, nil)
if err != nil {
t.Fatalf("Error creating request: %s", err)
}
rt := &roundTripper{}
client := &http.Client{Transport: rt}
_, err = client.Do(req)
if err != nil {
t.Fatalf("unexpected error from client.Do: %s", err)
}
conn, err := NewConnection(rt.conn, false)
go conn.Serve(NoOpStreamHandler)
stream, err := conn.CreateStream(http.Header{}, nil, false)
if err != nil {
t.Fatalf("error creating client stream: %s", err)
}
n, err := stream.Write([]byte("hello"))
if err != nil {
t.Fatalf("error writing to stream: %s", err)
}
if n != 5 {
t.Fatalf("Expected to write 5 bytes, but actually wrote %d", n)
}
b := make([]byte, 5)
n, err = stream.Read(b)
if err != nil {
t.Fatalf("error reading from stream: %s", err)
}
if n != 5 {
t.Fatalf("Expected to read 5 bytes, but actually read %d", n)
}
if e, a := "hello", string(b[0:n]); e != a {
t.Fatalf("expected '%s', got '%s'", e, a)
}
stream.Reset()
conn.Close()
}
var authenticated bool var authenticated bool
func authStreamHandler(stream *Stream) { func authStreamHandler(stream *Stream) {

View File

@ -59,8 +59,6 @@ func (s *Stream) WriteData(data []byte, fin bool) error {
Data: data, Data: data,
} }
s.conn.writeLock.Lock()
defer s.conn.writeLock.Unlock()
debugMessage("(%p) (%d) Writing data frame", s, s.streamId) debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
return s.conn.framer.WriteFrame(dataFrame) return s.conn.framer.WriteFrame(dataFrame)
} }
@ -186,8 +184,6 @@ func (s *Stream) resetStream() error {
StreamId: s.streamId, StreamId: s.streamId,
Status: spdy.Cancel, Status: spdy.Cancel,
} }
s.conn.writeLock.Lock()
defer s.conn.writeLock.Unlock()
return s.conn.framer.WriteFrame(resetFrame) return s.conn.framer.WriteFrame(resetFrame)
} }