From bd78ccaf31d6f30c74031481d79c6bfe9dfc87f1 Mon Sep 17 00:00:00 2001 From: "fupan.lfp" Date: Tue, 21 Jul 2020 20:48:47 +0800 Subject: [PATCH] shimv2: fix the issue of close IO stream It should wait until the stdin io copy termianted to close the process's io stream, otherwise, it would miss forwarding some contents to process stdin. Fixes: #439 Signed-off-by: fupan.lfp --- src/runtime/containerd-shim-v2/container.go | 66 +++++++++++---------- src/runtime/containerd-shim-v2/exec.go | 19 +++--- src/runtime/containerd-shim-v2/service.go | 16 +++-- src/runtime/containerd-shim-v2/start.go | 12 +++- src/runtime/containerd-shim-v2/stream.go | 4 +- 5 files changed, 70 insertions(+), 47 deletions(-) diff --git a/src/runtime/containerd-shim-v2/container.go b/src/runtime/containerd-shim-v2/container.go index 8649176a88..faea0e26ab 100644 --- a/src/runtime/containerd-shim-v2/container.go +++ b/src/runtime/containerd-shim-v2/container.go @@ -6,6 +6,7 @@ package containerdshim import ( + "io" "time" "github.com/containerd/containerd/api/types/task" @@ -17,23 +18,25 @@ import ( ) type container struct { - s *service - ttyio *ttyIO - spec *specs.Spec - exitTime time.Time - execs map[string]*exec - exitIOch chan struct{} - exitCh chan uint32 - id string - stdin string - stdout string - stderr string - bundle string - cType vc.ContainerType - exit uint32 - status task.Status - terminal bool - mounted bool + s *service + ttyio *ttyIO + spec *specs.Spec + exitTime time.Time + execs map[string]*exec + exitIOch chan struct{} + stdinPipe io.WriteCloser + stdinCloser chan struct{} + exitCh chan uint32 + id string + stdin string + stdout string + stderr string + bundle string + cType vc.ContainerType + exit uint32 + status task.Status + terminal bool + mounted bool } func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) { @@ -47,20 +50,21 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con } c := &container{ - s: s, - spec: spec, - id: r.ID, - bundle: r.Bundle, - stdin: r.Stdin, - stdout: r.Stdout, - stderr: r.Stderr, - terminal: r.Terminal, - cType: containerType, - execs: make(map[string]*exec), - status: task.StatusCreated, - exitIOch: make(chan struct{}), - exitCh: make(chan uint32, 1), - mounted: mounted, + s: s, + spec: spec, + id: r.ID, + bundle: r.Bundle, + stdin: r.Stdin, + stdout: r.Stdout, + stderr: r.Stderr, + terminal: r.Terminal, + cType: containerType, + execs: make(map[string]*exec), + status: task.StatusCreated, + exitIOch: make(chan struct{}), + exitCh: make(chan uint32, 1), + stdinCloser: make(chan struct{}), + mounted: mounted, } return c, nil } diff --git a/src/runtime/containerd-shim-v2/exec.go b/src/runtime/containerd-shim-v2/exec.go index 7fa2b016c5..a4cbff922d 100644 --- a/src/runtime/containerd-shim-v2/exec.go +++ b/src/runtime/containerd-shim-v2/exec.go @@ -7,6 +7,7 @@ package containerdshim import ( "fmt" + "io" "strings" "time" @@ -32,6 +33,9 @@ type exec struct { exitIOch chan struct{} exitCh chan uint32 + stdinCloser chan struct{} + stdinPipe io.WriteCloser + exitTime time.Time } @@ -108,13 +112,14 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g } exec := &exec{ - container: c, - cmds: cmds, - tty: tty, - exitCode: exitCode255, - exitIOch: make(chan struct{}), - exitCh: make(chan uint32, 1), - status: task.StatusCreated, + container: c, + cmds: cmds, + tty: tty, + exitCode: exitCode255, + exitIOch: make(chan struct{}), + stdinCloser: make(chan struct{}), + exitCh: make(chan uint32, 1), + status: task.StatusCreated, } return exec, nil diff --git a/src/runtime/containerd-shim-v2/service.go b/src/runtime/containerd-shim-v2/service.go index 59b81a9148..0690bc8888 100644 --- a/src/runtime/containerd-shim-v2/service.go +++ b/src/runtime/containerd-shim-v2/service.go @@ -747,19 +747,23 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt return nil, err } - tty := c.ttyio + stdin := c.stdinPipe + stdinCloser := c.stdinCloser + if r.ExecID != "" { execs, err := c.getExec(r.ExecID) if err != nil { return nil, err } - tty = execs.ttyio + stdin = execs.stdinPipe + stdinCloser = execs.stdinCloser } - if tty != nil && tty.Stdin != nil { - if err := tty.Stdin.Close(); err != nil { - return nil, errors.Wrap(err, "close stdin") - } + // wait until the stdin io copy terminated, otherwise + // some contents would not be forwarded to the process. + <-stdinCloser + if err := stdin.Close(); err != nil { + return nil, errors.Wrap(err, "close stdin") } return empty, nil diff --git a/src/runtime/containerd-shim-v2/start.go b/src/runtime/containerd-shim-v2/start.go index afcb327fc8..2f57948bac 100644 --- a/src/runtime/containerd-shim-v2/start.go +++ b/src/runtime/containerd-shim-v2/start.go @@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error { return err } + c.stdinPipe = stdin + if c.stdin != "" || c.stdout != "" || c.stderr != "" { tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal) if err != nil { return err } c.ttyio = tty - go ioCopy(c.exitIOch, tty, stdin, stdout, stderr) + go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr) } else { //close the io exit channel, since there is no io for this container, //otherwise the following wait goroutine will hang on this channel. close(c.exitIOch) + //close the stdin closer channel to notify that it's safe to close process's + // io. + close(c.stdinCloser) } go wait(s, c, "") @@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex if err != nil { return nil, err } + + execs.stdinPipe = stdin + tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) if err != nil { return nil, err } execs.ttyio = tty - go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr) + go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr) go wait(s, c, execID) diff --git a/src/runtime/containerd-shim-v2/stream.go b/src/runtime/containerd-shim-v2/stream.go index a16cbba0c4..5f3f3c2a91 100644 --- a/src/runtime/containerd-shim-v2/stream.go +++ b/src/runtime/containerd-shim-v2/stream.go @@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) ( return ttyIO, nil } -func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { +func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { var wg sync.WaitGroup var closeOnce sync.Once @@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi p := bufPool.Get().(*[]byte) defer bufPool.Put(p) io.CopyBuffer(stdinPipe, tty.Stdin, *p) + // notify that we can close process's io safely. + close(stdinCloser) wg.Done() }() }