vitiofsd: Add virtiofsd interaface

In oderder to make unit testing simpler,
lets add an interface that could be mocked.

Let hypervisor have a instance of virtiofsd interface,
and this makes a loose dependency to allow mock testing.

With the inteface is possible to add startSandbox unit test:

- use utils.StartCmd to mock call to start hypervisor process.

- Add unit test for startSandbox.

Fixes: #2367

Signed-off-by: Jose Carlos Venegas Munoz <jose.carlos.venegas.munoz@intel.com>
This commit is contained in:
Jose Carlos Venegas Munoz 2019-12-19 01:02:44 +00:00
parent 2a085ee67b
commit a2d3f9f32d
6 changed files with 439 additions and 179 deletions

View File

@ -6,7 +6,6 @@
package virtcontainers package virtcontainers
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
@ -52,7 +51,6 @@ const (
const ( const (
// Values are mandatory by http API // Values are mandatory by http API
// Values based on: // Values based on:
// github.com/cloud-hypervisor/cloud-hypervisor/blob/v0.3.0/vmm/src/config.rs#L395
clhTimeout = 10 clhTimeout = 10
clhAPITimeout = 1 clhAPITimeout = 1
clhStopSandboxTimeout = 3 clhStopSandboxTimeout = 3
@ -113,6 +111,7 @@ type cloudHypervisor struct {
version CloudHypervisorVersion version CloudHypervisorVersion
vmconfig chclient.VmConfig vmconfig chclient.VmConfig
cmdOutput bytes.Buffer cmdOutput bytes.Buffer
virtiofsd Virtiofsd
} }
var clhKernelParams = []Param{ var clhKernelParams = []Param{
@ -182,10 +181,26 @@ func (clh *cloudHypervisor) createSandbox(ctx context.Context, id string, networ
clh.Logger().WithField("function", "createSandbox").Info("creating Sandbox") clh.Logger().WithField("function", "createSandbox").Info("creating Sandbox")
virtiofsdSocketPath, err := clh.virtioFsSocketPath(clh.id)
if err != nil {
return nil
}
// No need to return an error from there since there might be nothing // No need to return an error from there since there might be nothing
// to fetch if this is the first time the hypervisor is created. // to fetch if this is the first time the hypervisor is created.
if err := clh.store.Load(store.Hypervisor, &clh.state); err != nil { err = clh.store.Load(store.Hypervisor, &clh.state)
clh.Logger().WithField("function", "createSandbox").WithError(err).Info("No info could be fetched") if err != nil {
clh.Logger().WithField("function", "createSandbox").WithError(err).Info("Sandbox not found creating ")
} else {
clh.Logger().WithField("function", "createSandbox").Info("Sandbox already exist, loading from state")
clh.virtiofsd = &virtiofsd{
PID: clh.state.VirtiofsdPID,
sourcePath: filepath.Join(kataHostSharedDir(), clh.id),
debug: clh.config.Debug,
socketPath: virtiofsdSocketPath,
}
return nil
} }
// Set initial memomory size of the virtual machine // Set initial memomory size of the virtual machine
@ -269,6 +284,15 @@ func (clh *cloudHypervisor) createSandbox(ctx context.Context, id string, networ
} }
clh.state.apiSocket = apiSocketPath clh.state.apiSocket = apiSocketPath
clh.virtiofsd = &virtiofsd{
path: clh.config.VirtioFSDaemon,
sourcePath: filepath.Join(kataHostSharedDir(), clh.id),
socketPath: virtiofsdSocketPath,
extraArgs: clh.config.VirtioFSExtraArgs,
debug: clh.config.Debug,
cache: clh.config.VirtioFSCache,
}
return nil return nil
} }
@ -288,12 +312,17 @@ func (clh *cloudHypervisor) startSandbox(timeout int) error {
return err return err
} }
if clh.virtiofsd == nil {
return errors.New("Missing virtiofsd configuration")
}
if clh.config.SharedFS == config.VirtioFS { if clh.config.SharedFS == config.VirtioFS {
clh.Logger().WithField("function", "startSandbox").Info("Starting virtiofsd") clh.Logger().WithField("function", "startSandbox").Info("Starting virtiofsd")
_, err = clh.setupVirtiofsd(timeout) pid, err := clh.virtiofsd.Start(ctx)
if err != nil { if err != nil {
return err return err
} }
clh.state.VirtiofsdPID = pid
if err = clh.storeState(); err != nil { if err = clh.storeState(); err != nil {
return err return err
} }
@ -310,7 +339,7 @@ func (clh *cloudHypervisor) startSandbox(timeout int) error {
if err := clh.waitVMM(clhTimeout); err != nil { if err := clh.waitVMM(clhTimeout); err != nil {
clh.Logger().WithField("error", err).WithField("output", clh.cmdOutput.String()).Warn("cloud-hypervisor init failed") clh.Logger().WithField("error", err).WithField("output", clh.cmdOutput.String()).Warn("cloud-hypervisor init failed")
if shutdownErr := clh.shutdownVirtiofsd(); shutdownErr != nil { if shutdownErr := clh.virtiofsd.Stop(); shutdownErr != nil {
clh.Logger().WithField("error", shutdownErr).Warn("error shutting down Virtiofsd") clh.Logger().WithField("error", shutdownErr).Warn("error shutting down Virtiofsd")
} }
return err return err
@ -501,64 +530,36 @@ func (clh *cloudHypervisor) terminate() (err error) {
span, _ := clh.trace("terminate") span, _ := clh.trace("terminate")
defer span.Finish() defer span.Finish()
defer func() {
if err != nil {
clh.Logger().Info("Terminate Cloud Hypervisor failed")
} else {
clh.Logger().Info("Cloud Hypervisor stopped")
clh.reset()
clh.Logger().Debug("removing virtiofsd and vm sockets")
path, err := clh.virtioFsSocketPath(clh.id)
if err == nil {
rerr := os.Remove(path)
if rerr != nil {
clh.Logger().WithField("path", path).Warn("removing virtiofsd socket failed")
}
}
path, err = clh.vsockSocketPath(clh.id)
if err == nil {
rerr := os.Remove(path)
if rerr != nil {
clh.Logger().WithField("path", path).Warn("removing vm socket failed")
}
}
}
_ = clh.cleanupVM(true)
}()
pid := clh.state.PID pid := clh.state.PID
pidRunning := true
if pid == 0 { if pid == 0 {
clh.Logger().WithField("PID", pid).Info("Skipping kill cloud hypervisor. invalid pid") pidRunning = false
return nil
} }
clh.Logger().WithField("PID", pid).Info("Stopping Cloud Hypervisor") clh.Logger().WithField("PID", pid).Info("Stopping Cloud Hypervisor")
clhRunning, err := clh.isClhRunning(clhStopSandboxTimeout) if pidRunning {
clhRunning, _ := clh.isClhRunning(clhStopSandboxTimeout)
if err != nil { if clhRunning {
return err ctx, cancel := context.WithTimeout(context.Background(), clhStopSandboxTimeout*time.Second)
} defer cancel()
if _, err = clh.client().ShutdownVMM(ctx); err != nil {
if !clhRunning { return err
return nil }
} }
ctx, cancel := context.WithTimeout(context.Background(), clhStopSandboxTimeout*time.Second)
defer cancel()
if _, err = clh.client().ShutdownVMM(ctx); err != nil {
return err
} }
// At this point the VMM was stop nicely, but need to check if PID is still running
// Wait for the VM process to terminate // Wait for the VM process to terminate
tInit := time.Now() tInit := time.Now()
for { for {
if err = syscall.Kill(pid, syscall.Signal(0)); err != nil { if err = syscall.Kill(pid, syscall.Signal(0)); err != nil {
return nil pidRunning = false
break
} }
if time.Since(tInit).Seconds() >= clhStopSandboxTimeout { if time.Since(tInit).Seconds() >= clhStopSandboxTimeout {
pidRunning = true
clh.Logger().Warnf("VM still running after waiting %ds", clhStopSandboxTimeout) clh.Logger().Warnf("VM still running after waiting %ds", clhStopSandboxTimeout)
break break
} }
@ -569,7 +570,21 @@ func (clh *cloudHypervisor) terminate() (err error) {
// Let's try with a hammer now, a SIGKILL should get rid of the // Let's try with a hammer now, a SIGKILL should get rid of the
// VM process. // VM process.
return syscall.Kill(pid, syscall.SIGKILL) if pidRunning {
if err = syscall.Kill(pid, syscall.SIGKILL); err != nil {
return fmt.Errorf("Fatal, failed to kill hypervisor process, error: %s", err)
}
}
if clh.virtiofsd == nil {
return errors.New("virtiofsd config is nil, failed to stop it")
}
if err := clh.cleanupVM(true); err != nil {
return err
}
return clh.virtiofsd.Stop()
} }
func (clh *cloudHypervisor) reset() { func (clh *cloudHypervisor) reset() {
@ -598,133 +613,6 @@ func (clh *cloudHypervisor) generateSocket(id string, useVsock bool) (interface{
}, nil }, nil
} }
func (clh *cloudHypervisor) setupVirtiofsd(timeout int) (remain int, err error) {
if clh.config.VirtioFSDaemon == "" {
return timeout, errors.New("Virtiofsd path is empty")
}
sockPath, perr := clh.virtioFsSocketPath(clh.id)
if perr != nil {
return 0, perr
}
theArgs, err := clh.virtiofsdArgs(sockPath)
if err != nil {
return 0, err
}
clh.Logger().WithField("path", clh.config.VirtioFSDaemon).Info()
clh.Logger().WithField("args", strings.Join(theArgs, " ")).Info()
cmd := exec.Command(clh.config.VirtioFSDaemon, theArgs...)
stderr, err := cmd.StderrPipe()
if err != nil {
return 0, err
}
if err = cmd.Start(); err != nil {
return 0, err
}
defer func() {
if err != nil {
clh.state.VirtiofsdPID = 0
cmd.Process.Kill()
} else {
clh.state.VirtiofsdPID = cmd.Process.Pid
}
clh.storeState()
}()
// Wait for socket to become available
sockReady := make(chan error, 1)
timeStart := time.Now()
go func() {
scanner := bufio.NewScanner(stderr)
var sent bool
for scanner.Scan() {
if clh.config.Debug {
clh.Logger().WithField("source", "virtiofsd").Debug(scanner.Text())
}
if !sent && strings.Contains(scanner.Text(), "Waiting for vhost-user socket connection...") {
sockReady <- nil
sent = true
}
}
if !sent {
if err := scanner.Err(); err != nil {
sockReady <- err
} else {
sockReady <- fmt.Errorf("virtiofsd did not announce socket connection")
}
}
clh.Logger().Info("virtiofsd quits")
// Wait to release resources of virtiofsd process
cmd.Process.Wait()
}()
return clh.waitVirtiofsd(timeStart, timeout, sockReady,
fmt.Sprintf("virtiofsd (pid=%d) socket %s", cmd.Process.Pid, sockPath))
}
func (clh *cloudHypervisor) waitVirtiofsd(start time.Time, timeout int, ready chan error, errMsg string) (int, error) {
var err error
timeoutDuration := time.Duration(timeout) * time.Second
select {
case err = <-ready:
case <-time.After(timeoutDuration):
err = fmt.Errorf("timed out waiting for %s", errMsg)
}
if err != nil {
return 0, err
}
// Now reduce timeout by the elapsed time
elapsed := time.Since(start)
if elapsed < timeoutDuration {
timeout = timeout - int(elapsed.Seconds())
} else {
timeout = 0
}
return timeout, nil
}
func (clh *cloudHypervisor) virtiofsdArgs(sockPath string) ([]string, error) {
sourcePath := filepath.Join(kataHostSharedDir(), clh.id)
if _, err := os.Stat(sourcePath); os.IsNotExist(err) {
if err = os.MkdirAll(sourcePath, os.ModePerm); err != nil {
return nil, err
}
}
args := []string{
"-f",
"-o", "vhost_user_socket=" + sockPath,
"-o", "source=" + sourcePath,
"-o", "cache=" + clh.config.VirtioFSCache}
if len(clh.config.VirtioFSExtraArgs) != 0 {
args = append(args, clh.config.VirtioFSExtraArgs...)
}
return args, nil
}
func (clh *cloudHypervisor) shutdownVirtiofsd() (err error) {
err = syscall.Kill(clh.state.VirtiofsdPID, syscall.SIGKILL)
if err != nil {
clh.state.VirtiofsdPID = 0
clh.storeState()
}
return err
}
func (clh *cloudHypervisor) virtioFsSocketPath(id string) (string, error) { func (clh *cloudHypervisor) virtioFsSocketPath(id string) (string, error) {
return utils.BuildSocketPath(store.RunVMStoragePath(), id, virtioFsSocket) return utils.BuildSocketPath(store.RunVMStoragePath(), id, virtioFsSocket)
} }
@ -868,7 +756,7 @@ func (clh *cloudHypervisor) LaunchClh() (string, int, error) {
cmd.Env = append(cmd.Env, "RUST_BACKTRACE=full") cmd.Env = append(cmd.Env, "RUST_BACKTRACE=full")
} }
if err := cmd.Start(); err != nil { if err := utils.StartCmd(cmd); err != nil {
fmt.Println("Error starting cloudHypervisor", err) fmt.Println("Error starting cloudHypervisor", err)
if cmd.Process != nil { if cmd.Process != nil {
cmd.Process.Kill() cmd.Process.Kill()
@ -1120,6 +1008,15 @@ func (clh *cloudHypervisor) cleanupVM(force bool) error {
return errors.New("Hypervisor ID is empty") return errors.New("Hypervisor ID is empty")
} }
clh.Logger().Debug("removing vm sockets")
path, err := clh.vsockSocketPath(clh.id)
if err == nil {
if err := os.Remove(path); err != nil {
clh.Logger().WithField("path", path).Warn("removing vm socket failed")
}
}
// cleanup vm path // cleanup vm path
dir := filepath.Join(store.RunVMStoragePath(), clh.id) dir := filepath.Join(store.RunVMStoragePath(), clh.id)
@ -1166,5 +1063,7 @@ func (clh *cloudHypervisor) cleanupVM(force bool) error {
} }
} }
clh.reset()
return nil return nil
} }

View File

@ -233,3 +233,35 @@ func TestClhCreateSandbox(t *testing.T) {
assert.NoError(os.RemoveAll(parentDir)) assert.NoError(os.RemoveAll(parentDir))
assert.Exactly(clhConfig, clh.config) assert.Exactly(clhConfig, clh.config)
} }
func TestClooudHypervisorStartSandbox(t *testing.T) {
assert := assert.New(t)
clhConfig, err := newClhConfig()
assert.NoError(err)
clh := &cloudHypervisor{
config: clhConfig,
APIClient: &clhClientMock{},
virtiofsd: &virtiofsdMock{},
}
sandbox := &Sandbox{
ctx: context.Background(),
id: "testSandbox",
config: &SandboxConfig{
HypervisorConfig: clhConfig,
},
}
vcStore, err := store.NewVCSandboxStore(sandbox.ctx, sandbox.id)
assert.NoError(err)
sandbox.store = vcStore
// Create parent dir path for hypervisor.json
parentDir := store.SandboxConfigurationRootPath(sandbox.id)
assert.NoError(os.MkdirAll(parentDir, store.DirMode))
err = clh.startSandbox(10)
assert.NoError(err)
}

View File

@ -252,3 +252,9 @@ func ValidCgroupPath(path string) string {
// clean up path and return a new path relative to defaultCgroupPath // clean up path and return a new path relative to defaultCgroupPath
return filepath.Join(DefaultCgroupPath, filepath.Clean("/"+path)) return filepath.Join(DefaultCgroupPath, filepath.Clean("/"+path))
} }
// StartCmd pointer to a function to start a command.
// Defined this way to allow mock testing.
var StartCmd = func(c *exec.Cmd) error {
return c.Start()
}

View File

@ -11,11 +11,13 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/kata-containers/runtime/virtcontainers/persist/fs" "github.com/kata-containers/runtime/virtcontainers/persist/fs"
"github.com/kata-containers/runtime/virtcontainers/store" "github.com/kata-containers/runtime/virtcontainers/store"
"github.com/kata-containers/runtime/virtcontainers/utils"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -130,6 +132,14 @@ func TestMain(m *testing.M) {
os.Exit(1) os.Exit(1)
} }
utils.StartCmd = func(c *exec.Cmd) error {
//startSandbox will check if the hypervisor is alive and
// checks for the PID is running, lets fake it using our
// own PID
c.Process = &os.Process{Pid: os.Getpid()}
return nil
}
testQemuKernelPath = filepath.Join(testDir, testKernel) testQemuKernelPath = filepath.Join(testDir, testKernel)
testQemuInitrdPath = filepath.Join(testDir, testInitrd) testQemuInitrdPath = filepath.Join(testDir, testInitrd)
testQemuImagePath = filepath.Join(testDir, testImage) testQemuImagePath = filepath.Join(testDir, testImage)

239
virtcontainers/virtiofsd.go Normal file
View File

@ -0,0 +1,239 @@
// Copyright (c) 2019 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"bufio"
"context"
"fmt"
"io"
"os"
"os/exec"
"strings"
"syscall"
"time"
"github.com/kata-containers/runtime/virtcontainers/utils"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
const (
//Timeout to wait in secounds
virtiofsdStartTimeout = 5
)
type Virtiofsd interface {
// Start virtiofsd, return pid of virtiofsd process
Start(context.Context) (pid int, err error)
// Stop virtiofsd process
Stop() error
}
// Helper function to check virtiofsd is serving
type virtiofsdWaitFunc func(runningCmd *exec.Cmd, stderr io.ReadCloser, debug bool) error
type virtiofsd struct {
// path to virtiofsd daemon
path string
// socketPath where daemon will serve
socketPath string
// cache size for virtiofsd
cache string
// extraArgs list of extra args to append to virtiofsd command
extraArgs []string
// sourcePath path that daemon will help to share
sourcePath string
// debug flag
debug bool
// PID process ID of virtiosd process
PID int
// Neded by tracing
ctx context.Context
// wait helper function to check if virtiofsd is serving
wait virtiofsdWaitFunc
}
// Start the virtiofsd daemon
func (v *virtiofsd) Start(ctx context.Context) (int, error) {
span, _ := v.trace("Start")
defer span.Finish()
pid := 0
if err := v.valid(); err != nil {
return pid, err
}
args, err := v.args()
if err != nil {
return pid, err
}
v.Logger().WithField("path", v.path).Info()
v.Logger().WithField("args", strings.Join(args, " ")).Info()
cmd := exec.Command(v.path, args...)
stderr, err := cmd.StderrPipe()
if err != nil {
return pid, fmt.Errorf("failed to get stderr from virtiofsd command, error: %s", err)
}
if err = utils.StartCmd(cmd); err != nil {
return pid, err
}
defer func() {
if err != nil {
cmd.Process.Kill()
}
}()
if v.wait == nil {
v.wait = waitVirtiofsReady
}
return cmd.Process.Pid, v.wait(cmd, stderr, v.debug)
}
func (v *virtiofsd) Stop() error {
if err := v.kill(); err != nil {
return nil
}
if v.socketPath == "" {
return errors.New("vitiofsd socket path is empty")
}
err := os.Remove(v.socketPath)
if err != nil {
v.Logger().WithError(err).WithField("path", v.socketPath).Warn("removing virtiofsd socket failed")
}
return nil
}
func (v *virtiofsd) args() ([]string, error) {
if v.sourcePath == "" {
return []string{}, errors.New("vitiofsd source path is empty")
}
if _, err := os.Stat(v.sourcePath); os.IsNotExist(err) {
return nil, err
}
args := []string{
"-f",
"-o", "vhost_user_socket=" + v.socketPath,
"-o", "source=" + v.sourcePath,
"-o", "cache=" + v.cache}
if len(v.extraArgs) != 0 {
args = append(args, v.extraArgs...)
}
return args, nil
}
func (v *virtiofsd) valid() error {
if v.path == "" {
errors.New("virtiofsd path is empty")
}
if v.socketPath == "" {
errors.New("Virtiofsd socket path is empty")
}
if v.sourcePath == "" {
errors.New("virtiofsd source path is empty")
}
return nil
}
func (v *virtiofsd) Logger() *log.Entry {
return virtLog.WithField("subsystem", "virtiofsd")
}
func (v *virtiofsd) trace(name string) (opentracing.Span, context.Context) {
if v.ctx == nil {
v.ctx = context.Background()
}
span, ctx := opentracing.StartSpanFromContext(v.ctx, name)
span.SetTag("subsystem", "virtiofds")
return span, ctx
}
func waitVirtiofsReady(cmd *exec.Cmd, stderr io.ReadCloser, debug bool) error {
if cmd == nil {
return errors.New("cmd is nil")
}
sockReady := make(chan error, 1)
go func() {
scanner := bufio.NewScanner(stderr)
var sent bool
for scanner.Scan() {
if debug {
virtLog.WithField("source", "virtiofsd").Debug(scanner.Text())
}
if !sent && strings.Contains(scanner.Text(), "Waiting for vhost-user socket connection...") {
sockReady <- nil
sent = true
}
}
if !sent {
if err := scanner.Err(); err != nil {
sockReady <- err
} else {
sockReady <- fmt.Errorf("virtiofsd did not announce socket connection")
}
}
// Wait to release resources of virtiofsd process
cmd.Process.Wait()
}()
var err error
select {
case err = <-sockReady:
case <-time.After(virtiofsdStartTimeout * time.Second):
err = fmt.Errorf("timed out waiting for vitiofsd ready mesage pid=%d", cmd.Process.Pid)
}
return err
}
func (v *virtiofsd) kill() (err error) {
span, _ := v.trace("kill")
defer span.Finish()
err = syscall.Kill(v.PID, syscall.SIGKILL)
if err != nil {
v.PID = 0
}
return err
}
// virtiofsdMock mock implementation for unit test
type virtiofsdMock struct {
}
// Start the virtiofsd daemon
func (v *virtiofsdMock) Start(ctx context.Context) (int, error) {
return 9999999, nil
}
func (v *virtiofsdMock) Stop() error {
return nil
}

View File

@ -0,0 +1,74 @@
// Copyright (c) 2019 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"context"
"io"
"io/ioutil"
"os"
"os/exec"
"testing"
"github.com/stretchr/testify/assert"
)
func TestVirtiofsdStart(t *testing.T) {
assert := assert.New(t)
type fields struct {
path string
socketPath string
cache string
extraArgs []string
sourcePath string
debug bool
PID int
ctx context.Context
}
sourcePath, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(sourcePath)
validConfig := fields{
path: "/tmp/a/path",
socketPath: "/tmp/a/path/to/sock.sock",
sourcePath: sourcePath,
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{"empty config", fields{}, true},
{"valid config", validConfig, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
v := &virtiofsd{
path: tt.fields.path,
socketPath: tt.fields.socketPath,
cache: tt.fields.cache,
extraArgs: tt.fields.extraArgs,
sourcePath: tt.fields.sourcePath,
debug: tt.fields.debug,
PID: tt.fields.PID,
ctx: tt.fields.ctx,
//Mock wait function
wait: func(runningCmd *exec.Cmd, stderr io.ReadCloser, debug bool) error {
return nil
},
}
var ctx context.Context
_, err := v.Start(ctx)
if (err != nil) != tt.wantErr {
t.Errorf("virtiofsd.Start() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}
}