shimv2: fix the issue of close IO stream

It should wait until the stdin io copy
termianted to close the process's io stream,
otherwise, it would miss forwarding some contents
to process stdin.

Fixes: #439

Signed-off-by: fupan.lfp <fupan.lfp@antgroup.com>
This commit is contained in:
fupan.lfp 2020-07-21 20:48:47 +08:00 committed by fupan.lfp
parent c5c3f5c31d
commit bd78ccaf31
5 changed files with 70 additions and 47 deletions

View File

@ -6,6 +6,7 @@
package containerdshim
import (
"io"
"time"
"github.com/containerd/containerd/api/types/task"
@ -17,23 +18,25 @@ import (
)
type container struct {
s *service
ttyio *ttyIO
spec *specs.Spec
exitTime time.Time
execs map[string]*exec
exitIOch chan struct{}
exitCh chan uint32
id string
stdin string
stdout string
stderr string
bundle string
cType vc.ContainerType
exit uint32
status task.Status
terminal bool
mounted bool
s *service
ttyio *ttyIO
spec *specs.Spec
exitTime time.Time
execs map[string]*exec
exitIOch chan struct{}
stdinPipe io.WriteCloser
stdinCloser chan struct{}
exitCh chan uint32
id string
stdin string
stdout string
stderr string
bundle string
cType vc.ContainerType
exit uint32
status task.Status
terminal bool
mounted bool
}
func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) {
@ -47,20 +50,21 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con
}
c := &container{
s: s,
spec: spec,
id: r.ID,
bundle: r.Bundle,
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
cType: containerType,
execs: make(map[string]*exec),
status: task.StatusCreated,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
mounted: mounted,
s: s,
spec: spec,
id: r.ID,
bundle: r.Bundle,
stdin: r.Stdin,
stdout: r.Stdout,
stderr: r.Stderr,
terminal: r.Terminal,
cType: containerType,
execs: make(map[string]*exec),
status: task.StatusCreated,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
stdinCloser: make(chan struct{}),
mounted: mounted,
}
return c, nil
}

View File

@ -7,6 +7,7 @@ package containerdshim
import (
"fmt"
"io"
"strings"
"time"
@ -32,6 +33,9 @@ type exec struct {
exitIOch chan struct{}
exitCh chan uint32
stdinCloser chan struct{}
stdinPipe io.WriteCloser
exitTime time.Time
}
@ -108,13 +112,14 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g
}
exec := &exec{
container: c,
cmds: cmds,
tty: tty,
exitCode: exitCode255,
exitIOch: make(chan struct{}),
exitCh: make(chan uint32, 1),
status: task.StatusCreated,
container: c,
cmds: cmds,
tty: tty,
exitCode: exitCode255,
exitIOch: make(chan struct{}),
stdinCloser: make(chan struct{}),
exitCh: make(chan uint32, 1),
status: task.StatusCreated,
}
return exec, nil

View File

@ -747,19 +747,23 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
return nil, err
}
tty := c.ttyio
stdin := c.stdinPipe
stdinCloser := c.stdinCloser
if r.ExecID != "" {
execs, err := c.getExec(r.ExecID)
if err != nil {
return nil, err
}
tty = execs.ttyio
stdin = execs.stdinPipe
stdinCloser = execs.stdinCloser
}
if tty != nil && tty.Stdin != nil {
if err := tty.Stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin")
}
// wait until the stdin io copy terminated, otherwise
// some contents would not be forwarded to the process.
<-stdinCloser
if err := stdin.Close(); err != nil {
return nil, errors.Wrap(err, "close stdin")
}
return empty, nil

View File

@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error {
return err
}
c.stdinPipe = stdin
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
if err != nil {
return err
}
c.ttyio = tty
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
} else {
//close the io exit channel, since there is no io for this container,
//otherwise the following wait goroutine will hang on this channel.
close(c.exitIOch)
//close the stdin closer channel to notify that it's safe to close process's
// io.
close(c.stdinCloser)
}
go wait(s, c, "")
@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex
if err != nil {
return nil, err
}
execs.stdinPipe = stdin
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
if err != nil {
return nil, err
}
execs.ttyio = tty
go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr)
go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
go wait(s, c, execID)

View File

@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
return ttyIO, nil
}
func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
var wg sync.WaitGroup
var closeOnce sync.Once
@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi
p := bufPool.Get().(*[]byte)
defer bufPool.Put(p)
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
// notify that we can close process's io safely.
close(stdinCloser)
wg.Done()
}()
}