mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-10-22 04:18:53 +00:00
Initial VFIO-AP support (#578) was simple, but somewhat hacky; a different code path would be chosen for performing the hotplug, and agent-side device handling was bound to knowing the assigned queue numbers (APQNs) through some other means; plus the code for awaiting them was written for the Go agent and never released. This code also artificially increased the hotplug timeout to wait for the (relatively expensive, thus limited to 5 seconds at the quickest) AP rescan, which is impractical for e.g. common k8s timeouts. Since then, the general handling logic was improved (#1190), but it assumed PCI in several places. In the runtime, introduce and parse AP devices. Annotate them as such when passing to the agent, and include information about the associated APQNs. The agent awaits the passed APQNs through uevents and triggers a rescan directly. Fixes: #3678 Signed-off-by: Jakob Naucke <jakob.naucke@ibm.com>
2317 lines
73 KiB
Go
2317 lines
73 KiB
Go
// Copyright (c) 2017 Intel Corporation
|
|
//
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
//
|
|
|
|
package virtcontainers
|
|
|
|
import (
|
|
b64 "encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/docker/go-units"
|
|
"github.com/kata-containers/kata-containers/src/runtime/pkg/device/api"
|
|
"github.com/kata-containers/kata-containers/src/runtime/pkg/device/config"
|
|
volume "github.com/kata-containers/kata-containers/src/runtime/pkg/direct-volume"
|
|
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils/katatrace"
|
|
"github.com/kata-containers/kata-containers/src/runtime/pkg/uuid"
|
|
persistapi "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/persist/api"
|
|
pbTypes "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols"
|
|
kataclient "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/client"
|
|
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/grpc"
|
|
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
|
|
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/rootless"
|
|
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/types"
|
|
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils"
|
|
|
|
"context"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/opencontainers/runtime-spec/specs-go"
|
|
"github.com/opencontainers/selinux/go-selinux"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/sys/unix"
|
|
"google.golang.org/grpc/codes"
|
|
grpcStatus "google.golang.org/grpc/status"
|
|
)
|
|
|
|
// kataAgentTracingTags defines tags for the trace span
|
|
var kataAgentTracingTags = map[string]string{
|
|
"source": "runtime",
|
|
"package": "virtcontainers",
|
|
"subsystem": "agent",
|
|
}
|
|
|
|
const (
|
|
// KataEphemeralDevType creates a tmpfs backed volume for sharing files between containers.
|
|
KataEphemeralDevType = "ephemeral"
|
|
|
|
// KataLocalDevType creates a local directory inside the VM for sharing files between
|
|
// containers.
|
|
KataLocalDevType = "local"
|
|
|
|
// Allocating an FSGroup that owns the pod's volumes
|
|
fsGid = "fsgid"
|
|
|
|
// path to vfio devices
|
|
vfioPath = "/dev/vfio/"
|
|
|
|
NydusRootFSType = "fuse.nydus-overlayfs"
|
|
|
|
// enable debug console
|
|
kernelParamDebugConsole = "agent.debug_console"
|
|
kernelParamDebugConsoleVPort = "agent.debug_console_vport"
|
|
kernelParamDebugConsoleVPortValue = "1026"
|
|
|
|
// Default SELinux type applied to the container process inside guest
|
|
defaultSeLinuxContainerType = "container_t"
|
|
)
|
|
|
|
var (
|
|
checkRequestTimeout = 30 * time.Second
|
|
defaultRequestTimeout = 60 * time.Second
|
|
errorMissingOCISpec = errors.New("Missing OCI specification")
|
|
defaultKataHostSharedDir = "/run/kata-containers/shared/sandboxes/"
|
|
defaultKataGuestSharedDir = "/run/kata-containers/shared/containers/"
|
|
defaultKataGuestNydusRootDir = "/run/kata-containers/shared/"
|
|
mountGuestTag = "kataShared"
|
|
defaultKataGuestSandboxDir = "/run/kata-containers/sandbox/"
|
|
type9pFs = "9p"
|
|
typeVirtioFS = "virtiofs"
|
|
typeOverlayFS = "overlay"
|
|
kata9pDevType = "9p"
|
|
kataMmioBlkDevType = "mmioblk"
|
|
kataBlkDevType = "blk"
|
|
kataBlkCCWDevType = "blk-ccw"
|
|
kataSCSIDevType = "scsi"
|
|
kataNvdimmDevType = "nvdimm"
|
|
kataVirtioFSDevType = "virtio-fs"
|
|
kataOverlayDevType = "overlayfs"
|
|
kataWatchableBindDevType = "watchable-bind"
|
|
kataVfioPciDevType = "vfio-pci" // VFIO PCI device to used as VFIO in the container
|
|
kataVfioPciGuestKernelDevType = "vfio-pci-gk" // VFIO PCI device for consumption by the guest kernel
|
|
kataVfioApDevType = "vfio-ap"
|
|
sharedDir9pOptions = []string{"trans=virtio,version=9p2000.L,cache=mmap", "nodev"}
|
|
sharedDirVirtioFSOptions = []string{}
|
|
sharedDirVirtioFSDaxOptions = "dax"
|
|
shmDir = "shm"
|
|
kataEphemeralDevType = "ephemeral"
|
|
defaultEphemeralPath = filepath.Join(defaultKataGuestSandboxDir, kataEphemeralDevType)
|
|
grpcMaxDataSize = int64(1024 * 1024)
|
|
localDirOptions = []string{"mode=0777"}
|
|
maxHostnameLen = 64
|
|
GuestDNSFile = "/etc/resolv.conf"
|
|
)
|
|
|
|
const (
|
|
grpcCheckRequest = "grpc.CheckRequest"
|
|
grpcExecProcessRequest = "grpc.ExecProcessRequest"
|
|
grpcCreateSandboxRequest = "grpc.CreateSandboxRequest"
|
|
grpcDestroySandboxRequest = "grpc.DestroySandboxRequest"
|
|
grpcCreateContainerRequest = "grpc.CreateContainerRequest"
|
|
grpcStartContainerRequest = "grpc.StartContainerRequest"
|
|
grpcRemoveContainerRequest = "grpc.RemoveContainerRequest"
|
|
grpcSignalProcessRequest = "grpc.SignalProcessRequest"
|
|
grpcUpdateRoutesRequest = "grpc.UpdateRoutesRequest"
|
|
grpcUpdateInterfaceRequest = "grpc.UpdateInterfaceRequest"
|
|
grpcUpdateEphemeralMountsRequest = "grpc.UpdateEphemeralMountsRequest"
|
|
grpcListInterfacesRequest = "grpc.ListInterfacesRequest"
|
|
grpcListRoutesRequest = "grpc.ListRoutesRequest"
|
|
grpcAddARPNeighborsRequest = "grpc.AddARPNeighborsRequest"
|
|
grpcOnlineCPUMemRequest = "grpc.OnlineCPUMemRequest"
|
|
grpcUpdateContainerRequest = "grpc.UpdateContainerRequest"
|
|
grpcWaitProcessRequest = "grpc.WaitProcessRequest"
|
|
grpcTtyWinResizeRequest = "grpc.TtyWinResizeRequest"
|
|
grpcWriteStreamRequest = "grpc.WriteStreamRequest"
|
|
grpcCloseStdinRequest = "grpc.CloseStdinRequest"
|
|
grpcStatsContainerRequest = "grpc.StatsContainerRequest"
|
|
grpcPauseContainerRequest = "grpc.PauseContainerRequest"
|
|
grpcResumeContainerRequest = "grpc.ResumeContainerRequest"
|
|
grpcReseedRandomDevRequest = "grpc.ReseedRandomDevRequest"
|
|
grpcGuestDetailsRequest = "grpc.GuestDetailsRequest"
|
|
grpcMemHotplugByProbeRequest = "grpc.MemHotplugByProbeRequest"
|
|
grpcCopyFileRequest = "grpc.CopyFileRequest"
|
|
grpcSetGuestDateTimeRequest = "grpc.SetGuestDateTimeRequest"
|
|
grpcGetOOMEventRequest = "grpc.GetOOMEventRequest"
|
|
grpcGetMetricsRequest = "grpc.GetMetricsRequest"
|
|
grpcAddSwapRequest = "grpc.AddSwapRequest"
|
|
grpcVolumeStatsRequest = "grpc.VolumeStatsRequest"
|
|
grpcResizeVolumeRequest = "grpc.ResizeVolumeRequest"
|
|
grpcGetIPTablesRequest = "grpc.GetIPTablesRequest"
|
|
grpcSetIPTablesRequest = "grpc.SetIPTablesRequest"
|
|
)
|
|
|
|
// newKataAgent returns an agent from an agent type.
|
|
func newKataAgent() agent {
|
|
return &kataAgent{}
|
|
}
|
|
|
|
// The function is declared this way for mocking in unit tests
|
|
var kataHostSharedDir = func() string {
|
|
if rootless.IsRootless() {
|
|
// filepath.Join removes trailing slashes, but it is necessary for mounting
|
|
return filepath.Join(rootless.GetRootlessDir(), defaultKataHostSharedDir) + "/"
|
|
}
|
|
return defaultKataHostSharedDir
|
|
}
|
|
|
|
func getPagesizeFromOpt(fsOpts []string) string {
|
|
// example options array: "rw", "relatime", "seclabel", "pagesize=2M"
|
|
for _, opt := range fsOpts {
|
|
if strings.HasPrefix(opt, "pagesize=") {
|
|
return strings.TrimPrefix(opt, "pagesize=")
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func getFSGroupChangePolicy(policy volume.FSGroupChangePolicy) pbTypes.FSGroupChangePolicy {
|
|
switch policy {
|
|
case volume.FSGroupChangeOnRootMismatch:
|
|
return pbTypes.FSGroupChangePolicy_OnRootMismatch
|
|
default:
|
|
return pbTypes.FSGroupChangePolicy_Always
|
|
}
|
|
}
|
|
|
|
// Shared path handling:
|
|
// 1. create three directories for each sandbox:
|
|
// -. /run/kata-containers/shared/sandboxes/$sbx_id/mounts/, a directory to hold all host/guest shared mounts
|
|
// -. /run/kata-containers/shared/sandboxes/$sbx_id/shared/, a host/guest shared directory (9pfs/virtiofs source dir)
|
|
// -. /run/kata-containers/shared/sandboxes/$sbx_id/private/, a directory to hold all temporary private mounts when creating ro mounts
|
|
//
|
|
// 2. /run/kata-containers/shared/sandboxes/$sbx_id/mounts/ is bind mounted readonly to /run/kata-containers/shared/sandboxes/$sbx_id/shared/, so guest cannot modify it
|
|
//
|
|
// 3. host-guest shared files/directories are mounted one-level under /run/kata-containers/shared/sandboxes/$sbx_id/mounts/ and thus present to guest at one level under /run/kata-containers/shared/sandboxes/$sbx_id/shared/
|
|
func GetSharePath(id string) string {
|
|
return filepath.Join(kataHostSharedDir(), id, "shared")
|
|
}
|
|
|
|
func getMountPath(id string) string {
|
|
return filepath.Join(kataHostSharedDir(), id, "mounts")
|
|
}
|
|
|
|
func getPrivatePath(id string) string {
|
|
return filepath.Join(kataHostSharedDir(), id, "private")
|
|
}
|
|
|
|
func getSandboxPath(id string) string {
|
|
return filepath.Join(kataHostSharedDir(), id)
|
|
}
|
|
|
|
// Use in nydus case, guest shared dir is compatible with virtiofsd sharedir
|
|
// nydus images are presented in kataGuestNydusImageDir
|
|
//
|
|
// virtiofs mountpoint: "/run/kata-containers/shared/"
|
|
// kataGuestSharedDir: "/run/kata-containers/shared/containers"
|
|
// kataGuestNydusImageDir: "/run/kata-containers/shared/rafs"
|
|
var kataGuestNydusRootDir = func() string {
|
|
if rootless.IsRootless() {
|
|
// filepath.Join removes trailing slashes, but it is necessary for mounting
|
|
return filepath.Join(rootless.GetRootlessDir(), defaultKataGuestNydusRootDir) + "/"
|
|
}
|
|
return defaultKataGuestNydusRootDir
|
|
}
|
|
|
|
var rafsMountPath = func(cid string) string {
|
|
return filepath.Join("/", nydusRafs, cid, lowerDir)
|
|
}
|
|
|
|
var kataGuestNydusImageDir = func() string {
|
|
if rootless.IsRootless() {
|
|
// filepath.Join removes trailing slashes, but it is necessary for mounting
|
|
return filepath.Join(rootless.GetRootlessDir(), defaultKataGuestNydusRootDir, nydusRafs) + "/"
|
|
}
|
|
return filepath.Join(defaultKataGuestNydusRootDir, nydusRafs) + "/"
|
|
}
|
|
|
|
// The function is declared this way for mocking in unit tests
|
|
var kataGuestSharedDir = func() string {
|
|
if rootless.IsRootless() {
|
|
// filepath.Join removes trailing slashes, but it is necessary for mounting
|
|
return filepath.Join(rootless.GetRootlessDir(), defaultKataGuestSharedDir) + "/"
|
|
}
|
|
return defaultKataGuestSharedDir
|
|
}
|
|
|
|
// The function is declared this way for mocking in unit tests
|
|
var kataGuestSandboxDir = func() string {
|
|
if rootless.IsRootless() {
|
|
// filepath.Join removes trailing slashes, but it is necessary for mounting
|
|
return filepath.Join(rootless.GetRootlessDir(), defaultKataGuestSandboxDir) + "/"
|
|
}
|
|
return defaultKataGuestSandboxDir
|
|
}
|
|
|
|
var kataGuestSandboxStorageDir = func() string {
|
|
return filepath.Join(defaultKataGuestSandboxDir, "storage")
|
|
}
|
|
|
|
func ephemeralPath() string {
|
|
if rootless.IsRootless() {
|
|
return filepath.Join(kataGuestSandboxDir(), kataEphemeralDevType)
|
|
}
|
|
return defaultEphemeralPath
|
|
}
|
|
|
|
// KataAgentConfig is a structure storing information needed
|
|
// to reach the Kata Containers agent.
|
|
type KataAgentConfig struct {
|
|
KernelModules []string
|
|
ContainerPipeSize uint32
|
|
DialTimeout uint32
|
|
LongLiveConn bool
|
|
Debug bool
|
|
Trace bool
|
|
EnableDebugConsole bool
|
|
}
|
|
|
|
// KataAgentState is the structure describing the data stored from this
|
|
// agent implementation.
|
|
type KataAgentState struct {
|
|
URL string
|
|
}
|
|
|
|
// nolint: govet
|
|
type kataAgent struct {
|
|
ctx context.Context
|
|
vmSocket interface{}
|
|
|
|
client *kataclient.AgentClient
|
|
|
|
// lock protects the client pointer
|
|
sync.Mutex
|
|
|
|
state KataAgentState
|
|
|
|
reqHandlers map[string]reqFunc
|
|
kmodules []string
|
|
|
|
dialTimout uint32
|
|
|
|
keepConn bool
|
|
dead bool
|
|
}
|
|
|
|
func (k *kataAgent) Logger() *logrus.Entry {
|
|
return virtLog.WithField("subsystem", "kata_agent")
|
|
}
|
|
|
|
func (k *kataAgent) longLiveConn() bool {
|
|
return k.keepConn
|
|
}
|
|
|
|
// KataAgentKernelParams returns a list of Kata Agent specific kernel
|
|
// parameters.
|
|
func KataAgentKernelParams(config KataAgentConfig) []Param {
|
|
var params []Param
|
|
|
|
if config.Debug {
|
|
params = append(params, Param{Key: "agent.log", Value: "debug"})
|
|
}
|
|
|
|
if config.Trace {
|
|
params = append(params, Param{Key: "agent.trace", Value: "true"})
|
|
}
|
|
|
|
if config.ContainerPipeSize > 0 {
|
|
containerPipeSize := strconv.FormatUint(uint64(config.ContainerPipeSize), 10)
|
|
params = append(params, Param{Key: vcAnnotations.ContainerPipeSizeKernelParam, Value: containerPipeSize})
|
|
}
|
|
|
|
if config.EnableDebugConsole {
|
|
params = append(params, Param{Key: kernelParamDebugConsole, Value: ""})
|
|
params = append(params, Param{Key: kernelParamDebugConsoleVPort, Value: kernelParamDebugConsoleVPortValue})
|
|
}
|
|
|
|
return params
|
|
}
|
|
|
|
func (k *kataAgent) handleTraceSettings(config KataAgentConfig) bool {
|
|
disableVMShutdown := false
|
|
|
|
if config.Trace {
|
|
// Agent tracing requires that the agent be able to shutdown
|
|
// cleanly. This is the only scenario where the agent is
|
|
// responsible for stopping the VM: normally this is handled
|
|
// by the runtime.
|
|
disableVMShutdown = true
|
|
}
|
|
|
|
return disableVMShutdown
|
|
}
|
|
|
|
func (k *kataAgent) init(ctx context.Context, sandbox *Sandbox, config KataAgentConfig) (disableVMShutdown bool, err error) {
|
|
// Save
|
|
k.ctx = sandbox.ctx
|
|
|
|
span, _ := katatrace.Trace(ctx, k.Logger(), "init", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
disableVMShutdown = k.handleTraceSettings(config)
|
|
k.keepConn = config.LongLiveConn
|
|
k.kmodules = config.KernelModules
|
|
k.dialTimout = config.DialTimeout
|
|
|
|
return disableVMShutdown, nil
|
|
}
|
|
|
|
func (k *kataAgent) agentURL() (string, error) {
|
|
switch s := k.vmSocket.(type) {
|
|
case types.VSock:
|
|
return s.String(), nil
|
|
case types.HybridVSock:
|
|
return s.String(), nil
|
|
case types.MockHybridVSock:
|
|
return s.String(), nil
|
|
default:
|
|
return "", fmt.Errorf("Invalid socket type")
|
|
}
|
|
}
|
|
|
|
func (k *kataAgent) capabilities() types.Capabilities {
|
|
var caps types.Capabilities
|
|
|
|
// add all Capabilities supported by agent
|
|
caps.SetBlockDeviceSupport()
|
|
|
|
return caps
|
|
}
|
|
|
|
func (k *kataAgent) internalConfigure(ctx context.Context, h Hypervisor, id string, config KataAgentConfig) error {
|
|
span, _ := katatrace.Trace(ctx, k.Logger(), "configure", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
var err error
|
|
if k.vmSocket, err = h.GenerateSocket(id); err != nil {
|
|
return err
|
|
}
|
|
k.keepConn = config.LongLiveConn
|
|
|
|
katatrace.AddTags(span, "socket", k.vmSocket)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) configure(ctx context.Context, h Hypervisor, id, sharePath string, config KataAgentConfig) error {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "configure", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
err := k.internalConfigure(ctx, h, id, config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch s := k.vmSocket.(type) {
|
|
case types.VSock:
|
|
if err = h.AddDevice(ctx, s, VSockPCIDev); err != nil {
|
|
return err
|
|
}
|
|
case types.HybridVSock:
|
|
err = h.AddDevice(ctx, s, HybridVirtioVsockDev)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case types.MockHybridVSock:
|
|
default:
|
|
return types.ErrInvalidConfigType
|
|
}
|
|
|
|
// Neither create shared directory nor add 9p device if hypervisor
|
|
// doesn't support filesystem sharing.
|
|
caps := h.Capabilities(ctx)
|
|
if !caps.IsFsSharingSupported() {
|
|
return nil
|
|
}
|
|
|
|
// Create shared directory and add the shared volume if filesystem sharing is supported.
|
|
// This volume contains all bind mounted container bundles.
|
|
sharedVolume := types.Volume{
|
|
MountTag: mountGuestTag,
|
|
HostPath: sharePath,
|
|
}
|
|
|
|
if err = os.MkdirAll(sharedVolume.HostPath, DirMode); err != nil {
|
|
return err
|
|
}
|
|
|
|
return h.AddDevice(ctx, sharedVolume, FsDev)
|
|
}
|
|
|
|
func (k *kataAgent) configureFromGrpc(ctx context.Context, h Hypervisor, id string, config KataAgentConfig) error {
|
|
return k.internalConfigure(ctx, h, id, config)
|
|
}
|
|
|
|
func (k *kataAgent) createSandbox(ctx context.Context, sandbox *Sandbox) error {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "createSandbox", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
return k.configure(ctx, sandbox.hypervisor, sandbox.id, GetSharePath(sandbox.id), sandbox.config.AgentConfig)
|
|
}
|
|
|
|
func cmdToKataProcess(cmd types.Cmd) (process *grpc.Process, err error) {
|
|
var i uint64
|
|
var extraGids []uint32
|
|
|
|
// Number of bits used to store user+group values in
|
|
// the gRPC "User" type.
|
|
const grpcUserBits = 32
|
|
|
|
// User can contain only the "uid" or it can contain "uid:gid".
|
|
parsedUser := strings.Split(cmd.User, ":")
|
|
if len(parsedUser) > 2 {
|
|
return nil, fmt.Errorf("cmd.User %q format is wrong", cmd.User)
|
|
}
|
|
|
|
i, err = strconv.ParseUint(parsedUser[0], 10, grpcUserBits)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
uid := uint32(i)
|
|
|
|
var gid uint32
|
|
if len(parsedUser) > 1 {
|
|
i, err = strconv.ParseUint(parsedUser[1], 10, grpcUserBits)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
gid = uint32(i)
|
|
}
|
|
|
|
if cmd.PrimaryGroup != "" {
|
|
i, err = strconv.ParseUint(cmd.PrimaryGroup, 10, grpcUserBits)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
gid = uint32(i)
|
|
}
|
|
|
|
for _, g := range cmd.SupplementaryGroups {
|
|
var extraGid uint64
|
|
|
|
extraGid, err = strconv.ParseUint(g, 10, grpcUserBits)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
extraGids = append(extraGids, uint32(extraGid))
|
|
}
|
|
|
|
process = &grpc.Process{
|
|
Terminal: cmd.Interactive,
|
|
User: grpc.User{
|
|
UID: uid,
|
|
GID: gid,
|
|
AdditionalGids: extraGids,
|
|
},
|
|
Args: cmd.Args,
|
|
Env: cmdEnvsToStringSlice(cmd.Envs),
|
|
Cwd: cmd.WorkDir,
|
|
}
|
|
|
|
return process, nil
|
|
}
|
|
|
|
func cmdEnvsToStringSlice(ev []types.EnvVar) []string {
|
|
var env []string
|
|
|
|
for _, e := range ev {
|
|
pair := []string{e.Var, e.Value}
|
|
env = append(env, strings.Join(pair, "="))
|
|
}
|
|
|
|
return env
|
|
}
|
|
|
|
func (k *kataAgent) exec(ctx context.Context, sandbox *Sandbox, c Container, cmd types.Cmd) (*Process, error) {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "exec", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
var kataProcess *grpc.Process
|
|
|
|
kataProcess, err := cmdToKataProcess(cmd)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req := &grpc.ExecProcessRequest{
|
|
ContainerId: c.id,
|
|
ExecId: uuid.Generate().String(),
|
|
Process: kataProcess,
|
|
}
|
|
|
|
if _, err := k.sendReq(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return buildProcessFromExecID(req.ExecId)
|
|
}
|
|
|
|
func (k *kataAgent) updateInterface(ctx context.Context, ifc *pbTypes.Interface) (*pbTypes.Interface, error) {
|
|
// send update interface request
|
|
ifcReq := &grpc.UpdateInterfaceRequest{
|
|
Interface: ifc,
|
|
}
|
|
resultingInterface, err := k.sendReq(ctx, ifcReq)
|
|
if err != nil {
|
|
k.Logger().WithFields(logrus.Fields{
|
|
"interface-requested": fmt.Sprintf("%+v", ifc),
|
|
"resulting-interface": fmt.Sprintf("%+v", resultingInterface),
|
|
}).WithError(err).Error("update interface request failed")
|
|
}
|
|
if resultInterface, ok := resultingInterface.(*pbTypes.Interface); ok {
|
|
return resultInterface, err
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
func (k *kataAgent) updateInterfaces(ctx context.Context, interfaces []*pbTypes.Interface) error {
|
|
for _, ifc := range interfaces {
|
|
if _, err := k.updateInterface(ctx, ifc); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) updateRoutes(ctx context.Context, routes []*pbTypes.Route) ([]*pbTypes.Route, error) {
|
|
if routes != nil {
|
|
routesReq := &grpc.UpdateRoutesRequest{
|
|
Routes: &grpc.Routes{
|
|
Routes: routes,
|
|
},
|
|
}
|
|
resultingRoutes, err := k.sendReq(ctx, routesReq)
|
|
if err != nil {
|
|
k.Logger().WithFields(logrus.Fields{
|
|
"routes-requested": fmt.Sprintf("%+v", routes),
|
|
"resulting-routes": fmt.Sprintf("%+v", resultingRoutes),
|
|
}).WithError(err).Error("update routes request failed")
|
|
}
|
|
resultRoutes, ok := resultingRoutes.(*grpc.Routes)
|
|
if ok && resultRoutes != nil {
|
|
return resultRoutes.Routes, err
|
|
}
|
|
return nil, err
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func (k *kataAgent) updateEphemeralMounts(ctx context.Context, storages []*grpc.Storage) error {
|
|
if storages != nil {
|
|
storagesReq := &grpc.UpdateEphemeralMountsRequest{
|
|
Storages: storages,
|
|
}
|
|
|
|
if _, err := k.sendReq(ctx, storagesReq); err != nil {
|
|
k.Logger().WithError(err).Error("update mounts request failed")
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) addARPNeighbors(ctx context.Context, neighs []*pbTypes.ARPNeighbor) error {
|
|
if neighs != nil {
|
|
neighsReq := &grpc.AddARPNeighborsRequest{
|
|
Neighbors: &grpc.ARPNeighbors{
|
|
ARPNeighbors: neighs,
|
|
},
|
|
}
|
|
_, err := k.sendReq(ctx, neighsReq)
|
|
if err != nil {
|
|
if grpcStatus.Convert(err).Code() == codes.Unimplemented {
|
|
k.Logger().WithFields(logrus.Fields{
|
|
"arpneighbors-requested": fmt.Sprintf("%+v", neighs),
|
|
}).Warn("add ARP neighbors request failed due to old agent, please upgrade Kata Containers image version")
|
|
return nil
|
|
}
|
|
k.Logger().WithFields(logrus.Fields{
|
|
"arpneighbors-requested": fmt.Sprintf("%+v", neighs),
|
|
}).WithError(err).Error("add ARP neighbors request failed")
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) listInterfaces(ctx context.Context) ([]*pbTypes.Interface, error) {
|
|
req := &grpc.ListInterfacesRequest{}
|
|
resultingInterfaces, err := k.sendReq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resultInterfaces, ok := resultingInterfaces.(*grpc.Interfaces)
|
|
if !ok {
|
|
return nil, fmt.Errorf("Unexpected type %T for interfaces", resultingInterfaces)
|
|
}
|
|
return resultInterfaces.Interfaces, nil
|
|
}
|
|
|
|
func (k *kataAgent) listRoutes(ctx context.Context) ([]*pbTypes.Route, error) {
|
|
req := &grpc.ListRoutesRequest{}
|
|
resultingRoutes, err := k.sendReq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resultRoutes, ok := resultingRoutes.(*grpc.Routes)
|
|
if !ok {
|
|
return nil, fmt.Errorf("Unexpected type %T for routes", resultingRoutes)
|
|
}
|
|
return resultRoutes.Routes, nil
|
|
}
|
|
|
|
func (k *kataAgent) getAgentURL() (string, error) {
|
|
return k.agentURL()
|
|
}
|
|
|
|
func (k *kataAgent) setAgentURL() error {
|
|
var err error
|
|
if k.state.URL, err = k.agentURL(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) reuseAgent(agent agent) error {
|
|
a, ok := agent.(*kataAgent)
|
|
if !ok {
|
|
return fmt.Errorf("Bug: get a wrong type of agent")
|
|
}
|
|
|
|
k.installReqFunc(a.client)
|
|
k.client = a.client
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) getDNS(sandbox *Sandbox) ([]string, error) {
|
|
ociSpec := sandbox.GetPatchedOCISpec()
|
|
if ociSpec == nil {
|
|
k.Logger().Debug("Sandbox OCI spec not found. Sandbox DNS will not be set.")
|
|
return nil, nil
|
|
}
|
|
|
|
ociMounts := ociSpec.Mounts
|
|
|
|
for _, m := range ociMounts {
|
|
if m.Destination == GuestDNSFile {
|
|
content, err := os.ReadFile(m.Source)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Could not read file %s: %s", m.Source, err)
|
|
}
|
|
dns := strings.Split(string(content), "\n")
|
|
return dns, nil
|
|
|
|
}
|
|
}
|
|
k.Logger().Debug("DNS file not present in ociMounts. Sandbox DNS will not be set.")
|
|
return nil, nil
|
|
}
|
|
|
|
func (k *kataAgent) startSandbox(ctx context.Context, sandbox *Sandbox) error {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "StartVM", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
if err := k.setAgentURL(); err != nil {
|
|
return err
|
|
}
|
|
|
|
hostname := sandbox.config.Hostname
|
|
if len(hostname) > maxHostnameLen {
|
|
hostname = hostname[:maxHostnameLen]
|
|
}
|
|
|
|
dns, err := k.getDNS(sandbox)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check grpc server is serving
|
|
if err = k.check(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Setup network interfaces and routes
|
|
interfaces, routes, neighs, err := generateVCNetworkStructures(ctx, sandbox.network)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = k.updateInterfaces(ctx, interfaces); err != nil {
|
|
return err
|
|
}
|
|
if _, err = k.updateRoutes(ctx, routes); err != nil {
|
|
return err
|
|
}
|
|
if err = k.addARPNeighbors(ctx, neighs); err != nil {
|
|
return err
|
|
}
|
|
|
|
storages := setupStorages(ctx, sandbox)
|
|
|
|
kmodules := setupKernelModules(k.kmodules)
|
|
|
|
req := &grpc.CreateSandboxRequest{
|
|
Hostname: hostname,
|
|
Dns: dns,
|
|
Storages: storages,
|
|
SandboxPidns: sandbox.sharePidNs,
|
|
SandboxId: sandbox.id,
|
|
GuestHookPath: sandbox.config.HypervisorConfig.GuestHookPath,
|
|
KernelModules: kmodules,
|
|
}
|
|
|
|
_, err = k.sendReq(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func setupKernelModules(kmodules []string) []*grpc.KernelModule {
|
|
modules := []*grpc.KernelModule{}
|
|
|
|
for _, m := range kmodules {
|
|
l := strings.Fields(strings.TrimSpace(m))
|
|
if len(l) == 0 {
|
|
continue
|
|
}
|
|
|
|
module := &grpc.KernelModule{Name: l[0]}
|
|
modules = append(modules, module)
|
|
if len(l) == 1 {
|
|
continue
|
|
}
|
|
|
|
module.Parameters = append(module.Parameters, l[1:]...)
|
|
}
|
|
|
|
return modules
|
|
}
|
|
|
|
func setupStorages(ctx context.Context, sandbox *Sandbox) []*grpc.Storage {
|
|
storages := []*grpc.Storage{}
|
|
caps := sandbox.hypervisor.Capabilities(ctx)
|
|
|
|
// append 9p shared volume to storages only if filesystem sharing is supported
|
|
if caps.IsFsSharingSupported() {
|
|
// We mount the shared directory in a predefined location
|
|
// in the guest.
|
|
// This is where at least some of the host config files
|
|
// (resolv.conf, etc...) and potentially all container
|
|
// rootfs will reside.
|
|
sharedFS := sandbox.config.HypervisorConfig.SharedFS
|
|
if sharedFS == config.VirtioFS || sharedFS == config.VirtioFSNydus {
|
|
// If virtio-fs uses either of the two cache options 'auto, always',
|
|
// the guest directory can be mounted with option 'dax' allowing it to
|
|
// directly map contents from the host. When set to 'never', the mount
|
|
// options should not contain 'dax' lest the virtio-fs daemon crashing
|
|
// with an invalid address reference.
|
|
if sandbox.config.HypervisorConfig.VirtioFSCache != typeVirtioFSCacheModeNever {
|
|
// If virtio_fs_cache_size = 0, dax should not be used.
|
|
if sandbox.config.HypervisorConfig.VirtioFSCacheSize != 0 {
|
|
sharedDirVirtioFSOptions = append(sharedDirVirtioFSOptions, sharedDirVirtioFSDaxOptions)
|
|
}
|
|
}
|
|
mountPoint := kataGuestSharedDir()
|
|
if sharedFS == config.VirtioFSNydus {
|
|
mountPoint = kataGuestNydusRootDir()
|
|
}
|
|
sharedVolume := &grpc.Storage{
|
|
Driver: kataVirtioFSDevType,
|
|
Source: mountGuestTag,
|
|
MountPoint: mountPoint,
|
|
Fstype: typeVirtioFS,
|
|
Options: sharedDirVirtioFSOptions,
|
|
}
|
|
|
|
storages = append(storages, sharedVolume)
|
|
} else {
|
|
sharedDir9pOptions = append(sharedDir9pOptions, fmt.Sprintf("msize=%d", sandbox.config.HypervisorConfig.Msize9p))
|
|
|
|
sharedVolume := &grpc.Storage{
|
|
Driver: kata9pDevType,
|
|
Source: mountGuestTag,
|
|
MountPoint: kataGuestSharedDir(),
|
|
Fstype: type9pFs,
|
|
Options: sharedDir9pOptions,
|
|
}
|
|
|
|
storages = append(storages, sharedVolume)
|
|
}
|
|
}
|
|
|
|
if sandbox.shmSize > 0 {
|
|
path := filepath.Join(kataGuestSandboxDir(), shmDir)
|
|
shmSizeOption := fmt.Sprintf("size=%d", sandbox.shmSize)
|
|
|
|
shmStorage := &grpc.Storage{
|
|
Driver: KataEphemeralDevType,
|
|
MountPoint: path,
|
|
Source: "shm",
|
|
Fstype: "tmpfs",
|
|
Options: []string{"noexec", "nosuid", "nodev", "mode=1777", shmSizeOption},
|
|
}
|
|
|
|
storages = append(storages, shmStorage)
|
|
}
|
|
|
|
return storages
|
|
}
|
|
|
|
func (k *kataAgent) stopSandbox(ctx context.Context, sandbox *Sandbox) error {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "stopSandbox", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
req := &grpc.DestroySandboxRequest{}
|
|
|
|
if _, err := k.sendReq(ctx, req); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) replaceOCIMountSource(spec *specs.Spec, guestMounts map[string]Mount) error {
|
|
ociMounts := spec.Mounts
|
|
|
|
for index, m := range ociMounts {
|
|
if guestMount, ok := guestMounts[m.Destination]; ok {
|
|
k.Logger().Debugf("Replacing OCI mount (%s) source %s with %s", m.Destination, m.Source, guestMount.Source)
|
|
ociMounts[index].Source = guestMount.Source
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) removeIgnoredOCIMount(spec *specs.Spec, ignoredMounts map[string]Mount) error {
|
|
var mounts []specs.Mount
|
|
|
|
for _, m := range spec.Mounts {
|
|
if _, found := ignoredMounts[m.Source]; found {
|
|
k.Logger().WithField("removed-mount", m.Source).Debug("Removing OCI mount")
|
|
} else {
|
|
mounts = append(mounts, m)
|
|
}
|
|
}
|
|
|
|
// Replace the OCI mounts with the updated list.
|
|
spec.Mounts = mounts
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) constrainGRPCSpec(grpcSpec *grpc.Spec, passSeccomp bool, disableGuestSeLinux bool, guestSeLinuxLabel string, stripVfio bool) error {
|
|
// Disable Hooks since they have been handled on the host and there is
|
|
// no reason to send them to the agent. It would make no sense to try
|
|
// to apply them on the guest.
|
|
grpcSpec.Hooks = nil
|
|
|
|
// Pass seccomp only if disable_guest_seccomp is set to false in
|
|
// configuration.toml and guest image is seccomp capable.
|
|
if !passSeccomp {
|
|
grpcSpec.Linux.Seccomp = nil
|
|
}
|
|
|
|
// Pass SELinux label for the container process to the agent.
|
|
if grpcSpec.Process.SelinuxLabel != "" {
|
|
if !disableGuestSeLinux {
|
|
k.Logger().Info("SELinux label will be applied to the container process inside guest")
|
|
|
|
var label string
|
|
if guestSeLinuxLabel != "" {
|
|
label = guestSeLinuxLabel
|
|
} else {
|
|
label = grpcSpec.Process.SelinuxLabel
|
|
}
|
|
|
|
processContext, err := selinux.NewContext(label)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Change the type from KVM to container because the type passed from the high-level
|
|
// runtime is for KVM process.
|
|
if guestSeLinuxLabel == "" {
|
|
processContext["type"] = defaultSeLinuxContainerType
|
|
}
|
|
grpcSpec.Process.SelinuxLabel = processContext.Get()
|
|
} else {
|
|
k.Logger().Info("Empty SELinux label for the process and the mount because guest SELinux is disabled")
|
|
grpcSpec.Process.SelinuxLabel = ""
|
|
grpcSpec.Linux.MountLabel = ""
|
|
}
|
|
}
|
|
|
|
// By now only CPU constraints are supported
|
|
// Issue: https://github.com/kata-containers/runtime/issues/158
|
|
// Issue: https://github.com/kata-containers/runtime/issues/204
|
|
grpcSpec.Linux.Resources.Devices = nil
|
|
grpcSpec.Linux.Resources.Pids = nil
|
|
grpcSpec.Linux.Resources.BlockIO = nil
|
|
grpcSpec.Linux.Resources.Network = nil
|
|
if grpcSpec.Linux.Resources.CPU != nil {
|
|
grpcSpec.Linux.Resources.CPU.Cpus = ""
|
|
grpcSpec.Linux.Resources.CPU.Mems = ""
|
|
}
|
|
|
|
// We need agent systemd cgroup now.
|
|
// There are three main reasons to do not apply systemd cgroups in the VM
|
|
// - Initrd image doesn't have systemd.
|
|
// - Nobody will be able to modify the resources of a specific container by using systemctl set-property.
|
|
// - docker is not running in the VM.
|
|
// if resCtrl.IsSystemdCgroup(grpcSpec.Linux.CgroupsPath) {
|
|
// // Convert systemd cgroup to cgroupfs
|
|
// slice := strings.Split(grpcSpec.Linux.CgroupsPath, ":")
|
|
// // 0 - slice: system.slice
|
|
// // 1 - prefix: docker
|
|
// // 2 - name: abc123
|
|
// grpcSpec.Linux.CgroupsPath = filepath.Join("/", slice[1], slice[2])
|
|
// }
|
|
|
|
// Disable network namespace since it is already handled on the host by
|
|
// virtcontainers. The network is a complex part which cannot be simply
|
|
// passed to the agent.
|
|
// Every other namespaces's paths have to be emptied. This way, there
|
|
// is no confusion from the agent, trying to find an existing namespace
|
|
// on the guest.
|
|
var tmpNamespaces []grpc.LinuxNamespace
|
|
for _, ns := range grpcSpec.Linux.Namespaces {
|
|
switch ns.Type {
|
|
case string(specs.CgroupNamespace):
|
|
case string(specs.NetworkNamespace):
|
|
default:
|
|
ns.Path = ""
|
|
tmpNamespaces = append(tmpNamespaces, ns)
|
|
}
|
|
}
|
|
grpcSpec.Linux.Namespaces = tmpNamespaces
|
|
|
|
if stripVfio {
|
|
// VFIO char device shouldn't appear in the guest
|
|
// (because the VM device driver will do something
|
|
// with it rather than just presenting it to the
|
|
// container unmodified)
|
|
var linuxDevices []grpc.LinuxDevice
|
|
for _, dev := range grpcSpec.Linux.Devices {
|
|
if dev.Type == "c" && strings.HasPrefix(dev.Path, vfioPath) {
|
|
k.Logger().WithField("vfio-dev", dev.Path).Debug("removing vfio device from grpcSpec")
|
|
continue
|
|
}
|
|
linuxDevices = append(linuxDevices, dev)
|
|
}
|
|
grpcSpec.Linux.Devices = linuxDevices
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) handleShm(mounts []specs.Mount, sandbox *Sandbox) {
|
|
for idx, mnt := range mounts {
|
|
if mnt.Destination != "/dev/shm" {
|
|
continue
|
|
}
|
|
|
|
// If /dev/shm for a container is backed by an ephemeral volume, skip
|
|
// bind-mounting it to the sandbox shm.
|
|
// A later call to handleEphemeralStorage should take care of setting up /dev/shm correctly.
|
|
if mnt.Type == KataEphemeralDevType {
|
|
continue
|
|
}
|
|
|
|
// A container shm mount is shared with sandbox shm mount.
|
|
if sandbox.shmSize > 0 {
|
|
mounts[idx].Type = "bind"
|
|
mounts[idx].Options = []string{"rbind"}
|
|
mounts[idx].Source = filepath.Join(kataGuestSandboxDir(), shmDir)
|
|
k.Logger().WithField("shm-size", sandbox.shmSize).Info("Using sandbox shm")
|
|
} else {
|
|
// This should typically not happen, as a sandbox shm mount is always set up by the
|
|
// upper stack.
|
|
sizeOption := fmt.Sprintf("size=%d", DefaultShmSize)
|
|
mounts[idx].Type = "tmpfs"
|
|
mounts[idx].Source = "shm"
|
|
mounts[idx].Options = []string{"noexec", "nosuid", "nodev", "mode=1777", sizeOption}
|
|
k.Logger().WithField("shm-size", sizeOption).Info("Setting up a separate shm for container")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (k *kataAgent) appendBlockDevice(dev ContainerDevice, device api.Device, c *Container) *grpc.Device {
|
|
d, ok := device.GetDeviceInfo().(*config.BlockDrive)
|
|
if !ok || d == nil {
|
|
k.Logger().WithField("device", device).Error("malformed block drive")
|
|
return nil
|
|
}
|
|
|
|
if d.Pmem {
|
|
// block drive is a persistent memory device that
|
|
// was passed as volume (-v) not as device (--device).
|
|
// It shouldn't be visible in the container
|
|
return nil
|
|
}
|
|
|
|
kataDevice := &grpc.Device{
|
|
ContainerPath: dev.ContainerPath,
|
|
}
|
|
|
|
switch c.sandbox.config.HypervisorConfig.BlockDeviceDriver {
|
|
case config.VirtioMmio:
|
|
kataDevice.Type = kataMmioBlkDevType
|
|
kataDevice.Id = d.VirtPath
|
|
kataDevice.VmPath = d.VirtPath
|
|
case config.VirtioBlockCCW:
|
|
kataDevice.Type = kataBlkCCWDevType
|
|
kataDevice.Id = d.DevNo
|
|
case config.VirtioBlock:
|
|
kataDevice.Type = kataBlkDevType
|
|
kataDevice.Id = d.PCIPath.String()
|
|
kataDevice.VmPath = d.VirtPath
|
|
case config.VirtioSCSI:
|
|
kataDevice.Type = kataSCSIDevType
|
|
kataDevice.Id = d.SCSIAddr
|
|
case config.Nvdimm:
|
|
kataDevice.Type = kataNvdimmDevType
|
|
kataDevice.VmPath = fmt.Sprintf("/dev/pmem%s", d.NvdimmID)
|
|
}
|
|
|
|
return kataDevice
|
|
}
|
|
|
|
func (k *kataAgent) appendVhostUserBlkDevice(dev ContainerDevice, device api.Device, c *Container) *grpc.Device {
|
|
d, ok := device.GetDeviceInfo().(*config.VhostUserDeviceAttrs)
|
|
if !ok || d == nil {
|
|
k.Logger().WithField("device", device).Error("malformed vhost-user-blk drive")
|
|
return nil
|
|
}
|
|
|
|
kataDevice := &grpc.Device{
|
|
ContainerPath: dev.ContainerPath,
|
|
Type: kataBlkDevType,
|
|
Id: d.PCIPath.String(),
|
|
}
|
|
|
|
return kataDevice
|
|
}
|
|
|
|
func (k *kataAgent) appendVfioDevice(dev ContainerDevice, device api.Device, c *Container) *grpc.Device {
|
|
devList, ok := device.GetDeviceInfo().([]*config.VFIODev)
|
|
if !ok || devList == nil {
|
|
k.Logger().WithField("device", device).Error("malformed vfio device")
|
|
return nil
|
|
}
|
|
|
|
groupNum := filepath.Base(dev.ContainerPath)
|
|
|
|
// For VFIO-PCI, each /dev/vfio/NN device represents a VFIO group,
|
|
// which could include several PCI devices. So we give group
|
|
// information in the main structure, then list each individual PCI
|
|
// device in the Options array.
|
|
//
|
|
// Each option is formatted as "DDDD:BB:DD.F=<pcipath>"
|
|
// DDDD:BB:DD.F is the device's PCI address on the
|
|
// *host*. <pcipath> is the device's PCI path in the guest
|
|
// (see qomGetPciPath() for details).
|
|
//
|
|
// For VFIO-AP, one VFIO group could include several queue devices. They are
|
|
// identified by APQNs (Adjunct Processor Queue Numbers), which do not differ
|
|
// between host and guest. They are passed as options so they can be awaited
|
|
// by the agent.
|
|
kataDevice := &grpc.Device{
|
|
ContainerPath: dev.ContainerPath,
|
|
Type: kataVfioPciDevType,
|
|
Id: groupNum,
|
|
Options: nil,
|
|
}
|
|
|
|
// We always pass the device information to the agent, since
|
|
// it needs that to wait for them to be ready. But depending
|
|
// on the vfio_mode, we need to use a different device type so
|
|
// the agent can handle it properly
|
|
if c.sandbox.config.VfioMode == config.VFIOModeGuestKernel {
|
|
kataDevice.Type = kataVfioPciGuestKernelDevType
|
|
}
|
|
|
|
if (*devList[0]).GetType() == config.VFIOAPDeviceMediatedType {
|
|
kataDevice.Type = kataVfioApDevType
|
|
kataDevice.Options = (*devList[0]).(config.VFIOAPDev).APDevices
|
|
} else {
|
|
kataDevice.Options = make([]string, len(devList))
|
|
for i, device := range devList {
|
|
pciDevice := (*device).(config.VFIOPCIDev)
|
|
kataDevice.Options[i] = fmt.Sprintf("0000:%s=%s", pciDevice.BDF, pciDevice.GuestPciPath)
|
|
}
|
|
}
|
|
|
|
return kataDevice
|
|
}
|
|
|
|
func (k *kataAgent) appendDevices(deviceList []*grpc.Device, c *Container) []*grpc.Device {
|
|
var kataDevice *grpc.Device
|
|
|
|
for _, dev := range c.devices {
|
|
device := c.sandbox.devManager.GetDeviceByID(dev.ID)
|
|
if device == nil {
|
|
k.Logger().WithField("device", dev.ID).Error("failed to find device by id")
|
|
return nil
|
|
}
|
|
|
|
switch device.DeviceType() {
|
|
case config.DeviceBlock:
|
|
kataDevice = k.appendBlockDevice(dev, device, c)
|
|
case config.VhostUserBlk:
|
|
kataDevice = k.appendVhostUserBlkDevice(dev, device, c)
|
|
case config.DeviceVFIO:
|
|
kataDevice = k.appendVfioDevice(dev, device, c)
|
|
}
|
|
|
|
if kataDevice == nil {
|
|
continue
|
|
}
|
|
|
|
deviceList = append(deviceList, kataDevice)
|
|
}
|
|
|
|
return deviceList
|
|
}
|
|
|
|
// rollbackFailingContainerCreation rolls back important steps that might have
|
|
// been performed before the container creation failed.
|
|
// - Unmount container volumes.
|
|
// - Unmount container rootfs.
|
|
func (k *kataAgent) rollbackFailingContainerCreation(ctx context.Context, c *Container) {
|
|
if c != nil {
|
|
if err2 := c.unmountHostMounts(ctx); err2 != nil {
|
|
k.Logger().WithError(err2).Error("rollback failed unmountHostMounts()")
|
|
}
|
|
|
|
if err2 := c.sandbox.fsShare.UnshareRootFilesystem(ctx, c); err2 != nil {
|
|
k.Logger().WithError(err2).Error("rollback failed UnshareRootfs()")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (k *kataAgent) createContainer(ctx context.Context, sandbox *Sandbox, c *Container) (p *Process, err error) {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "createContainer", kataAgentTracingTags)
|
|
defer span.End()
|
|
var ctrStorages []*grpc.Storage
|
|
var ctrDevices []*grpc.Device
|
|
var sharedRootfs *SharedFile
|
|
|
|
// In case the container creation fails, the following defer statement
|
|
// takes care of rolling back actions previously performed.
|
|
defer func() {
|
|
if err != nil {
|
|
k.Logger().WithError(err).Error("createContainer failed")
|
|
k.rollbackFailingContainerCreation(ctx, c)
|
|
}
|
|
}()
|
|
|
|
// Share the container rootfs -- if its block based, we'll receive a non-nil storage object representing
|
|
// the block device for the rootfs, which us utilized for mounting in the guest. This'll be handled
|
|
// already for non-block based rootfs
|
|
if sharedRootfs, err = sandbox.fsShare.ShareRootFilesystem(ctx, c); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if sharedRootfs.storage != nil {
|
|
// Add rootfs to the list of container storage.
|
|
// We only need to do this for block based rootfs, as we
|
|
// want the agent to mount it into the right location
|
|
// (kataGuestSharedDir/ctrID/
|
|
ctrStorages = append(ctrStorages, sharedRootfs.storage)
|
|
}
|
|
|
|
ociSpec := c.GetPatchedOCISpec()
|
|
if ociSpec == nil {
|
|
return nil, errorMissingOCISpec
|
|
}
|
|
|
|
// Handle container mounts
|
|
sharedDirMounts := make(map[string]Mount)
|
|
ignoredMounts := make(map[string]Mount)
|
|
|
|
shareStorages, err := c.mountSharedDirMounts(ctx, sharedDirMounts, ignoredMounts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctrStorages = append(ctrStorages, shareStorages...)
|
|
|
|
k.handleShm(ociSpec.Mounts, sandbox)
|
|
|
|
epheStorages, err := k.handleEphemeralStorage(ociSpec.Mounts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctrStorages = append(ctrStorages, epheStorages...)
|
|
|
|
k.Logger().WithField("ociSpec Hugepage Resources", ociSpec.Linux.Resources.HugepageLimits).Debug("ociSpec HugepageLimit")
|
|
hugepages, err := k.handleHugepages(ociSpec.Mounts, ociSpec.Linux.Resources.HugepageLimits)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctrStorages = append(ctrStorages, hugepages...)
|
|
|
|
localStorages, err := k.handleLocalStorage(ociSpec.Mounts, sandbox.id, c.rootfsSuffix)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctrStorages = append(ctrStorages, localStorages...)
|
|
|
|
// We replace all OCI mount sources that match our container mount
|
|
// with the right source path (The guest one).
|
|
if err = k.replaceOCIMountSource(ociSpec, sharedDirMounts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Remove all mounts that should be ignored from the spec
|
|
if err = k.removeIgnoredOCIMount(ociSpec, ignoredMounts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Append container devices for block devices passed with --device.
|
|
ctrDevices = k.appendDevices(ctrDevices, c)
|
|
|
|
// Block based volumes will require some adjustments in the OCI spec, and creation of
|
|
// storage objects to pass to the agent.
|
|
volumeStorages, err := k.handleBlkOCIMounts(c, ociSpec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctrStorages = append(ctrStorages, volumeStorages...)
|
|
|
|
grpcSpec, err := grpc.OCItoGRPC(ociSpec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We need to give the OCI spec our absolute rootfs path in the guest.
|
|
grpcSpec.Root.Path = sharedRootfs.guestPath
|
|
|
|
sharedPidNs := k.handlePidNamespace(grpcSpec, sandbox)
|
|
|
|
if !sandbox.config.DisableGuestSeccomp && !sandbox.seccompSupported {
|
|
return nil, fmt.Errorf("Seccomp profiles are passed to the virtual machine, but the Kata agent does not support seccomp")
|
|
}
|
|
|
|
passSeccomp := !sandbox.config.DisableGuestSeccomp && sandbox.seccompSupported
|
|
|
|
// Currently, guest SELinux can be enabled only when SELinux is enabled on the host side.
|
|
if !sandbox.config.HypervisorConfig.DisableGuestSeLinux && !selinux.GetEnabled() {
|
|
return nil, fmt.Errorf("Guest SELinux is enabled, but SELinux is disabled on the host side")
|
|
}
|
|
if sandbox.config.HypervisorConfig.DisableGuestSeLinux && sandbox.config.GuestSeLinuxLabel != "" {
|
|
return nil, fmt.Errorf("Custom SELinux security policy is provided, but guest SELinux is disabled")
|
|
}
|
|
|
|
// We need to constrain the spec to make sure we're not
|
|
// passing irrelevant information to the agent.
|
|
err = k.constrainGRPCSpec(grpcSpec, passSeccomp, sandbox.config.HypervisorConfig.DisableGuestSeLinux, sandbox.config.GuestSeLinuxLabel, sandbox.config.VfioMode == config.VFIOModeGuestKernel)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req := &grpc.CreateContainerRequest{
|
|
ContainerId: c.id,
|
|
ExecId: c.id,
|
|
Storages: ctrStorages,
|
|
Devices: ctrDevices,
|
|
OCI: grpcSpec,
|
|
SandboxPidns: sharedPidNs,
|
|
}
|
|
|
|
if _, err = k.sendReq(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return buildProcessFromExecID(req.ExecId)
|
|
}
|
|
|
|
func buildProcessFromExecID(token string) (*Process, error) {
|
|
return &Process{
|
|
Token: token,
|
|
StartTime: time.Now().UTC(),
|
|
Pid: -1,
|
|
}, nil
|
|
}
|
|
|
|
// handleHugePages handles hugepages storage by
|
|
// creating a Storage from corresponding source of the mount point
|
|
func (k *kataAgent) handleHugepages(mounts []specs.Mount, hugepageLimits []specs.LinuxHugepageLimit) ([]*grpc.Storage, error) {
|
|
//Map to hold the total memory of each type of hugepages
|
|
optionsMap := make(map[int64]string)
|
|
|
|
for _, hp := range hugepageLimits {
|
|
if hp.Limit != 0 {
|
|
k.Logger().WithFields(logrus.Fields{
|
|
"Pagesize": hp.Pagesize,
|
|
"Limit": hp.Limit,
|
|
}).Info("hugepage request")
|
|
//example Pagesize 2MB, 1GB etc. The Limit are in Bytes
|
|
pageSize, err := units.RAMInBytes(hp.Pagesize)
|
|
if err != nil {
|
|
k.Logger().Error("Unable to convert pagesize to bytes")
|
|
return nil, err
|
|
}
|
|
totalHpSizeStr := strconv.FormatUint(hp.Limit, 10)
|
|
optionsMap[pageSize] = totalHpSizeStr
|
|
}
|
|
}
|
|
|
|
var hugepages []*grpc.Storage
|
|
for idx, mnt := range mounts {
|
|
if mnt.Type != KataLocalDevType {
|
|
continue
|
|
}
|
|
//HugePages mount Type is Local
|
|
if _, fsType, fsOptions, _ := utils.GetDevicePathAndFsTypeOptions(mnt.Source); fsType == "hugetlbfs" {
|
|
k.Logger().WithField("fsOptions", fsOptions).Debug("hugepage mount options")
|
|
//Find the pagesize from the mountpoint options
|
|
pagesizeOpt := getPagesizeFromOpt(fsOptions)
|
|
if pagesizeOpt == "" {
|
|
return nil, fmt.Errorf("No pagesize option found in filesystem mount options")
|
|
}
|
|
pageSize, err := units.RAMInBytes(pagesizeOpt)
|
|
if err != nil {
|
|
k.Logger().Error("Unable to convert pagesize from fs mount options to bytes")
|
|
return nil, err
|
|
}
|
|
//Create mount option string
|
|
options := fmt.Sprintf("pagesize=%s,size=%s", strconv.FormatInt(pageSize, 10), optionsMap[pageSize])
|
|
k.Logger().WithField("Hugepage options string", options).Debug("hugepage mount options")
|
|
// Set the mount source path to a path that resides inside the VM
|
|
mounts[idx].Source = filepath.Join(ephemeralPath(), filepath.Base(mnt.Source))
|
|
// Set the mount type to "bind"
|
|
mounts[idx].Type = "bind"
|
|
|
|
// Create a storage struct so that kata agent is able to create
|
|
// hugetlbfs backed volume inside the VM
|
|
hugepage := &grpc.Storage{
|
|
Driver: KataEphemeralDevType,
|
|
Source: "nodev",
|
|
Fstype: "hugetlbfs",
|
|
MountPoint: mounts[idx].Source,
|
|
Options: []string{options},
|
|
}
|
|
hugepages = append(hugepages, hugepage)
|
|
}
|
|
|
|
}
|
|
return hugepages, nil
|
|
}
|
|
|
|
// handleEphemeralStorage handles ephemeral storages by
|
|
// creating a Storage from corresponding source of the mount point
|
|
func (k *kataAgent) handleEphemeralStorage(mounts []specs.Mount) ([]*grpc.Storage, error) {
|
|
var epheStorages []*grpc.Storage
|
|
for idx, mnt := range mounts {
|
|
if mnt.Type == KataEphemeralDevType {
|
|
origin_src := mounts[idx].Source
|
|
stat := syscall.Stat_t{}
|
|
err := syscall.Stat(origin_src, &stat)
|
|
if err != nil {
|
|
k.Logger().WithError(err).Errorf("failed to stat %s", origin_src)
|
|
return nil, err
|
|
}
|
|
|
|
var dir_options []string
|
|
|
|
// if volume's gid isn't root group(default group), this means there's
|
|
// an specific fsGroup is set on this local volume, then it should pass
|
|
// to guest.
|
|
if stat.Gid != 0 {
|
|
dir_options = append(dir_options, fmt.Sprintf("%s=%d", fsGid, stat.Gid))
|
|
}
|
|
|
|
// Set the mount source path to a path that resides inside the VM
|
|
mounts[idx].Source = filepath.Join(ephemeralPath(), filepath.Base(mnt.Source))
|
|
// Set the mount type to "bind"
|
|
mounts[idx].Type = "bind"
|
|
|
|
// Create a storage struct so that kata agent is able to create
|
|
// tmpfs backed volume inside the VM
|
|
epheStorage := &grpc.Storage{
|
|
Driver: KataEphemeralDevType,
|
|
Source: "tmpfs",
|
|
Fstype: "tmpfs",
|
|
MountPoint: mounts[idx].Source,
|
|
Options: dir_options,
|
|
}
|
|
epheStorages = append(epheStorages, epheStorage)
|
|
}
|
|
}
|
|
return epheStorages, nil
|
|
}
|
|
|
|
// handleLocalStorage handles local storage within the VM
|
|
// by creating a directory in the VM from the source of the mount point.
|
|
func (k *kataAgent) handleLocalStorage(mounts []specs.Mount, sandboxID string, rootfsSuffix string) ([]*grpc.Storage, error) {
|
|
var localStorages []*grpc.Storage
|
|
for idx, mnt := range mounts {
|
|
if mnt.Type == KataLocalDevType {
|
|
origin_src := mounts[idx].Source
|
|
stat := syscall.Stat_t{}
|
|
err := syscall.Stat(origin_src, &stat)
|
|
if err != nil {
|
|
k.Logger().WithError(err).Errorf("failed to stat %s", origin_src)
|
|
return nil, err
|
|
}
|
|
|
|
dir_options := localDirOptions
|
|
|
|
// if volume's gid isn't root group(default group), this means there's
|
|
// an specific fsGroup is set on this local volume, then it should pass
|
|
// to guest.
|
|
if stat.Gid != 0 {
|
|
dir_options = append(dir_options, fmt.Sprintf("%s=%d", fsGid, stat.Gid))
|
|
}
|
|
|
|
// Set the mount source path to a the desired directory point in the VM.
|
|
// In this case it is located in the sandbox directory.
|
|
// We rely on the fact that the first container in the VM has the same ID as the sandbox ID.
|
|
// In Kubernetes, this is usually the pause container and we depend on it existing for
|
|
// local directories to work.
|
|
mounts[idx].Source = filepath.Join(kataGuestSharedDir(), sandboxID, rootfsSuffix, KataLocalDevType, filepath.Base(mnt.Source))
|
|
|
|
// Create a storage struct so that the kata agent is able to create the
|
|
// directory inside the VM.
|
|
localStorage := &grpc.Storage{
|
|
Driver: KataLocalDevType,
|
|
Source: KataLocalDevType,
|
|
Fstype: KataLocalDevType,
|
|
MountPoint: mounts[idx].Source,
|
|
Options: dir_options,
|
|
}
|
|
localStorages = append(localStorages, localStorage)
|
|
}
|
|
}
|
|
return localStorages, nil
|
|
}
|
|
|
|
// handleDeviceBlockVolume handles volume that is block device file
|
|
// and DeviceBlock type.
|
|
func (k *kataAgent) handleDeviceBlockVolume(c *Container, m Mount, device api.Device) (*grpc.Storage, error) {
|
|
vol := &grpc.Storage{}
|
|
|
|
blockDrive, ok := device.GetDeviceInfo().(*config.BlockDrive)
|
|
if !ok || blockDrive == nil {
|
|
k.Logger().Error("malformed block drive")
|
|
return nil, fmt.Errorf("malformed block drive")
|
|
}
|
|
switch {
|
|
// pmem volumes case
|
|
case blockDrive.Pmem:
|
|
vol.Driver = kataNvdimmDevType
|
|
vol.Source = fmt.Sprintf("/dev/pmem%s", blockDrive.NvdimmID)
|
|
vol.Fstype = blockDrive.Format
|
|
vol.Options = []string{"dax"}
|
|
case c.sandbox.config.HypervisorConfig.BlockDeviceDriver == config.VirtioBlockCCW:
|
|
vol.Driver = kataBlkCCWDevType
|
|
vol.Source = blockDrive.DevNo
|
|
case c.sandbox.config.HypervisorConfig.BlockDeviceDriver == config.VirtioBlock:
|
|
vol.Driver = kataBlkDevType
|
|
vol.Source = blockDrive.PCIPath.String()
|
|
case c.sandbox.config.HypervisorConfig.BlockDeviceDriver == config.VirtioMmio:
|
|
vol.Driver = kataMmioBlkDevType
|
|
vol.Source = blockDrive.VirtPath
|
|
case c.sandbox.config.HypervisorConfig.BlockDeviceDriver == config.VirtioSCSI:
|
|
vol.Driver = kataSCSIDevType
|
|
vol.Source = blockDrive.SCSIAddr
|
|
default:
|
|
return nil, fmt.Errorf("Unknown block device driver: %s", c.sandbox.config.HypervisorConfig.BlockDeviceDriver)
|
|
}
|
|
|
|
vol.MountPoint = m.Destination
|
|
|
|
// If no explicit FS Type or Options are being set, then let's use what is provided for the particular mount:
|
|
if vol.Fstype == "" {
|
|
vol.Fstype = m.Type
|
|
}
|
|
if len(vol.Options) == 0 {
|
|
vol.Options = m.Options
|
|
}
|
|
if m.FSGroup != nil {
|
|
vol.FsGroup = &grpc.FSGroup{
|
|
GroupId: uint32(*m.FSGroup),
|
|
GroupChangePolicy: getFSGroupChangePolicy(m.FSGroupChangePolicy),
|
|
}
|
|
}
|
|
|
|
return vol, nil
|
|
}
|
|
|
|
// handleVhostUserBlkVolume handles volume that is block device file
|
|
// and VhostUserBlk type.
|
|
func (k *kataAgent) handleVhostUserBlkVolume(c *Container, m Mount, device api.Device) (*grpc.Storage, error) {
|
|
vol := &grpc.Storage{}
|
|
|
|
d, ok := device.GetDeviceInfo().(*config.VhostUserDeviceAttrs)
|
|
if !ok || d == nil {
|
|
k.Logger().Error("malformed vhost-user blk drive")
|
|
return nil, fmt.Errorf("malformed vhost-user blk drive")
|
|
}
|
|
|
|
vol.Driver = kataBlkDevType
|
|
vol.Source = d.PCIPath.String()
|
|
vol.Fstype = "bind"
|
|
vol.Options = []string{"bind"}
|
|
vol.MountPoint = m.Destination
|
|
|
|
// Assign the type from the mount, if it's specified (e.g. direct assigned volume)
|
|
if m.Type != "" {
|
|
vol.Fstype = m.Type
|
|
vol.Options = m.Options
|
|
}
|
|
|
|
return vol, nil
|
|
}
|
|
|
|
func (k *kataAgent) createBlkStorageObject(c *Container, m Mount) (*grpc.Storage, error) {
|
|
var vol *grpc.Storage
|
|
|
|
id := m.BlockDeviceID
|
|
device := c.sandbox.devManager.GetDeviceByID(id)
|
|
if device == nil {
|
|
k.Logger().WithField("device", id).Error("failed to find device by id")
|
|
return nil, fmt.Errorf("Failed to find device by id (id=%s)", id)
|
|
}
|
|
|
|
var err error
|
|
switch device.DeviceType() {
|
|
case config.DeviceBlock:
|
|
vol, err = k.handleDeviceBlockVolume(c, m, device)
|
|
case config.VhostUserBlk:
|
|
vol, err = k.handleVhostUserBlkVolume(c, m, device)
|
|
default:
|
|
return nil, fmt.Errorf("Unknown device type")
|
|
}
|
|
|
|
return vol, err
|
|
}
|
|
|
|
// handleBlkOCIMounts will create a unique destination mountpoint in the guest for each volume in the
|
|
// given container and will update the OCI spec to utilize this mount point as the new source for the
|
|
// container volume. The container mount structure is updated to store the guest destination mountpoint.
|
|
func (k *kataAgent) handleBlkOCIMounts(c *Container, spec *specs.Spec) ([]*grpc.Storage, error) {
|
|
|
|
var volumeStorages []*grpc.Storage
|
|
|
|
for i, m := range c.mounts {
|
|
id := m.BlockDeviceID
|
|
|
|
if len(id) == 0 {
|
|
continue
|
|
}
|
|
|
|
// Add the block device to the list of container devices, to make sure the
|
|
// device is detached with detachDevices() for a container.
|
|
c.devices = append(c.devices, ContainerDevice{ID: id, ContainerPath: m.Destination})
|
|
|
|
// Create Storage structure
|
|
vol, err := k.createBlkStorageObject(c, m)
|
|
if vol == nil || err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Each device will be mounted at a unique location within the VM only once. Mounting
|
|
// to the container specific location is handled within the OCI spec. Let's ensure that
|
|
// the storage mount point is unique for each device. This is then utilized as the source
|
|
// in the OCI spec. If multiple containers mount the same block device, it's ref-counted inside
|
|
// the guest by Kata agent.
|
|
filename := b64.URLEncoding.EncodeToString([]byte(vol.Source))
|
|
path := filepath.Join(kataGuestSandboxStorageDir(), filename)
|
|
|
|
// Update applicable OCI mount source
|
|
for idx, ociMount := range spec.Mounts {
|
|
if ociMount.Destination != vol.MountPoint {
|
|
continue
|
|
}
|
|
k.Logger().WithFields(logrus.Fields{
|
|
"original-source": ociMount.Source,
|
|
"new-source": path,
|
|
}).Debug("Replacing OCI mount source")
|
|
spec.Mounts[idx].Source = path
|
|
break
|
|
}
|
|
|
|
// Update storage mountpoint, and save guest device mount path to container mount struct:
|
|
vol.MountPoint = path
|
|
c.mounts[i].GuestDeviceMount = path
|
|
|
|
volumeStorages = append(volumeStorages, vol)
|
|
}
|
|
|
|
return volumeStorages, nil
|
|
}
|
|
|
|
// handlePidNamespace checks if Pid namespace for a container needs to be shared with its sandbox
|
|
// pid namespace. This function also modifies the grpc spec to remove the pid namespace
|
|
// from the list of namespaces passed to the agent.
|
|
func (k *kataAgent) handlePidNamespace(grpcSpec *grpc.Spec, sandbox *Sandbox) bool {
|
|
sharedPidNs := false
|
|
pidIndex := -1
|
|
|
|
for i, ns := range grpcSpec.Linux.Namespaces {
|
|
if ns.Type != string(specs.PIDNamespace) {
|
|
continue
|
|
}
|
|
|
|
pidIndex = i
|
|
// host pidns path does not make sense in kata. Let's just align it with
|
|
// sandbox namespace whenever it is set.
|
|
if ns.Path != "" {
|
|
sharedPidNs = true
|
|
}
|
|
break
|
|
}
|
|
|
|
// Remove pid namespace.
|
|
if pidIndex >= 0 {
|
|
grpcSpec.Linux.Namespaces = append(grpcSpec.Linux.Namespaces[:pidIndex], grpcSpec.Linux.Namespaces[pidIndex+1:]...)
|
|
}
|
|
|
|
return sharedPidNs
|
|
}
|
|
|
|
func (k *kataAgent) startContainer(ctx context.Context, sandbox *Sandbox, c *Container) error {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "startContainer", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
req := &grpc.StartContainerRequest{
|
|
ContainerId: c.id,
|
|
}
|
|
|
|
_, err := k.sendReq(ctx, req)
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) stopContainer(ctx context.Context, sandbox *Sandbox, c Container) error {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "stopContainer", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
_, err := k.sendReq(ctx, &grpc.RemoveContainerRequest{ContainerId: c.id})
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) signalProcess(ctx context.Context, c *Container, processID string, signal syscall.Signal, all bool) error {
|
|
execID := processID
|
|
if all {
|
|
// kata agent uses empty execId to signal all processes in a container
|
|
execID = ""
|
|
}
|
|
req := &grpc.SignalProcessRequest{
|
|
ContainerId: c.id,
|
|
ExecId: execID,
|
|
Signal: uint32(signal),
|
|
}
|
|
|
|
_, err := k.sendReq(ctx, req)
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) winsizeProcess(ctx context.Context, c *Container, processID string, height, width uint32) error {
|
|
req := &grpc.TtyWinResizeRequest{
|
|
ContainerId: c.id,
|
|
ExecId: processID,
|
|
Row: height,
|
|
Column: width,
|
|
}
|
|
|
|
_, err := k.sendReq(ctx, req)
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) updateContainer(ctx context.Context, sandbox *Sandbox, c Container, resources specs.LinuxResources) error {
|
|
grpcResources, err := grpc.ResourcesOCItoGRPC(&resources)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &grpc.UpdateContainerRequest{
|
|
ContainerId: c.id,
|
|
Resources: grpcResources,
|
|
}
|
|
|
|
_, err = k.sendReq(ctx, req)
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) pauseContainer(ctx context.Context, sandbox *Sandbox, c Container) error {
|
|
req := &grpc.PauseContainerRequest{
|
|
ContainerId: c.id,
|
|
}
|
|
|
|
_, err := k.sendReq(ctx, req)
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) resumeContainer(ctx context.Context, sandbox *Sandbox, c Container) error {
|
|
req := &grpc.ResumeContainerRequest{
|
|
ContainerId: c.id,
|
|
}
|
|
|
|
_, err := k.sendReq(ctx, req)
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) memHotplugByProbe(ctx context.Context, addr uint64, sizeMB uint32, memorySectionSizeMB uint32) error {
|
|
if memorySectionSizeMB == uint32(0) {
|
|
return fmt.Errorf("memorySectionSizeMB couldn't be zero")
|
|
}
|
|
// hot-added memory device should be sliced into the size of memory section, which is the basic unit for
|
|
// memory hotplug
|
|
numSection := uint64(sizeMB / memorySectionSizeMB)
|
|
var addrList []uint64
|
|
index := uint64(0)
|
|
for index < numSection {
|
|
k.Logger().WithFields(logrus.Fields{
|
|
"addr": fmt.Sprintf("0x%x", addr+(index*uint64(memorySectionSizeMB))<<20),
|
|
}).Debugf("notify guest kernel the address of memory device")
|
|
addrList = append(addrList, addr+(index*uint64(memorySectionSizeMB))<<20)
|
|
index++
|
|
}
|
|
req := &grpc.MemHotplugByProbeRequest{
|
|
MemHotplugProbeAddr: addrList,
|
|
}
|
|
|
|
_, err := k.sendReq(ctx, req)
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) onlineCPUMem(ctx context.Context, cpus uint32, cpuOnly bool) error {
|
|
req := &grpc.OnlineCPUMemRequest{
|
|
Wait: false,
|
|
NbCpus: cpus,
|
|
CpuOnly: cpuOnly,
|
|
}
|
|
|
|
_, err := k.sendReq(ctx, req)
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) statsContainer(ctx context.Context, sandbox *Sandbox, c Container) (*ContainerStats, error) {
|
|
req := &grpc.StatsContainerRequest{
|
|
ContainerId: c.id,
|
|
}
|
|
|
|
returnStats, err := k.sendReq(ctx, req)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stats, ok := returnStats.(*grpc.StatsContainerResponse)
|
|
if !ok {
|
|
return nil, fmt.Errorf("irregular response container stats")
|
|
}
|
|
|
|
data, err := json.Marshal(stats.CgroupStats)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var cgroupStats CgroupStats
|
|
err = json.Unmarshal(data, &cgroupStats)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
containerStats := &ContainerStats{
|
|
CgroupStats: &cgroupStats,
|
|
}
|
|
return containerStats, nil
|
|
}
|
|
|
|
func (k *kataAgent) connect(ctx context.Context) error {
|
|
if k.dead {
|
|
return errors.New("Dead agent")
|
|
}
|
|
// lockless quick pass
|
|
if k.client != nil {
|
|
return nil
|
|
}
|
|
|
|
span, _ := katatrace.Trace(ctx, k.Logger(), "connect", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
// This is for the first connection only, to prevent race
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
if k.client != nil {
|
|
return nil
|
|
}
|
|
|
|
k.Logger().WithField("url", k.state.URL).Info("New client")
|
|
client, err := kataclient.NewAgentClient(k.ctx, k.state.URL, k.dialTimout)
|
|
if err != nil {
|
|
k.dead = true
|
|
return err
|
|
}
|
|
|
|
k.installReqFunc(client)
|
|
k.client = client
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) disconnect(ctx context.Context) error {
|
|
span, _ := katatrace.Trace(ctx, k.Logger(), "Disconnect", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
k.Lock()
|
|
defer k.Unlock()
|
|
|
|
if k.client == nil {
|
|
return nil
|
|
}
|
|
|
|
if err := k.client.Close(); err != nil && grpcStatus.Convert(err).Code() != codes.Canceled {
|
|
return err
|
|
}
|
|
|
|
k.client = nil
|
|
k.reqHandlers = nil
|
|
|
|
return nil
|
|
}
|
|
|
|
// check grpc server is serving
|
|
func (k *kataAgent) check(ctx context.Context) error {
|
|
_, err := k.sendReq(ctx, &grpc.CheckRequest{})
|
|
if err != nil {
|
|
err = fmt.Errorf("Failed to Check if grpc server is working: %s", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) waitProcess(ctx context.Context, c *Container, processID string) (int32, error) {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "waitProcess", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
resp, err := k.sendReq(ctx, &grpc.WaitProcessRequest{
|
|
ContainerId: c.id,
|
|
ExecId: processID,
|
|
})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return resp.(*grpc.WaitProcessResponse).Status, nil
|
|
}
|
|
|
|
func (k *kataAgent) writeProcessStdin(ctx context.Context, c *Container, ProcessID string, data []byte) (int, error) {
|
|
resp, err := k.sendReq(ctx, &grpc.WriteStreamRequest{
|
|
ContainerId: c.id,
|
|
ExecId: ProcessID,
|
|
Data: data,
|
|
})
|
|
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return int(resp.(*grpc.WriteStreamResponse).Len), nil
|
|
}
|
|
|
|
func (k *kataAgent) closeProcessStdin(ctx context.Context, c *Container, ProcessID string) error {
|
|
_, err := k.sendReq(ctx, &grpc.CloseStdinRequest{
|
|
ContainerId: c.id,
|
|
ExecId: ProcessID,
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) reseedRNG(ctx context.Context, data []byte) error {
|
|
_, err := k.sendReq(ctx, &grpc.ReseedRandomDevRequest{
|
|
Data: data,
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
type reqFunc func(context.Context, interface{}) (interface{}, error)
|
|
|
|
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
|
|
k.reqHandlers = make(map[string]reqFunc)
|
|
k.reqHandlers[grpcCheckRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.HealthClient.Check(ctx, req.(*grpc.CheckRequest))
|
|
}
|
|
k.reqHandlers[grpcExecProcessRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.ExecProcess(ctx, req.(*grpc.ExecProcessRequest))
|
|
}
|
|
k.reqHandlers[grpcCreateSandboxRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.CreateSandbox(ctx, req.(*grpc.CreateSandboxRequest))
|
|
}
|
|
k.reqHandlers[grpcDestroySandboxRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.DestroySandbox(ctx, req.(*grpc.DestroySandboxRequest))
|
|
}
|
|
k.reqHandlers[grpcCreateContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.CreateContainer(ctx, req.(*grpc.CreateContainerRequest))
|
|
}
|
|
k.reqHandlers[grpcStartContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.StartContainer(ctx, req.(*grpc.StartContainerRequest))
|
|
}
|
|
k.reqHandlers[grpcRemoveContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.RemoveContainer(ctx, req.(*grpc.RemoveContainerRequest))
|
|
}
|
|
k.reqHandlers[grpcSignalProcessRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.SignalProcess(ctx, req.(*grpc.SignalProcessRequest))
|
|
}
|
|
k.reqHandlers[grpcUpdateRoutesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.UpdateRoutes(ctx, req.(*grpc.UpdateRoutesRequest))
|
|
}
|
|
k.reqHandlers[grpcUpdateInterfaceRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.UpdateInterface(ctx, req.(*grpc.UpdateInterfaceRequest))
|
|
}
|
|
k.reqHandlers[grpcUpdateEphemeralMountsRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.UpdateEphemeralMounts(ctx, req.(*grpc.UpdateEphemeralMountsRequest))
|
|
}
|
|
k.reqHandlers[grpcListInterfacesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.ListInterfaces(ctx, req.(*grpc.ListInterfacesRequest))
|
|
}
|
|
k.reqHandlers[grpcListRoutesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.ListRoutes(ctx, req.(*grpc.ListRoutesRequest))
|
|
}
|
|
k.reqHandlers[grpcAddARPNeighborsRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.AddARPNeighbors(ctx, req.(*grpc.AddARPNeighborsRequest))
|
|
}
|
|
k.reqHandlers[grpcOnlineCPUMemRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.OnlineCPUMem(ctx, req.(*grpc.OnlineCPUMemRequest))
|
|
}
|
|
k.reqHandlers[grpcUpdateContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.UpdateContainer(ctx, req.(*grpc.UpdateContainerRequest))
|
|
}
|
|
k.reqHandlers[grpcWaitProcessRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.WaitProcess(ctx, req.(*grpc.WaitProcessRequest))
|
|
}
|
|
k.reqHandlers[grpcTtyWinResizeRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.TtyWinResize(ctx, req.(*grpc.TtyWinResizeRequest))
|
|
}
|
|
k.reqHandlers[grpcWriteStreamRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.WriteStdin(ctx, req.(*grpc.WriteStreamRequest))
|
|
}
|
|
k.reqHandlers[grpcCloseStdinRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.CloseStdin(ctx, req.(*grpc.CloseStdinRequest))
|
|
}
|
|
k.reqHandlers[grpcStatsContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.StatsContainer(ctx, req.(*grpc.StatsContainerRequest))
|
|
}
|
|
k.reqHandlers[grpcPauseContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.PauseContainer(ctx, req.(*grpc.PauseContainerRequest))
|
|
}
|
|
k.reqHandlers[grpcResumeContainerRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.ResumeContainer(ctx, req.(*grpc.ResumeContainerRequest))
|
|
}
|
|
k.reqHandlers[grpcReseedRandomDevRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.ReseedRandomDev(ctx, req.(*grpc.ReseedRandomDevRequest))
|
|
}
|
|
k.reqHandlers[grpcGuestDetailsRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.GetGuestDetails(ctx, req.(*grpc.GuestDetailsRequest))
|
|
}
|
|
k.reqHandlers[grpcMemHotplugByProbeRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.MemHotplugByProbe(ctx, req.(*grpc.MemHotplugByProbeRequest))
|
|
}
|
|
k.reqHandlers[grpcCopyFileRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.CopyFile(ctx, req.(*grpc.CopyFileRequest))
|
|
}
|
|
k.reqHandlers[grpcSetGuestDateTimeRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.SetGuestDateTime(ctx, req.(*grpc.SetGuestDateTimeRequest))
|
|
}
|
|
k.reqHandlers[grpcGetOOMEventRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.GetOOMEvent(ctx, req.(*grpc.GetOOMEventRequest))
|
|
}
|
|
k.reqHandlers[grpcGetMetricsRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.GetMetrics(ctx, req.(*grpc.GetMetricsRequest))
|
|
}
|
|
k.reqHandlers[grpcAddSwapRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.AddSwap(ctx, req.(*grpc.AddSwapRequest))
|
|
}
|
|
k.reqHandlers[grpcVolumeStatsRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.GetVolumeStats(ctx, req.(*grpc.VolumeStatsRequest))
|
|
}
|
|
k.reqHandlers[grpcResizeVolumeRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.ResizeVolume(ctx, req.(*grpc.ResizeVolumeRequest))
|
|
}
|
|
k.reqHandlers[grpcGetIPTablesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.GetIPTables(ctx, req.(*grpc.GetIPTablesRequest))
|
|
}
|
|
k.reqHandlers[grpcSetIPTablesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
|
|
return k.client.AgentServiceClient.SetIPTables(ctx, req.(*grpc.SetIPTablesRequest))
|
|
}
|
|
}
|
|
|
|
func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) {
|
|
newCtx = ctx
|
|
switch reqName {
|
|
case grpcWaitProcessRequest, grpcGetOOMEventRequest:
|
|
// Wait and GetOOMEvent have no timeout
|
|
case grpcCheckRequest:
|
|
newCtx, cancel = context.WithTimeout(ctx, checkRequestTimeout)
|
|
default:
|
|
newCtx, cancel = context.WithTimeout(ctx, defaultRequestTimeout)
|
|
}
|
|
|
|
return newCtx, cancel
|
|
}
|
|
|
|
func (k *kataAgent) sendReq(spanCtx context.Context, request interface{}) (interface{}, error) {
|
|
start := time.Now()
|
|
|
|
if err := k.connect(spanCtx); err != nil {
|
|
return nil, err
|
|
}
|
|
if !k.keepConn {
|
|
defer k.disconnect(spanCtx)
|
|
}
|
|
|
|
msgName := proto.MessageName(request.(proto.Message))
|
|
|
|
k.Lock()
|
|
|
|
if k.reqHandlers == nil {
|
|
k.Unlock()
|
|
return nil, errors.New("Client has already disconnected")
|
|
}
|
|
|
|
handler := k.reqHandlers[msgName]
|
|
if msgName == "" || handler == nil {
|
|
k.Unlock()
|
|
return nil, errors.New("Invalid request type")
|
|
}
|
|
|
|
k.Unlock()
|
|
|
|
message := request.(proto.Message)
|
|
ctx, cancel := k.getReqContext(spanCtx, msgName)
|
|
if cancel != nil {
|
|
defer cancel()
|
|
}
|
|
k.Logger().WithField("name", msgName).WithField("req", message.String()).Trace("sending request")
|
|
|
|
defer func() {
|
|
agentRPCDurationsHistogram.WithLabelValues(msgName).Observe(float64(time.Since(start).Nanoseconds() / int64(time.Millisecond)))
|
|
}()
|
|
return handler(ctx, request)
|
|
}
|
|
|
|
// readStdout and readStderr are special that we cannot differentiate them with the request types...
|
|
func (k *kataAgent) readProcessStdout(ctx context.Context, c *Container, processID string, data []byte) (int, error) {
|
|
if err := k.connect(ctx); err != nil {
|
|
return 0, err
|
|
}
|
|
if !k.keepConn {
|
|
defer k.disconnect(ctx)
|
|
}
|
|
|
|
return k.readProcessStream(c.id, processID, data, k.client.AgentServiceClient.ReadStdout)
|
|
}
|
|
|
|
// readStdout and readStderr are special that we cannot differentiate them with the request types...
|
|
func (k *kataAgent) readProcessStderr(ctx context.Context, c *Container, processID string, data []byte) (int, error) {
|
|
if err := k.connect(ctx); err != nil {
|
|
return 0, err
|
|
}
|
|
if !k.keepConn {
|
|
defer k.disconnect(ctx)
|
|
}
|
|
|
|
return k.readProcessStream(c.id, processID, data, k.client.AgentServiceClient.ReadStderr)
|
|
}
|
|
|
|
type readFn func(context.Context, *grpc.ReadStreamRequest) (*grpc.ReadStreamResponse, error)
|
|
|
|
func (k *kataAgent) readProcessStream(containerID, processID string, data []byte, read readFn) (int, error) {
|
|
resp, err := read(k.ctx, &grpc.ReadStreamRequest{
|
|
ContainerId: containerID,
|
|
ExecId: processID,
|
|
Len: uint32(len(data))})
|
|
if err == nil {
|
|
copy(data, resp.Data)
|
|
return len(resp.Data), nil
|
|
}
|
|
|
|
return 0, err
|
|
}
|
|
|
|
func (k *kataAgent) getGuestDetails(ctx context.Context, req *grpc.GuestDetailsRequest) (*grpc.GuestDetailsResponse, error) {
|
|
resp, err := k.sendReq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.(*grpc.GuestDetailsResponse), nil
|
|
}
|
|
|
|
func (k *kataAgent) setGuestDateTime(ctx context.Context, tv time.Time) error {
|
|
_, err := k.sendReq(ctx, &grpc.SetGuestDateTimeRequest{
|
|
Sec: tv.Unix(),
|
|
Usec: int64(tv.Nanosecond() / 1e3),
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) copyFile(ctx context.Context, src, dst string) error {
|
|
var st unix.Stat_t
|
|
|
|
err := unix.Stat(src, &st)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not get file %s information: %v", src, err)
|
|
}
|
|
|
|
b, err := os.ReadFile(src)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not read file %s: %v", src, err)
|
|
}
|
|
|
|
fileSize := int64(len(b))
|
|
|
|
k.Logger().WithFields(logrus.Fields{
|
|
"source": src,
|
|
"dest": dst,
|
|
}).Debugf("Copying file from host to guest")
|
|
|
|
cpReq := &grpc.CopyFileRequest{
|
|
Path: dst,
|
|
DirMode: uint32(DirMode),
|
|
FileMode: uint32(st.Mode),
|
|
FileSize: fileSize,
|
|
Uid: int32(st.Uid),
|
|
Gid: int32(st.Gid),
|
|
}
|
|
|
|
// Handle the special case where the file is empty
|
|
if fileSize == 0 {
|
|
_, err = k.sendReq(ctx, cpReq)
|
|
return err
|
|
}
|
|
|
|
// Copy file by parts if it's needed
|
|
remainingBytes := fileSize
|
|
offset := int64(0)
|
|
for remainingBytes > 0 {
|
|
bytesToCopy := int64(len(b))
|
|
if bytesToCopy > grpcMaxDataSize {
|
|
bytesToCopy = grpcMaxDataSize
|
|
}
|
|
|
|
cpReq.Data = b[:bytesToCopy]
|
|
cpReq.Offset = offset
|
|
|
|
if _, err = k.sendReq(ctx, cpReq); err != nil {
|
|
return fmt.Errorf("Could not send CopyFile request: %v", err)
|
|
}
|
|
|
|
b = b[bytesToCopy:]
|
|
remainingBytes -= bytesToCopy
|
|
offset += grpcMaxDataSize
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kataAgent) addSwap(ctx context.Context, PCIPath types.PciPath) error {
|
|
span, ctx := katatrace.Trace(ctx, k.Logger(), "addSwap", kataAgentTracingTags)
|
|
defer span.End()
|
|
|
|
_, err := k.sendReq(ctx, &grpc.AddSwapRequest{PCIPath: PCIPath.ToArray()})
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) markDead(ctx context.Context) {
|
|
k.Logger().Infof("mark agent dead")
|
|
k.dead = true
|
|
k.disconnect(ctx)
|
|
}
|
|
|
|
func (k *kataAgent) cleanup(ctx context.Context) {
|
|
}
|
|
|
|
func (k *kataAgent) save() persistapi.AgentState {
|
|
return persistapi.AgentState{
|
|
URL: k.state.URL,
|
|
}
|
|
}
|
|
|
|
func (k *kataAgent) load(s persistapi.AgentState) {
|
|
k.state.URL = s.URL
|
|
}
|
|
|
|
func (k *kataAgent) getOOMEvent(ctx context.Context) (string, error) {
|
|
req := &grpc.GetOOMEventRequest{}
|
|
result, err := k.sendReq(ctx, req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if oomEvent, ok := result.(*grpc.OOMEvent); ok {
|
|
return oomEvent.ContainerId, nil
|
|
}
|
|
return "", err
|
|
}
|
|
|
|
func (k *kataAgent) getAgentMetrics(ctx context.Context, req *grpc.GetMetricsRequest) (*grpc.Metrics, error) {
|
|
resp, err := k.sendReq(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp.(*grpc.Metrics), nil
|
|
}
|
|
|
|
func (k *kataAgent) getIPTables(ctx context.Context, isIPv6 bool) ([]byte, error) {
|
|
resp, err := k.sendReq(ctx, &grpc.GetIPTablesRequest{IsIpv6: isIPv6})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return resp.(*grpc.GetIPTablesResponse).Data, nil
|
|
}
|
|
|
|
func (k *kataAgent) setIPTables(ctx context.Context, isIPv6 bool, data []byte) error {
|
|
_, err := k.sendReq(ctx, &grpc.SetIPTablesRequest{
|
|
IsIpv6: isIPv6,
|
|
Data: data,
|
|
})
|
|
if err != nil {
|
|
k.Logger().WithError(err).Errorf("setIPTables request to agent failed")
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (k *kataAgent) getGuestVolumeStats(ctx context.Context, volumeGuestPath string) ([]byte, error) {
|
|
result, err := k.sendReq(ctx, &grpc.VolumeStatsRequest{VolumeGuestPath: volumeGuestPath})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
buf, err := json.Marshal(result.(*grpc.VolumeStatsResponse))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return buf, nil
|
|
}
|
|
|
|
func (k *kataAgent) resizeGuestVolume(ctx context.Context, volumeGuestPath string, size uint64) error {
|
|
_, err := k.sendReq(ctx, &grpc.ResizeVolumeRequest{VolumeGuestPath: volumeGuestPath, Size_: size})
|
|
return err
|
|
}
|