Merge pull request #440 from lifupan/2.0-dev

shimv2: fix the issue  of close IO stream
This commit is contained in:
Bin Liu 2020-08-07 11:28:44 +08:00 committed by GitHub
commit 0a233ff4bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 70 additions and 47 deletions

View File

@ -6,6 +6,7 @@
package containerdshim package containerdshim
import ( import (
"io"
"time" "time"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
@ -23,6 +24,8 @@ type container struct {
exitTime time.Time exitTime time.Time
execs map[string]*exec execs map[string]*exec
exitIOch chan struct{} exitIOch chan struct{}
stdinPipe io.WriteCloser
stdinCloser chan struct{}
exitCh chan uint32 exitCh chan uint32
id string id string
stdin string stdin string
@ -60,6 +63,7 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con
status: task.StatusCreated, status: task.StatusCreated,
exitIOch: make(chan struct{}), exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1), exitCh: make(chan uint32, 1),
stdinCloser: make(chan struct{}),
mounted: mounted, mounted: mounted,
} }
return c, nil return c, nil

View File

@ -7,6 +7,7 @@ package containerdshim
import ( import (
"fmt" "fmt"
"io"
"strings" "strings"
"time" "time"
@ -32,6 +33,9 @@ type exec struct {
exitIOch chan struct{} exitIOch chan struct{}
exitCh chan uint32 exitCh chan uint32
stdinCloser chan struct{}
stdinPipe io.WriteCloser
exitTime time.Time exitTime time.Time
} }
@ -113,6 +117,7 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g
tty: tty, tty: tty,
exitCode: exitCode255, exitCode: exitCode255,
exitIOch: make(chan struct{}), exitIOch: make(chan struct{}),
stdinCloser: make(chan struct{}),
exitCh: make(chan uint32, 1), exitCh: make(chan uint32, 1),
status: task.StatusCreated, status: task.StatusCreated,
} }

View File

@ -753,20 +753,24 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
return nil, err return nil, err
} }
tty := c.ttyio stdin := c.stdinPipe
stdinCloser := c.stdinCloser
if r.ExecID != "" { if r.ExecID != "" {
execs, err := c.getExec(r.ExecID) execs, err := c.getExec(r.ExecID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tty = execs.ttyio stdin = execs.stdinPipe
stdinCloser = execs.stdinCloser
} }
if tty != nil && tty.Stdin != nil { // wait until the stdin io copy terminated, otherwise
if err := tty.Stdin.Close(); err != nil { // some contents would not be forwarded to the process.
<-stdinCloser
if err := stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin") return nil, errors.Wrap(err, "close stdin")
} }
}
return empty, nil return empty, nil
} }

View File

@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error {
return err return err
} }
c.stdinPipe = stdin
if c.stdin != "" || c.stdout != "" || c.stderr != "" { if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal) tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil { if err != nil {
return err return err
} }
c.ttyio = tty c.ttyio = tty
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr) go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
} else { } else {
//close the io exit channel, since there is no io for this container, //close the io exit channel, since there is no io for this container,
//otherwise the following wait goroutine will hang on this channel. //otherwise the following wait goroutine will hang on this channel.
close(c.exitIOch) close(c.exitIOch)
//close the stdin closer channel to notify that it's safe to close process's
// io.
close(c.stdinCloser)
} }
go wait(s, c, "") go wait(s, c, "")
@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex
if err != nil { if err != nil {
return nil, err return nil, err
} }
execs.stdinPipe = stdin
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil { if err != nil {
return nil, err return nil, err
} }
execs.ttyio = tty execs.ttyio = tty
go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr) go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
go wait(s, c, execID) go wait(s, c, execID)

View File

@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
return ttyIO, nil return ttyIO, nil
} }
func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup var wg sync.WaitGroup
var closeOnce sync.Once var closeOnce sync.Once
@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi
p := bufPool.Get().(*[]byte) p := bufPool.Get().(*[]byte)
defer bufPool.Put(p) defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p) io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done() wg.Done()
}() }()
} }