diff --git a/virtcontainers/clh.go b/virtcontainers/clh.go index b4ca05155f..3dd1da84ca 100644 --- a/virtcontainers/clh.go +++ b/virtcontainers/clh.go @@ -6,7 +6,6 @@ package virtcontainers import ( - "bufio" "bytes" "context" "encoding/json" @@ -52,7 +51,6 @@ const ( const ( // Values are mandatory by http API // Values based on: - // github.com/cloud-hypervisor/cloud-hypervisor/blob/v0.3.0/vmm/src/config.rs#L395 clhTimeout = 10 clhAPITimeout = 1 clhStopSandboxTimeout = 3 @@ -113,6 +111,7 @@ type cloudHypervisor struct { version CloudHypervisorVersion vmconfig chclient.VmConfig cmdOutput bytes.Buffer + virtiofsd Virtiofsd } var clhKernelParams = []Param{ @@ -182,10 +181,26 @@ func (clh *cloudHypervisor) createSandbox(ctx context.Context, id string, networ clh.Logger().WithField("function", "createSandbox").Info("creating Sandbox") + virtiofsdSocketPath, err := clh.virtioFsSocketPath(clh.id) + if err != nil { + return nil + + } + // No need to return an error from there since there might be nothing // to fetch if this is the first time the hypervisor is created. - if err := clh.store.Load(store.Hypervisor, &clh.state); err != nil { - clh.Logger().WithField("function", "createSandbox").WithError(err).Info("No info could be fetched") + err = clh.store.Load(store.Hypervisor, &clh.state) + if err != nil { + clh.Logger().WithField("function", "createSandbox").WithError(err).Info("Sandbox not found creating ") + } else { + clh.Logger().WithField("function", "createSandbox").Info("Sandbox already exist, loading from state") + clh.virtiofsd = &virtiofsd{ + PID: clh.state.VirtiofsdPID, + sourcePath: filepath.Join(kataHostSharedDir(), clh.id), + debug: clh.config.Debug, + socketPath: virtiofsdSocketPath, + } + return nil } // Set initial memomory size of the virtual machine @@ -269,6 +284,15 @@ func (clh *cloudHypervisor) createSandbox(ctx context.Context, id string, networ } clh.state.apiSocket = apiSocketPath + clh.virtiofsd = &virtiofsd{ + path: clh.config.VirtioFSDaemon, + sourcePath: filepath.Join(kataHostSharedDir(), clh.id), + socketPath: virtiofsdSocketPath, + extraArgs: clh.config.VirtioFSExtraArgs, + debug: clh.config.Debug, + cache: clh.config.VirtioFSCache, + } + return nil } @@ -288,12 +312,17 @@ func (clh *cloudHypervisor) startSandbox(timeout int) error { return err } + if clh.virtiofsd == nil { + return errors.New("Missing virtiofsd configuration") + } + if clh.config.SharedFS == config.VirtioFS { clh.Logger().WithField("function", "startSandbox").Info("Starting virtiofsd") - _, err = clh.setupVirtiofsd(timeout) + pid, err := clh.virtiofsd.Start(ctx) if err != nil { return err } + clh.state.VirtiofsdPID = pid if err = clh.storeState(); err != nil { return err } @@ -310,7 +339,7 @@ func (clh *cloudHypervisor) startSandbox(timeout int) error { if err := clh.waitVMM(clhTimeout); err != nil { clh.Logger().WithField("error", err).WithField("output", clh.cmdOutput.String()).Warn("cloud-hypervisor init failed") - if shutdownErr := clh.shutdownVirtiofsd(); shutdownErr != nil { + if shutdownErr := clh.virtiofsd.Stop(); shutdownErr != nil { clh.Logger().WithField("error", shutdownErr).Warn("error shutting down Virtiofsd") } return err @@ -501,64 +530,36 @@ func (clh *cloudHypervisor) terminate() (err error) { span, _ := clh.trace("terminate") defer span.Finish() - defer func() { - if err != nil { - clh.Logger().Info("Terminate Cloud Hypervisor failed") - } else { - clh.Logger().Info("Cloud Hypervisor stopped") - clh.reset() - clh.Logger().Debug("removing virtiofsd and vm sockets") - path, err := clh.virtioFsSocketPath(clh.id) - if err == nil { - rerr := os.Remove(path) - if rerr != nil { - clh.Logger().WithField("path", path).Warn("removing virtiofsd socket failed") - } - } - path, err = clh.vsockSocketPath(clh.id) - if err == nil { - rerr := os.Remove(path) - if rerr != nil { - clh.Logger().WithField("path", path).Warn("removing vm socket failed") - } - } - } - - _ = clh.cleanupVM(true) - }() - pid := clh.state.PID + pidRunning := true if pid == 0 { - clh.Logger().WithField("PID", pid).Info("Skipping kill cloud hypervisor. invalid pid") - return nil + pidRunning = false } + clh.Logger().WithField("PID", pid).Info("Stopping Cloud Hypervisor") - clhRunning, err := clh.isClhRunning(clhStopSandboxTimeout) - - if err != nil { - return err - } - - if !clhRunning { - return nil - } - - ctx, cancel := context.WithTimeout(context.Background(), clhStopSandboxTimeout*time.Second) - defer cancel() - - if _, err = clh.client().ShutdownVMM(ctx); err != nil { - return err + if pidRunning { + clhRunning, _ := clh.isClhRunning(clhStopSandboxTimeout) + if clhRunning { + ctx, cancel := context.WithTimeout(context.Background(), clhStopSandboxTimeout*time.Second) + defer cancel() + if _, err = clh.client().ShutdownVMM(ctx); err != nil { + return err + } + } } + // At this point the VMM was stop nicely, but need to check if PID is still running // Wait for the VM process to terminate tInit := time.Now() for { if err = syscall.Kill(pid, syscall.Signal(0)); err != nil { - return nil + pidRunning = false + break } if time.Since(tInit).Seconds() >= clhStopSandboxTimeout { + pidRunning = true clh.Logger().Warnf("VM still running after waiting %ds", clhStopSandboxTimeout) break } @@ -569,7 +570,21 @@ func (clh *cloudHypervisor) terminate() (err error) { // Let's try with a hammer now, a SIGKILL should get rid of the // VM process. - return syscall.Kill(pid, syscall.SIGKILL) + if pidRunning { + if err = syscall.Kill(pid, syscall.SIGKILL); err != nil { + return fmt.Errorf("Fatal, failed to kill hypervisor process, error: %s", err) + } + } + + if clh.virtiofsd == nil { + return errors.New("virtiofsd config is nil, failed to stop it") + } + + if err := clh.cleanupVM(true); err != nil { + return err + } + + return clh.virtiofsd.Stop() } func (clh *cloudHypervisor) reset() { @@ -598,133 +613,6 @@ func (clh *cloudHypervisor) generateSocket(id string, useVsock bool) (interface{ }, nil } -func (clh *cloudHypervisor) setupVirtiofsd(timeout int) (remain int, err error) { - - if clh.config.VirtioFSDaemon == "" { - return timeout, errors.New("Virtiofsd path is empty") - } - - sockPath, perr := clh.virtioFsSocketPath(clh.id) - if perr != nil { - return 0, perr - } - - theArgs, err := clh.virtiofsdArgs(sockPath) - if err != nil { - return 0, err - } - - clh.Logger().WithField("path", clh.config.VirtioFSDaemon).Info() - clh.Logger().WithField("args", strings.Join(theArgs, " ")).Info() - - cmd := exec.Command(clh.config.VirtioFSDaemon, theArgs...) - stderr, err := cmd.StderrPipe() - if err != nil { - return 0, err - } - - if err = cmd.Start(); err != nil { - return 0, err - } - defer func() { - if err != nil { - clh.state.VirtiofsdPID = 0 - cmd.Process.Kill() - } else { - clh.state.VirtiofsdPID = cmd.Process.Pid - - } - clh.storeState() - }() - - // Wait for socket to become available - sockReady := make(chan error, 1) - timeStart := time.Now() - go func() { - scanner := bufio.NewScanner(stderr) - var sent bool - for scanner.Scan() { - if clh.config.Debug { - clh.Logger().WithField("source", "virtiofsd").Debug(scanner.Text()) - } - if !sent && strings.Contains(scanner.Text(), "Waiting for vhost-user socket connection...") { - sockReady <- nil - sent = true - } - } - if !sent { - if err := scanner.Err(); err != nil { - sockReady <- err - } else { - sockReady <- fmt.Errorf("virtiofsd did not announce socket connection") - } - } - clh.Logger().Info("virtiofsd quits") - // Wait to release resources of virtiofsd process - cmd.Process.Wait() - - }() - - return clh.waitVirtiofsd(timeStart, timeout, sockReady, - fmt.Sprintf("virtiofsd (pid=%d) socket %s", cmd.Process.Pid, sockPath)) -} - -func (clh *cloudHypervisor) waitVirtiofsd(start time.Time, timeout int, ready chan error, errMsg string) (int, error) { - var err error - - timeoutDuration := time.Duration(timeout) * time.Second - select { - case err = <-ready: - case <-time.After(timeoutDuration): - err = fmt.Errorf("timed out waiting for %s", errMsg) - } - if err != nil { - return 0, err - } - - // Now reduce timeout by the elapsed time - elapsed := time.Since(start) - if elapsed < timeoutDuration { - timeout = timeout - int(elapsed.Seconds()) - } else { - timeout = 0 - } - return timeout, nil -} - -func (clh *cloudHypervisor) virtiofsdArgs(sockPath string) ([]string, error) { - - sourcePath := filepath.Join(kataHostSharedDir(), clh.id) - if _, err := os.Stat(sourcePath); os.IsNotExist(err) { - if err = os.MkdirAll(sourcePath, os.ModePerm); err != nil { - return nil, err - } - } - - args := []string{ - "-f", - "-o", "vhost_user_socket=" + sockPath, - "-o", "source=" + sourcePath, - "-o", "cache=" + clh.config.VirtioFSCache} - - if len(clh.config.VirtioFSExtraArgs) != 0 { - args = append(args, clh.config.VirtioFSExtraArgs...) - } - return args, nil -} - -func (clh *cloudHypervisor) shutdownVirtiofsd() (err error) { - - err = syscall.Kill(clh.state.VirtiofsdPID, syscall.SIGKILL) - - if err != nil { - clh.state.VirtiofsdPID = 0 - clh.storeState() - } - return err - -} - func (clh *cloudHypervisor) virtioFsSocketPath(id string) (string, error) { return utils.BuildSocketPath(store.RunVMStoragePath(), id, virtioFsSocket) } @@ -868,7 +756,7 @@ func (clh *cloudHypervisor) LaunchClh() (string, int, error) { cmd.Env = append(cmd.Env, "RUST_BACKTRACE=full") } - if err := cmd.Start(); err != nil { + if err := utils.StartCmd(cmd); err != nil { fmt.Println("Error starting cloudHypervisor", err) if cmd.Process != nil { cmd.Process.Kill() @@ -1120,6 +1008,15 @@ func (clh *cloudHypervisor) cleanupVM(force bool) error { return errors.New("Hypervisor ID is empty") } + clh.Logger().Debug("removing vm sockets") + + path, err := clh.vsockSocketPath(clh.id) + if err == nil { + if err := os.Remove(path); err != nil { + clh.Logger().WithField("path", path).Warn("removing vm socket failed") + } + } + // cleanup vm path dir := filepath.Join(store.RunVMStoragePath(), clh.id) @@ -1166,5 +1063,7 @@ func (clh *cloudHypervisor) cleanupVM(force bool) error { } } + clh.reset() + return nil } diff --git a/virtcontainers/clh_test.go b/virtcontainers/clh_test.go index 8bd9870dad..0d336ae005 100644 --- a/virtcontainers/clh_test.go +++ b/virtcontainers/clh_test.go @@ -233,3 +233,35 @@ func TestClhCreateSandbox(t *testing.T) { assert.NoError(os.RemoveAll(parentDir)) assert.Exactly(clhConfig, clh.config) } + +func TestClooudHypervisorStartSandbox(t *testing.T) { + assert := assert.New(t) + clhConfig, err := newClhConfig() + assert.NoError(err) + + clh := &cloudHypervisor{ + config: clhConfig, + APIClient: &clhClientMock{}, + virtiofsd: &virtiofsdMock{}, + } + + sandbox := &Sandbox{ + ctx: context.Background(), + id: "testSandbox", + config: &SandboxConfig{ + HypervisorConfig: clhConfig, + }, + } + + vcStore, err := store.NewVCSandboxStore(sandbox.ctx, sandbox.id) + assert.NoError(err) + + sandbox.store = vcStore + + // Create parent dir path for hypervisor.json + parentDir := store.SandboxConfigurationRootPath(sandbox.id) + assert.NoError(os.MkdirAll(parentDir, store.DirMode)) + + err = clh.startSandbox(10) + assert.NoError(err) +} diff --git a/virtcontainers/utils/utils.go b/virtcontainers/utils/utils.go index 29c4a17321..02bb50b2e9 100644 --- a/virtcontainers/utils/utils.go +++ b/virtcontainers/utils/utils.go @@ -252,3 +252,9 @@ func ValidCgroupPath(path string) string { // clean up path and return a new path relative to defaultCgroupPath return filepath.Join(DefaultCgroupPath, filepath.Clean("/"+path)) } + +// StartCmd pointer to a function to start a command. +// Defined this way to allow mock testing. +var StartCmd = func(c *exec.Cmd) error { + return c.Start() +} diff --git a/virtcontainers/virtcontainers_test.go b/virtcontainers/virtcontainers_test.go index 289516104b..516face355 100644 --- a/virtcontainers/virtcontainers_test.go +++ b/virtcontainers/virtcontainers_test.go @@ -11,11 +11,13 @@ import ( "fmt" "io/ioutil" "os" + "os/exec" "path/filepath" "testing" "github.com/kata-containers/runtime/virtcontainers/persist/fs" "github.com/kata-containers/runtime/virtcontainers/store" + "github.com/kata-containers/runtime/virtcontainers/utils" "github.com/sirupsen/logrus" ) @@ -130,6 +132,14 @@ func TestMain(m *testing.M) { os.Exit(1) } + utils.StartCmd = func(c *exec.Cmd) error { + //startSandbox will check if the hypervisor is alive and + // checks for the PID is running, lets fake it using our + // own PID + c.Process = &os.Process{Pid: os.Getpid()} + return nil + } + testQemuKernelPath = filepath.Join(testDir, testKernel) testQemuInitrdPath = filepath.Join(testDir, testInitrd) testQemuImagePath = filepath.Join(testDir, testImage) diff --git a/virtcontainers/virtiofsd.go b/virtcontainers/virtiofsd.go new file mode 100644 index 0000000000..c004a51674 --- /dev/null +++ b/virtcontainers/virtiofsd.go @@ -0,0 +1,239 @@ +// Copyright (c) 2019 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +package virtcontainers + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "os/exec" + "strings" + "syscall" + "time" + + "github.com/kata-containers/runtime/virtcontainers/utils" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +const ( + //Timeout to wait in secounds + virtiofsdStartTimeout = 5 +) + +type Virtiofsd interface { + // Start virtiofsd, return pid of virtiofsd process + Start(context.Context) (pid int, err error) + // Stop virtiofsd process + Stop() error +} + +// Helper function to check virtiofsd is serving +type virtiofsdWaitFunc func(runningCmd *exec.Cmd, stderr io.ReadCloser, debug bool) error + +type virtiofsd struct { + // path to virtiofsd daemon + path string + // socketPath where daemon will serve + socketPath string + // cache size for virtiofsd + cache string + // extraArgs list of extra args to append to virtiofsd command + extraArgs []string + // sourcePath path that daemon will help to share + sourcePath string + // debug flag + debug bool + // PID process ID of virtiosd process + PID int + // Neded by tracing + ctx context.Context + // wait helper function to check if virtiofsd is serving + wait virtiofsdWaitFunc +} + +// Start the virtiofsd daemon +func (v *virtiofsd) Start(ctx context.Context) (int, error) { + span, _ := v.trace("Start") + defer span.Finish() + pid := 0 + + if err := v.valid(); err != nil { + return pid, err + } + + args, err := v.args() + if err != nil { + return pid, err + } + + v.Logger().WithField("path", v.path).Info() + v.Logger().WithField("args", strings.Join(args, " ")).Info() + + cmd := exec.Command(v.path, args...) + stderr, err := cmd.StderrPipe() + if err != nil { + return pid, fmt.Errorf("failed to get stderr from virtiofsd command, error: %s", err) + } + + if err = utils.StartCmd(cmd); err != nil { + return pid, err + } + + defer func() { + if err != nil { + cmd.Process.Kill() + } + }() + + if v.wait == nil { + v.wait = waitVirtiofsReady + } + + return cmd.Process.Pid, v.wait(cmd, stderr, v.debug) +} + +func (v *virtiofsd) Stop() error { + if err := v.kill(); err != nil { + return nil + } + + if v.socketPath == "" { + return errors.New("vitiofsd socket path is empty") + } + + err := os.Remove(v.socketPath) + if err != nil { + v.Logger().WithError(err).WithField("path", v.socketPath).Warn("removing virtiofsd socket failed") + } + return nil +} + +func (v *virtiofsd) args() ([]string, error) { + if v.sourcePath == "" { + return []string{}, errors.New("vitiofsd source path is empty") + } + + if _, err := os.Stat(v.sourcePath); os.IsNotExist(err) { + return nil, err + } + + args := []string{ + "-f", + "-o", "vhost_user_socket=" + v.socketPath, + "-o", "source=" + v.sourcePath, + "-o", "cache=" + v.cache} + + if len(v.extraArgs) != 0 { + args = append(args, v.extraArgs...) + } + + return args, nil +} + +func (v *virtiofsd) valid() error { + if v.path == "" { + errors.New("virtiofsd path is empty") + } + + if v.socketPath == "" { + errors.New("Virtiofsd socket path is empty") + } + + if v.sourcePath == "" { + errors.New("virtiofsd source path is empty") + + } + + return nil +} + +func (v *virtiofsd) Logger() *log.Entry { + return virtLog.WithField("subsystem", "virtiofsd") +} + +func (v *virtiofsd) trace(name string) (opentracing.Span, context.Context) { + if v.ctx == nil { + v.ctx = context.Background() + } + + span, ctx := opentracing.StartSpanFromContext(v.ctx, name) + + span.SetTag("subsystem", "virtiofds") + + return span, ctx +} + +func waitVirtiofsReady(cmd *exec.Cmd, stderr io.ReadCloser, debug bool) error { + if cmd == nil { + return errors.New("cmd is nil") + } + + sockReady := make(chan error, 1) + go func() { + scanner := bufio.NewScanner(stderr) + var sent bool + for scanner.Scan() { + if debug { + virtLog.WithField("source", "virtiofsd").Debug(scanner.Text()) + } + if !sent && strings.Contains(scanner.Text(), "Waiting for vhost-user socket connection...") { + sockReady <- nil + sent = true + } + + } + if !sent { + if err := scanner.Err(); err != nil { + sockReady <- err + + } else { + sockReady <- fmt.Errorf("virtiofsd did not announce socket connection") + + } + + } + // Wait to release resources of virtiofsd process + cmd.Process.Wait() + }() + + var err error + select { + case err = <-sockReady: + case <-time.After(virtiofsdStartTimeout * time.Second): + err = fmt.Errorf("timed out waiting for vitiofsd ready mesage pid=%d", cmd.Process.Pid) + } + + return err +} + +func (v *virtiofsd) kill() (err error) { + span, _ := v.trace("kill") + defer span.Finish() + + err = syscall.Kill(v.PID, syscall.SIGKILL) + if err != nil { + v.PID = 0 + } + + return err +} + +// virtiofsdMock mock implementation for unit test +type virtiofsdMock struct { +} + +// Start the virtiofsd daemon +func (v *virtiofsdMock) Start(ctx context.Context) (int, error) { + return 9999999, nil +} + +func (v *virtiofsdMock) Stop() error { + return nil +} diff --git a/virtcontainers/virtiofsd_test.go b/virtcontainers/virtiofsd_test.go new file mode 100644 index 0000000000..ebac2644b4 --- /dev/null +++ b/virtcontainers/virtiofsd_test.go @@ -0,0 +1,74 @@ +// Copyright (c) 2019 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +package virtcontainers + +import ( + "context" + "io" + "io/ioutil" + "os" + "os/exec" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestVirtiofsdStart(t *testing.T) { + assert := assert.New(t) + type fields struct { + path string + socketPath string + cache string + extraArgs []string + sourcePath string + debug bool + PID int + ctx context.Context + } + + sourcePath, err := ioutil.TempDir("", "") + assert.NoError(err) + defer os.RemoveAll(sourcePath) + + validConfig := fields{ + path: "/tmp/a/path", + socketPath: "/tmp/a/path/to/sock.sock", + sourcePath: sourcePath, + } + + tests := []struct { + name string + fields fields + wantErr bool + }{ + {"empty config", fields{}, true}, + {"valid config", validConfig, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + v := &virtiofsd{ + path: tt.fields.path, + socketPath: tt.fields.socketPath, + cache: tt.fields.cache, + extraArgs: tt.fields.extraArgs, + sourcePath: tt.fields.sourcePath, + debug: tt.fields.debug, + PID: tt.fields.PID, + ctx: tt.fields.ctx, + //Mock wait function + wait: func(runningCmd *exec.Cmd, stderr io.ReadCloser, debug bool) error { + return nil + }, + } + var ctx context.Context + _, err := v.Start(ctx) + if (err != nil) != tt.wantErr { + t.Errorf("virtiofsd.Start() error = %v, wantErr %v", err, tt.wantErr) + return + } + }) + } +}