api: add sandbox iostream API

It returns stdin, stdout and stderr stream of the specified process in
the container.

Fixes: #258

Signed-off-by: Peng Tao <bergwolf@gmail.com>
This commit is contained in:
Peng Tao 2018-04-25 23:20:00 +08:00
parent bf4ef4324e
commit 1bb6ab9e22
12 changed files with 361 additions and 1 deletions

View File

@ -168,6 +168,18 @@ type agent interface {
// winsizeProcess will tell the agent to set a process' tty size // winsizeProcess will tell the agent to set a process' tty size
winsizeProcess(c *Container, processID string, height, width uint32) error winsizeProcess(c *Container, processID string, height, width uint32) error
// writeProcessStdin will tell the agent to write a process stdin
writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error)
// closeProcessStdin will tell the agent to close a process stdin
closeProcessStdin(c *Container, ProcessID string) error
// readProcessStdout will tell the agent to read a process stdout
readProcessStdout(c *Container, processID string, data []byte) (int, error)
// readProcessStderr will tell the agent to read a process stderr
readProcessStderr(c *Container, processID string, data []byte) (int, error)
// processListContainer will list the processes running inside the container // processListContainer will list the processes running inside the container
processListContainer(sandbox *Sandbox, c Container, options ProcessListOptions) (ProcessList, error) processListContainer(sandbox *Sandbox, c Container, options ProcessListOptions) (ProcessList, error)

View File

@ -8,6 +8,7 @@ package virtcontainers
import ( import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io"
"os" "os"
"path/filepath" "path/filepath"
"syscall" "syscall"
@ -758,6 +759,16 @@ func (c *Container) winsizeProcess(processID string, height, width uint32) error
return c.sandbox.agent.winsizeProcess(c, processID, height, width) return c.sandbox.agent.winsizeProcess(c, processID, height, width)
} }
func (c *Container) ioStream(processID string) (io.WriteCloser, io.Reader, io.Reader, error) {
if c.state.State != StateReady && c.state.State != StateRunning {
return nil, nil, nil, fmt.Errorf("Container not ready or running, impossible to signal the container")
}
stream := newIOStream(c.sandbox, c, processID)
return stream.stdin(), stream.stdout(), stream.stderr(), nil
}
func (c *Container) processList(options ProcessListOptions) (ProcessList, error) { func (c *Container) processList(options ProcessListOptions) (ProcessList, error) {
if err := c.checkSandboxRunning("ps"); err != nil { if err := c.checkSandboxRunning("ps"); err != nil {
return nil, err return nil, err

View File

@ -435,3 +435,29 @@ func TestWinsizeProcessErrorState(t *testing.T) {
err = c.winsizeProcess(processID, 100, 200) err = c.winsizeProcess(processID, 100, 200)
assert.Error(err) assert.Error(err)
} }
func TestProcessIOStream(t *testing.T) {
assert := assert.New(t)
c := &Container{
sandbox: &Sandbox{
state: State{
State: StateRunning,
},
},
}
processID := "foobar"
// Container state undefined
_, _, _, err := c.ioStream(processID)
assert.Error(err)
// Container paused
c.state.State = StatePaused
_, _, _, err = c.ioStream(processID)
assert.Error(err)
// Container stopped
c.state.State = StateStopped
_, _, _, err = c.ioStream(processID)
assert.Error(err)
}

View File

@ -816,3 +816,23 @@ func (h *hyper) winsizeProcess(c *Container, processID string, height, width uin
// cc-agent does not support winsize process // cc-agent does not support winsize process
return nil return nil
} }
func (h *hyper) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
// cc-agent does not support stdin write request
return 0, nil
}
func (h *hyper) closeProcessStdin(c *Container, ProcessID string) error {
// cc-agent does not support stdin close request
return nil
}
func (h *hyper) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
// cc-agent does not support stdout read request
return 0, nil
}
func (h *hyper) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
// cc-agent does not support stderr read request
return 0, nil
}

View File

