feature(nydusd): add nydusd support to introduse lazyload ability

Pulling image is the most time-consuming step in the container lifecycle. This PR
introduse nydus to kata container, it can lazily pull image when container start. So it
can speed up kata container create and start.

Fixes #2724

Signed-off-by: luodaowen.backend <luodaowen.backend@bytedance.com>
This commit is contained in:
luodaowen.backend 2021-08-20 18:41:53 +08:00
parent 88b3e9e848
commit 2d9f89aec7
31 changed files with 1520 additions and 114 deletions

View File

@ -10,6 +10,7 @@ Kata Containers design documents:
- [Host cgroups](host-cgroups.md)
- [`Inotify` support](inotify.md)
- [Metrics(Kata 2.0)](kata-2-0-metrics.md)
- [Design for Kata Containers `Lazyload` ability with `nydus`](kata-nydus-design.md)
---

View File

@ -0,0 +1 @@
<mxfile host="app.diagrams.net" modified="2022-01-18T14:06:01.890Z" agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36" etag="nId-8OV6FDjWTDgzqDu-" version="15.8.9" type="device"><diagram id="bkF_ZONM9sPFCpIYoGFl" name="Page-1">5Vtbj6M2GP01eUyEbW55nGSmM2q70mqnUnf6UrnBCdYSnIIzSfbX14AJYDsLSSDZUWdHGvxhDD7n8F1sdoTm6/1zgjfhJxaQaAStYD9CjyMIgQ1d8SezHAqL504LwyqhgexUGV7pdyKNlrRuaUDSRkfOWMTppmlcsDgmC96w4SRhu2a3JYuad93gFdEMrwsc6dY/acDDwupDr7K/ELoKyzuDcn5rXHaWM0lDHLBdzYSeRmieMMaLo/V+TqIMvBKX4rpfTpw9PlhCYt7lgpd4/J1SSP9++bR8Cb6A1de/XsZ2Mco7jrZywiFLuXxgfihR4GQv7jEL+ToSBiAOU56wb2TOIpYIS8xi0XO2pFGkmHBEV7FoLsRTEmGfvZOEU4HvgzyxpkGQ3Wa2Cyknrxu8yO65E2oStoRt44BkE7BES5+xBCEbk+xrJonAM2FrwpOD6CLP+o5kQ8rRsZ2ivavItWWXsMZrSSKWclodR64QFwcSdDMB8ycYj+f+Hw/+b3jxvN57e/vXsa8RoIHfBMEEU42XJYu5fIugfeSplC5QSBpBtHSyf/LKmr340ZgWZ9z858iHBr6BopN8INDkAwGdj6llIMSxh2JkamDEjbhEqEGN+++WlSfGaY76g+gA3c2+OimOVtnf+BBs03Ea400aMp69DHJY8ZTFyEW/H/AP+uC/D9aQNbFAkzjDiwQ8A3H+ULyVSrqCOARNxInQwjGNSRIMzth0OMacCYJN14csnTFnOkG+Tpo3GGnAQJqCJomDhyySZ1EkwmlKFzlKOOG6uYZr023WUBYTRDOBW3L4mp2cOGXzTV6ZNx738sqidWjEIBJoWYMWlFK2TRakg2DFTFaEt3kkndoab47JQ0pbQiLM6XvzeU1Eyjt8ZjR/W0rluErELD10OUQxT3lVPf9QBrIVV2+7ykAFDtpAua6O075Cauh6x97iH8ZpSNfjb5jj8TscxFn04Aocx2n3A65BUMM5AT0L7c+lwqFcqg8UHKEeAVGJdSOXdAYD0rle4tOTucvw4W8wrhyvyZU7NWQr0KB5dzCq3OupMqaZufcRVWnOzwfNVnxbiTlTg4tCP4h5/dPlXZin1KA7phxjkT3DRtZhTbxj+0Tikbc+k4SKCWWFdGHcU/61HF4cv1UJjWhVI2WNITIYdM/MxIOKStSEomtmosrNVVOcoTOTDosAncWl5LNWm6ykgirVvNX0dCMFdciBC0ruJjWkKAReKjWnZaCBpQZNRfLFUmu6sFYPdmdn1bXcuq9Xc1WFqClIV6mpA3nWjaV2aWlfl9oFkql5QgvYTYkC95Ioexd/Z/9MoVWLiJ39HWiJ0UOLEBpEeF6aDXxTmr3akrRzhv0zbZ9cl5grcdBxJL732j6BpqWDM/k1llHFNthHordZifn9EA6A4gmQYZXjtozraxxzoFFyaU2bB4hBalpggROpX1tRO9gaBNTXILLt6GX6IeH0O8KJBoNTXyOg6+zzAhGOPw6sSi3sGTZkgWlDdjhYTdXxmS7eMbn4NBSwBDQZZJ2s9OwRWfJ+qJmq+bxxq/yGxKAOteStNzc0t2BC6aZeodx1/d/LV0kdfeve8jXtB95ZvtNpO0i3VW+Hrbm2Iv70RjysL0DWS/xbrQkVL+e9qmzfP8H2uVW2Fhrs21bZyLTv2K9KykWd4wJkvx9rtK7HFFnIvZQCLNiXVFxVKt7kxmLRq47yo7g8mpmL63Mrahm4TtbTqXDjNF79nnd7tCvLF0leZmLi8mWUazYUFxIxwmyT4ZIj5czEr0Bznq1IOuJZ56INqrb4zbonfM5i8fiY5pojOOW7bO0okzzHHP+Tz1Sv4HvLiFzHLJ2adD3DZwrDxZRet7vO24MIcBoe43mP7qEQ9f3cg6VwrC6/dHUP6kYXALA//yCa1efuRffqPw2gp/8A</diagram></mxfile>

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 390 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 942 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 182 KiB

View File

