shimv2: fix the issue of close IO stream

It should wait until the stdin io copy
termianted to close the process's io stream,
otherwise, it would miss forwarding some contents
to process stdin.

Fixes: #439

Signed-off-by: fupan.lfp <fupan.lfp@antgroup.com>
This commit is contained in:
fupan.lfp 2020-07-21 20:48:47 +08:00 committed by fupan.lfp
parent c5c3f5c31d
commit bd78ccaf31
5 changed files with 70 additions and 47 deletions

View File

@ -6,6 +6,7 @@
package containerdshim package containerdshim
import ( import (
"io"
"time" "time"
"github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/api/types/task"
@ -23,6 +24,8 @@ type container struct {
exitTime time.Time exitTime time.Time
execs map[string]*exec execs map[string]*exec
exitIOch chan struct{} exitIOch chan struct{}
stdinPipe io.WriteCloser
stdinCloser chan struct{}
exitCh chan uint32 exitCh chan uint32
id string id string
stdin string stdin string
@ -60,6 +63,7 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con
status: task.StatusCreated, status: task.StatusCreated,
exitIOch: make(chan struct{}), exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1), exitCh: make(chan uint32, 1),
stdinCloser: make(chan struct{}),
mounted: mounted, mounted: mounted,
} }
return c, nil return c, nil

View File

@ -7,6 +7,7 @@ package containerdshim
import ( import (
"fmt" "fmt"
"io"
"strings" "strings"
"time" "time"
@ -32,6 +33,9 @@ type exec struct {
exitIOch chan struct{} exitIOch chan struct{}
exitCh chan uint32 exitCh chan uint32
stdinCloser chan struct{}
stdinPipe io.WriteCloser
exitTime time.Time exitTime time.Time
} }
@ -113,6 +117,7 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g
tty: tty, tty: tty,
exitCode: exitCode255, exitCode: exitCode255,
exitIOch: make(chan struct{}), exitIOch: make(chan struct{}),
stdinCloser: make(chan struct{}),
exitCh: make(chan uint32, 1), exitCh: make(chan uint32, 1),
status: task.StatusCreated, status: task.StatusCreated,
} }

View File

@ -747,20 +747,24 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
return nil, err return nil, err
} }
tty := c.ttyio stdin := c.stdinPipe
stdinCloser := c.stdinCloser
if r.ExecID != "" { if r.ExecID != "" {
execs, err := c.getExec(r.ExecID) execs, err := c.getExec(r.ExecID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
tty = execs.ttyio stdin = execs.stdinPipe
stdinCloser = execs.stdinCloser
} }
if tty != nil && tty.Stdin != nil { // wait until the stdin io copy terminated, otherwise
if err := tty.Stdin.Close(); err != nil { // some contents would not be forwarded to the process.
<-stdinCloser
if err := stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin") return nil, errors.Wrap(err, "close stdin")
} }
}
return empty, nil return empty, nil
} }

View File

@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error {
return err return err
} }
c.stdinPipe = stdin
if c.stdin != "" || c.stdout != "" || c.stderr != "" { if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal) tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil { if err != nil {
return err return err
} }
c.ttyio = tty c.ttyio = tty
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr) go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
} else { } else {
//close the io exit channel, since there is no io for this container, //close the io exit channel, since there is no io for this container,
//otherwise the following wait goroutine will hang on this channel. //otherwise the following wait goroutine will hang on this channel.
close(c.exitIOch) close(c.exitIOch)
//close the stdin closer channel to notify that it's safe to close process's
// io.
close(c.stdinCloser)
} }
go wait(s, c, "") go wait(s, c, "")
@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex
if err != nil { if err != nil {
return nil, err return nil, err
} }
execs.stdinPipe = stdin
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal) tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil { if err != nil {
return nil, err return nil, err
} }
execs.ttyio = tty execs.ttyio = tty
go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr) go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
go wait(s, c, execID) go wait(s, c, execID)

View File

@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
return ttyIO, nil return ttyIO, nil
} }
func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) { func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup var wg sync.WaitGroup
var closeOnce sync.Once var closeOnce sync.Once
@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi
p := bufPool.Get().(*[]byte) p := bufPool.Get().(*[]byte)
defer bufPool.Put(p) defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p) io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done() wg.Done()
}() }()
} }