@ -6,6 +6,7 @@
package virtcontainers package virtcontainers
import ( import (
"io"
"syscall" "syscall"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -60,6 +61,7 @@ type VCSandbox interface {
WaitProcess(containerID, processID string) (int32, error) WaitProcess(containerID, processID string) (int32, error)
SignalProcess(containerID, processID string, signal syscall.Signal, all bool) error SignalProcess(containerID, processID string, signal syscall.Signal, all bool) error
WinsizeProcess(containerID, processID string, height, width uint32) error WinsizeProcess(containerID, processID string, height, width uint32) error
IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error)
} }
// VCContainer is the Container interface // VCContainer is the Container interface

View File

@ -0,0 +1,91 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"errors"
"io"
)
type iostream struct {
sandbox *Sandbox
container *Container
process string
closed bool
}
// io.WriteCloser
type stdinStream struct {
*iostream
}
// io.Reader
type stdoutStream struct {
*iostream
}
// io.Reader
type stderrStream struct {
*iostream
}
func newIOStream(s *Sandbox, c *Container, proc string) *iostream {
return &iostream{
sandbox: s,
container: c,
process: proc,
closed: false, // needed to workaround buggy structcheck
}
}
func (s *iostream) stdin() io.WriteCloser {
return &stdinStream{s}
}
func (s *iostream) stdout() io.Reader {
return &stdoutStream{s}
}
func (s *iostream) stderr() io.Reader {
return &stderrStream{s}
}
func (s *stdinStream) Write(data []byte) (n int, err error) {
if s.closed {
return 0, errors.New("stream closed")
}
return s.sandbox.agent.writeProcessStdin(s.container, s.process, data)
}
func (s *stdinStream) Close() error {
if s.closed {
return errors.New("stream closed")
}
err := s.sandbox.agent.closeProcessStdin(s.container, s.process)
if err == nil {
s.closed = true
}
return err
}
func (s *stdoutStream) Read(data []byte) (n int, err error) {
if s.closed {
return 0, errors.New("stream closed")
}
return s.sandbox.agent.readProcessStdout(s.container, s.process, data)
}
func (s *stderrStream) Read(data []byte) (n int, err error) {
if s.closed {
return 0, errors.New("stream closed")
}
return s.sandbox.agent.readProcessStderr(s.container, s.process, data)
}

View File

@ -0,0 +1,59 @@
// Copyright (c) 2018 HyperHQ Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestIOStream(t *testing.T) {
hConfig := newHypervisorConfig(nil, nil)
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, hConfig, NoopAgentType, NoopNetworkModel, NetworkConfig{}, []ContainerConfig{}, nil)
if err != nil {
t.Fatal(err)
}
defer cleanUp()
contID := "foo"
processID := "bar"
config := newTestContainerConfigNoop(contID)
c := &Container{
sandbox: s,
config: &config,
}
stream := newIOStream(s, c, processID)
stdin := stream.stdin()
stdout := stream.stdout()
stderr := stream.stderr()
buffer := []byte("randombufferdata")
_, err = stdin.Write(buffer)
assert.Nil(t, err, "stdin write failed: %s", err)
_, err = stdout.Read(buffer)
assert.Nil(t, err, "stdout read failed: %s", err)
_, err = stderr.Read(buffer)
assert.Nil(t, err, "stderr read failed: %s", err)
err = stdin.Close()
assert.Nil(t, err, "stream close failed: %s", err)
_, err = stdin.Write(buffer)
assert.NotNil(t, err, "stdin write closed should fail")
_, err = stdout.Read(buffer)
assert.NotNil(t, err, "stdout read closed should fail")
_, err = stderr.Read(buffer)
assert.NotNil(t, err, "stderr read closed should fail")
err = stdin.Close()
assert.NotNil(t, err, "stdin close closed should fail")
}

View File