@ -0,0 +1,93 @@
# Background
[Research](https://www.usenix.org/conference/fast16/technical-sessions/presentation/harter) shows that time to take for pull operation accounts for 76% of container startup time but only 6.4% of that data is read. So if we can get data on demand (lazy load), it will speed up the container start. [`Nydus`](https://github.com/dragonflyoss/image-service) is a project which build image with new format and can get data on demand when container start.
The following benchmarking result shows the performance improvement compared with the OCI image for the container cold startup elapsed time on containerd. As the OCI image size increases, the container startup time of using `nydus` image remains very short. [Click here](https://github.com/dragonflyoss/image-service/blob/master/docs/nydus-design.md) to see `nydus` design.
![`nydus`-performance](arch-images/nydus-performance.png)
## Proposal - Bring `lazyload` ability to Kata Containers
`Nydusd` is a fuse/`virtiofs` daemon which is provided by `nydus` project and it supports `PassthroughFS` and [RAFS](https://github.com/dragonflyoss/image-service/blob/master/docs/nydus-design.md) (Registry Acceleration File System) natively, so in Kata Containers, we can use `nydusd` in place of `virtiofsd` and mount `nydus` image to guest in the meanwhile.
The process of creating/starting Kata Containers with `virtiofsd`,
1. When creating sandbox, the Kata Containers Containerd v2 [shim](https://github.com/kata-containers/kata-containers/blob/main/docs/design/architecture/README.md#runtime) will launch `virtiofsd` before VM starts and share directories with VM.
2. When creating container, the Kata Containers Containerd v2 shim will mount rootfs to `kataShared`(/run/kata-containers/shared/sandboxes/\<SANDBOX\>/mounts/\<CONTAINER\>/rootfs), so it can be seen at the path `/run/kata-containers/shared/containers/shared/\<CONTAINER\>/rootfs` in the guest and used as container's rootfs.
The process of creating/starting Kata Containers with `nydusd`,
![kata-`nydus`](arch-images/kata-nydus.png)
1. When creating sandbox, the Kata Containers Containerd v2 shim will launch `nydusd` daemon before VM starts.
After VM starts, `kata-agent` will mount `virtiofs` at the path `/run/kata-containers/shared` and Kata Containers Containerd v2 shim mount `passthroughfs` filesystem to path `/run/kata-containers/shared/containers` when the VM starts.
```bash
# start nydusd
$ sandbox_id=my-test-sandbox
$ sudo /usr/local/bin/nydusd --log-level info --sock /run/vc/vm/${sandbox_id}/vhost-user-fs.sock --apisock /run/vc/vm/${sandbox_id}/api.sock
```
```bash
# source: the host sharedir which will pass through to guest
$ sudo curl -v --unix-socket /run/vc/vm/${sandbox_id}/api.sock \
-X POST "http://localhost/api/v1/mount?mountpoint=/containers" -H "accept: */*" \
-H "Content-Type: application/json" \
-d '{
"source":"/path/to/sharedir",
"fs_type":"passthrough_fs",
"config":""
}'
```
2. When creating normal container, the Kata Containers Containerd v2 shim send request to `nydusd` to mount `rafs` at the path `/run/kata-containers/shared/rafs/<container_id>/lowerdir` in guest.
```bash
# source: the metafile of nydus image
# config: the config of this image
$ sudo curl --unix-socket /run/vc/vm/${sandbox_id}/api.sock \
-X POST "http://localhost/api/v1/mount?mountpoint=/rafs/<container_id>/lowerdir" -H "accept: */*" \
-H "Content-Type: application/json" \
-d '{
"source":"/path/to/bootstrap",
"fs_type":"rafs",
"config":"config":"{\"device\":{\"backend\":{\"type\":\"localfs\",\"config\":{\"dir\":\"blobs\"}},\"cache\":{\"type\":\"blobcache\",\"config\":{\"work_dir\":\"cache\"}}},\"mode\":\"direct\",\"digest_validate\":true}",
}'
```
The Kata Containers Containerd v2 shim will also bind mount `snapshotdir` which `nydus-snapshotter` assigns to `sharedir`
So in guest, container rootfs=overlay(`lowerdir=rafs`, `upperdir=snapshotdir/fs`, `workdir=snapshotdir/work`)
> how to transfer the `rafs` info from `nydus-snapshotter` to the Kata Containers Containerd v2 shim?
By default, when creating `OCI` image container, `nydus-snapshotter` will return [`struct` Mount slice](https://github.com/containerd/containerd/blob/main/mount/mount.go#L21) below to containerd and containerd use them to mount rootfs
```
[
{
Type: "overlay",
Source: "overlay",
Options: [lowerdir=/var/lib/containerd/io.containerd.snapshotter.v1.nydus/snapshots/<snapshot_A>/mnt,upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.nydus/snapshots/<snapshot_B>/fs,workdir=/var/lib/containerd/io.containerd.snapshotter.v1.nydus/snapshots/<snapshot_B>/work],
}
]
```
Then, we can append `rafs` info into `Options`, but if do this, containerd will mount failed, as containerd can not identify `rafs` info. Here, we can refer to [containerd mount helper](https://github.com/containerd/containerd/blob/main/mount/mount_linux.go#L42) and provide a binary called `nydus-overlayfs`. The `Mount` slice which `nydus-snapshotter` returned becomes
```
[
{
Type: "fuse.nydus-overlayfs",
Source: "overlay",
Options: [lowerdir=/var/lib/containerd/io.containerd.snapshotter.v1.nydus/snapshots/<snapshot_A>/mnt,upperdir=/var/lib/containerd/io.containerd.snapshotter.v1.nydus/snapshots/<snapshot_B>/fs,workdir=/var/lib/containerd/io.containerd.snapshotter.v1.nydus/snapshots/<snapshot_B>/work,extraoption=base64({source:xxx,config:xxx,snapshotdir:xxx})],
}
]
```
When containerd find `Type` is `fuse.nydus-overlayfs`,
1. containerd will call `mount.fuse` command;
2. in `mount.fuse`, it will call `nydus-overlayfs`.
3. in `nydus-overlayfs`, it will ignore the `extraoption` and do the overlay mount.
Finally, in the Kata Containers Containerd v2 shim, it parse `extraoption` and get the `rafs` info to mount the image in guest.

View File

@ -37,3 +37,4 @@
- [How to setup swap devices in guest kernel](how-to-setup-swap-devices-in-guest-kernel.md)
- [How to run rootless vmm](how-to-run-rootless-vmm.md)
- [How to run Docker with Kata Containers](how-to-run-docker-with-kata.md)
- [How to run Kata Containers with `nydus`](how-to-use-virtio-fs-nydus-with-kata.md)

View File

@ -0,0 +1,57 @@
# Kata Containers with virtio-fs-nydus
## Introduction
Refer to [kata-`nydus`-design](../design/kata-nydus-design.md)
## How to
You can use Kata Containers with `nydus` as follows,
1. Use [`nydus` latest branch](https://github.com/dragonflyoss/image-service);
2. Deploy `nydus` environment as [`Nydus` Setup for Containerd Environment](https://github.com/dragonflyoss/image-service/blob/master/docs/containerd-env-setup.md);
3. Start `nydus-snapshotter` with `enable_nydus_overlayfs` enabled;
4. Use [kata-containers](https://github.com/kata-containers/kata-containers) `latest` branch to compile and build `kata-containers.img`;
5. Update `configuration-qemu.toml` to include:
```toml
shared_fs = "virtio-fs-nydus"
virtio_fs_daemon = "<nydusd binary path>"
virtio_fs_extra_args = []
```
6. run `crictl run -r kata-qemu nydus-container.yaml nydus-sandbox.yaml`;
The `nydus-sandbox.yaml` looks like below:
```yaml
metadata:
attempt: 1
name: nydus-sandbox
namespace: default
log_directory: /tmp
linux:
security_context:
namespace_options:
network: 2
annotations:
"io.containerd.osfeature": "nydus.remoteimage.v1"
```
The `nydus-container.yaml` looks like below:
```yaml
metadata:
name: nydus-container
image:
image: localhost:5000/ubuntu-nydus:latest
command:
- /bin/sleep
args:
- 600
log_path: container.1.log
```

View File

@ -51,6 +51,7 @@ pub const DRIVER_VFIO_GK_TYPE: &str = "vfio-gk";
// VFIO device to be bound to vfio-pci and made available inside the
// container as a VFIO device node
pub const DRIVER_VFIO_TYPE: &str = "vfio";
pub const DRIVER_OVERLAYFS_TYPE: &str = "overlayfs";
#[instrument]
pub fn online_device(path: &str) -> Result<()> {

View File

@ -23,8 +23,8 @@ use regex::Regex;
use crate::device::{
get_scsi_device_name, get_virtio_blk_pci_device_name, online_device, wait_for_pmem_device,
DRIVER_9P_TYPE, DRIVER_BLK_CCW_TYPE, DRIVER_BLK_TYPE, DRIVER_EPHEMERAL_TYPE, DRIVER_LOCAL_TYPE,
DRIVER_MMIO_BLK_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_SCSI_TYPE, DRIVER_VIRTIOFS_TYPE,
DRIVER_WATCHABLE_BIND_TYPE,
DRIVER_MMIO_BLK_TYPE, DRIVER_NVDIMM_TYPE, DRIVER_OVERLAYFS_TYPE, DRIVER_SCSI_TYPE,
DRIVER_VIRTIOFS_TYPE, DRIVER_WATCHABLE_BIND_TYPE,
};
use crate::linux_abi::*;
use crate::pci;
@ -130,6 +130,7 @@ pub const STORAGE_HANDLER_LIST: &[&str] = &[
DRIVER_9P_TYPE,
DRIVER_VIRTIOFS_TYPE,
DRIVER_EPHEMERAL_TYPE,
DRIVER_OVERLAYFS_TYPE,
DRIVER_MMIO_BLK_TYPE,
DRIVER_LOCAL_TYPE,
DRIVER_SCSI_TYPE,
@ -233,6 +234,15 @@ async fn ephemeral_storage_handler(
Ok("".to_string())
}
#[instrument]
async fn overlayfs_storage_handler(
logger: &Logger,
storage: &Storage,
_sandbox: Arc<Mutex<Sandbox>>,
) -> Result<String> {
common_storage_handler(logger, storage)
}
#[instrument]
async fn local_storage_handler(
_logger: &Logger,
@ -546,6 +556,9 @@ pub async fn add_storages(
DRIVER_EPHEMERAL_TYPE => {
ephemeral_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_OVERLAYFS_TYPE => {
overlayfs_storage_handler(&logger, &storage, sandbox.clone()).await
}
DRIVER_MMIO_BLK_TYPE => {
virtiommio_blk_storage_handler(&logger, &storage, sandbox.clone()).await
}

View File

@ -141,6 +141,7 @@ disable_block_device_use = @DEFDISABLEBLOCK@
# Shared file system type:
# - virtio-fs (default)
# - virtio-9p
# - virtio-fs-nydus
shared_fs = "@DEFSHAREDFS_QEMU_VIRTIOFS@"
# Path to vhost-user-fs daemon.

View File

@ -263,6 +263,10 @@ func checkAndMount(s *service, r *taskAPI.CreateTaskRequest) (bool, error) {
if katautils.IsBlockDevice(m.Source) && !s.config.HypervisorConfig.DisableBlockDeviceUse {
return false, nil
}
if m.Type == vc.NydusRootFSType {
// if kata + nydus, do not mount
return false, nil
}
}
rootfs := filepath.Join(r.Bundle, "rootfs")
if err := doMount(r.Rootfs, rootfs); err != nil {

View File

@ -41,10 +41,10 @@ type HypervisorState struct {
// HotpluggedCPUs is the list of CPUs that were hot-added
HotpluggedVCPUs []CPUDevice
HotpluggedMemory int
VirtiofsdPid int
Pid int
PCIeRootPort int
HotpluggedMemory int
VirtiofsDaemonPid int
Pid int
PCIeRootPort int
HotplugVFIOOnRootBus bool
}

View File

@ -423,7 +423,7 @@ func (h hypervisor) blockDeviceDriver() (string, error) {
}
func (h hypervisor) sharedFS() (string, error) {
supportedSharedFS := []string{config.Virtio9P, config.VirtioFS}
supportedSharedFS := []string{config.Virtio9P, config.VirtioFS, config.VirtioFSNydus}
if h.SharedFS == "" {
return config.Virtio9P, nil
@ -649,6 +649,11 @@ func newQemuHypervisorConfig(h hypervisor) (vc.HypervisorConfig, error) {
errors.New("cannot enable virtio-fs without daemon path in configuration file")
}
if sharedFS == config.VirtioFSNydus && h.VirtioFSDaemon == "" {
return vc.HypervisorConfig{},
errors.New("cannot enable virtio nydus without nydusd daemon path in configuration file")
}
if vSock, err := utils.SupportsVsocks(); !vSock {
return vc.HypervisorConfig{}, err
}

View File

@ -225,7 +225,7 @@ func CreateContainer(ctx context.Context, sandbox vc.VCSandbox, ociSpec specs.Sp
}
if !rootFs.Mounted {
if rootFs.Source != "" {
if rootFs.Source != "" && rootFs.Type != vc.NydusRootFSType {
realPath, err := ResolvePath(rootFs.Source)
if err != nil {
return vc.Process{}, err
@ -234,7 +234,6 @@ func CreateContainer(ctx context.Context, sandbox vc.VCSandbox, ociSpec specs.Sp
}
contConfig.RootFs = rootFs
}
sandboxID, err := oci.SandboxID(ociSpec)
if err != nil {
return vc.Process{}, err

View File

@ -158,7 +158,7 @@ func (s *CloudHypervisorState) reset() {
type cloudHypervisor struct {
console console.Console
virtiofsd Virtiofsd
virtiofsd VirtiofsDaemon
APIClient clhClient
ctx context.Context
id string
@ -759,14 +759,14 @@ func (clh *cloudHypervisor) toGrpc(ctx context.Context) ([]byte, error) {
func (clh *cloudHypervisor) Save() (s hv.HypervisorState) {
s.Pid = clh.state.PID
s.Type = string(ClhHypervisor)
s.VirtiofsdPid = clh.state.VirtiofsdPID
s.VirtiofsDaemonPid = clh.state.VirtiofsdPID
s.APISocket = clh.state.apiSocket
return
}
func (clh *cloudHypervisor) Load(s hv.HypervisorState) {
clh.state.PID = s.Pid
clh.state.VirtiofsdPID = s.VirtiofsdPid
clh.state.VirtiofsdPID = s.VirtiofsDaemonPid
clh.state.apiSocket = s.APISocket
}

View File

@ -860,8 +860,15 @@ func (c *Container) rollbackFailingContainerCreation(ctx context.Context) {
if err := c.unmountHostMounts(ctx); err != nil {
c.Logger().WithError(err).Error("rollback failed unmountHostMounts()")
}
if err := bindUnmountContainerRootfs(ctx, getMountPath(c.sandbox.id), c.id); err != nil {
c.Logger().WithError(err).Error("rollback failed bindUnmountContainerRootfs()")
if c.rootFs.Type == NydusRootFSType {
if err := nydusContainerCleanup(ctx, getMountPath(c.sandbox.id), c); err != nil {
c.Logger().WithError(err).Error("rollback failed nydusContainerCleanup()")
}
} else {
if err := bindUnmountContainerRootfs(ctx, getMountPath(c.sandbox.id), c.id); err != nil {
c.Logger().WithError(err).Error("rollback failed bindUnmountContainerRootfs()")
}
}
}
@ -890,7 +897,7 @@ func (c *Container) create(ctx context.Context) (err error) {
}
}()
if c.checkBlockDeviceSupport(ctx) {
if c.checkBlockDeviceSupport(ctx) && c.rootFs.Type != NydusRootFSType {
// If the rootfs is backed by a block device, go ahead and hotplug it to the guest
if err = c.hotplugDrive(ctx); err != nil {
return
@ -1076,8 +1083,14 @@ func (c *Container) stop(ctx context.Context, force bool) error {
return err
}
if err := bindUnmountContainerRootfs(ctx, getMountPath(c.sandbox.id), c.id); err != nil && !force {
return err
if c.rootFs.Type == NydusRootFSType {
if err := nydusContainerCleanup(ctx, getMountPath(c.sandbox.id), c); err != nil && !force {
return err
}
} else {
if err := bindUnmountContainerRootfs(ctx, getMountPath(c.sandbox.id), c.id); err != nil && !force {
return err
}
}
if err := c.detachDevices(ctx); err != nil && !force {

View File

@ -67,6 +67,9 @@ const (
// VirtioFS means use virtio-fs for the shared file system
VirtioFS = "virtio-fs"
// VirtioFSNydus means use nydus for the shared file system
VirtioFSNydus = "virtio-fs-nydus"
)
const (

View File

@ -63,6 +63,7 @@ const (
sandboxMountsDir = "sandbox-mounts"
NydusRootFSType = "fuse.nydus-overlayfs"
// enable debug console
kernelParamDebugConsole = "agent.debug_console"
kernelParamDebugConsoleVPort = "agent.debug_console_vport"
@ -73,36 +74,39 @@ const (
)
var (
checkRequestTimeout = 30 * time.Second
defaultRequestTimeout = 60 * time.Second
errorMissingOCISpec = errors.New("Missing OCI specification")
defaultKataHostSharedDir = "/run/kata-containers/shared/sandboxes/"
defaultKataGuestSharedDir = "/run/kata-containers/shared/containers/"
mountGuestTag = "kataShared"
defaultKataGuestSandboxDir = "/run/kata-containers/sandbox/"
type9pFs = "9p"
typeVirtioFS = "virtiofs"
typeVirtioFSNoCache = "none"
kata9pDevType = "9p"
kataMmioBlkDevType = "mmioblk"
kataBlkDevType = "blk"
kataBlkCCWDevType = "blk-ccw"
kataSCSIDevType = "scsi"
kataNvdimmDevType = "nvdimm"
kataVirtioFSDevType = "virtio-fs"
kataWatchableBindDevType = "watchable-bind"
kataVfioDevType = "vfio" // VFIO device to used as VFIO in the container
kataVfioGuestKernelDevType = "vfio-gk" // VFIO device for consumption by the guest kernel
sharedDir9pOptions = []string{"trans=virtio,version=9p2000.L,cache=mmap", "nodev"}
sharedDirVirtioFSOptions = []string{}
sharedDirVirtioFSDaxOptions = "dax"
shmDir = "shm"
kataEphemeralDevType = "ephemeral"
defaultEphemeralPath = filepath.Join(defaultKataGuestSandboxDir, kataEphemeralDevType)
grpcMaxDataSize = int64(1024 * 1024)
localDirOptions = []string{"mode=0777"}
maxHostnameLen = 64
GuestDNSFile = "/etc/resolv.conf"
checkRequestTimeout = 30 * time.Second
defaultRequestTimeout = 60 * time.Second
errorMissingOCISpec = errors.New("Missing OCI specification")
defaultKataHostSharedDir = "/run/kata-containers/shared/sandboxes/"
defaultKataGuestSharedDir = "/run/kata-containers/shared/containers/"
defaultKataGuestNydusRootDir = "/run/kata-containers/shared/"
mountGuestTag = "kataShared"
defaultKataGuestSandboxDir = "/run/kata-containers/sandbox/"
type9pFs = "9p"
typeVirtioFS = "virtiofs"
typeOverlayFS = "overlay"
typeVirtioFSNoCache = "none"
kata9pDevType = "9p"
kataMmioBlkDevType = "mmioblk"
kataBlkDevType = "blk"
kataBlkCCWDevType = "blk-ccw"
kataSCSIDevType = "scsi"
kataNvdimmDevType = "nvdimm"
kataVirtioFSDevType = "virtio-fs"
kataOverlayDevType = "overlayfs"
kataWatchableBindDevType = "watchable-bind"
kataVfioDevType = "vfio" // VFIO device to used as VFIO in the container
kataVfioGuestKernelDevType = "vfio-gk" // VFIO device for consumption by the guest kernel
sharedDir9pOptions = []string{"trans=virtio,version=9p2000.L,cache=mmap", "nodev"}
sharedDirVirtioFSOptions = []string{}
sharedDirVirtioFSDaxOptions = "dax"
shmDir = "shm"
kataEphemeralDevType = "ephemeral"
defaultEphemeralPath = filepath.Join(defaultKataGuestSandboxDir, kataEphemeralDevType)
grpcMaxDataSize = int64(1024 * 1024)
localDirOptions = []string{"mode=0777"}
maxHostnameLen = 64
GuestDNSFile = "/etc/resolv.conf"
)
const (
@ -177,6 +181,32 @@ func getSandboxPath(id string) string {
return filepath.Join(kataHostSharedDir(), id)
}
// Use in nydus case, guest shared dir is compatible with virtiofsd sharedir
// nydus images are presented in kataGuestNydusImageDir
//
// virtiofs mountpoint: "/run/kata-containers/shared/"
// kataGuestSharedDir: "/run/kata-containers/shared/containers"
// kataGuestNydusImageDir: "/run/kata-containers/shared/rafs"
var kataGuestNydusRootDir = func() string {
if rootless.IsRootless() {
// filepath.Join removes trailing slashes, but it is necessary for mounting
return filepath.Join(rootless.GetRootlessDir(), defaultKataGuestNydusRootDir) + "/"
}
return defaultKataGuestNydusRootDir
}
var rafsMountPath = func(cid string) string {
return filepath.Join("/", nydusRafs, cid, lowerDir)
}
var kataGuestNydusImageDir = func() string {
if rootless.IsRootless() {
// filepath.Join removes trailing slashes, but it is necessary for mounting
return filepath.Join(rootless.GetRootlessDir(), defaultKataGuestNydusRootDir, nydusRafs) + "/"
}
return filepath.Join(defaultKataGuestNydusRootDir, nydusRafs) + "/"
}
// The function is declared this way for mocking in unit tests
var kataGuestSharedDir = func() string {
if rootless.IsRootless() {
@ -852,7 +882,8 @@ func setupStorages(ctx context.Context, sandbox *Sandbox) []*grpc.Storage {
// This is where at least some of the host config files
// (resolv.conf, etc...) and potentially all container
// rootfs will reside.
if sandbox.config.HypervisorConfig.SharedFS == config.VirtioFS {
sharedFS := sandbox.config.HypervisorConfig.SharedFS
if sharedFS == config.VirtioFS || sharedFS == config.VirtioFSNydus {
// If virtio-fs uses either of the two cache options 'auto, always',
// the guest directory can be mounted with option 'dax' allowing it to
// directly map contents from the host. When set to 'none', the mount
@ -864,10 +895,14 @@ func setupStorages(ctx context.Context, sandbox *Sandbox) []*grpc.Storage {
sharedDirVirtioFSOptions = append(sharedDirVirtioFSOptions, sharedDirVirtioFSDaxOptions)
}
}
mountPoint := kataGuestSharedDir()
if sharedFS == config.VirtioFSNydus {
mountPoint = kataGuestNydusRootDir()
}
sharedVolume := &grpc.Storage{
Driver: kataVirtioFSDevType,
Source: mountGuestTag,
MountPoint: kataGuestSharedDir(),
MountPoint: mountPoint,
Fstype: typeVirtioFS,
Options: sharedDirVirtioFSOptions,
}
@ -1225,13 +1260,71 @@ func (k *kataAgent) rollbackFailingContainerCreation(ctx context.Context, c *Con
k.Logger().WithError(err2).Error("rollback failed unmountHostMounts()")
}
if err2 := bindUnmountContainerRootfs(ctx, getMountPath(c.sandbox.id), c.id); err2 != nil {
k.Logger().WithError(err2).Error("rollback failed bindUnmountContainerRootfs()")
if c.rootFs.Type == NydusRootFSType {
if err2 := nydusContainerCleanup(ctx, getMountPath(c.sandbox.id), c); err2 != nil {
k.Logger().WithError(err2).Error("rollback failed nydusContainerCleanup")
}
} else {
if err2 := bindUnmountContainerRootfs(ctx, getMountPath(c.sandbox.id), c.id); err2 != nil {
k.Logger().WithError(err2).Error("rollback failed bindUnmountContainerRootfs()")
}
}
}
}
func (k *kataAgent) buildContainerRootfsWithNydus(sandbox *Sandbox, c *Container, rootPathParent string) (*grpc.Storage, error) {
if sandbox.GetHypervisorType() != string(QemuHypervisor) {
// qemu is supported first, other hypervisors will next
// https://github.com/kata-containers/kata-containers/issues/2724
return nil, errNydusdNotSupport
}
q, _ := sandbox.hypervisor.(*qemu)
extraOption, err := parseExtraOption(c.rootFs.Options)
if err != nil {
return nil, err
}
mountOpt := &MountOption{
mountpoint: rafsMountPath(c.id),
source: extraOption.Source,
config: extraOption.Config,
}
k.Logger().Infof("nydus option: %v", extraOption)
// mount lowerdir to guest /run/kata-containers/shared/images/<cid>/lowerdir
if err := q.virtiofsDaemon.Mount(*mountOpt); err != nil {
return nil, err
}
rootfs := &grpc.Storage{}
containerShareDir := filepath.Join(getMountPath(c.sandbox.id), c.id)
// mkdir rootfs, guest at /run/kata-containers/shared/containers/<cid>/rootfs
rootfsDir := filepath.Join(containerShareDir, c.rootfsSuffix)
if err := os.MkdirAll(rootfsDir, DirMode); err != nil {
return nil, err
}
// bindmount snapshot dir which snapshotter allocated
// to guest /run/kata-containers/shared/containers/<cid>/snapshotdir
snapshotShareDir := filepath.Join(containerShareDir, snapshotDir)
if err := bindMount(k.ctx, extraOption.Snapshotdir, snapshotShareDir, true, "slave"); err != nil {
return nil, err
}
// so rootfs = overlay(upperdir, workerdir, lowerdir)
rootfs.MountPoint = filepath.Join(rootPathParent, c.rootfsSuffix)
rootfs.Source = typeOverlayFS
rootfs.Fstype = typeOverlayFS
rootfs.Driver = kataOverlayDevType
rootfs.Options = append(rootfs.Options, fmt.Sprintf("%s=%s", upperDir, filepath.Join(kataGuestSharedDir(), c.id, snapshotDir, "fs")))
rootfs.Options = append(rootfs.Options, fmt.Sprintf("%s=%s", workDir, filepath.Join(kataGuestSharedDir(), c.id, snapshotDir, "work")))
rootfs.Options = append(rootfs.Options, fmt.Sprintf("%s=%s", lowerDir, filepath.Join(kataGuestNydusImageDir(), c.id, lowerDir)))
rootfs.Options = append(rootfs.Options, "index=off")
k.Logger().Infof("rootfs info: %#v\n", rootfs)
return rootfs, nil
}
func (k *kataAgent) buildContainerRootfs(ctx context.Context, sandbox *Sandbox, c *Container, rootPathParent string) (*grpc.Storage, error) {
if c.rootFs.Type == NydusRootFSType {
return k.buildContainerRootfsWithNydus(sandbox, c, rootPathParent)
}
if c.state.Fstype != "" && c.state.BlockDeviceID != "" {
// The rootfs storage volume represents the container rootfs
// mount point inside the guest.
@ -1301,7 +1394,6 @@ func (k *kataAgent) buildContainerRootfs(ctx context.Context, sandbox *Sandbox,
func (k *kataAgent) createContainer(ctx context.Context, sandbox *Sandbox, c *Container) (p *Process, err error) {
span, ctx := katatrace.Trace(ctx, k.Logger(), "createContainer", kataAgentTracingTags)
defer span.End()
var ctrStorages []*grpc.Storage
var ctrDevices []*grpc.Device
var rootfs *grpc.Storage

View File

@ -1095,18 +1095,27 @@ func TestKataAgentDirs(t *testing.T) {
uidmap := strings.Fields(string(line))
expectedRootless := (uidmap[0] == "0" && uidmap[1] != "0")
assert.Equal(expectedRootless, rootless.IsRootless())
if expectedRootless {
assert.Equal(kataHostSharedDir(), os.Getenv("XDG_RUNTIME_DIR")+defaultKataHostSharedDir)
assert.Equal(kataGuestSharedDir(), os.Getenv("XDG_RUNTIME_DIR")+defaultKataGuestSharedDir)
assert.Equal(kataGuestSandboxDir(), os.Getenv("XDG_RUNTIME_DIR")+defaultKataGuestSandboxDir)
assert.Equal(ephemeralPath(), os.Getenv("XDG_RUNTIME_DIR")+defaultEphemeralPath)
assert.Equal(kataGuestNydusRootDir(), os.Getenv("XDG_RUNTIME_DIR")+defaultKataGuestNydusRootDir)
assert.Equal(kataGuestNydusImageDir(), os.Getenv("XDG_RUNTIME_DIR")+defaultKataGuestNydusRootDir+"images"+"/")
assert.Equal(kataGuestSharedDir(), os.Getenv("XDG_RUNTIME_DIR")+defaultKataGuestNydusRootDir+"containers"+"/")
} else {
assert.Equal(kataHostSharedDir(), defaultKataHostSharedDir)
assert.Equal(kataGuestSharedDir(), defaultKataGuestSharedDir)
assert.Equal(kataGuestSandboxDir(), defaultKataGuestSandboxDir)
assert.Equal(ephemeralPath(), defaultEphemeralPath)
assert.Equal(kataGuestNydusRootDir(), defaultKataGuestNydusRootDir)
assert.Equal(kataGuestNydusImageDir(), defaultKataGuestNydusRootDir+"rafs"+"/")
assert.Equal(kataGuestSharedDir(), defaultKataGuestNydusRootDir+"containers"+"/")
}
cid := "123"
expected := "/rafs/123/lowerdir"
assert.Equal(rafsMountPath(cid), expected)
}
func TestSandboxBindMount(t *testing.T) {

View File

@ -7,7 +7,6 @@ package virtcontainers
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
@ -17,6 +16,7 @@ import (
merr "github.com/hashicorp/go-multierror"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils/katatrace"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
otelLabel "go.opentelemetry.io/otel/attribute"
)
@ -28,7 +28,13 @@ const DefaultShmSize = 65536 * 1024
// Sadly golang/sys doesn't have UmountNoFollow although it's there since Linux 2.6.34
const UmountNoFollow = 0x8
var rootfsDir = "rootfs"
const (
rootfsDir = "rootfs"
lowerDir = "lowerdir"
upperDir = "upperdir"
workDir = "workdir"
snapshotDir = "snapshotdir"
)
var systemMountPrefixes = []string{"/proc", "/sys"}
@ -349,33 +355,64 @@ func isSymlink(path string) bool {
return stat.Mode()&os.ModeSymlink != 0
}
func bindUnmountContainerRootfs(ctx context.Context, sharedDir, cID string) error {
span, _ := katatrace.Trace(ctx, nil, "bindUnmountContainerRootfs", mountTracingTags)
defer span.End()
span.SetAttributes(otelLabel.String("shared_dir", sharedDir), otelLabel.String("container_id", cID))
rootfsDest := filepath.Join(sharedDir, cID, rootfsDir)
if isSymlink(filepath.Join(sharedDir, cID)) || isSymlink(rootfsDest) {
func bindUnmountContainerShareDir(ctx context.Context, sharedDir, cID, target string) error {
destDir := filepath.Join(sharedDir, cID, target)
if isSymlink(filepath.Join(sharedDir, cID)) || isSymlink(destDir) {
mountLogger().WithField("container", cID).Warnf("container dir is a symlink, malicious guest?")
return nil
}
err := syscall.Unmount(rootfsDest, syscall.MNT_DETACH|UmountNoFollow)
err := syscall.Unmount(destDir, syscall.MNT_DETACH|UmountNoFollow)
if err == syscall.ENOENT {
mountLogger().WithError(err).WithField("rootfs-dir", rootfsDest).Warn()
mountLogger().WithError(err).WithField("share-dir", destDir).Warn()
return nil
}
if err := syscall.Rmdir(rootfsDest); err != nil {
mountLogger().WithError(err).WithField("rootfs-dir", rootfsDest).Warn("Could not remove container rootfs dir")
if err := syscall.Rmdir(destDir); err != nil {
mountLogger().WithError(err).WithField("share-dir", destDir).Warn("Could not remove container share dir")
}
return err
}
func bindUnmountContainerRootfs(ctx context.Context, sharedDir, cID string) error {
span, _ := katatrace.Trace(ctx, nil, "bindUnmountContainerRootfs", mountTracingTags)
defer span.End()
span.SetAttributes(otelLabel.String("shared-dir", sharedDir), otelLabel.String("container-id", cID))
return bindUnmountContainerShareDir(ctx, sharedDir, cID, rootfsDir)
}
func bindUnmountContainerSnapshotDir(ctx context.Context, sharedDir, cID string) error {
span, _ := katatrace.Trace(ctx, nil, "bindUnmountContainerSnapshotDir", mountTracingTags)
defer span.End()
span.SetAttributes(otelLabel.String("shared-dir", sharedDir), otelLabel.String("container-id", cID))
return bindUnmountContainerShareDir(ctx, sharedDir, cID, snapshotDir)
}
func nydusContainerCleanup(ctx context.Context, sharedDir string, c *Container) error {
sandbox := c.sandbox
if sandbox.GetHypervisorType() != string(QemuHypervisor) {
// qemu is supported first, other hypervisors will next
// https://github.com/kata-containers/kata-containers/issues/2724
return errNydusdNotSupport
}
q, _ := sandbox.hypervisor.(*qemu)
if err := q.virtiofsDaemon.Umount(rafsMountPath(c.id)); err != nil {
return errors.Wrap(err, "umount rafs failed")
}
if err := bindUnmountContainerSnapshotDir(ctx, sharedDir, c.id); err != nil {
return errors.Wrap(err, "umount snapshotdir err")
}
destDir := filepath.Join(sharedDir, c.id, c.rootfsSuffix)
if err := syscall.Rmdir(destDir); err != nil {
return errors.Wrap(err, "remove container rootfs err")
}
return nil
}
func bindUnmountAllRootfs(ctx context.Context, sharedDir string, sandbox *Sandbox) error {
span, ctx := katatrace.Trace(ctx, nil, "bindUnmountAllRootfs", mountTracingTags)
defer span.End()
span.SetAttributes(otelLabel.String("shared_dir", sharedDir), otelLabel.String("sandbox_id", sandbox.id))
span.SetAttributes(otelLabel.String("shared-dir", sharedDir), otelLabel.String("sandbox-id", sandbox.id))
var errors *merr.Error
for _, c := range sandbox.containers {
@ -387,7 +424,11 @@ func bindUnmountAllRootfs(ctx context.Context, sharedDir string, sandbox *Sandbo
if c.state.Fstype == "" {
// even if error found, don't break out of loop until all mounts attempted
// to be unmounted, and collect all errors
errors = merr.Append(errors, bindUnmountContainerRootfs(ctx, sharedDir, c.id))
if c.rootFs.Type == NydusRootFSType {
errors = merr.Append(errors, nydusContainerCleanup(ctx, sharedDir, c))
} else {
errors = merr.Append(errors, bindUnmountContainerRootfs(ctx, sharedDir, c.id))
}
}
}
return errors.ErrorOrNil()

View File

@ -0,0 +1,479 @@
// Copyright (c) 2017 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"syscall"
"time"
"github.com/containernetworking/plugins/pkg/ns"
"github.com/kata-containers/kata-containers/src/runtime/pkg/katautils/katatrace"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils"
"github.com/kata-containers/kata-containers/src/runtime/virtcontainers/utils/retry"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const (
infoEndpoint = "http://unix/api/v1/daemon"
mountEndpoint = "http://unix/api/v1/mount"
nydusdStopTimeoutSecs = 5
defaultHttpClientTimeoutSecs = 30 * time.Second
contentType = "application/json"
maxIdleConns = 10
idleConnTimeoutSecs = 10 * time.Second
dialTimoutSecs = 5 * time.Second
keepAliveSecs = 5 * time.Second
expectContinueTimeoutSecs = 1 * time.Second
// Registry Acceleration File System which is nydus provide to accelerate image load
nydusRafs = "rafs"
// used to shared directories between host and guest
nydusPassthroughfs = "passthrough_fs"
sharedPathInGuest = "/containers"
shimNsPath = "/proc/self/ns/net"
)
var (
nydusdTracingTags = map[string]string{
"source": "runtime",
"package": "virtcontainers",
"subsystem": "nydusd",
}
errNydusdDaemonPathInvalid = errors.New("nydusd daemon path is invalid")
errNydusdSockPathInvalid = errors.New("nydusd sock path is invalid")
errNydusdAPISockPathInvalid = errors.New("nydusd api sock path is invalid")
errNydusdSourcePathInvalid = errors.New("nydusd resource path is invalid")
errNydusdNotSupport = errors.New("nydusd only supports the QEMU hypervisor currently (see https://github.com/kata-containers/kata-containers/issues/2724)")
)
type nydusd struct {
startFn func(cmd *exec.Cmd) error // for mock testing
setupShareDirFn func() error // for mock testing
path string
sockPath string
apiSockPath string
sourcePath string
extraArgs []string
pid int
debug bool
}
func startInShimNS(cmd *exec.Cmd) error {
// Create nydusd in shim netns as it needs to access host network
return doNetNS(shimNsPath, func(_ ns.NetNS) error {
return cmd.Start()
})
}
func (nd *nydusd) Start(ctx context.Context, onQuit onQuitFunc) (int, error) {
span, _ := katatrace.Trace(ctx, nd.Logger(), "Start", nydusdTracingTags)
defer span.End()
pid := 0
if err := nd.valid(); err != nil {
return pid, err
}
args, err := nd.args()
if err != nil {
return pid, err
}
cmd := exec.Command(nd.path, args...)
r, w, err := os.Pipe()
if err != nil {
return pid, err
}
cmd.Stdout = w
cmd.Stderr = w
fields := logrus.Fields{
"path": nd.path,
"args": strings.Join(args, " "),
}
nd.Logger().WithFields(fields).Info()
if err := nd.startFn(cmd); err != nil {
return pid, err
}
// Monitor nydusd's stdout/stderr and stop sandbox if nydusd quits
go func() {
scanner := bufio.NewScanner(r)
for scanner.Scan() {
nd.Logger().Info(scanner.Text())
}
nd.Logger().Info("nydusd quits")
// Wait to release resources of nydusd process
_, err = cmd.Process.Wait()
if err != nil {
nd.Logger().WithError(err).Warn("nydusd quits")
}
if onQuit != nil {
onQuit()
}
}()
if err := nd.setupShareDirFn(); err != nil {
return pid, err
}
nd.pid = cmd.Process.Pid
return nd.pid, nil
}
func (nd *nydusd) args() ([]string, error) {
logLevel := "info"
if nd.debug {
logLevel = "debug"
}
args := []string{
"--log-level", logLevel,
"--apisock", nd.apiSockPath,
"--sock", nd.sockPath,
}
if len(nd.extraArgs) > 0 {
args = append(args, nd.extraArgs...)
}
return args, nil
}
func checkPathValid(path string) error {
if len(path) == 0 {
return errors.New("path is empty")
}
absPath, err := filepath.Abs(path)
if err != nil {
return err
}
dir := filepath.Dir(absPath)
if _, err := os.Stat(dir); err != nil {
return err
}
return nil
}
func (nd *nydusd) valid() error {
if err := checkPathValid(nd.sockPath); err != nil {
nd.Logger().WithError(err).Info("check nydusd sock path err")
return errNydusdSockPathInvalid
}
if err := checkPathValid(nd.apiSockPath); err != nil {
nd.Logger().WithError(err).Info("check nydusd api sock path err")
return errNydusdAPISockPathInvalid
}
if err := checkPathValid(nd.path); err != nil {
nd.Logger().WithError(err).Info("check nydusd daemon path err")
return errNydusdDaemonPathInvalid
}
if err := checkPathValid(nd.sourcePath); err != nil {
nd.Logger().WithError(err).Info("check nydusd daemon path err")
return errNydusdSourcePathInvalid
}
return nil
}
func (nd *nydusd) setupPassthroughFS() error {
nc, err := NewNydusClient(nd.apiSockPath)
if err != nil {
return err
}
nd.Logger().WithField("from", nd.sourcePath).
WithField("dest", sharedPathInGuest).Info("prepare mount passthroughfs")
mr := NewMountRequest(nydusPassthroughfs, nd.sourcePath, "")
return nc.Mount(sharedPathInGuest, mr)
}
func (nd *nydusd) Mount(opt MountOption) error {
nc, err := NewNydusClient(nd.apiSockPath)
if err != nil {
return err
}
nd.Logger().WithField("from", opt.source).
WithField("dest", opt.mountpoint).Info("prepare mount rafs")
mr := NewMountRequest(nydusRafs, opt.source, opt.config)
return nc.Mount(opt.mountpoint, mr)
}
func (nd *nydusd) Umount(mountpoint string) error {
nc, err := NewNydusClient(nd.apiSockPath)
if err != nil {
return err
}
nd.Logger().WithField("mountpoint", mountpoint).Info("umount rafs")
return nc.Umount(mountpoint)
}
func (nd *nydusd) Stop(ctx context.Context) error {
if err := nd.kill(ctx); err != nil {
nd.Logger().WithError(err).WithField("pid", nd.pid).Warn("kill nydusd failed")
return nil
}
err := os.Remove(nd.sockPath)
if err != nil {
nd.Logger().WithError(err).WithField("path", nd.sockPath).Warn("removing nydusd socket failed")
}
err = os.Remove(nd.apiSockPath)
if err != nil {
nd.Logger().WithError(err).WithField("path", nd.apiSockPath).Warn("removing nydusd api socket failed")
}
return nil
}
func (nd *nydusd) Logger() *logrus.Entry {
return hvLogger.WithField("subsystem", "nydusd")
}
func (nd *nydusd) kill(ctx context.Context) (err error) {
span, _ := katatrace.Trace(ctx, nd.Logger(), "kill", nydusdTracingTags)
defer span.End()
if nd.pid <= 0 {
nd.Logger().WithField("invalid-nydusd-pid", nd.pid).Warn("cannot kill nydusd")
return nil
}
if err := utils.WaitLocalProcess(nd.pid, nydusdStopTimeoutSecs, syscall.SIGTERM, nd.Logger()); err != nil {
nd.Logger().WithError(err).Warn("kill nydusd err")
}
nd.pid = 0
return err
}
type BuildTimeInfo struct {
PackageVer string `json:"package_ver"`
GitCommit string `json:"git_commit"`
BuildTime string `json:"build_time"`
Profile string `json:"profile"`
Rustc string `json:"rustc"`
}
type DaemonInfo struct {
ID string `json:"id"`
Version BuildTimeInfo `json:"version"`
State string `json:"state"`
}
type ErrorMessage struct {
Code string `json:"code"`
Message string `json:"message"`
}
type MountRequest struct {
FsType string `json:"fs_type"`
Source string `json:"source"`
Config string `json:"config"`
}
func NewMountRequest(fsType, source, config string) *MountRequest {
return &MountRequest{
FsType: fsType,
Source: source,
Config: config,
}
}
type Interface interface {
CheckStatus() (DaemonInfo, error)
Mount(string, *MountRequest) error
Umount(sharedMountPoint string) error
}
type NydusClient struct {
httpClient *http.Client
}
func NewNydusClient(sock string) (Interface, error) {
transport, err := buildTransport(sock)
if err != nil {
return nil, err
}
return &NydusClient{
httpClient: &http.Client{
Timeout: defaultHttpClientTimeoutSecs,
Transport: transport,
},
}, nil
}
func waitUntilSocketReady(sock string) error {
return retry.Do(func() error {
if _, err := os.Stat(sock); err != nil {
return err
}
return nil
},
retry.Attempts(3),
retry.LastErrorOnly(true),
retry.Delay(100*time.Millisecond))
}
func buildTransport(sock string) (http.RoundTripper, error) {
err := waitUntilSocketReady(sock)
if err != nil {
return nil, err
}
return &http.Transport{
MaxIdleConns: maxIdleConns,
IdleConnTimeout: idleConnTimeoutSecs,
ExpectContinueTimeout: expectContinueTimeoutSecs,
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
dialer := &net.Dialer{
Timeout: dialTimoutSecs,
KeepAlive: keepAliveSecs,
}
return dialer.DialContext(ctx, "unix", sock)
},
}, nil
}
func (c *NydusClient) CheckStatus() (DaemonInfo, error) {
resp, err := c.httpClient.Get(infoEndpoint)
if err != nil {
return DaemonInfo{}, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return DaemonInfo{}, err
}
var info DaemonInfo
if err = json.Unmarshal(b, &info); err != nil {
return DaemonInfo{}, err
}
return info, nil
}
func checkRafsMountPointValid(mp string) bool {
// refer to https://github.com/opencontainers/runc/blob/master/libcontainer/factory_linux.go#L30
re := regexp.MustCompile(`/rafs/[\w+-\.]+/lowerdir`)
return re.MatchString(mp)
}
func (c *NydusClient) checkMountPoint(mountPoint string, fsType string) error {
switch fsType {
case nydusPassthroughfs:
// sharedir has been checked in args check.
return nil
case nydusRafs:
// nydusRafs mountpoint path format: /rafs/<container_id>/lowerdir
if checkRafsMountPointValid(mountPoint) {
return nil
}
return fmt.Errorf("rafs mountpoint %s is invalid", mountPoint)
default:
return errors.New("unsupported filesystem type")
}
}
func (c *NydusClient) Mount(mountPoint string, mr *MountRequest) error {
if err := c.checkMountPoint(mountPoint, mr.FsType); err != nil {
return errors.Wrap(err, "check mount point err")
}
requestURL := fmt.Sprintf("%s?mountpoint=%s", mountEndpoint, mountPoint)
body, err := json.Marshal(mr)
if err != nil {
return errors.Wrap(err, "failed to create mount request")
}
resp, err := c.httpClient.Post(requestURL, contentType, bytes.NewBuffer(body))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil
}
return handleMountError(resp.Body)
}
func (c *NydusClient) Umount(mountPoint string) error {
requestURL := fmt.Sprintf("%s?mountpoint=%s", mountEndpoint, mountPoint)
req, err := http.NewRequest(http.MethodDelete, requestURL, nil)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNoContent {
return nil
}
return handleMountError(resp.Body)
}
func handleMountError(r io.Reader) error {
b, err := ioutil.ReadAll(r)
if err != nil {
return err
}
var errMessage ErrorMessage
if err = json.Unmarshal(b, &errMessage); err != nil {
return err
}
return errors.New(errMessage.Message)
}
/*
rootfs mount format: Type: fuse.nydus-overlayfs, Source: overlay
Options[lowerdir=/foo/lower2:/foo/lower1,upperdir=/foo/upper,workdir=/foo/work,extraoption={source: xxx, config: xxx, snapshotdir: xxx}]
*/
type extraOption struct {
Source string `json:"source"`
Config string `json:"config"`
Snapshotdir string `json:"snapshotdir"`
}
const extraOptionKey = "extraoption="
func parseExtraOption(options []string) (*extraOption, error) {
extraOpt := ""
for _, opt := range options {
if strings.HasPrefix(opt, extraOptionKey) {
extraOpt = strings.TrimPrefix(opt, extraOptionKey)
}
}
if len(extraOpt) == 0 {
return nil, errors.New("no extraoption found")
}
opt, err := base64.StdEncoding.DecodeString(extraOpt)
if err != nil {
return nil, errors.Wrap(err, "base64 decoding err")
}
no := &extraOption{}
if err := json.Unmarshal(opt, no); err != nil {
return nil, errors.Wrapf(err, "json unmarshal err")
}
if len(no.Config) == 0 || len(no.Snapshotdir) == 0 || len(no.Source) == 0 {
return nil, fmt.Errorf("extra option is not correct, %+v", no)
}
return no, nil
}

View File

@ -0,0 +1,241 @@
// Copyright (c) 2017 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
//
package virtcontainers
import (
"context"
"encoding/base64"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestNydusdStart(t *testing.T) {
assert := assert.New(t)
// nolint: govet
type fields struct {
pid int
path string
sockPath string
apiSockPath string
sourcePath string
debug bool
extraArgs []string
startFn func(cmd *exec.Cmd) error
setupShareDirFn func() error
}
sourcePath, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(sourcePath)
socketDir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(socketDir)
sockPath := filepath.Join(socketDir, "vhost-user.sock")
apiSockPath := filepath.Join(socketDir, "api.sock")
validConfig := fields{
path: "/usr/bin/nydusd",
sockPath: sockPath,
apiSockPath: apiSockPath,
sourcePath: sourcePath,
startFn: func(cmd *exec.Cmd) error {
cmd.Process = &os.Process{}
return nil
},
setupShareDirFn: func() error { return nil },
}
SourcePathNoExist := validConfig
SourcePathNoExist.sourcePath = "/tmp/path/to/nydusd/sourcepath"
// nolint: govet
tests := []struct {
name string
fields fields
wantErr bool
}{
{"empty config", fields{}, true},
{"directory source path not exist", SourcePathNoExist, true},
{"valid config", validConfig, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nd := &nydusd{
path: tt.fields.path,
sockPath: tt.fields.sockPath,
apiSockPath: tt.fields.apiSockPath,
sourcePath: tt.fields.sourcePath,
extraArgs: tt.fields.extraArgs,
debug: tt.fields.debug,
pid: tt.fields.pid,
startFn: tt.fields.startFn,
setupShareDirFn: tt.fields.setupShareDirFn,
}
ctx := context.Background()
_, err := nd.Start(ctx, nil)
if (err != nil) != tt.wantErr {
t.Errorf("nydusd.Start() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}
}
func TestNydusdArgs(t *testing.T) {
assert := assert.New(t)
nd := &nydusd{
path: "/usr/bin/nydusd",
sockPath: "/var/lib/vhost-user.sock",
apiSockPath: "/var/lib/api.sock",
debug: true,
}
expected := "--log-level debug --apisock /var/lib/api.sock --sock /var/lib/vhost-user.sock"
args, err := nd.args()
assert.NoError(err)
assert.Equal(expected, strings.Join(args, " "))
nd.debug = false
expected = "--log-level info --apisock /var/lib/api.sock --sock /var/lib/vhost-user.sock"
args, err = nd.args()
assert.NoError(err)
assert.Equal(expected, strings.Join(args, " "))
}
func TestNydusdValid(t *testing.T) {
assert := assert.New(t)
sourcePath, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(sourcePath)
socketDir, err := ioutil.TempDir("", "")
assert.NoError(err)
defer os.RemoveAll(socketDir)
sockPath := filepath.Join(socketDir, "vhost-user.sock")
apiSockPath := filepath.Join(socketDir, "api.sock")
newNydsudFunc := func() *nydusd {
return &nydusd{
path: "/usr/bin/nydusd",
sourcePath: sourcePath,
sockPath: sockPath,
apiSockPath: apiSockPath,
}
}
nd := newNydsudFunc()
err = nd.valid()
assert.NoError(err)
nd = newNydsudFunc()
nd.path = ""
err = nd.valid()
assert.Equal(errNydusdDaemonPathInvalid, err)
nd = newNydsudFunc()
nd.sockPath = ""
err = nd.valid()
assert.Equal(errNydusdSockPathInvalid, err)
nd = newNydsudFunc()
nd.apiSockPath = ""
err = nd.valid()
assert.Equal(errNydusdAPISockPathInvalid, err)
nd = newNydsudFunc()
nd.sourcePath = ""
err = nd.valid()
assert.Equal(errNydusdSourcePathInvalid, err)
}
func TestParseExtraOption(t *testing.T) {
tests := []struct {
name string
option string
wantErr bool
}{
{
name: "valid option",
option: "extraoption=" + base64.StdEncoding.EncodeToString([]byte("{\"source\":\"/path/to/bootstrap\",\"config\":\"config content\",\"snapshotdir\":\"/path/to/snapshotdir\"}")),
wantErr: false,
},
{
name: "no extra option",
option: base64.StdEncoding.EncodeToString([]byte("{\"source\":/path/to/bootstrap,\"config\":config content,\"snapshotdir\":/path/to/snapshotdir}")),
wantErr: true,
},
{
name: "no source",
option: "extraoption=" + base64.StdEncoding.EncodeToString([]byte("{\"config\":config content,\"snapshotdir\":/path/to/snapshotdir}")),
wantErr: true,
},
{
name: "no config",
option: "extraoption=" + base64.StdEncoding.EncodeToString([]byte("{\"source\":/path/to/bootstrap,\"snapshotdir\":/path/to/snapshotdir}")),
wantErr: true,
},
{
name: "no snapshotdir",
option: "extraoption=" + base64.StdEncoding.EncodeToString([]byte("{\"source\":/path/to/bootstrap,\"config\":config content}")),
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := parseExtraOption([]string{tt.option})
if (err != nil) != tt.wantErr {
t.Errorf("parseExtraOption error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}
}
func TestCheckRafsMountPointValid(t *testing.T) {
tests := []struct {
mountPoint string
valid bool
}{
{
mountPoint: "/rafs/xxxxaaa/lowerdir",
valid: true,
},
{
mountPoint: "/",
valid: false,
},
{
mountPoint: "/rafs",
valid: false,
},
{
mountPoint: "/xxxx",
valid: false,
},
{
mountPoint: "/rafs/aaaaa/xxxx",
valid: false,
},
{
mountPoint: "/rafs//lowerdir",
valid: false,
},
}
for _, tt := range tests {
res := checkRafsMountPointValid(tt.mountPoint)
if res != tt.valid {
t.Errorf("test %v get %v, but want %v", tt, res, tt.valid)
}
}
}

View File

@ -73,7 +73,7 @@ type QemuState struct {
// HotpluggedCPUs is the list of CPUs that were hot-added
HotpluggedVCPUs []hv.CPUDevice
HotpluggedMemory int
VirtiofsdPid int
VirtiofsDaemonPid int
PCIeRootPort int
HotplugVFIOOnRootBus bool
}
@ -83,7 +83,7 @@ type QemuState struct {
type qemu struct {
arch qemuArch
virtiofsd Virtiofsd
virtiofsDaemon VirtiofsDaemon
ctx context.Context
@ -113,6 +113,7 @@ const (
consoleSocket = "console.sock"
qmpSocket = "qmp.sock"
vhostFSSocket = "vhost-fs.sock"
nydusdAPISock = "nydusd-api.sock"
// memory dump format will be set to elf
memoryDumpFormat = "elf"
@ -467,6 +468,41 @@ func (q *qemu) setConfig(config *HypervisorConfig) error {
return nil
}
func (q *qemu) createVirtiofsDaemon(sharedPath string) (VirtiofsDaemon, error) {
virtiofsdSocketPath, err := q.vhostFSSocketPath(q.id)
if err != nil {
return nil, err
}
if q.config.SharedFS == config.VirtioFSNydus {
apiSockPath, err := q.nydusdAPISocketPath(q.id)
if err != nil {
return nil, err
}
nd := &nydusd{
path: q.config.VirtioFSDaemon,
sockPath: virtiofsdSocketPath,
apiSockPath: apiSockPath,
sourcePath: sharedPath,
debug: q.config.Debug,
extraArgs: q.config.VirtioFSExtraArgs,
startFn: startInShimNS,
}
nd.setupShareDirFn = nd.setupPassthroughFS
return nd, nil
}
// default use virtiofsd
return &virtiofsd{
path: q.config.VirtioFSDaemon,
sourcePath: sharedPath,
socketPath: virtiofsdSocketPath,
extraArgs: q.config.VirtioFSExtraArgs,
debug: q.config.Debug,
cache: q.config.VirtioFSCache,
}, nil
}
// CreateVM is the Hypervisor VM creation implementation for govmmQemu.
func (q *qemu) CreateVM(ctx context.Context, id string, network Network, hypervisorConfig *HypervisorConfig) error {
// Save the tracing context
@ -527,7 +563,8 @@ func (q *qemu) CreateVM(ctx context.Context, id string, network Network, hypervi
// builds the first VM with file-backed memory and shared=on and the
// subsequent ones with shared=off. virtio-fs always requires shared=on for
// memory.
if q.config.SharedFS == config.VirtioFS || q.config.FileBackedMemRootDir != "" {
if q.config.SharedFS == config.VirtioFS || q.config.SharedFS == config.VirtioFSNydus ||
q.config.FileBackedMemRootDir != "" {
if !(q.config.BootToBeTemplate || q.config.BootFromTemplate) {
q.setupFileBackedMem(&knobs, &memory)
} else {
@ -642,50 +679,41 @@ func (q *qemu) CreateVM(ctx context.Context, id string, network Network, hypervi
q.qemuConfig = qemuConfig
virtiofsdSocketPath, err := q.vhostFSSocketPath(q.id)
if err != nil {
return err
}
q.virtiofsd = &virtiofsd{
path: q.config.VirtioFSDaemon,
sourcePath: hypervisorConfig.SharedPath,
socketPath: virtiofsdSocketPath,
extraArgs: q.config.VirtioFSExtraArgs,
debug: q.config.Debug,
cache: q.config.VirtioFSCache,
}
return nil
q.virtiofsDaemon, err = q.createVirtiofsDaemon(hypervisorConfig.SharedPath)
return err
}
func (q *qemu) vhostFSSocketPath(id string) (string, error) {
return utils.BuildSocketPath(q.config.VMStorePath, id, vhostFSSocket)
}
func (q *qemu) setupVirtiofsd(ctx context.Context) (err error) {
pid, err := q.virtiofsd.Start(ctx, func() {
func (q *qemu) nydusdAPISocketPath(id string) (string, error) {
return utils.BuildSocketPath(q.config.VMStorePath, id, nydusdAPISock)
}
func (q *qemu) setupVirtiofsDaemon(ctx context.Context) (err error) {
pid, err := q.virtiofsDaemon.Start(ctx, func() {
q.StopVM(ctx, false)
})
if err != nil {
return err
}
q.state.VirtiofsdPid = pid
q.state.VirtiofsDaemonPid = pid
return nil
}
func (q *qemu) stopVirtiofsd(ctx context.Context) (err error) {
if q.state.VirtiofsdPid == 0 {
func (q *qemu) stopVirtiofsDaemon(ctx context.Context) (err error) {
if q.state.VirtiofsDaemonPid == 0 {
q.Logger().Warn("The virtiofsd had stopped")
return nil
}
err = q.virtiofsd.Stop(ctx)
err = q.virtiofsDaemon.Stop(ctx)
if err != nil {
return err
}
q.state.VirtiofsdPid = 0
q.state.VirtiofsDaemonPid = 0
return nil
}
@ -707,7 +735,8 @@ func (q *qemu) getMemArgs() (bool, string, string, error) {
return share, target, "", fmt.Errorf("Vhost-user-blk/scsi requires hugepage memory")
}
if q.config.SharedFS == config.VirtioFS || q.config.FileBackedMemRootDir != "" {
if q.config.SharedFS == config.VirtioFS || q.config.SharedFS == config.VirtioFSNydus ||
q.config.FileBackedMemRootDir != "" {
target = q.qemuConfig.Memory.Path
memoryBack = "memory-backend-file"
}
@ -817,15 +846,15 @@ func (q *qemu) StartVM(ctx context.Context, timeout int) error {
}
defer label.SetProcessLabel("")
if q.config.SharedFS == config.VirtioFS {
err = q.setupVirtiofsd(ctx)
if q.config.SharedFS == config.VirtioFS || q.config.SharedFS == config.VirtioFSNydus {
err = q.setupVirtiofsDaemon(ctx)
if err != nil {
return err
}
defer func() {
if err != nil {
if shutdownErr := q.stopVirtiofsd(ctx); shutdownErr != nil {
q.Logger().WithError(shutdownErr).Warn("failed to stop virtiofsd")
if shutdownErr := q.stopVirtiofsDaemon(ctx); shutdownErr != nil {
q.Logger().WithError(shutdownErr).Warn("failed to stop virtiofsDaemon")
}
}
}()
@ -986,8 +1015,8 @@ func (q *qemu) StopVM(ctx context.Context, waitOnly bool) error {
}
}
if q.config.SharedFS == config.VirtioFS {
if err := q.stopVirtiofsd(ctx); err != nil {
if q.config.SharedFS == config.VirtioFS || q.config.SharedFS == config.VirtioFSNydus {
if err := q.stopVirtiofsDaemon(ctx); err != nil {
return err
}
}
@ -1970,7 +1999,7 @@ func (q *qemu) AddDevice(ctx context.Context, devInfo interface{}, devType Devic
switch v := devInfo.(type) {
case types.Volume:
if q.config.SharedFS == config.VirtioFS {
if q.config.SharedFS == config.VirtioFS || q.config.SharedFS == config.VirtioFSNydus {
q.Logger().WithField("volume-type", "virtio-fs").Info("adding volume")
var randBytes []byte
@ -2399,15 +2428,15 @@ func (q *qemu) GetPids() []int {
}
pids := []int{pid}
if q.state.VirtiofsdPid != 0 {
pids = append(pids, q.state.VirtiofsdPid)
if q.state.VirtiofsDaemonPid != 0 {
pids = append(pids, q.state.VirtiofsDaemonPid)
}
return pids
}
func (q *qemu) GetVirtioFsPid() *int {
return &q.state.VirtiofsdPid
return &q.state.VirtiofsDaemonPid
}
type qemuGrpc struct {
@ -2476,7 +2505,7 @@ func (q *qemu) Save() (s hv.HypervisorState) {
if len(pids) != 0 {
s.Pid = pids[0]
}
s.VirtiofsdPid = q.state.VirtiofsdPid
s.VirtiofsDaemonPid = q.state.VirtiofsDaemonPid
s.Type = string(QemuHypervisor)
s.UUID = q.state.UUID
s.HotpluggedMemory = q.state.HotpluggedMemory
@ -2504,7 +2533,7 @@ func (q *qemu) Load(s hv.HypervisorState) {
q.state.UUID = s.UUID
q.state.HotpluggedMemory = s.HotpluggedMemory
q.state.HotplugVFIOOnRootBus = s.HotplugVFIOOnRootBus
q.state.VirtiofsdPid = s.VirtiofsdPid
q.state.VirtiofsDaemonPid = s.VirtiofsDaemonPid
q.state.PCIeRootPort = s.PCIeRootPort
for _, bridge := range s.Bridges {

View File

@ -613,7 +613,7 @@ func TestQemuGetpids(t *testing.T) {
assert.True(len(pids) == 1)
assert.True(pids[0] == 100)
q.state.VirtiofsdPid = 200
q.state.VirtiofsDaemonPid = 200
pids = q.GetPids()
assert.True(len(pids) == 2)
assert.True(pids[0] == 100)

View File

@ -1286,7 +1286,6 @@ func (s *Sandbox) CreateContainer(ctx context.Context, contConfig ContainerConfi
if err != nil {
return nil, err
}
// create and start the container
if err = c.create(ctx); err != nil {
return nil, err

View File

@ -0,0 +1,294 @@
/*
* Copyright (c) 2020. Ant Group. All rights reserved.
*
* SPDX-License-Identifier: Apache-2.0
*/
// copy from https://github.com/containerd/nydus-snapshotter/blob/38b23bcd0658f2bd9d99083d320b727bf73129b7/pkg/utils/retry/retry.go
package retry
import (
"fmt"
"math/rand"
"strings"
"time"
)
type RetryableFunc func() error
var (
DefaultAttempts = uint(10)
DefaultDelayMS = 100 * time.Millisecond
DefaultMaxJitterMS = 100 * time.Millisecond
DefaultOnRetry = func(n uint, err error) {}
DefaultRetryIf = IsRecoverable
DefaultDelayType = CombineDelay(BackOffDelay, RandomDelay)
DefaultLastErrorOnly = false
)
// Function signature of retry if function
type RetryIfFunc func(error) bool
// Function signature of OnRetry function
// n = count of attempts
type OnRetryFunc func(n uint, err error)
type DelayTypeFunc func(n uint, config *Config) time.Duration
type Config struct {
onRetry OnRetryFunc
retryIf RetryIfFunc
delayType DelayTypeFunc
delay time.Duration
maxDelay time.Duration
maxJitter time.Duration
attempts uint
lastErrorOnly bool
}
// Option represents an option for retry.
type Option func(*Config)
// return the direct last error that came from the retried function
// default is false (return wrapped errors with everything)
func LastErrorOnly(lastErrorOnly bool) Option {
return func(c *Config) {
c.lastErrorOnly = lastErrorOnly
}
}
// Attempts set count of retry
// default is 10
func Attempts(attempts uint) Option {
return func(c *Config) {
c.attempts = attempts
}
}
// Delay set delay between retry
// default is 100ms
func Delay(delay time.Duration) Option {
return func(c *Config) {
c.delay = delay
}
}
// MaxDelay set maximum delay between retry
// does not apply by default
func MaxDelay(maxDelay time.Duration) Option {
return func(c *Config) {
c.maxDelay = maxDelay
}
}
// MaxJitter sets the maximum random Jitter between retries for RandomDelay
func MaxJitter(maxJitter time.Duration) Option {
return func(c *Config) {
c.maxJitter = maxJitter
}
}
// DelayType set type of the delay between retries
// default is BackOff
func DelayType(delayType DelayTypeFunc) Option {
return func(c *Config) {
c.delayType = delayType
}
}
// BackOffDelay is a DelayType which increases delay between consecutive retries
func BackOffDelay(n uint, config *Config) time.Duration {
return config.delay * (1 << n)
}
// FixedDelay is a DelayType which keeps delay the same through all iterations
func FixedDelay(_ uint, config *Config) time.Duration {
return config.delay
}
// RandomDelay is a DelayType which picks a random delay up to config.maxJitter
func RandomDelay(_ uint, config *Config) time.Duration {
return time.Duration(rand.Int63n(int64(config.maxJitter)))
}
// CombineDelay is a DelayType the combines all of the specified delays into a new DelayTypeFunc
func CombineDelay(delays ...DelayTypeFunc) DelayTypeFunc {
return func(n uint, config *Config) time.Duration {
var total time.Duration
for _, delay := range delays {
total += delay(n, config)
}
return total
}
}
// OnRetry function callback are called each retry
//
// log each retry example:
//
// retry.Do(
// func() error {
// return errors.New("some error")
// },
// retry.OnRetry(func(n uint, err error) {
// log.Printf("#%d: %s\n", n, err)
// }),
// )
func OnRetry(onRetry OnRetryFunc) Option {
return func(c *Config) {
c.onRetry = onRetry
}
}
// RetryIf controls whether a retry should be attempted after an error
// (assuming there are any retry attempts remaining)
//
// skip retry if special error example:
//
// retry.Do(
// func() error {
// return errors.New("special error")
// },
// retry.RetryIf(func(err error) bool {
// if err.Error() == "special error" {
// return false
// }
// return true
// })
// )
//
// By default RetryIf stops execution if the error is wrapped using `retry.Unrecoverable`,
// so above example may also be shortened to:
//
// retry.Do(
// func() error {
// return retry.Unrecoverable(errors.New("special error"))
// }
// )
func RetryIf(retryIf RetryIfFunc) Option {
return func(c *Config) {
c.retryIf = retryIf
}
}
func Do(retryableFunc RetryableFunc, opts ...Option) error {
var n uint
// default
config := &Config{
attempts: DefaultAttempts,
delay: DefaultDelayMS,
maxJitter: DefaultMaxJitterMS,
onRetry: DefaultOnRetry,
retryIf: DefaultRetryIf,
delayType: DefaultDelayType,
lastErrorOnly: DefaultLastErrorOnly,
}
// apply opts
for _, opt := range opts {
opt(config)
}
var errorLog Error
if !config.lastErrorOnly {
errorLog = make(Error, config.attempts)
} else {
errorLog = make(Error, 1)
}
lastErrIndex := n
for n < config.attempts {
err := retryableFunc()
if err != nil {
errorLog[lastErrIndex] = unpackUnrecoverable(err)
if !config.retryIf(err) {
break
}
config.onRetry(n, err)
// if this is last attempt - don't wait
if n == config.attempts-1 {
break
}
delayTime := config.delayType(n, config)
if config.maxDelay > 0 && delayTime > config.maxDelay {
delayTime = config.maxDelay
}
time.Sleep(delayTime)
} else {
return nil
}
n++
if !config.lastErrorOnly {
lastErrIndex = n
}
}
if config.lastErrorOnly {
return errorLog[lastErrIndex]
}
return errorLog
}
// Error type represents list of errors in retry
type Error []error
// Error method return string representation of Error
// It is an implementation of error interface
func (e Error) Error() string {
logWithNumber := make([]string, lenWithoutNil(e))
for i, l := range e {
if l != nil {
logWithNumber[i] = fmt.Sprintf("#%d: %s", i+1, l.Error())
}
}
return fmt.Sprintf("All attempts fail:\n%s", strings.Join(logWithNumber, "\n"))
}
func lenWithoutNil(e Error) (count int) {
for _, v := range e {
if v != nil {
count++
}
}
return
}
// WrappedErrors returns the list of errors that this Error is wrapping.
// It is an implementation of the `errwrap.Wrapper` interface
// in package [errwrap](https://github.com/hashicorp/errwrap) so that
// `retry.Error` can be used with that library.
func (e Error) WrappedErrors() []error {
return e
}
type unrecoverableError struct {
error
}
// Unrecoverable wraps an error in `unrecoverableError` struct
func Unrecoverable(err error) error {
return unrecoverableError{err}
}
// IsRecoverable checks if error is an instance of `unrecoverableError`
func IsRecoverable(err error) bool {
_, isUnrecoverable := err.(unrecoverableError)
return !isUnrecoverable
}
func unpackUnrecoverable(err error) error {
if unrecoverable, isUnrecoverable := err.(unrecoverableError); isUnrecoverable {
return unrecoverable.error
}
return err
}

View File

@ -34,13 +34,24 @@ var (
errVirtiofsdSocketPathEmpty = errors.New("virtiofsd socket path is empty")
errVirtiofsdSourcePathEmpty = errors.New("virtiofsd source path is empty")
errVirtiofsdSourceNotAvailable = errors.New("virtiofsd source path not available")
errUnimplemented = errors.New("unimplemented")
)
type Virtiofsd interface {
// Start virtiofsd, return pid of virtiofsd process
type VirtiofsDaemon interface {
// Start virtiofs daemon, return pid of virtiofs daemon process
Start(context.Context, onQuitFunc) (pid int, err error)
// Stop virtiofsd process
// Stop virtiofs daemon process
Stop(context.Context) error
// Add a submount rafs to the virtiofs mountpoint
Mount(opt MountOption) error
// Umount a submount rafs from the virtiofs mountpoint
Umount(mountpoint string) error
}
type MountOption struct {
source string
mountpoint string
config string
}
// Helper function to execute when virtiofsd quit
@ -155,6 +166,7 @@ func (v *virtiofsd) Start(ctx context.Context, onQuit onQuitFunc) (int, error) {
func (v *virtiofsd) Stop(ctx context.Context) error {
if err := v.kill(ctx); err != nil {
v.Logger().WithError(err).WithField("pid", v.PID).Warn("kill virtiofsd failed")
return nil
}
@ -165,6 +177,14 @@ func (v *virtiofsd) Stop(ctx context.Context) error {
return nil
}
func (v *virtiofsd) Mount(opt MountOption) error {
return errUnimplemented
}
func (v *virtiofsd) Umount(mountpoint string) error {
return errUnimplemented
}
func (v *virtiofsd) args(FdSocketNumber uint) ([]string, error) {
args := []string{
@ -232,7 +252,6 @@ func (v *virtiofsd) kill(ctx context.Context) (err error) {
if err != nil {
v.PID = 0
}
return err
}
@ -245,6 +264,14 @@ func (v *virtiofsdMock) Start(ctx context.Context, onQuit onQuitFunc) (int, erro
return 9999999, nil
}
func (v *virtiofsdMock) Mount(opt MountOption) error {
return errUnimplemented
}
func (v *virtiofsdMock) Umount(mountpoint string) error {
return errUnimplemented
}
func (v *virtiofsdMock) Stop(ctx context.Context) error {
return nil
}

View File

@ -74,7 +74,6 @@ retries=5
EOF
if [ "$BASE_URL" != "" ]; then
cat >> "${DNF_CONF}" << EOF
[base]
name=${OS_NAME}-${OS_VERSION} ${REPO_NAME}
failovermethod=priority
@ -83,7 +82,6 @@ enabled=1
EOF
elif [ "$MIRROR_LIST" != "" ]; then
cat >> "${DNF_CONF}" << EOF
[base]
name=${OS_NAME}-${OS_VERSION} ${REPO_NAME}
mirrorlist=${MIRROR_LIST}

View File

@ -233,6 +233,11 @@ externals:
'newest-version' is the latest version known to work.
newest-version: "1.1.23"
nydus:
description: "Nydus image acceleration service"
url: "https://github.com/dragonflyoss/image-service"
version: "v1.1.2"
languages:
description: |
Details of programming languages required to build system