diff --git a/virtcontainers/agent.go b/virtcontainers/agent.go index f785ec6038..568708e3bb 100644 --- a/virtcontainers/agent.go +++ b/virtcontainers/agent.go @@ -168,6 +168,18 @@ type agent interface { // winsizeProcess will tell the agent to set a process' tty size winsizeProcess(c *Container, processID string, height, width uint32) error + // writeProcessStdin will tell the agent to write a process stdin + writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) + + // closeProcessStdin will tell the agent to close a process stdin + closeProcessStdin(c *Container, ProcessID string) error + + // readProcessStdout will tell the agent to read a process stdout + readProcessStdout(c *Container, processID string, data []byte) (int, error) + + // readProcessStderr will tell the agent to read a process stderr + readProcessStderr(c *Container, processID string, data []byte) (int, error) + // processListContainer will list the processes running inside the container processListContainer(sandbox *Sandbox, c Container, options ProcessListOptions) (ProcessList, error) diff --git a/virtcontainers/container.go b/virtcontainers/container.go index 11d6fdb37c..363b923d6c 100644 --- a/virtcontainers/container.go +++ b/virtcontainers/container.go @@ -8,6 +8,7 @@ package virtcontainers import ( "encoding/hex" "fmt" + "io" "os" "path/filepath" "syscall" @@ -758,6 +759,16 @@ func (c *Container) winsizeProcess(processID string, height, width uint32) error return c.sandbox.agent.winsizeProcess(c, processID, height, width) } +func (c *Container) ioStream(processID string) (io.WriteCloser, io.Reader, io.Reader, error) { + if c.state.State != StateReady && c.state.State != StateRunning { + return nil, nil, nil, fmt.Errorf("Container not ready or running, impossible to signal the container") + } + + stream := newIOStream(c.sandbox, c, processID) + + return stream.stdin(), stream.stdout(), stream.stderr(), nil +} + func (c *Container) processList(options ProcessListOptions) (ProcessList, error) { if err := c.checkSandboxRunning("ps"); err != nil { return nil, err diff --git a/virtcontainers/container_test.go b/virtcontainers/container_test.go index 79303807ab..bab57911ce 100644 --- a/virtcontainers/container_test.go +++ b/virtcontainers/container_test.go @@ -435,3 +435,29 @@ func TestWinsizeProcessErrorState(t *testing.T) { err = c.winsizeProcess(processID, 100, 200) assert.Error(err) } + +func TestProcessIOStream(t *testing.T) { + assert := assert.New(t) + c := &Container{ + sandbox: &Sandbox{ + state: State{ + State: StateRunning, + }, + }, + } + processID := "foobar" + + // Container state undefined + _, _, _, err := c.ioStream(processID) + assert.Error(err) + + // Container paused + c.state.State = StatePaused + _, _, _, err = c.ioStream(processID) + assert.Error(err) + + // Container stopped + c.state.State = StateStopped + _, _, _, err = c.ioStream(processID) + assert.Error(err) +} diff --git a/virtcontainers/hyperstart_agent.go b/virtcontainers/hyperstart_agent.go index cbc03b4768..ab9b35fbdf 100644 --- a/virtcontainers/hyperstart_agent.go +++ b/virtcontainers/hyperstart_agent.go @@ -816,3 +816,23 @@ func (h *hyper) winsizeProcess(c *Container, processID string, height, width uin // cc-agent does not support winsize process return nil } + +func (h *hyper) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) { + // cc-agent does not support stdin write request + return 0, nil +} + +func (h *hyper) closeProcessStdin(c *Container, ProcessID string) error { + // cc-agent does not support stdin close request + return nil +} + +func (h *hyper) readProcessStdout(c *Container, processID string, data []byte) (int, error) { + // cc-agent does not support stdout read request + return 0, nil +} + +func (h *hyper) readProcessStderr(c *Container, processID string, data []byte) (int, error) { + // cc-agent does not support stderr read request + return 0, nil +} diff --git a/virtcontainers/interfaces.go b/virtcontainers/interfaces.go index f1dcc9d7a5..fe2b2bb468 100644 --- a/virtcontainers/interfaces.go +++ b/virtcontainers/interfaces.go @@ -6,6 +6,7 @@ package virtcontainers import ( + "io" "syscall" "github.com/sirupsen/logrus" @@ -60,6 +61,7 @@ type VCSandbox interface { WaitProcess(containerID, processID string) (int32, error) SignalProcess(containerID, processID string, signal syscall.Signal, all bool) error WinsizeProcess(containerID, processID string, height, width uint32) error + IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error) } // VCContainer is the Container interface diff --git a/virtcontainers/iostream.go b/virtcontainers/iostream.go new file mode 100644 index 0000000000..5b96ba79f6 --- /dev/null +++ b/virtcontainers/iostream.go @@ -0,0 +1,91 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package virtcontainers + +import ( + "errors" + "io" +) + +type iostream struct { + sandbox *Sandbox + container *Container + process string + closed bool +} + +// io.WriteCloser +type stdinStream struct { + *iostream +} + +// io.Reader +type stdoutStream struct { + *iostream +} + +// io.Reader +type stderrStream struct { + *iostream +} + +func newIOStream(s *Sandbox, c *Container, proc string) *iostream { + return &iostream{ + sandbox: s, + container: c, + process: proc, + closed: false, // needed to workaround buggy structcheck + } +} + +func (s *iostream) stdin() io.WriteCloser { + return &stdinStream{s} +} + +func (s *iostream) stdout() io.Reader { + return &stdoutStream{s} +} + +func (s *iostream) stderr() io.Reader { + return &stderrStream{s} +} + +func (s *stdinStream) Write(data []byte) (n int, err error) { + if s.closed { + return 0, errors.New("stream closed") + } + + return s.sandbox.agent.writeProcessStdin(s.container, s.process, data) +} + +func (s *stdinStream) Close() error { + if s.closed { + return errors.New("stream closed") + } + + err := s.sandbox.agent.closeProcessStdin(s.container, s.process) + if err == nil { + s.closed = true + } + + return err +} + +func (s *stdoutStream) Read(data []byte) (n int, err error) { + if s.closed { + return 0, errors.New("stream closed") + } + + return s.sandbox.agent.readProcessStdout(s.container, s.process, data) +} + +func (s *stderrStream) Read(data []byte) (n int, err error) { + if s.closed { + return 0, errors.New("stream closed") + } + + return s.sandbox.agent.readProcessStderr(s.container, s.process, data) +} diff --git a/virtcontainers/iostream_test.go b/virtcontainers/iostream_test.go new file mode 100644 index 0000000000..af1c423a92 --- /dev/null +++ b/virtcontainers/iostream_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2018 HyperHQ Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// + +package virtcontainers + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIOStream(t *testing.T) { + hConfig := newHypervisorConfig(nil, nil) + s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{}, nil) + if err != nil { + t.Fatal(err) + } + defer cleanUp() + + contID := "foo" + processID := "bar" + config := newTestContainerConfigNoop(contID) + c := &Container{ + sandbox: s, + config: &config, + } + + stream := newIOStream(s, c, processID) + stdin := stream.stdin() + stdout := stream.stdout() + stderr := stream.stderr() + + buffer := []byte("randombufferdata") + _, err = stdin.Write(buffer) + assert.Nil(t, err, "stdin write failed: %s", err) + + _, err = stdout.Read(buffer) + assert.Nil(t, err, "stdout read failed: %s", err) + + _, err = stderr.Read(buffer) + assert.Nil(t, err, "stderr read failed: %s", err) + + err = stdin.Close() + assert.Nil(t, err, "stream close failed: %s", err) + + _, err = stdin.Write(buffer) + assert.NotNil(t, err, "stdin write closed should fail") + + _, err = stdout.Read(buffer) + assert.NotNil(t, err, "stdout read closed should fail") + + _, err = stderr.Read(buffer) + assert.NotNil(t, err, "stderr read closed should fail") + + err = stdin.Close() + assert.NotNil(t, err, "stdin close closed should fail") +} diff --git a/virtcontainers/kata_agent.go b/virtcontainers/kata_agent.go index fc60a6906a..b76956ed6b 100644 --- a/virtcontainers/kata_agent.go +++ b/virtcontainers/kata_agent.go @@ -6,7 +6,6 @@ package virtcontainers import ( - "context" "encoding/json" "errors" "fmt" @@ -23,6 +22,7 @@ import ( vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations" ns "github.com/kata-containers/runtime/virtcontainers/pkg/nsenter" "github.com/kata-containers/runtime/virtcontainers/pkg/uuid" + "golang.org/x/net/context" "github.com/opencontainers/runtime-spec/specs-go" "github.com/sirupsen/logrus" @@ -966,6 +966,29 @@ func (k *kataAgent) waitProcess(c *Container, processID string) (int32, error) { return resp.(*grpc.WaitProcessResponse).Status, nil } +func (k *kataAgent) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) { + resp, err := k.sendReq(&grpc.WriteStreamRequest{ + ContainerId: c.id, + ExecId: ProcessID, + Data: data, + }) + + if err != nil { + return 0, err + } + + return int(resp.(*grpc.WriteStreamResponse).Len), nil +} + +func (k *kataAgent) closeProcessStdin(c *Container, ProcessID string) error { + _, err := k.sendReq(&grpc.CloseStdinRequest{ + ContainerId: c.id, + ExecId: ProcessID, + }) + + return err +} + type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error) func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { @@ -1014,6 +1037,12 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...) } + k.reqHandlers["grpc.WriteStreamRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...) + } + k.reqHandlers["grpc.CloseStdinRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { + return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...) + } } func (k *kataAgent) sendReq(request interface{}) (interface{}, error) { @@ -1032,3 +1061,42 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) { return handler(context.Background(), request) } + +// readStdout and readStderr are special that we cannot differentiate them with the request types... +func (k *kataAgent) readProcessStdout(c *Container, processID string, data []byte) (int, error) { + if err := k.connect(); err != nil { + return 0, err + } + if !k.keepConn { + defer k.disconnect() + } + + return k.readProcessStream(c.id, processID, data, k.client.ReadStdout) +} + +// readStdout and readStderr are special that we cannot differentiate them with the request types... +func (k *kataAgent) readProcessStderr(c *Container, processID string, data []byte) (int, error) { + if err := k.connect(); err != nil { + return 0, err + } + if !k.keepConn { + defer k.disconnect() + } + + return k.readProcessStream(c.id, processID, data, k.client.ReadStderr) +} + +type readFn func(context.Context, *grpc.ReadStreamRequest, ...golangGrpc.CallOption) (*grpc.ReadStreamResponse, error) + +func (k *kataAgent) readProcessStream(containerID, processID string, data []byte, read readFn) (int, error) { + resp, err := read(context.Background(), &grpc.ReadStreamRequest{ + ContainerId: containerID, + ExecId: processID, + Len: uint32(len(data))}) + if err == nil { + copy(data, resp.Data) + return len(resp.Data), nil + } + + return 0, err +} diff --git a/virtcontainers/noop_agent.go b/virtcontainers/noop_agent.go index 08ee17ccf6..c6905f8090 100644 --- a/virtcontainers/noop_agent.go +++ b/virtcontainers/noop_agent.go @@ -93,3 +93,23 @@ func (n *noopAgent) waitProcess(c *Container, processID string) (int32, error) { func (n *noopAgent) winsizeProcess(c *Container, processID string, height, width uint32) error { return nil } + +// writeProcessStdin is the Noop agent process stdin writer. It does nothing. +func (n *noopAgent) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) { + return 0, nil +} + +// closeProcessStdin is the Noop agent process stdin closer. It does nothing. +func (n *noopAgent) closeProcessStdin(c *Container, ProcessID string) error { + return nil +} + +// readProcessStdout is the Noop agent process stdout reader. It does nothing. +func (n *noopAgent) readProcessStdout(c *Container, processID string, data []byte) (int, error) { + return 0, nil +} + +// readProcessStderr is the Noop agent process stderr reader. It does nothing. +func (n *noopAgent) readProcessStderr(c *Container, processID string, data []byte) (int, error) { + return 0, nil +} diff --git a/virtcontainers/pkg/vcmock/sandbox.go b/virtcontainers/pkg/vcmock/sandbox.go index c100c829bf..720e2d3afe 100644 --- a/virtcontainers/pkg/vcmock/sandbox.go +++ b/virtcontainers/pkg/vcmock/sandbox.go @@ -6,6 +6,7 @@ package vcmock import ( + "io" "syscall" vc "github.com/kata-containers/runtime/virtcontainers" @@ -121,3 +122,8 @@ func (p *Sandbox) SignalProcess(containerID, processID string, signal syscall.Si func (p *Sandbox) WinsizeProcess(containerID, processID string, height, width uint32) error { return nil } + +// IOStream implements the VCSandbox function of the same name. +func (p *Sandbox) IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error) { + return nil, nil, nil, nil +} diff --git a/virtcontainers/sandbox.go b/virtcontainers/sandbox.go index 4dc0ebee80..720e1a1947 100644 --- a/virtcontainers/sandbox.go +++ b/virtcontainers/sandbox.go @@ -7,6 +7,7 @@ package virtcontainers import ( "fmt" + "io" "os" "path/filepath" "strings" @@ -630,6 +631,20 @@ func (s *Sandbox) WinsizeProcess(containerID, processID string, height, width ui return c.winsizeProcess(processID, height, width) } +// IOStream returns stdin writer, stdout reader and stderr reader of a process +func (s *Sandbox) IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error) { + if s.state.State != StateRunning { + return nil, nil, nil, fmt.Errorf("Sandbox not running") + } + + c, err := s.findContainer(containerID) + if err != nil { + return nil, nil, nil, err + } + + return c.ioStream(processID) +} + func createAssets(sandboxConfig *SandboxConfig) error { kernel, err := newAsset(sandboxConfig, kernelAsset) if err != nil { diff --git a/virtcontainers/sandbox_test.go b/virtcontainers/sandbox_test.go index 67e7ddd032..291df54dda 100644 --- a/virtcontainers/sandbox_test.go +++ b/virtcontainers/sandbox_test.go @@ -1499,3 +1499,33 @@ func TestWinsizeProcess(t *testing.T) { err = s.WinsizeProcess(contID, execID, 100, 200) assert.Nil(t, err, "Winsize process failed: %v", err) } + +func TestContainerProcessIOStream(t *testing.T) { + s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil) + assert.Nil(t, err, "VirtContainers should not allow empty sandboxes") + defer cleanUp() + + contID := "foo" + execID := "bar" + _, _, _, err = s.IOStream(contID, execID) + assert.NotNil(t, err, "Winsize process in stopped sandbox should fail") + + err = s.start() + assert.Nil(t, err, "Failed to start sandbox: %v", err) + + _, _, _, err = s.IOStream(contID, execID) + assert.NotNil(t, err, "Winsize process in non-existing container should fail") + + contConfig := newTestContainerConfigNoop(contID) + _, err = s.CreateContainer(contConfig) + assert.Nil(t, err, "Failed to create container %+v in sandbox %+v: %v", contConfig, s, err) + + _, _, _, err = s.IOStream(contID, execID) + assert.Nil(t, err, "Winsize process in ready container failed: %v", err) + + _, err = s.StartContainer(contID) + assert.Nil(t, err, "Start container failed: %v", err) + + _, _, _, err = s.IOStream(contID, execID) + assert.Nil(t, err, "Winsize process failed: %v", err) +}