diff --git a/go.mod b/go.mod index 351606c6e70..d0bbac05d78 100644 --- a/go.mod +++ b/go.mod @@ -150,7 +150,7 @@ require ( github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups v1.1.0 // indirect github.com/containerd/console v1.0.3 // indirect - github.com/containerd/ttrpc v1.2.1 // indirect + github.com/containerd/ttrpc v1.2.2 // indirect github.com/coredns/caddy v1.1.1 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 1184959130c..b3fdce5bdb7 100644 --- a/go.sum +++ b/go.sum @@ -163,8 +163,8 @@ github.com/containerd/continuity v0.1.0/go.mod h1:ICJu0PwR54nI0yPEnJ6jcS+J7CZAUX github.com/containerd/fifo v1.0.0/go.mod h1:ocF/ME1SX5b1AOlWi9r677YJmCPSwwWnQ9O123vzpE4= github.com/containerd/go-runc v1.0.0/go.mod h1:cNU0ZbCgCQVZK4lgG3P+9tn9/PaJNmoDXPpoJhDR+Ok= github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= -github.com/containerd/ttrpc v1.2.1 h1:VWv/Rzx023TBLv4WQ+9WPXlBG/s3rsRjY3i9AJ2BJdE= -github.com/containerd/ttrpc v1.2.1/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= +github.com/containerd/ttrpc v1.2.2 h1:9vqZr0pxwOF5koz6N0N3kJ0zDHokrcPxIR/ZR2YFtOs= +github.com/containerd/ttrpc v1.2.2/go.mod h1:sIT6l32Ph/H9cvnJsfXM5drIVzTr5A2flTf1G5tYZak= github.com/containerd/typeurl v1.0.2 h1:Chlt8zIieDbzQFzXzAeBEF92KhExuE4p9p92/QmY7aY= github.com/containerd/typeurl v1.0.2/go.mod h1:9trJWW2sRlGub4wZJRTW83VtbOLS6hwcDZXTn6oPz9s= github.com/coredns/caddy v1.1.0/go.mod h1:A6ntJQlAWuQfFlsd9hvigKbo2WS0VUs2l1e2F+BawD4= diff --git a/vendor/github.com/containerd/ttrpc/.golangci.yml b/vendor/github.com/containerd/ttrpc/.golangci.yml index c8be4980c2b..6462e52f66f 100644 --- a/vendor/github.com/containerd/ttrpc/.golangci.yml +++ b/vendor/github.com/containerd/ttrpc/.golangci.yml @@ -1,7 +1,5 @@ linters: enable: - - structcheck - - varcheck - staticcheck - unconvert - gofmt diff --git a/vendor/github.com/containerd/ttrpc/Makefile b/vendor/github.com/containerd/ttrpc/Makefile index 474194239d8..c3a497dcac0 100644 --- a/vendor/github.com/containerd/ttrpc/Makefile +++ b/vendor/github.com/containerd/ttrpc/Makefile @@ -151,7 +151,7 @@ install-protobuild: coverage: ## generate coverprofiles from the unit tests, except tests that require root @echo "$(WHALE) $@" @rm -f coverage.txt - @$(GO) test -i ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null + @$(GO) test ${TESTFLAGS} ${TESTPACKAGES} 2> /dev/null @( for pkg in ${PACKAGES}; do \ $(GO) test ${TESTFLAGS} \ -cover \ diff --git a/vendor/github.com/containerd/ttrpc/client.go b/vendor/github.com/containerd/ttrpc/client.go index 0abc7025d63..4b1e1e709ba 100644 --- a/vendor/github.com/containerd/ttrpc/client.go +++ b/vendor/github.com/containerd/ttrpc/client.go @@ -214,60 +214,66 @@ func (cs *clientStream) RecvMsg(m interface{}) error { if cs.remoteClosed { return io.EOF } + + var msg *streamMessage select { case <-cs.ctx.Done(): return cs.ctx.Err() - case msg, ok := <-cs.s.recv: - if !ok { + case <-cs.s.recvClose: + // If recv has a pending message, process that first + select { + case msg = <-cs.s.recv: + default: return cs.s.recvErr } + case msg = <-cs.s.recv: + } - if msg.header.Type == messageTypeResponse { - resp := &Response{} - err := proto.Unmarshal(msg.payload[:msg.header.Length], resp) - // return the payload buffer for reuse - cs.c.channel.putmbuf(msg.payload) - if err != nil { - return err - } + if msg.header.Type == messageTypeResponse { + resp := &Response{} + err := proto.Unmarshal(msg.payload[:msg.header.Length], resp) + // return the payload buffer for reuse + cs.c.channel.putmbuf(msg.payload) + if err != nil { + return err + } - if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil { - return err - } + if err := cs.c.codec.Unmarshal(resp.Payload, m); err != nil { + return err + } - if resp.Status != nil && resp.Status.Code != int32(codes.OK) { - return status.ErrorProto(resp.Status) - } + if resp.Status != nil && resp.Status.Code != int32(codes.OK) { + return status.ErrorProto(resp.Status) + } + cs.c.deleteStream(cs.s) + cs.remoteClosed = true + + return nil + } else if msg.header.Type == messageTypeData { + if !cs.desc.StreamingServer { + cs.c.deleteStream(cs.s) + cs.remoteClosed = true + return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol) + } + if msg.header.Flags&flagRemoteClosed == flagRemoteClosed { cs.c.deleteStream(cs.s) cs.remoteClosed = true - return nil - } else if msg.header.Type == messageTypeData { - if !cs.desc.StreamingServer { - cs.c.deleteStream(cs.s) - cs.remoteClosed = true - return fmt.Errorf("received data from non-streaming server: %w", ErrProtocol) + if msg.header.Flags&flagNoData == flagNoData { + return io.EOF } - if msg.header.Flags&flagRemoteClosed == flagRemoteClosed { - cs.c.deleteStream(cs.s) - cs.remoteClosed = true - - if msg.header.Flags&flagNoData == flagNoData { - return io.EOF - } - } - - err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m) - cs.c.channel.putmbuf(msg.payload) - if err != nil { - return err - } - return nil } - return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) + err := cs.c.codec.Unmarshal(msg.payload[:msg.header.Length], m) + cs.c.channel.putmbuf(msg.payload) + if err != nil { + return err + } + return nil } + + return fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) } // Close closes the ttrpc connection and underlying connection @@ -477,25 +483,30 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err } defer c.deleteStream(s) + var msg *streamMessage select { case <-ctx.Done(): return ctx.Err() case <-c.ctx.Done(): return ErrClosed - case msg, ok := <-s.recv: - if !ok { + case <-s.recvClose: + // If recv has a pending message, process that first + select { + case msg = <-s.recv: + default: return s.recvErr } - - if msg.header.Type == messageTypeResponse { - err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) - } else { - err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) - } - - // return the payload buffer for reuse - c.channel.putmbuf(msg.payload) - - return err + case msg = <-s.recv: } + + if msg.header.Type == messageTypeResponse { + err = proto.Unmarshal(msg.payload[:msg.header.Length], resp) + } else { + err = fmt.Errorf("unexpected %q message received: %w", msg.header.Type, ErrProtocol) + } + + // return the payload buffer for reuse + c.channel.putmbuf(msg.payload) + + return err } diff --git a/vendor/github.com/containerd/ttrpc/server.go b/vendor/github.com/containerd/ttrpc/server.go index 2efda2bc905..7af59f828e5 100644 --- a/vendor/github.com/containerd/ttrpc/server.go +++ b/vendor/github.com/containerd/ttrpc/server.go @@ -547,7 +547,7 @@ func (c *serverConn) run(sctx context.Context) { // branch. Basically, it means that we are no longer receiving // requests due to a terminal error. recvErr = nil // connection is now "closing" - if err == io.EOF || err == io.ErrUnexpectedEOF || errors.Is(err, syscall.ECONNRESET) { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.ECONNRESET) { // The client went away and we should stop processing // requests, so that the client connection is closed return diff --git a/vendor/github.com/containerd/ttrpc/stream.go b/vendor/github.com/containerd/ttrpc/stream.go index 5f264fe6710..739a4c9675b 100644 --- a/vendor/github.com/containerd/ttrpc/stream.go +++ b/vendor/github.com/containerd/ttrpc/stream.go @@ -35,27 +35,26 @@ type stream struct { closeOnce sync.Once recvErr error + recvClose chan struct{} } func newStream(id streamID, send sender) *stream { return &stream{ - id: id, - sender: send, - recv: make(chan *streamMessage, 1), + id: id, + sender: send, + recv: make(chan *streamMessage, 1), + recvClose: make(chan struct{}), } } func (s *stream) closeWithError(err error) error { s.closeOnce.Do(func() { - if s.recv != nil { - close(s.recv) - if err != nil { - s.recvErr = err - } else { - s.recvErr = ErrClosed - } - + if err != nil { + s.recvErr = err + } else { + s.recvErr = ErrClosed } + close(s.recvClose) }) return nil } @@ -65,10 +64,14 @@ func (s *stream) send(mt messageType, flags uint8, b []byte) error { } func (s *stream) receive(ctx context.Context, msg *streamMessage) error { - if s.recvErr != nil { + select { + case <-s.recvClose: return s.recvErr + default: } select { + case <-s.recvClose: + return s.recvErr case s.recv <- msg: return nil case <-ctx.Done(): diff --git a/vendor/modules.txt b/vendor/modules.txt index a97d22d0dc9..a4f04b373e7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -146,7 +146,7 @@ github.com/containerd/cgroups/stats/v1 # github.com/containerd/console v1.0.3 ## explicit; go 1.13 github.com/containerd/console -# github.com/containerd/ttrpc v1.2.1 +# github.com/containerd/ttrpc v1.2.2 ## explicit; go 1.13 github.com/containerd/ttrpc # github.com/coredns/caddy v1.1.1