mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-10-23 21:28:10 +00:00
shimv2: fix the issue of close IO stream
It should wait until the stdin io copy termianted to close the process's io stream, otherwise, it would miss forwarding some contents to process stdin. Fixes: #439 Signed-off-by: fupan.lfp <fupan.lfp@antgroup.com>
This commit is contained in:
@@ -6,6 +6,7 @@
|
||||
package containerdshim
|
||||
|
||||
import (
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/api/types/task"
|
||||
@@ -23,6 +24,8 @@ type container struct {
|
||||
exitTime time.Time
|
||||
execs map[string]*exec
|
||||
exitIOch chan struct{}
|
||||
stdinPipe io.WriteCloser
|
||||
stdinCloser chan struct{}
|
||||
exitCh chan uint32
|
||||
id string
|
||||
stdin string
|
||||
@@ -60,6 +63,7 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con
|
||||
status: task.StatusCreated,
|
||||
exitIOch: make(chan struct{}),
|
||||
exitCh: make(chan uint32, 1),
|
||||
stdinCloser: make(chan struct{}),
|
||||
mounted: mounted,
|
||||
}
|
||||
return c, nil
|
||||
|
@@ -7,6 +7,7 @@ package containerdshim
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -32,6 +33,9 @@ type exec struct {
|
||||
exitIOch chan struct{}
|
||||
exitCh chan uint32
|
||||
|
||||
stdinCloser chan struct{}
|
||||
stdinPipe io.WriteCloser
|
||||
|
||||
exitTime time.Time
|
||||
}
|
||||
|
||||
@@ -113,6 +117,7 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g
|
||||
tty: tty,
|
||||
exitCode: exitCode255,
|
||||
exitIOch: make(chan struct{}),
|
||||
stdinCloser: make(chan struct{}),
|
||||
exitCh: make(chan uint32, 1),
|
||||
status: task.StatusCreated,
|
||||
}
|
||||
|
@@ -747,20 +747,24 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tty := c.ttyio
|
||||
stdin := c.stdinPipe
|
||||
stdinCloser := c.stdinCloser
|
||||
|
||||
if r.ExecID != "" {
|
||||
execs, err := c.getExec(r.ExecID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tty = execs.ttyio
|
||||
stdin = execs.stdinPipe
|
||||
stdinCloser = execs.stdinCloser
|
||||
}
|
||||
|
||||
if tty != nil && tty.Stdin != nil {
|
||||
if err := tty.Stdin.Close(); err != nil {
|
||||
// wait until the stdin io copy terminated, otherwise
|
||||
// some contents would not be forwarded to the process.
|
||||
<-stdinCloser
|
||||
if err := stdin.Close(); err != nil {
|
||||
return nil, errors.Wrap(err, "close stdin")
|
||||
}
|
||||
}
|
||||
|
||||
return empty, nil
|
||||
}
|
||||
|
@@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error {
|
||||
return err
|
||||
}
|
||||
|
||||
c.stdinPipe = stdin
|
||||
|
||||
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
|
||||
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.ttyio = tty
|
||||
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
|
||||
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
|
||||
} else {
|
||||
//close the io exit channel, since there is no io for this container,
|
||||
//otherwise the following wait goroutine will hang on this channel.
|
||||
close(c.exitIOch)
|
||||
//close the stdin closer channel to notify that it's safe to close process's
|
||||
// io.
|
||||
close(c.stdinCloser)
|
||||
}
|
||||
|
||||
go wait(s, c, "")
|
||||
@@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
execs.stdinPipe = stdin
|
||||
|
||||
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
execs.ttyio = tty
|
||||
|
||||
go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr)
|
||||
go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
|
||||
|
||||
go wait(s, c, execID)
|
||||
|
||||
|
@@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
|
||||
return ttyIO, nil
|
||||
}
|
||||
|
||||
func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
|
||||
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
|
||||
var wg sync.WaitGroup
|
||||
var closeOnce sync.Once
|
||||
|
||||
@@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi
|
||||
p := bufPool.Get().(*[]byte)
|
||||
defer bufPool.Put(p)
|
||||
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
|
||||
// notify that we can close process's io safely.
|
||||
close(stdinCloser)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
Reference in New Issue
Block a user