mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-27 15:57:09 +00:00
shimv2: fix the issue of close IO stream
It should wait until the stdin io copy termianted to close the process's io stream, otherwise, it would miss forwarding some contents to process stdin. Fixes: #439 Signed-off-by: fupan.lfp <fupan.lfp@antgroup.com>
This commit is contained in:
parent
c5c3f5c31d
commit
bd78ccaf31
@ -6,6 +6,7 @@
|
|||||||
package containerdshim
|
package containerdshim
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd/api/types/task"
|
"github.com/containerd/containerd/api/types/task"
|
||||||
@ -17,23 +18,25 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type container struct {
|
type container struct {
|
||||||
s *service
|
s *service
|
||||||
ttyio *ttyIO
|
ttyio *ttyIO
|
||||||
spec *specs.Spec
|
spec *specs.Spec
|
||||||
exitTime time.Time
|
exitTime time.Time
|
||||||
execs map[string]*exec
|
execs map[string]*exec
|
||||||
exitIOch chan struct{}
|
exitIOch chan struct{}
|
||||||
exitCh chan uint32
|
stdinPipe io.WriteCloser
|
||||||
id string
|
stdinCloser chan struct{}
|
||||||
stdin string
|
exitCh chan uint32
|
||||||
stdout string
|
id string
|
||||||
stderr string
|
stdin string
|
||||||
bundle string
|
stdout string
|
||||||
cType vc.ContainerType
|
stderr string
|
||||||
exit uint32
|
bundle string
|
||||||
status task.Status
|
cType vc.ContainerType
|
||||||
terminal bool
|
exit uint32
|
||||||
mounted bool
|
status task.Status
|
||||||
|
terminal bool
|
||||||
|
mounted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) {
|
func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.ContainerType, spec *specs.Spec, mounted bool) (*container, error) {
|
||||||
@ -47,20 +50,21 @@ func newContainer(s *service, r *taskAPI.CreateTaskRequest, containerType vc.Con
|
|||||||
}
|
}
|
||||||
|
|
||||||
c := &container{
|
c := &container{
|
||||||
s: s,
|
s: s,
|
||||||
spec: spec,
|
spec: spec,
|
||||||
id: r.ID,
|
id: r.ID,
|
||||||
bundle: r.Bundle,
|
bundle: r.Bundle,
|
||||||
stdin: r.Stdin,
|
stdin: r.Stdin,
|
||||||
stdout: r.Stdout,
|
stdout: r.Stdout,
|
||||||
stderr: r.Stderr,
|
stderr: r.Stderr,
|
||||||
terminal: r.Terminal,
|
terminal: r.Terminal,
|
||||||
cType: containerType,
|
cType: containerType,
|
||||||
execs: make(map[string]*exec),
|
execs: make(map[string]*exec),
|
||||||
status: task.StatusCreated,
|
status: task.StatusCreated,
|
||||||
exitIOch: make(chan struct{}),
|
exitIOch: make(chan struct{}),
|
||||||
exitCh: make(chan uint32, 1),
|
exitCh: make(chan uint32, 1),
|
||||||
mounted: mounted,
|
stdinCloser: make(chan struct{}),
|
||||||
|
mounted: mounted,
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,7 @@ package containerdshim
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -32,6 +33,9 @@ type exec struct {
|
|||||||
exitIOch chan struct{}
|
exitIOch chan struct{}
|
||||||
exitCh chan uint32
|
exitCh chan uint32
|
||||||
|
|
||||||
|
stdinCloser chan struct{}
|
||||||
|
stdinPipe io.WriteCloser
|
||||||
|
|
||||||
exitTime time.Time
|
exitTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,13 +112,14 @@ func newExec(c *container, stdin, stdout, stderr string, terminal bool, jspec *g
|
|||||||
}
|
}
|
||||||
|
|
||||||
exec := &exec{
|
exec := &exec{
|
||||||
container: c,
|
container: c,
|
||||||
cmds: cmds,
|
cmds: cmds,
|
||||||
tty: tty,
|
tty: tty,
|
||||||
exitCode: exitCode255,
|
exitCode: exitCode255,
|
||||||
exitIOch: make(chan struct{}),
|
exitIOch: make(chan struct{}),
|
||||||
exitCh: make(chan uint32, 1),
|
stdinCloser: make(chan struct{}),
|
||||||
status: task.StatusCreated,
|
exitCh: make(chan uint32, 1),
|
||||||
|
status: task.StatusCreated,
|
||||||
}
|
}
|
||||||
|
|
||||||
return exec, nil
|
return exec, nil
|
||||||
|
@ -747,19 +747,23 @@ func (s *service) CloseIO(ctx context.Context, r *taskAPI.CloseIORequest) (_ *pt
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tty := c.ttyio
|
stdin := c.stdinPipe
|
||||||
|
stdinCloser := c.stdinCloser
|
||||||
|
|
||||||
if r.ExecID != "" {
|
if r.ExecID != "" {
|
||||||
execs, err := c.getExec(r.ExecID)
|
execs, err := c.getExec(r.ExecID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tty = execs.ttyio
|
stdin = execs.stdinPipe
|
||||||
|
stdinCloser = execs.stdinCloser
|
||||||
}
|
}
|
||||||
|
|
||||||
if tty != nil && tty.Stdin != nil {
|
// wait until the stdin io copy terminated, otherwise
|
||||||
if err := tty.Stdin.Close(); err != nil {
|
// some contents would not be forwarded to the process.
|
||||||
return nil, errors.Wrap(err, "close stdin")
|
<-stdinCloser
|
||||||
}
|
if err := stdin.Close(); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "close stdin")
|
||||||
}
|
}
|
||||||
|
|
||||||
return empty, nil
|
return empty, nil
|
||||||
|
@ -62,17 +62,22 @@ func startContainer(ctx context.Context, s *service, c *container) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.stdinPipe = stdin
|
||||||
|
|
||||||
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
|
if c.stdin != "" || c.stdout != "" || c.stderr != "" {
|
||||||
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
|
tty, err := newTtyIO(ctx, c.stdin, c.stdout, c.stderr, c.terminal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.ttyio = tty
|
c.ttyio = tty
|
||||||
go ioCopy(c.exitIOch, tty, stdin, stdout, stderr)
|
go ioCopy(c.exitIOch, c.stdinCloser, tty, stdin, stdout, stderr)
|
||||||
} else {
|
} else {
|
||||||
//close the io exit channel, since there is no io for this container,
|
//close the io exit channel, since there is no io for this container,
|
||||||
//otherwise the following wait goroutine will hang on this channel.
|
//otherwise the following wait goroutine will hang on this channel.
|
||||||
close(c.exitIOch)
|
close(c.exitIOch)
|
||||||
|
//close the stdin closer channel to notify that it's safe to close process's
|
||||||
|
// io.
|
||||||
|
close(c.stdinCloser)
|
||||||
}
|
}
|
||||||
|
|
||||||
go wait(s, c, "")
|
go wait(s, c, "")
|
||||||
@ -111,13 +116,16 @@ func startExec(ctx context.Context, s *service, containerID, execID string) (*ex
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
execs.stdinPipe = stdin
|
||||||
|
|
||||||
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
|
tty, err := newTtyIO(ctx, execs.tty.stdin, execs.tty.stdout, execs.tty.stderr, execs.tty.terminal)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
execs.ttyio = tty
|
execs.ttyio = tty
|
||||||
|
|
||||||
go ioCopy(execs.exitIOch, tty, stdin, stdout, stderr)
|
go ioCopy(execs.exitIOch, execs.stdinCloser, tty, stdin, stdout, stderr)
|
||||||
|
|
||||||
go wait(s, c, execID)
|
go wait(s, c, execID)
|
||||||
|
|
||||||
|
@ -85,7 +85,7 @@ func newTtyIO(ctx context.Context, stdin, stdout, stderr string, console bool) (
|
|||||||
return ttyIO, nil
|
return ttyIO, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
|
func ioCopy(exitch, stdinCloser chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPipe, stderrPipe io.Reader) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var closeOnce sync.Once
|
var closeOnce sync.Once
|
||||||
|
|
||||||
@ -95,6 +95,8 @@ func ioCopy(exitch chan struct{}, tty *ttyIO, stdinPipe io.WriteCloser, stdoutPi
|
|||||||
p := bufPool.Get().(*[]byte)
|
p := bufPool.Get().(*[]byte)
|
||||||
defer bufPool.Put(p)
|
defer bufPool.Put(p)
|
||||||
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
|
io.CopyBuffer(stdinPipe, tty.Stdin, *p)
|
||||||
|
// notify that we can close process's io safely.
|
||||||
|
close(stdinCloser)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user