Merge pull request #5736 from gkurz/no-qemu-daemonize

runtime: Start QEMU undaemonized and get logs
This commit is contained in:
Greg Kurz
2023-01-27 16:33:48 +01:00
committed by GitHub
6 changed files with 183 additions and 44 deletions

View File

@@ -27,13 +27,16 @@ func Example() {
// resources // resources
params = append(params, "-m", "370", "-smp", "cpus=2") params = append(params, "-m", "370", "-smp", "cpus=2")
// LaunchCustomQemu should return as soon as the instance has launched as we // LaunchCustomQemu should return immediately. We must then wait
// are using the --daemonize flag. It will set up a unix domain socket // the returned process to terminate as we are using the --daemonize
// called /tmp/qmp-socket that we can use to manage the instance. // flag.
_, err := qemu.LaunchCustomQemu(context.Background(), "", params, nil, nil, nil) // It will set up a unix domain socket called /tmp/qmp-socket that we
// can use to manage the instance.
proc, _, err := qemu.LaunchCustomQemu(context.Background(), "", params, nil, nil, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }
proc.Wait()
// This channel will be closed when the instance dies. // This channel will be closed when the instance dies.
disconnectedCh := make(chan struct{}) disconnectedCh := make(chan struct{})

View File

@@ -14,9 +14,9 @@
package qemu package qemu
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io"
"log" "log"
"os" "os"
"os/exec" "os/exec"
@@ -2340,6 +2340,9 @@ type QMPSocket struct {
// Type is the socket type (e.g. "unix"). // Type is the socket type (e.g. "unix").
Type QMPSocketType Type QMPSocketType
// QMP listener file descriptor to be passed to qemu
FD *os.File
// Name is the socket name. // Name is the socket name.
Name string Name string
@@ -2352,7 +2355,8 @@ type QMPSocket struct {
// Valid returns true if the QMPSocket structure is valid and complete. // Valid returns true if the QMPSocket structure is valid and complete.
func (qmp QMPSocket) Valid() bool { func (qmp QMPSocket) Valid() bool {
if qmp.Type == "" || qmp.Name == "" { // Exactly one of Name of FD must be set.
if qmp.Type == "" || (qmp.Name == "") == (qmp.FD == nil) {
return false return false
} }
@@ -2692,7 +2696,13 @@ func (config *Config) appendQMPSockets() {
continue continue
} }
qmpParams := append([]string{}, fmt.Sprintf("%s:%s", q.Type, q.Name)) var qmpParams []string
if q.FD != nil {
qemuFDs := config.appendFDs([]*os.File{q.FD})
qmpParams = append([]string{}, fmt.Sprintf("%s:fd=%d", q.Type, qemuFDs[0]))
} else {
qmpParams = append([]string{}, fmt.Sprintf("%s:path=%s", q.Type, q.Name))
}
if q.Server { if q.Server {
qmpParams = append(qmpParams, "server=on") qmpParams = append(qmpParams, "server=on")
if q.NoWait { if q.NoWait {
@@ -2975,12 +2985,8 @@ func (config *Config) appendFwCfg(logger QMPLog) {
// //
// The Config parameter contains a set of qemu parameters and settings. // The Config parameter contains a set of qemu parameters and settings.
// //
// This function writes its log output via logger parameter. // See LaunchCustomQemu for more information.
// func LaunchQemu(config Config, logger QMPLog) (*exec.Cmd, io.ReadCloser, error) {
// The function will block until the launched qemu process exits. "", nil
// will be returned if the launch succeeds. Otherwise a string containing
// the contents of stderr + a Go error object will be returned.
func LaunchQemu(config Config, logger QMPLog) (string, error) {
config.appendName() config.appendName()
config.appendUUID() config.appendUUID()
config.appendMachine() config.appendMachine()
@@ -3003,7 +3009,7 @@ func LaunchQemu(config Config, logger QMPLog) (string, error) {
config.appendSeccompSandbox() config.appendSeccompSandbox()
if err := config.appendCPUs(); err != nil { if err := config.appendCPUs(); err != nil {
return "", err return nil, nil, err
} }
ctx := config.Ctx ctx := config.Ctx
@@ -3034,17 +3040,16 @@ func LaunchQemu(config Config, logger QMPLog) (string, error) {
// //
// This function writes its log output via logger parameter. // This function writes its log output via logger parameter.
// //
// The function will block until the launched qemu process exits. "", nil // The function returns cmd, reader, nil where cmd is a Go exec.Cmd object
// will be returned if the launch succeeds. Otherwise a string containing // representing the QEMU process and reader a Go io.ReadCloser object
// the contents of stderr + a Go error object will be returned. // connected to QEMU's stderr, if launched successfully. Otherwise
// nil, nil, err where err is a Go error object is returned.
func LaunchCustomQemu(ctx context.Context, path string, params []string, fds []*os.File, func LaunchCustomQemu(ctx context.Context, path string, params []string, fds []*os.File,
attr *syscall.SysProcAttr, logger QMPLog) (string, error) { attr *syscall.SysProcAttr, logger QMPLog) (*exec.Cmd, io.ReadCloser, error) {
if logger == nil { if logger == nil {
logger = qmpNullLogger{} logger = qmpNullLogger{}
} }
errStr := ""
if path == "" { if path == "" {
path = "qemu-system-x86_64" path = "qemu-system-x86_64"
} }
@@ -3058,15 +3063,17 @@ func LaunchCustomQemu(ctx context.Context, path string, params []string, fds []*
cmd.SysProcAttr = attr cmd.SysProcAttr = attr
var stderr bytes.Buffer reader, err := cmd.StderrPipe()
cmd.Stderr = &stderr if err != nil {
logger.Errorf("Unable to connect stderr to a pipe")
return nil, nil, err
}
logger.Infof("launching %s with: %v", path, params) logger.Infof("launching %s with: %v", path, params)
err := cmd.Run() err = cmd.Start()
if err != nil { if err != nil {
logger.Errorf("Unable to launch %s: %v", path, err) logger.Errorf("Unable to launch %s: %v", path, err)
errStr = stderr.String() return nil, nil, err
logger.Errorf("%s", errStr)
} }
return errStr, err return cmd, reader, nil
} }

View File

@@ -698,8 +698,8 @@ func TestFailToAppendCPUs(t *testing.T) {
} }
} }
var qmpSingleSocketServerString = "-qmp unix:cc-qmp,server=on,wait=off" var qmpSingleSocketServerString = "-qmp unix:path=cc-qmp,server=on,wait=off"
var qmpSingleSocketString = "-qmp unix:cc-qmp" var qmpSingleSocketString = "-qmp unix:path=cc-qmp"
func TestAppendSingleQMPSocketServer(t *testing.T) { func TestAppendSingleQMPSocketServer(t *testing.T) {
qmp := QMPSocket{ qmp := QMPSocket{
@@ -722,7 +722,27 @@ func TestAppendSingleQMPSocket(t *testing.T) {
testAppend(qmp, qmpSingleSocketString, t) testAppend(qmp, qmpSingleSocketString, t)
} }
var qmpSocketServerString = "-qmp unix:cc-qmp-1,server=on,wait=off -qmp unix:cc-qmp-2,server=on,wait=off" var qmpSocketServerFdString = "-qmp unix:fd=3,server=on,wait=off"
func TestAppendQMPSocketServerFd(t *testing.T) {
foo, _ := os.CreateTemp(os.TempDir(), "govmm-qemu-test")
defer func() {
_ = foo.Close()
_ = os.Remove(foo.Name())
}()
qmp := QMPSocket{
Type: "unix",
FD: foo,
Server: true,
NoWait: true,
}
testAppend(qmp, qmpSocketServerFdString, t)
}
var qmpSocketServerString = "-qmp unix:path=cc-qmp-1,server=on,wait=off -qmp unix:path=cc-qmp-2,server=on,wait=off"
func TestAppendQMPSocketServer(t *testing.T) { func TestAppendQMPSocketServer(t *testing.T) {
qmp := []QMPSocket{ qmp := []QMPSocket{

View File

@@ -702,6 +702,16 @@ func QMPStart(ctx context.Context, socket string, cfg QMPConfig, disconnectedCh
return nil, nil, err return nil, nil, err
} }
return QMPStartWithConn(ctx, conn, cfg, disconnectedCh)
}
// Same as QMPStart but with a pre-established connection
func QMPStartWithConn(ctx context.Context, conn net.Conn, cfg QMPConfig, disconnectedCh chan struct{}) (*QMP, *QMPVersion, error) {
if conn == nil {
close(disconnectedCh)
return nil, nil, fmt.Errorf("invalid connection")
}
connectedCh := make(chan *QMPVersion) connectedCh := make(chan *QMPVersion)
q := startQMPLoop(conn, cfg, connectedCh, disconnectedCh) q := startQMPLoop(conn, cfg, connectedCh, disconnectedCh)

View File

@@ -273,6 +273,22 @@ func TestQMPStartBadPath(t *testing.T) {
<-disconnectedCh <-disconnectedCh
} }
// Checks that a call to QMPStartWithConn with a nil connection exits gracefully.
//
// We call QMPStartWithConn with a nil connection.
//
// An error should be returned and the disconnected channel should be closed.
func TestQMPStartWithConnNil(t *testing.T) {
cfg := QMPConfig{Logger: qmpTestLogger{}}
disconnectedCh := make(chan struct{})
q, _, err := QMPStartWithConn(context.Background(), nil, cfg, disconnectedCh)
if err == nil {
t.Errorf("Expected error")
q.Shutdown()
}
<-disconnectedCh
}
// Checks that the qmp_capabilities command is correctly sent. // Checks that the qmp_capabilities command is correctly sent.
// //
// We start a QMPLoop, send the qmp_capabilities command and stop the // We start a QMPLoop, send the qmp_capabilities command and stop the

View File

@@ -13,10 +13,14 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"math" "math"
"net"
"os" "os"
"os/exec"
"os/user" "os/user"
"path/filepath" "path/filepath"
"regexp"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@@ -368,7 +372,6 @@ func (q *qemu) createQmpSocket() ([]govmmQemu.QMPSocket, error) {
return []govmmQemu.QMPSocket{ return []govmmQemu.QMPSocket{
{ {
Type: "unix", Type: "unix",
Name: q.qmpMonitorCh.path,
Server: true, Server: true,
NoWait: true, NoWait: true,
}, },
@@ -531,7 +534,7 @@ func (q *qemu) CreateVM(ctx context.Context, id string, network Network, hypervi
NoDefaults: true, NoDefaults: true,
NoGraphic: true, NoGraphic: true,
NoReboot: true, NoReboot: true,
Daemonize: true, Daemonize: false,
MemPrealloc: q.config.MemPrealloc, MemPrealloc: q.config.MemPrealloc,
HugePages: q.config.HugePages, HugePages: q.config.HugePages,
IOMMUPlatform: q.config.IOMMUPlatform, IOMMUPlatform: q.config.IOMMUPlatform,
@@ -812,6 +815,77 @@ func (q *qemu) setupVirtioMem(ctx context.Context) error {
return err return err
} }
// setupEarlyQmpConnection creates a listener socket to be passed to QEMU
// as a QMP listening endpoint. An initial connection is established, to
// be used as the QMP client socket. This allows to detect an early failure
// of QEMU instead of looping on connect until some timeout expires.
func (q *qemu) setupEarlyQmpConnection() (net.Conn, error) {
monitorSockPath := q.qmpMonitorCh.path
qmpListener, err := net.Listen("unix", monitorSockPath)
if err != nil {
q.Logger().WithError(err).Errorf("Unable to listen on unix socket address (%s)", monitorSockPath)
return nil, err
}
// A duplicate fd of this socket will be passed to QEMU. We must
// close the original one when we're done.
defer qmpListener.Close()
if rootless.IsRootless() {
err = syscall.Chown(monitorSockPath, int(q.config.Uid), int(q.config.Gid))
if err != nil {
q.Logger().WithError(err).Errorf("Unable to make unix socket (%s) rootless", monitorSockPath)
return nil, err
}
}
VMFd, err := qmpListener.(*net.UnixListener).File()
if err != nil {
return nil, err
}
defer func() {
if err != nil {
VMFd.Close()
}
}()
// This socket will be used to establish the initial QMP connection
dialer := net.Dialer{Cancel: q.qmpMonitorCh.ctx.Done()}
conn, err := dialer.Dial("unix", monitorSockPath)
if err != nil {
q.Logger().WithError(err).Errorf("Unable to connect to unix socket (%s)", monitorSockPath)
return nil, err
}
// We need to keep the socket file around to be able to re-connect
qmpListener.(*net.UnixListener).SetUnlinkOnClose(false)
// Pass the duplicated fd of the listener socket to QEMU
q.qemuConfig.QMPSockets[0].FD = VMFd
q.fds = append(q.fds, q.qemuConfig.QMPSockets[0].FD)
return conn, nil
}
func (q *qemu) LogAndWait(qemuCmd *exec.Cmd, reader io.ReadCloser) {
pid := qemuCmd.Process.Pid
q.Logger().Infof("Start logging QEMU (qemuPid=%d)", pid)
scanner := bufio.NewScanner(reader)
warnRE := regexp.MustCompile("(^[^:]+: )warning: ")
for scanner.Scan() {
text := scanner.Text()
if warnRE.MatchString(text) {
text = warnRE.ReplaceAllString(text, "$1")
q.Logger().WithField("qemuPid", pid).Warning(text)
} else {
q.Logger().WithField("qemuPid", pid).Error(text)
}
}
q.Logger().Infof("Stop logging QEMU (qemuPid=%d)", pid)
qemuCmd.Wait()
}
// StartVM will start the Sandbox's VM. // StartVM will start the Sandbox's VM.
func (q *qemu) StartVM(ctx context.Context, timeout int) error { func (q *qemu) StartVM(ctx context.Context, timeout int) error {
span, ctx := katatrace.Trace(ctx, q.Logger(), "StartVM", qemuTracingTags, map[string]string{"sandbox_id": q.id}) span, ctx := katatrace.Trace(ctx, q.Logger(), "StartVM", qemuTracingTags, map[string]string{"sandbox_id": q.id})
@@ -857,6 +931,12 @@ func (q *qemu) StartVM(ctx context.Context, timeout int) error {
} }
}() }()
var qmpConn net.Conn
qmpConn, err = q.setupEarlyQmpConnection()
if err != nil {
return err
}
// This needs to be done as late as possible, just before launching // This needs to be done as late as possible, just before launching
// virtiofsd are executed by kata-runtime after this call, run with // virtiofsd are executed by kata-runtime after this call, run with
// the SELinux label. If these processes require privileged, we do // the SELinux label. If these processes require privileged, we do
@@ -882,20 +962,23 @@ func (q *qemu) StartVM(ctx context.Context, timeout int) error {
} }
var strErr string qemuCmd, reader, err := govmmQemu.LaunchQemu(q.qemuConfig, newQMPLogger())
strErr, err = govmmQemu.LaunchQemu(q.qemuConfig, newQMPLogger())
if err != nil { if err != nil {
if q.config.Debug && q.qemuConfig.LogFile != "" { q.Logger().WithError(err).Error("failed to launch qemu")
b, err := os.ReadFile(q.qemuConfig.LogFile) return fmt.Errorf("failed to launch qemu: %s", err)
if err == nil {
strErr += string(b)
} }
} if q.qemuConfig.Knobs.Daemonize {
q.Logger().WithError(err).Errorf("failed to launch qemu: %s", strErr) // LaunchQemu returns a handle on the upper QEMU process.
return fmt.Errorf("failed to launch qemu: %s, error messages from qemu log: %s", err, strErr) // Wait for it to exit to assume that the QEMU daemon was
// actually started.
qemuCmd.Wait()
} else {
// Log QEMU errors and ensure the QEMU process is reaped after
// termination.
go q.LogAndWait(qemuCmd, reader)
} }
err = q.waitVM(ctx, timeout) err = q.waitVM(ctx, qmpConn, timeout)
if err != nil { if err != nil {
return err return err
} }
@@ -933,7 +1016,7 @@ func (q *qemu) bootFromTemplate() error {
} }
// waitVM will wait for the Sandbox's VM to be up and running. // waitVM will wait for the Sandbox's VM to be up and running.
func (q *qemu) waitVM(ctx context.Context, timeout int) error { func (q *qemu) waitVM(ctx context.Context, qmpConn net.Conn, timeout int) error {
span, _ := katatrace.Trace(ctx, q.Logger(), "waitVM", qemuTracingTags, map[string]string{"sandbox_id": q.id}) span, _ := katatrace.Trace(ctx, q.Logger(), "waitVM", qemuTracingTags, map[string]string{"sandbox_id": q.id})
defer span.End() defer span.End()
@@ -953,7 +1036,7 @@ func (q *qemu) waitVM(ctx context.Context, timeout int) error {
timeStart := time.Now() timeStart := time.Now()
for { for {
disconnectCh = make(chan struct{}) disconnectCh = make(chan struct{})
qmp, ver, err = govmmQemu.QMPStart(q.qmpMonitorCh.ctx, q.qmpMonitorCh.path, cfg, disconnectCh) qmp, ver, err = govmmQemu.QMPStartWithConn(q.qmpMonitorCh.ctx, qmpConn, cfg, disconnectCh)
if err == nil { if err == nil {
break break
} }