mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-27 07:48:55 +00:00
Merge pull request #259 from bergwolf/sandbox_api_2
add sandbox process operation relay API support
This commit is contained in:
commit
81503d7c69
@ -160,14 +160,32 @@ type agent interface {
|
||||
// stopContainer will tell the agent to stop a container related to a Sandbox.
|
||||
stopContainer(sandbox *Sandbox, c Container) error
|
||||
|
||||
// killContainer will tell the agent to send a signal to a
|
||||
// container related to a Sandbox. If all is true, all processes in
|
||||
// signalProcess will tell the agent to send a signal to a
|
||||
// container or a process related to a Sandbox. If all is true, all processes in
|
||||
// the container will be sent the signal.
|
||||
killContainer(sandbox *Sandbox, c Container, signal syscall.Signal, all bool) error
|
||||
signalProcess(c *Container, processID string, signal syscall.Signal, all bool) error
|
||||
|
||||
// winsizeProcess will tell the agent to set a process' tty size
|
||||
winsizeProcess(c *Container, processID string, height, width uint32) error
|
||||
|
||||
// writeProcessStdin will tell the agent to write a process stdin
|
||||
writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error)
|
||||
|
||||
// closeProcessStdin will tell the agent to close a process stdin
|
||||
closeProcessStdin(c *Container, ProcessID string) error
|
||||
|
||||
// readProcessStdout will tell the agent to read a process stdout
|
||||
readProcessStdout(c *Container, processID string, data []byte) (int, error)
|
||||
|
||||
// readProcessStderr will tell the agent to read a process stderr
|
||||
readProcessStderr(c *Container, processID string, data []byte) (int, error)
|
||||
|
||||
// processListContainer will list the processes running inside the container
|
||||
processListContainer(sandbox *Sandbox, c Container, options ProcessListOptions) (ProcessList, error)
|
||||
|
||||
// waitProcess will wait for the exit code of a process
|
||||
waitProcess(c *Container, processID string) (int32, error)
|
||||
|
||||
// onlineCPUMem will online CPUs and Memory inside the Sandbox.
|
||||
// This function should be called after hot adding vCPUs or Memory.
|
||||
// cpus specifies the number of CPUs that were added and the agent should online
|
||||
|
@ -8,6 +8,7 @@ package virtcontainers
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
@ -663,7 +664,7 @@ func (c *Container) stop() error {
|
||||
// return an error, but instead try to kill it forcefully.
|
||||
if err := waitForShim(c.process.Pid); err != nil {
|
||||
// Force the container to be killed.
|
||||
if err := c.sandbox.agent.killContainer(c.sandbox, *c, syscall.SIGKILL, true); err != nil {
|
||||
if err := c.kill(syscall.SIGKILL, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -684,7 +685,7 @@ func (c *Container) stop() error {
|
||||
// this signal will ensure the container will get killed to match
|
||||
// the state of the shim. This will allow the following call to
|
||||
// stopContainer() to succeed in such particular case.
|
||||
c.sandbox.agent.killContainer(c.sandbox, *c, syscall.SIGKILL, true)
|
||||
c.kill(syscall.SIGKILL, true)
|
||||
|
||||
if err := c.sandbox.agent.stopContainer(c.sandbox, *c); err != nil {
|
||||
return err
|
||||
@ -724,7 +725,21 @@ func (c *Container) enter(cmd Cmd) (*Process, error) {
|
||||
return process, nil
|
||||
}
|
||||
|
||||
func (c *Container) wait(processID string) (int32, error) {
|
||||
if c.state.State != StateReady &&
|
||||
c.state.State != StateRunning {
|
||||
return 0, fmt.Errorf("Container not ready or running, " +
|
||||
"impossible to wait")
|
||||
}
|
||||
|
||||
return c.sandbox.agent.waitProcess(c, processID)
|
||||
}
|
||||
|
||||
func (c *Container) kill(signal syscall.Signal, all bool) error {
|
||||
return c.signalProcess(c.process.Token, signal, all)
|
||||
}
|
||||
|
||||
func (c *Container) signalProcess(processID string, signal syscall.Signal, all bool) error {
|
||||
if c.sandbox.state.State != StateReady && c.sandbox.state.State != StateRunning {
|
||||
return fmt.Errorf("Sandbox not ready or running, impossible to signal the container")
|
||||
}
|
||||
@ -733,7 +748,25 @@ func (c *Container) kill(signal syscall.Signal, all bool) error {
|
||||
return fmt.Errorf("Container not ready or running, impossible to signal the container")
|
||||
}
|
||||
|
||||
return c.sandbox.agent.killContainer(c.sandbox, *c, signal, all)
|
||||
return c.sandbox.agent.signalProcess(c, processID, signal, all)
|
||||
}
|
||||
|
||||
func (c *Container) winsizeProcess(processID string, height, width uint32) error {
|
||||
if c.state.State != StateReady && c.state.State != StateRunning {
|
||||
return fmt.Errorf("Container not ready or running, impossible to signal the container")
|
||||
}
|
||||
|
||||
return c.sandbox.agent.winsizeProcess(c, processID, height, width)
|
||||
}
|
||||
|
||||
func (c *Container) ioStream(processID string) (io.WriteCloser, io.Reader, io.Reader, error) {
|
||||
if c.state.State != StateReady && c.state.State != StateRunning {
|
||||
return nil, nil, nil, fmt.Errorf("Container not ready or running, impossible to signal the container")
|
||||
}
|
||||
|
||||
stream := newIOStream(c.sandbox, c, processID)
|
||||
|
||||
return stream.stdin(), stream.stdout(), stream.stderr(), nil
|
||||
}
|
||||
|
||||
func (c *Container) processList(options ProcessListOptions) (ProcessList, error) {
|
||||
|
@ -359,3 +359,105 @@ func TestContainerEnterErrorsOnContainerStates(t *testing.T) {
|
||||
_, err = c.enter(cmd)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
||||
func TestContainerWaitErrorState(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
c := &Container{
|
||||
sandbox: &Sandbox{
|
||||
state: State{
|
||||
State: StateRunning,
|
||||
},
|
||||
},
|
||||
}
|
||||
processID := "foobar"
|
||||
|
||||
// Container state undefined
|
||||
_, err := c.wait(processID)
|
||||
assert.Error(err)
|
||||
|
||||
// Container paused
|
||||
c.state.State = StatePaused
|
||||
_, err = c.wait(processID)
|
||||
assert.Error(err)
|
||||
|
||||
// Container stopped
|
||||
c.state.State = StateStopped
|
||||
_, err = c.wait(processID)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
||||
func TestKillContainerErrorState(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
c := &Container{
|
||||
sandbox: &Sandbox{
|
||||
state: State{
|
||||
State: StateRunning,
|
||||
},
|
||||
},
|
||||
}
|
||||
// Container state undefined
|
||||
err := c.kill(syscall.SIGKILL, true)
|
||||
assert.Error(err)
|
||||
|
||||
// Container paused
|
||||
c.state.State = StatePaused
|
||||
err = c.kill(syscall.SIGKILL, false)
|
||||
assert.Error(err)
|
||||
|
||||
// Container stopped
|
||||
c.state.State = StateStopped
|
||||
err = c.kill(syscall.SIGKILL, true)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
||||
func TestWinsizeProcessErrorState(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
c := &Container{
|
||||
sandbox: &Sandbox{
|
||||
state: State{
|
||||
State: StateRunning,
|
||||
},
|
||||
},
|
||||
}
|
||||
processID := "foobar"
|
||||
|
||||
// Container state undefined
|
||||
err := c.winsizeProcess(processID, 100, 200)
|
||||
assert.Error(err)
|
||||
|
||||
// Container paused
|
||||
c.state.State = StatePaused
|
||||
err = c.winsizeProcess(processID, 100, 200)
|
||||
assert.Error(err)
|
||||
|
||||
// Container stopped
|
||||
c.state.State = StateStopped
|
||||
err = c.winsizeProcess(processID, 100, 200)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
||||
func TestProcessIOStream(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
c := &Container{
|
||||
sandbox: &Sandbox{
|
||||
state: State{
|
||||
State: StateRunning,
|
||||
},
|
||||
},
|
||||
}
|
||||
processID := "foobar"
|
||||
|
||||
// Container state undefined
|
||||
_, _, _, err := c.ioStream(processID)
|
||||
assert.Error(err)
|
||||
|
||||
// Container paused
|
||||
c.state.State = StatePaused
|
||||
_, _, _, err = c.ioStream(processID)
|
||||
assert.Error(err)
|
||||
|
||||
// Container stopped
|
||||
c.state.State = StateStopped
|
||||
_, _, _, err = c.ioStream(processID)
|
||||
assert.Error(err)
|
||||
}
|
||||
|
@ -571,8 +571,8 @@ func (h *hyper) stopOneContainer(sandboxID string, c Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// killContainer is the agent process signal implementation for hyperstart.
|
||||
func (h *hyper) killContainer(sandbox *Sandbox, c Container, signal syscall.Signal, all bool) error {
|
||||
// signalProcess is the agent process signal implementation for hyperstart.
|
||||
func (h *hyper) signalProcess(c *Container, processID string, signal syscall.Signal, all bool) error {
|
||||
// Send the signal to the shim directly in case the container has not
|
||||
// been started yet.
|
||||
if c.state.State == StateReady {
|
||||
@ -798,11 +798,41 @@ func (h *hyper) sendCmd(proxyCmd hyperstartProxyCmd) (interface{}, error) {
|
||||
}
|
||||
|
||||
func (h *hyper) onlineCPUMem(cpus uint32) error {
|
||||
// cc-agent uses udev to online CPUs automatically
|
||||
// hyperstart-agent uses udev to online CPUs automatically
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *hyper) check() error {
|
||||
// cc-agent does not support check
|
||||
// hyperstart-agent does not support check
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *hyper) waitProcess(c *Container, processID string) (int32, error) {
|
||||
// hyperstart-agent does not support wait process
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (h *hyper) winsizeProcess(c *Container, processID string, height, width uint32) error {
|
||||
// hyperstart-agent does not support winsize process
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *hyper) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
|
||||
// hyperstart-agent does not support stdin write request
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (h *hyper) closeProcessStdin(c *Container, ProcessID string) error {
|
||||
// hyperstart-agent does not support stdin close request
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *hyper) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
|
||||
// hyperstart-agent does not support stdout read request
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (h *hyper) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
|
||||
// hyperstart-agent does not support stderr read request
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -6,6 +6,7 @@
|
||||
package virtcontainers
|
||||
|
||||
import (
|
||||
"io"
|
||||
"syscall"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
@ -57,6 +58,10 @@ type VCSandbox interface {
|
||||
StartContainer(containerID string) (VCContainer, error)
|
||||
StatusContainer(containerID string) (ContainerStatus, error)
|
||||
EnterContainer(containerID string, cmd Cmd) (VCContainer, *Process, error)
|
||||
WaitProcess(containerID, processID string) (int32, error)
|
||||
SignalProcess(containerID, processID string, signal syscall.Signal, all bool) error
|
||||
WinsizeProcess(containerID, processID string, height, width uint32) error
|
||||
IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error)
|
||||
}
|
||||
|
||||
// VCContainer is the Container interface
|
||||
|
91
virtcontainers/iostream.go
Normal file
91
virtcontainers/iostream.go
Normal file
@ -0,0 +1,91 @@
|
||||
// Copyright (c) 2018 HyperHQ Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package virtcontainers
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
)
|
||||
|
||||
type iostream struct {
|
||||
sandbox *Sandbox
|
||||
container *Container
|
||||
process string
|
||||
closed bool
|
||||
}
|
||||
|
||||
// io.WriteCloser
|
||||
type stdinStream struct {
|
||||
*iostream
|
||||
}
|
||||
|
||||
// io.Reader
|
||||
type stdoutStream struct {
|
||||
*iostream
|
||||
}
|
||||
|
||||
// io.Reader
|
||||
type stderrStream struct {
|
||||
*iostream
|
||||
}
|
||||
|
||||
func newIOStream(s *Sandbox, c *Container, proc string) *iostream {
|
||||
return &iostream{
|
||||
sandbox: s,
|
||||
container: c,
|
||||
process: proc,
|
||||
closed: false, // needed to workaround buggy structcheck
|
||||
}
|
||||
}
|
||||
|
||||
func (s *iostream) stdin() io.WriteCloser {
|
||||
return &stdinStream{s}
|
||||
}
|
||||
|
||||
func (s *iostream) stdout() io.Reader {
|
||||
return &stdoutStream{s}
|
||||
}
|
||||
|
||||
func (s *iostream) stderr() io.Reader {
|
||||
return &stderrStream{s}
|
||||
}
|
||||
|
||||
func (s *stdinStream) Write(data []byte) (n int, err error) {
|
||||
if s.closed {
|
||||
return 0, errors.New("stream closed")
|
||||
}
|
||||
|
||||
return s.sandbox.agent.writeProcessStdin(s.container, s.process, data)
|
||||
}
|
||||
|
||||
func (s *stdinStream) Close() error {
|
||||
if s.closed {
|
||||
return errors.New("stream closed")
|
||||
}
|
||||
|
||||
err := s.sandbox.agent.closeProcessStdin(s.container, s.process)
|
||||
if err == nil {
|
||||
s.closed = true
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *stdoutStream) Read(data []byte) (n int, err error) {
|
||||
if s.closed {
|
||||
return 0, errors.New("stream closed")
|
||||
}
|
||||
|
||||
return s.sandbox.agent.readProcessStdout(s.container, s.process, data)
|
||||
}
|
||||
|
||||
func (s *stderrStream) Read(data []byte) (n int, err error) {
|
||||
if s.closed {
|
||||
return 0, errors.New("stream closed")
|
||||
}
|
||||
|
||||
return s.sandbox.agent.readProcessStderr(s.container, s.process, data)
|
||||
}
|
59
virtcontainers/iostream_test.go
Normal file
59
virtcontainers/iostream_test.go
Normal file
@ -0,0 +1,59 @@
|
||||
// Copyright (c) 2018 HyperHQ Inc.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package virtcontainers
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestIOStream(t *testing.T) {
|
||||
hConfig := newHypervisorConfig(nil, nil)
|
||||
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{}, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cleanUp()
|
||||
|
||||
contID := "foo"
|
||||
processID := "bar"
|
||||
config := newTestContainerConfigNoop(contID)
|
||||
c := &Container{
|
||||
sandbox: s,
|
||||
config: &config,
|
||||
}
|
||||
|
||||
stream := newIOStream(s, c, processID)
|
||||
stdin := stream.stdin()
|
||||
stdout := stream.stdout()
|
||||
stderr := stream.stderr()
|
||||
|
||||
buffer := []byte("randombufferdata")
|
||||
_, err = stdin.Write(buffer)
|
||||
assert.Nil(t, err, "stdin write failed: %s", err)
|
||||
|
||||
_, err = stdout.Read(buffer)
|
||||
assert.Nil(t, err, "stdout read failed: %s", err)
|
||||
|
||||
_, err = stderr.Read(buffer)
|
||||
assert.Nil(t, err, "stderr read failed: %s", err)
|
||||
|
||||
err = stdin.Close()
|
||||
assert.Nil(t, err, "stream close failed: %s", err)
|
||||
|
||||
_, err = stdin.Write(buffer)
|
||||
assert.NotNil(t, err, "stdin write closed should fail")
|
||||
|
||||
_, err = stdout.Read(buffer)
|
||||
assert.NotNil(t, err, "stdout read closed should fail")
|
||||
|
||||
_, err = stderr.Read(buffer)
|
||||
assert.NotNil(t, err, "stderr read closed should fail")
|
||||
|
||||
err = stdin.Close()
|
||||
assert.NotNil(t, err, "stdin close closed should fail")
|
||||
}
|
@ -6,7 +6,6 @@
|
||||
package virtcontainers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -23,6 +22,7 @@ import (
|
||||
vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations"
|
||||
ns "github.com/kata-containers/runtime/virtcontainers/pkg/nsenter"
|
||||
"github.com/kata-containers/runtime/virtcontainers/pkg/uuid"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/sirupsen/logrus"
|
||||
@ -860,10 +860,15 @@ func (k *kataAgent) stopContainer(sandbox *Sandbox, c Container) error {
|
||||
return bindUnmountContainerRootfs(kataHostSharedDir, sandbox.id, c.id)
|
||||
}
|
||||
|
||||
func (k *kataAgent) killContainer(sandbox *Sandbox, c Container, signal syscall.Signal, all bool) error {
|
||||
func (k *kataAgent) signalProcess(c *Container, processID string, signal syscall.Signal, all bool) error {
|
||||
execID := processID
|
||||
if all {
|
||||
// kata agent uses empty execId to signal all processes in a container
|
||||
execID = ""
|
||||
}
|
||||
req := &grpc.SignalProcessRequest{
|
||||
ContainerId: c.id,
|
||||
ExecId: c.process.Token,
|
||||
ExecId: execID,
|
||||
Signal: uint32(signal),
|
||||
}
|
||||
|
||||
@ -871,6 +876,18 @@ func (k *kataAgent) killContainer(sandbox *Sandbox, c Container, signal syscall.
|
||||
return err
|
||||
}
|
||||
|
||||
func (k *kataAgent) winsizeProcess(c *Container, processID string, height, width uint32) error {
|
||||
req := &grpc.TtyWinResizeRequest{
|
||||
ContainerId: c.id,
|
||||
ExecId: processID,
|
||||
Row: height,
|
||||
Column: width,
|
||||
}
|
||||
|
||||
_, err := k.sendReq(req)
|
||||
return err
|
||||
}
|
||||
|
||||
func (k *kataAgent) processListContainer(sandbox *Sandbox, c Container, options ProcessListOptions) (ProcessList, error) {
|
||||
req := &grpc.ListProcessesRequest{
|
||||
ContainerId: c.id,
|
||||
@ -937,6 +954,41 @@ func (k *kataAgent) check() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (k *kataAgent) waitProcess(c *Container, processID string) (int32, error) {
|
||||
resp, err := k.sendReq(&grpc.WaitProcessRequest{
|
||||
ContainerId: c.id,
|
||||
ExecId: processID,
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return resp.(*grpc.WaitProcessResponse).Status, nil
|
||||
}
|
||||
|
||||
func (k *kataAgent) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
|
||||
resp, err := k.sendReq(&grpc.WriteStreamRequest{
|
||||
ContainerId: c.id,
|
||||
ExecId: ProcessID,
|
||||
Data: data,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return int(resp.(*grpc.WriteStreamResponse).Len), nil
|
||||
}
|
||||
|
||||
func (k *kataAgent) closeProcessStdin(c *Container, ProcessID string) error {
|
||||
_, err := k.sendReq(&grpc.CloseStdinRequest{
|
||||
ContainerId: c.id,
|
||||
ExecId: ProcessID,
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error)
|
||||
|
||||
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
|
||||
@ -979,6 +1031,18 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
|
||||
k.reqHandlers["grpc.ListProcessesRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.ListProcesses(ctx, req.(*grpc.ListProcessesRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.WaitProcessRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.WaitProcess(ctx, req.(*grpc.WaitProcessRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.WriteStreamRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...)
|
||||
}
|
||||
k.reqHandlers["grpc.CloseStdinRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
|
||||
return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
|
||||
@ -992,8 +1056,47 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
|
||||
msgName := proto.MessageName(request.(proto.Message))
|
||||
handler := k.reqHandlers[msgName]
|
||||
if msgName == "" || handler == nil {
|
||||
return nil, fmt.Errorf("Invalid request type")
|
||||
return nil, errors.New("Invalid request type")
|
||||
}
|
||||
|
||||
return handler(context.Background(), request)
|
||||
}
|
||||
|
||||
// readStdout and readStderr are special that we cannot differentiate them with the request types...
|
||||
func (k *kataAgent) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
|
||||
if err := k.connect(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if !k.keepConn {
|
||||
defer k.disconnect()
|
||||
}
|
||||
|
||||
return k.readProcessStream(c.id, processID, data, k.client.ReadStdout)
|
||||
}
|
||||
|
||||
// readStdout and readStderr are special that we cannot differentiate them with the request types...
|
||||
func (k *kataAgent) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
|
||||
if err := k.connect(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if !k.keepConn {
|
||||
defer k.disconnect()
|
||||
}
|
||||
|
||||
return k.readProcessStream(c.id, processID, data, k.client.ReadStderr)
|
||||
}
|
||||
|
||||
type readFn func(context.Context, *grpc.ReadStreamRequest, ...golangGrpc.CallOption) (*grpc.ReadStreamResponse, error)
|
||||
|
||||
func (k *kataAgent) readProcessStream(containerID, processID string, data []byte, read readFn) (int, error) {
|
||||
resp, err := read(context.Background(), &grpc.ReadStreamRequest{
|
||||
ContainerId: containerID,
|
||||
ExecId: processID,
|
||||
Len: uint32(len(data))})
|
||||
if err == nil {
|
||||
copy(data, resp.Data)
|
||||
return len(resp.Data), nil
|
||||
}
|
||||
|
||||
return 0, err
|
||||
}
|
||||
|
@ -217,6 +217,7 @@ var reqList = []interface{}{
|
||||
&pb.RemoveContainerRequest{},
|
||||
&pb.SignalProcessRequest{},
|
||||
&pb.CheckRequest{},
|
||||
&pb.WaitProcessRequest{},
|
||||
}
|
||||
|
||||
func TestKataAgentSendReq(t *testing.T) {
|
||||
|
@ -64,8 +64,8 @@ func (n *noopAgent) stopContainer(sandbox *Sandbox, c Container) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// killContainer is the Noop agent Container signaling implementation. It does nothing.
|
||||
func (n *noopAgent) killContainer(sandbox *Sandbox, c Container, signal syscall.Signal, all bool) error {
|
||||
// signalProcess is the Noop agent Container signaling implementation. It does nothing.
|
||||
func (n *noopAgent) signalProcess(c *Container, processID string, signal syscall.Signal, all bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -83,3 +83,33 @@ func (n *noopAgent) onlineCPUMem(cpus uint32) error {
|
||||
func (n *noopAgent) check() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitProcess is the Noop agent process waiter. It does nothing.
|
||||
func (n *noopAgent) waitProcess(c *Container, processID string) (int32, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// winsizeProcess is the Noop agent process tty resizer. It does nothing.
|
||||
func (n *noopAgent) winsizeProcess(c *Container, processID string, height, width uint32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeProcessStdin is the Noop agent process stdin writer. It does nothing.
|
||||
func (n *noopAgent) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// closeProcessStdin is the Noop agent process stdin closer. It does nothing.
|
||||
func (n *noopAgent) closeProcessStdin(c *Container, ProcessID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// readProcessStdout is the Noop agent process stdout reader. It does nothing.
|
||||
func (n *noopAgent) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// readProcessStderr is the Noop agent process stderr reader. It does nothing.
|
||||
func (n *noopAgent) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -6,6 +6,9 @@
|
||||
package vcmock
|
||||
|
||||
import (
|
||||
"io"
|
||||
"syscall"
|
||||
|
||||
vc "github.com/kata-containers/runtime/virtcontainers"
|
||||
)
|
||||
|
||||
@ -104,3 +107,23 @@ func (p *Sandbox) EnterContainer(containerID string, cmd vc.Cmd) (vc.VCContainer
|
||||
func (p *Sandbox) Monitor() (chan error, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// WaitProcess implements the VCSandbox function of the same name.
|
||||
func (p *Sandbox) WaitProcess(containerID, processID string) (int32, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// SignalProcess implements the VCSandbox function of the same name.
|
||||
func (p *Sandbox) SignalProcess(containerID, processID string, signal syscall.Signal, all bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WinsizeProcess implements the VCSandbox function of the same name.
|
||||
func (p *Sandbox) WinsizeProcess(containerID, processID string, height, width uint32) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// IOStream implements the VCSandbox function of the same name.
|
||||
func (p *Sandbox) IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error) {
|
||||
return nil, nil, nil, nil
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ package virtcontainers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@ -587,6 +588,63 @@ func (s *Sandbox) Monitor() (chan error, error) {
|
||||
return s.monitor.newWatcher()
|
||||
}
|
||||
|
||||
// WaitProcess waits on a container process and return its exit code
|
||||
func (s *Sandbox) WaitProcess(containerID, processID string) (int32, error) {
|
||||
if s.state.State != StateRunning {
|
||||
return 0, fmt.Errorf("Sandbox not running")
|
||||
}
|
||||
|
||||
c, err := s.findContainer(containerID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return c.wait(processID)
|
||||
}
|
||||
|
||||
// SignalProcess sends a signal to a process of a container when all is false.
|
||||
// When all is true, it sends the signal to all processes of a container.
|
||||
func (s *Sandbox) SignalProcess(containerID, processID string, signal syscall.Signal, all bool) error {
|
||||
if s.state.State != StateRunning {
|
||||
return fmt.Errorf("Sandbox not running")
|
||||
}
|
||||
|
||||
c, err := s.findContainer(containerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.signalProcess(processID, signal, all)
|
||||
}
|
||||
|
||||
// WinsizeProcess resizes the tty window of a process
|
||||
func (s *Sandbox) WinsizeProcess(containerID, processID string, height, width uint32) error {
|
||||
if s.state.State != StateRunning {
|
||||
return fmt.Errorf("Sandbox not running")
|
||||
}
|
||||
|
||||
c, err := s.findContainer(containerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.winsizeProcess(processID, height, width)
|
||||
}
|
||||
|
||||
// IOStream returns stdin writer, stdout reader and stderr reader of a process
|
||||
func (s *Sandbox) IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error) {
|
||||
if s.state.State != StateRunning {
|
||||
return nil, nil, nil, fmt.Errorf("Sandbox not running")
|
||||
}
|
||||
|
||||
c, err := s.findContainer(containerID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return c.ioStream(processID)
|
||||
}
|
||||
|
||||
func createAssets(sandboxConfig *SandboxConfig) error {
|
||||
kernel, err := newAsset(sandboxConfig, kernelAsset)
|
||||
if err != nil {
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
"syscall"
|
||||
"testing"
|
||||
|
||||
"github.com/kata-containers/runtime/virtcontainers/pkg/annotations"
|
||||
@ -1408,3 +1409,123 @@ func TestMonitor(t *testing.T) {
|
||||
|
||||
s.monitor.stop()
|
||||
}
|
||||
|
||||
func TestWaitProcess(t *testing.T) {
|
||||
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
|
||||
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
|
||||
defer cleanUp()
|
||||
|
||||
contID := "foo"
|
||||
execID := "bar"
|
||||
_, err = s.WaitProcess(contID, execID)
|
||||
assert.NotNil(t, err, "Wait process in stopped sandbox should fail")
|
||||
|
||||
err = s.start()
|
||||
assert.Nil(t, err, "Failed to start sandbox: %v", err)
|
||||
|
||||
_, err = s.WaitProcess(contID, execID)
|
||||
assert.NotNil(t, err, "Wait process in non-existing container should fail")
|
||||
|
||||
contConfig := newTestContainerConfigNoop(contID)
|
||||
_, err = s.CreateContainer(contConfig)
|
||||
assert.Nil(t, err, "Failed to create container %+v in sandbox %+v: %v", contConfig, s, err)
|
||||
|
||||
_, err = s.WaitProcess(contID, execID)
|
||||
assert.Nil(t, err, "Wait process in ready container failed: %v", err)
|
||||
|
||||
_, err = s.StartContainer(contID)
|
||||
assert.Nil(t, err, "Start container failed: %v", err)
|
||||
|
||||
_, err = s.WaitProcess(contID, execID)
|
||||
assert.Nil(t, err, "Wait process failed: %v", err)
|
||||
}
|
||||
|
||||
func TestSignalProcess(t *testing.T) {
|
||||
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
|
||||
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
|
||||
defer cleanUp()
|
||||
|
||||
contID := "foo"
|
||||
execID := "bar"
|
||||
err = s.SignalProcess(contID, execID, syscall.SIGKILL, true)
|
||||
assert.NotNil(t, err, "Wait process in stopped sandbox should fail")
|
||||
|
||||
err = s.start()
|
||||
assert.Nil(t, err, "Failed to start sandbox: %v", err)
|
||||
|
||||
err = s.SignalProcess(contID, execID, syscall.SIGKILL, false)
|
||||
assert.NotNil(t, err, "Wait process in non-existing container should fail")
|
||||
|
||||
contConfig := newTestContainerConfigNoop(contID)
|
||||
_, err = s.CreateContainer(contConfig)
|
||||
assert.Nil(t, err, "Failed to create container %+v in sandbox %+v: %v", contConfig, s, err)
|
||||
|
||||
err = s.SignalProcess(contID, execID, syscall.SIGKILL, true)
|
||||
assert.Nil(t, err, "Wait process in ready container failed: %v", err)
|
||||
|
||||
_, err = s.StartContainer(contID)
|
||||
assert.Nil(t, err, "Start container failed: %v", err)
|
||||
|
||||
err = s.SignalProcess(contID, execID, syscall.SIGKILL, false)
|
||||
assert.Nil(t, err, "Wait process failed: %v", err)
|
||||
}
|
||||
|
||||
func TestWinsizeProcess(t *testing.T) {
|
||||
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
|
||||
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
|
||||
defer cleanUp()
|
||||
|
||||
contID := "foo"
|
||||
execID := "bar"
|
||||
err = s.WinsizeProcess(contID, execID, 100, 200)
|
||||
assert.NotNil(t, err, "Winsize process in stopped sandbox should fail")
|
||||
|
||||
err = s.start()
|
||||
assert.Nil(t, err, "Failed to start sandbox: %v", err)
|
||||
|
||||
err = s.WinsizeProcess(contID, execID, 100, 200)
|
||||
assert.NotNil(t, err, "Winsize process in non-existing container should fail")
|
||||
|
||||
contConfig := newTestContainerConfigNoop(contID)
|
||||
_, err = s.CreateContainer(contConfig)
|
||||
assert.Nil(t, err, "Failed to create container %+v in sandbox %+v: %v", contConfig, s, err)
|
||||
|
||||
err = s.WinsizeProcess(contID, execID, 100, 200)
|
||||
assert.Nil(t, err, "Winsize process in ready container failed: %v", err)
|
||||
|
||||
_, err = s.StartContainer(contID)
|
||||
assert.Nil(t, err, "Start container failed: %v", err)
|
||||
|
||||
err = s.WinsizeProcess(contID, execID, 100, 200)
|
||||
assert.Nil(t, err, "Winsize process failed: %v", err)
|
||||
}
|
||||
|
||||
func TestContainerProcessIOStream(t *testing.T) {
|
||||
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
|
||||
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
|
||||
defer cleanUp()
|
||||
|
||||
contID := "foo"
|
||||
execID := "bar"
|
||||
_, _, _, err = s.IOStream(contID, execID)
|
||||
assert.NotNil(t, err, "Winsize process in stopped sandbox should fail")
|
||||
|
||||
err = s.start()
|
||||
assert.Nil(t, err, "Failed to start sandbox: %v", err)
|
||||
|
||||
_, _, _, err = s.IOStream(contID, execID)
|
||||
assert.NotNil(t, err, "Winsize process in non-existing container should fail")
|
||||
|
||||
contConfig := newTestContainerConfigNoop(contID)
|
||||
_, err = s.CreateContainer(contConfig)
|
||||
assert.Nil(t, err, "Failed to create container %+v in sandbox %+v: %v", contConfig, s, err)
|
||||
|
||||
_, _, _, err = s.IOStream(contID, execID)
|
||||
assert.Nil(t, err, "Winsize process in ready container failed: %v", err)
|
||||
|
||||
_, err = s.StartContainer(contID)
|
||||
assert.Nil(t, err, "Start container failed: %v", err)
|
||||
|
||||
_, _, _, err = s.IOStream(contID, execID)
|
||||
assert.Nil(t, err, "Winsize process failed: %v", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user