@ -6,7 +6,6 @@
package virtcontainers package virtcontainers
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -23,6 +22,7 @@ import (
vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations" vcAnnotations "github.com/kata-containers/runtime/virtcontainers/pkg/annotations"
ns "github.com/kata-containers/runtime/virtcontainers/pkg/nsenter" ns "github.com/kata-containers/runtime/virtcontainers/pkg/nsenter"
"github.com/kata-containers/runtime/virtcontainers/pkg/uuid" "github.com/kata-containers/runtime/virtcontainers/pkg/uuid"
"golang.org/x/net/context"
"github.com/opencontainers/runtime-spec/specs-go" "github.com/opencontainers/runtime-spec/specs-go"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -966,6 +966,29 @@ func (k *kataAgent) waitProcess(c *Container, processID string) (int32, error) {
return resp.(*grpc.WaitProcessResponse).Status, nil return resp.(*grpc.WaitProcessResponse).Status, nil
} }
func (k *kataAgent) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
resp, err := k.sendReq(&grpc.WriteStreamRequest{
ContainerId: c.id,
ExecId: ProcessID,
Data: data,
})
if err != nil {
return 0, err
}
return int(resp.(*grpc.WriteStreamResponse).Len), nil
}
func (k *kataAgent) closeProcessStdin(c *Container, ProcessID string) error {
_, err := k.sendReq(&grpc.CloseStdinRequest{
ContainerId: c.id,
ExecId: ProcessID,
})
return err
}
type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error) type reqFunc func(context.Context, interface{}, ...golangGrpc.CallOption) (interface{}, error)
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
@ -1014,6 +1037,12 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) { k.reqHandlers["grpc.TtyWinResizeRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...) return k.client.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest), opts...)
} }
k.reqHandlers["grpc.WriteStreamRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.WriteStdin(ctx, req.(*grpc.WriteStreamRequest), opts...)
}
k.reqHandlers["grpc.CloseStdinRequest"] = func(ctx context.Context, req interface{}, opts ...golangGrpc.CallOption) (interface{}, error) {
return k.client.CloseStdin(ctx, req.(*grpc.CloseStdinRequest), opts...)
}
} }
func (k *kataAgent) sendReq(request interface{}) (interface{}, error) { func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
@ -1032,3 +1061,42 @@ func (k *kataAgent) sendReq(request interface{}) (interface{}, error) {
return handler(context.Background(), request) return handler(context.Background(), request)
} }
// readStdout and readStderr are special that we cannot differentiate them with the request types...
func (k *kataAgent) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
if err := k.connect(); err != nil {
return 0, err
}
if !k.keepConn {
defer k.disconnect()
}
return k.readProcessStream(c.id, processID, data, k.client.ReadStdout)
}
// readStdout and readStderr are special that we cannot differentiate them with the request types...
func (k *kataAgent) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
if err := k.connect(); err != nil {
return 0, err
}
if !k.keepConn {
defer k.disconnect()
}
return k.readProcessStream(c.id, processID, data, k.client.ReadStderr)
}
type readFn func(context.Context, *grpc.ReadStreamRequest, ...golangGrpc.CallOption) (*grpc.ReadStreamResponse, error)
func (k *kataAgent) readProcessStream(containerID, processID string, data []byte, read readFn) (int, error) {
resp, err := read(context.Background(), &grpc.ReadStreamRequest{
ContainerId: containerID,
ExecId: processID,
Len: uint32(len(data))})
if err == nil {
copy(data, resp.Data)
return len(resp.Data), nil
}
return 0, err
}

View File

