mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-04 11:06:21 +00:00
vendor: fix dep warning and update yamux dependency
I got following warning after upgrading dep tool: Warning: the following project(s) have [[constraint]] stanzas in Gopkg.toml: ✗ github.com/hashicorp/yamux However, these projects are not direct dependencies of the current project: they are not imported in any .go files, nor are they in the 'required' list in Gopkg.toml. Dep only applies [[constraint]] rules to direct dependencies, so these rules will have no effect. Either import/require packages from these projects so that they become direct dependencies, or convert each [[constraint]] to an [[override]] to enforce rules on these projects, if they happen to be transitive dependencies, So let's convert constraint to override over yamux. In the meanwhile, update the yamux vendor. Full commit list: 4c2fe0d (origin/b-consul-3040) Dont output keepalive error when the session is closed f21aae5 Make sure to drain the timer channel on defer, and a clarifying comment 601ccd8 Make receive window update logic a bit cleaner 02d320c Uses timer pool in sendNoWait, like in waitForSendErr cf433c5 window update unit test for partial read; benchmark large buffer ca8dfd0 improve memory utilization in receive buffer, fix flow control 683f491 Fix race around read and write deadlines in Stream (#52) 40b86b2 Add public session CloseChan method (#44) Note that commit 4c2fe0d might also help kata-containers/agent/issues/231. Signed-off-by: Peng Tao <bergwolf@gmail.com>
This commit is contained in:
parent
0f20b6b81b
commit
0646a39ff0
5
Gopkg.lock
generated
5
Gopkg.lock
generated
@ -84,9 +84,10 @@
|
||||
version = "v1.0.0"
|
||||
|
||||
[[projects]]
|
||||
branch = "master"
|
||||
name = "github.com/hashicorp/yamux"
|
||||
packages = ["."]
|
||||
revision = "f5742cb6b85602e7fa834e9d5d91a7d7fa850824"
|
||||
revision = "3520598351bb3500a49ae9563f5539666ae0a27c"
|
||||
|
||||
[[projects]]
|
||||
name = "github.com/intel/govmm"
|
||||
@ -263,6 +264,6 @@
|
||||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "4d57a771261fe6b0e1f86bf2de82f8c39cc0047170f4277754972e2feab4796f"
|
||||
inputs-digest = "ea3d6532c4375832a1c79d70af45e6722e526bde97f6caf23d90b91267a3cf0b"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
@ -70,11 +70,11 @@
|
||||
name = "github.com/safchain/ethtool"
|
||||
revision = "79559b488d8848b53a8e34c330140c3fc37ee246"
|
||||
|
||||
[[override]]
|
||||
branch = "master"
|
||||
name = "github.com/hashicorp/yamux"
|
||||
|
||||
[prune]
|
||||
non-go = true
|
||||
go-tests = true
|
||||
unused-packages = true
|
||||
|
||||
[[constraint]]
|
||||
branch = "master"
|
||||
name = "github.com/hashicorp/yamux"
|
||||
|
65
vendor/github.com/hashicorp/yamux/session.go
generated
vendored
65
vendor/github.com/hashicorp/yamux/session.go
generated
vendored
@ -123,6 +123,12 @@ func (s *Session) IsClosed() bool {
|
||||
}
|
||||
}
|
||||
|
||||
// CloseChan returns a read-only channel which is closed as
|
||||
// soon as the session is closed.
|
||||
func (s *Session) CloseChan() <-chan struct{} {
|
||||
return s.shutdownCh
|
||||
}
|
||||
|
||||
// NumStreams returns the number of currently open streams
|
||||
func (s *Session) NumStreams() int {
|
||||
s.streamLock.Lock()
|
||||
@ -303,8 +309,10 @@ func (s *Session) keepalive() {
|
||||
case <-time.After(s.config.KeepAliveInterval):
|
||||
_, err := s.Ping()
|
||||
if err != nil {
|
||||
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
|
||||
s.exitErr(ErrKeepAliveTimeout)
|
||||
if err != ErrSessionShutdown {
|
||||
s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
|
||||
s.exitErr(ErrKeepAliveTimeout)
|
||||
}
|
||||
return
|
||||
}
|
||||
case <-s.shutdownCh:
|
||||
@ -323,8 +331,17 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error {
|
||||
// potential shutdown. Since there's the expectation that sends can happen
|
||||
// in a timely manner, we enforce the connection write timeout here.
|
||||
func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
|
||||
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
|
||||
defer timer.Stop()
|
||||
t := timerPool.Get()
|
||||
timer := t.(*time.Timer)
|
||||
timer.Reset(s.config.ConnectionWriteTimeout)
|
||||
defer func() {
|
||||
timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
timerPool.Put(t)
|
||||
}()
|
||||
|
||||
ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
|
||||
select {
|
||||
@ -349,8 +366,17 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e
|
||||
// the send happens right here, we enforce the connection write timeout if we
|
||||
// can't queue the header to be sent.
|
||||
func (s *Session) sendNoWait(hdr header) error {
|
||||
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
|
||||
defer timer.Stop()
|
||||
t := timerPool.Get()
|
||||
timer := t.(*time.Timer)
|
||||
timer.Reset(s.config.ConnectionWriteTimeout)
|
||||
defer func() {
|
||||
timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
timerPool.Put(t)
|
||||
}()
|
||||
|
||||
select {
|
||||
case s.sendCh <- sendReady{Hdr: hdr}:
|
||||
@ -408,11 +434,20 @@ func (s *Session) recv() {
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
|
||||
var (
|
||||
handlers = []func(*Session, header) error{
|
||||
typeData: (*Session).handleStreamMessage,
|
||||
typeWindowUpdate: (*Session).handleStreamMessage,
|
||||
typePing: (*Session).handlePing,
|
||||
typeGoAway: (*Session).handleGoAway,
|
||||
}
|
||||
)
|
||||
|
||||
// recvLoop continues to receive data until a fatal error is encountered
|
||||
func (s *Session) recvLoop() error {
|
||||
defer close(s.recvDoneCh)
|
||||
hdr := header(make([]byte, headerSize))
|
||||
var handler func(header) error
|
||||
for {
|
||||
// Read the header
|
||||
if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
|
||||
@ -428,22 +463,12 @@ func (s *Session) recvLoop() error {
|
||||
return ErrInvalidVersion
|
||||
}
|
||||
|
||||
// Switch on the type
|
||||
switch hdr.MsgType() {
|
||||
case typeData:
|
||||
handler = s.handleStreamMessage
|
||||
case typeWindowUpdate:
|
||||
handler = s.handleStreamMessage
|
||||
case typeGoAway:
|
||||
handler = s.handleGoAway
|
||||
case typePing:
|
||||
handler = s.handlePing
|
||||
default:
|
||||
mt := hdr.MsgType()
|
||||
if mt < typeData || mt > typeGoAway {
|
||||
return ErrInvalidMsgType
|
||||
}
|
||||
|
||||
// Invoke the handler
|
||||
if err := handler(hdr); err != nil {
|
||||
if err := handlers[mt](s, hdr); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
45
vendor/github.com/hashicorp/yamux/stream.go
generated
vendored
45
vendor/github.com/hashicorp/yamux/stream.go
generated
vendored
@ -47,8 +47,8 @@ type Stream struct {
|
||||
recvNotifyCh chan struct{}
|
||||
sendNotifyCh chan struct{}
|
||||
|
||||
readDeadline time.Time
|
||||
writeDeadline time.Time
|
||||
readDeadline atomic.Value // time.Time
|
||||
writeDeadline atomic.Value // time.Time
|
||||
}
|
||||
|
||||
// newStream is used to construct a new stream within
|
||||
@ -67,6 +67,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
|
||||
recvNotifyCh: make(chan struct{}, 1),
|
||||
sendNotifyCh: make(chan struct{}, 1),
|
||||
}
|
||||
s.readDeadline.Store(time.Time{})
|
||||
s.writeDeadline.Store(time.Time{})
|
||||
return s
|
||||
}
|
||||
|
||||
@ -122,8 +124,9 @@ START:
|
||||
WAIT:
|
||||
var timeout <-chan time.Time
|
||||
var timer *time.Timer
|
||||
if !s.readDeadline.IsZero() {
|
||||
delay := s.readDeadline.Sub(time.Now())
|
||||
readDeadline := s.readDeadline.Load().(time.Time)
|
||||
if !readDeadline.IsZero() {
|
||||
delay := readDeadline.Sub(time.Now())
|
||||
timer = time.NewTimer(delay)
|
||||
timeout = timer.C
|
||||
}
|
||||
@ -188,7 +191,7 @@ START:
|
||||
|
||||
// Send the header
|
||||
s.sendHdr.encode(typeData, flags, s.id, max)
|
||||
if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
|
||||
if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@ -200,8 +203,9 @@ START:
|
||||
|
||||
WAIT:
|
||||
var timeout <-chan time.Time
|
||||
if !s.writeDeadline.IsZero() {
|
||||
delay := s.writeDeadline.Sub(time.Now())
|
||||
writeDeadline := s.writeDeadline.Load().(time.Time)
|
||||
if !writeDeadline.IsZero() {
|
||||
delay := writeDeadline.Sub(time.Now())
|
||||
timeout = time.After(delay)
|
||||
}
|
||||
select {
|
||||
@ -238,18 +242,25 @@ func (s *Stream) sendWindowUpdate() error {
|
||||
|
||||
// Determine the delta update
|
||||
max := s.session.config.MaxStreamWindowSize
|
||||
delta := max - atomic.LoadUint32(&s.recvWindow)
|
||||
var bufLen uint32
|
||||
s.recvLock.Lock()
|
||||
if s.recvBuf != nil {
|
||||
bufLen = uint32(s.recvBuf.Len())
|
||||
}
|
||||
delta := (max - bufLen) - s.recvWindow
|
||||
|
||||
// Determine the flags if any
|
||||
flags := s.sendFlags()
|
||||
|
||||
// Check if we can omit the update
|
||||
if delta < (max/2) && flags == 0 {
|
||||
s.recvLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update our window
|
||||
atomic.AddUint32(&s.recvWindow, delta)
|
||||
s.recvWindow += delta
|
||||
s.recvLock.Unlock()
|
||||
|
||||
// Send the header
|
||||
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
|
||||
@ -392,16 +403,18 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
|
||||
if length == 0 {
|
||||
return nil
|
||||
}
|
||||
if remain := atomic.LoadUint32(&s.recvWindow); length > remain {
|
||||
s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length)
|
||||
return ErrRecvWindowExceeded
|
||||
}
|
||||
|
||||
// Wrap in a limited reader
|
||||
conn = &io.LimitedReader{R: conn, N: int64(length)}
|
||||
|
||||
// Copy into buffer
|
||||
s.recvLock.Lock()
|
||||
|
||||
if length > s.recvWindow {
|
||||
s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length)
|
||||
return ErrRecvWindowExceeded
|
||||
}
|
||||
|
||||
if s.recvBuf == nil {
|
||||
// Allocate the receive buffer just-in-time to fit the full data frame.
|
||||
// This way we can read in the whole packet without further allocations.
|
||||
@ -414,7 +427,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
|
||||
}
|
||||
|
||||
// Decrement the receive window
|
||||
atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
|
||||
s.recvWindow -= length
|
||||
s.recvLock.Unlock()
|
||||
|
||||
// Unblock any readers
|
||||
@ -435,13 +448,13 @@ func (s *Stream) SetDeadline(t time.Time) error {
|
||||
|
||||
// SetReadDeadline sets the deadline for future Read calls.
|
||||
func (s *Stream) SetReadDeadline(t time.Time) error {
|
||||
s.readDeadline = t
|
||||
s.readDeadline.Store(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetWriteDeadline sets the deadline for future Write calls
|
||||
func (s *Stream) SetWriteDeadline(t time.Time) error {
|
||||
s.writeDeadline = t
|
||||
s.writeDeadline.Store(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
15
vendor/github.com/hashicorp/yamux/util.go
generated
vendored
15
vendor/github.com/hashicorp/yamux/util.go
generated
vendored
@ -1,5 +1,20 @@
|
||||
package yamux
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
timerPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
timer := time.NewTimer(time.Hour * 1e6)
|
||||
timer.Stop()
|
||||
return timer
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
// asyncSendErr is used to try an async send of an error
|
||||
func asyncSendErr(ch chan error, err error) {
|
||||
if ch == nil {
|
||||
|
Loading…
Reference in New Issue
Block a user