mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-27 15:57:09 +00:00
shimv2: fix the issue of close IO stream
It should wait until the stdin io copy termianted to close the process's io stream, otherwise, it would miss forwarding some contents to process stdin. Fixes: #439 Signed-off-by: fupan.lfp <fupan.lfp@antgroup.com>
This commit is contained in:
parent
c5c3f5c31d
commit
bd78ccaf31
@ -6,6 +6,7 @@
|
|||||||
package containerdshim
|
package containerdshim
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/api/types/task"
|
"github.com/containerd/containerd/api/types/task"
|
||||||
@ -23,6 +24,8 @@ type container struct {
|
|||||||
exitTime time.Time
|
exitTime time.Time
|
||||||
execs map[string]*exec
|
execs map[string]*exec
|
||||||
exitIOch chan struct{}
|
exitIOch chan struct{}
|
||||||
|
stdinPipe io.WriteCloser
|
||||||
|
stdinCloser chan struct{}
|
||||||
exitCh chan uint32
|
exitCh chan uint32
|
||||||
id string
|
id string
|
||||||
stdin string
|
stdin string
|
||||||
@ -60,6 +63,7 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con
|
|||||||
status: task.StatusCreated,
|
status: task.StatusCreated,
|
||||||
exitIOch: make(chan struct{}),
|
exitIOch: make(chan struct{}),
|
||||||
exitCh: make(chan uint32, 1),
|
exitCh: make(chan uint32, 1),
|
||||||
|
stdinCloser: make(chan struct{}),
|
||||||
mounted: mounted,
|
mounted: mounted,
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
|
@ -7,6 +7,7 @@ package containerdshim
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -32,6 +33,9 @@ type exec struct {
|
|||||||
exitIOch chan struct{}
|
exitIOch chan struct{}
|
||||||
exitCh chan uint32
|
exitCh chan uint32
|
||||||
|
|
||||||
|
stdinCloser chan struct{}
|
||||||
|
stdinPipe io.WriteCloser
|
||||||
|
|
||||||
exitTime time.Time
|
exitTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,6 +117,7 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g
|
|||||||
tty: tty,
|
tty: tty,
|
||||||
exitCode: exitCode255,
|
exitCode: exitCode255,
|
||||||
exitIOch: make(chan struct{}),
|
exitIOch: make(chan struct{}),
|
||||||
|
stdinCloser: make(chan struct{}),
|
||||||
exitCh: make(chan uint32, 1),
|
exitCh: make(chan uint32, 1),
|
||||||
status: task.StatusCreated,
|
status: task.StatusCreated,
|
||||||
}
|
}
|
||||||
|
@ -747,20 +747,24 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tty := c.ttyio
|
stdin := c.stdinPipe
|
||||||
|
stdinCloser := c.stdinCloser
|
||||||
|
|
||||||
if r.ExecID != "" {
|
if r.ExecID != "" {
|
||||||
execs, err := c.getExec(r.ExecID)
|
execs, err := c.getExec(r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tty = execs.ttyio
|
stdin = execs.stdinPipe
|
||||||
|
stdinCloser = execs.stdinCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
if tty != nil && tty.Stdin != nil {
|
// wait until the stdin io copy terminated, otherwise
|
||||||
if err := tty.Stdin.Close(); err != nil {
|
// some contents would not be forwarded to the process.
|
||||||
|
<-stdinCloser
|
||||||
|
if err := stdin.Close(); err != nil {
|
||||||
return nil, errors.Wrap(err, "close stdin")
|
return nil, errors.Wrap(err, "close stdin")
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return empty, nil
|
return empty, nil
|
||||||
}
|
}
|
||||||
|
@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.stdinPipe = stdin
|
||||||
|
|
||||||
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
|
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
|
||||||
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
|
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.ttyio = tty
|
c.ttyio = tty
|
||||||
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
|
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
|
||||||
} else {
|
} else {
|
||||||
//close the io exit channel, since there is no io for this container,
|
//close the io exit channel, since there is no io for this container,
|
||||||
//otherwise the following wait goroutine will hang on this channel.
|
//otherwise the following wait goroutine will hang on this channel.
|
||||||
close(c.exitIOch)
|
close(c.exitIOch)
|
||||||
|
//close the stdin closer channel to notify that it's safe to close process's
|
||||||
|
// io.
|
||||||
|
close(c.stdinCloser)
|
||||||
}
|
}
|
||||||
|
|
||||||
go wait(s, c, "")
|
go wait(s, c, "")
|
||||||
@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
execs.stdinPipe = stdin
|
||||||
|
|
||||||
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
|
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
execs.ttyio = tty
|
execs.ttyio = tty
|
||||||
|
|
||||||
go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr)
|
go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
|
||||||
|
|
||||||
go wait(s, c, execID)
|
go wait(s, c, execID)
|
||||||
|
|
||||||
|
@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
|
|||||||
return ttyIO, nil
|
return ttyIO, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
|
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var closeOnce sync.Once
|
var closeOnce sync.Once
|
||||||
|
|
||||||
@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi
|
|||||||
p := bufPool.Get().(*[]byte)
|
p := bufPool.Get().(*[]byte)
|
||||||
defer bufPool.Put(p)
|
defer bufPool.Put(p)
|
||||||
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
|
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
|
||||||
|
// notify that we can close process's io safely.
|
||||||
|
close(stdinCloser)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user