@ -93,3 +93,23 @@ func (n *noopAgent) waitProcess(c *Container, processID string) (int32, error) {
func (n *noopAgent) winsizeProcess(c *Container, processID string, height, width uint32) error { func (n *noopAgent) winsizeProcess(c *Container, processID string, height, width uint32) error {
return nil return nil
} }
// writeProcessStdin is the Noop agent process stdin writer. It does nothing.
func (n *noopAgent) writeProcessStdin(c *Container, ProcessID string, data []byte) (int, error) {
return 0, nil
}
// closeProcessStdin is the Noop agent process stdin closer. It does nothing.
func (n *noopAgent) closeProcessStdin(c *Container, ProcessID string) error {
return nil
}
// readProcessStdout is the Noop agent process stdout reader. It does nothing.
func (n *noopAgent) readProcessStdout(c *Container, processID string, data []byte) (int, error) {
return 0, nil
}
// readProcessStderr is the Noop agent process stderr reader. It does nothing.
func (n *noopAgent) readProcessStderr(c *Container, processID string, data []byte) (int, error) {
return 0, nil
}

View File

@ -6,6 +6,7 @@
package vcmock package vcmock
import ( import (
"io"
"syscall" "syscall"
vc "github.com/kata-containers/runtime/virtcontainers" vc "github.com/kata-containers/runtime/virtcontainers"
@ -121,3 +122,8 @@ func (p *Sandbox) SignalProcess(containerID, processID string, signal syscall.Si
func (p *Sandbox) WinsizeProcess(containerID, processID string, height, width uint32) error { func (p *Sandbox) WinsizeProcess(containerID, processID string, height, width uint32) error {
return nil return nil
} }
// IOStream implements the VCSandbox function of the same name.
func (p *Sandbox) IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error) {
return nil, nil, nil, nil
}

View File

@ -7,6 +7,7 @@ package virtcontainers
import ( import (
"fmt" "fmt"
"io"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -630,6 +631,20 @@ func (s *Sandbox) WinsizeProcess(containerID, processID string, height, width ui
return c.winsizeProcess(processID, height, width) return c.winsizeProcess(processID, height, width)
} }
// IOStream returns stdin writer, stdout reader and stderr reader of a process
func (s *Sandbox) IOStream(containerID, processID string) (io.WriteCloser, io.Reader, io.Reader, error) {
if s.state.State != StateRunning {
return nil, nil, nil, fmt.Errorf("Sandbox not running")
}
c, err := s.findContainer(containerID)
if err != nil {
return nil, nil, nil, err
}
return c.ioStream(processID)
}
func createAssets(sandboxConfig *SandboxConfig) error { func createAssets(sandboxConfig *SandboxConfig) error {
kernel, err := newAsset(sandboxConfig, kernelAsset) kernel, err := newAsset(sandboxConfig, kernelAsset)
if err != nil { if err != nil {

View File

@ -1499,3 +1499,33 @@ func TestWinsizeProcess(t *testing.T) {
err = s.WinsizeProcess(contID, execID, 100, 200) err = s.WinsizeProcess(contID, execID, 100, 200)
assert.Nil(t, err, "Winsize process failed: %v", err) assert.Nil(t, err, "Winsize process failed: %v", err)
} }
func TestContainerProcessIOStream(t *testing.T) {
s, err := testCreateSandbox(t, testSandboxID, MockHypervisor, newHypervisorConfig(nil, nil), NoopAgentType, NoopNetworkModel, NetworkConfig{}, nil, nil)
assert.Nil(t, err, "VirtContainers should not allow empty sandboxes")
defer cleanUp()
contID := "foo"
execID := "bar"
_, _, _, err = s.IOStream(contID, execID)
assert.NotNil(t, err, "Winsize process in stopped sandbox should fail")
err = s.start()
assert.Nil(t, err, "Failed to start sandbox: %v", err)
_, _, _, err = s.IOStream(contID, execID)
assert.NotNil(t, err, "Winsize process in non-existing container should fail")
contConfig := newTestContainerConfigNoop(contID)
_, err = s.CreateContainer(contConfig)
assert.Nil(t, err, "Failed to create container %+v in sandbox %+v: %v", contConfig, s, err)
_, _, _, err = s.IOStream(contID, execID)
assert.Nil(t, err, "Winsize process in ready container failed: %v", err)
_, err = s.StartContainer(contID)
assert.Nil(t, err, "Start container failed: %v", err)
_, _, _, err = s.IOStream(contID, execID)
assert.Nil(t, err, "Winsize process failed: %v", err)
}