diff --git a/docs/how-to/how-to-run-kata-containers-with-kinds-of-Block-Volumes.md b/docs/how-to/how-to-run-kata-containers-with-kinds-of-Block-Volumes.md index e53a3b5345..b8401ed1b3 100644 --- a/docs/how-to/how-to-run-kata-containers-with-kinds-of-Block-Volumes.md +++ b/docs/how-to/how-to-run-kata-containers-with-kinds-of-Block-Volumes.md @@ -224,3 +224,7 @@ In the case, `ctr run --mount type=X, src=source, dst=dest`, the X will be set ` $ # ctr run with --mount type=spdkvol,src=/kubelet/kata-test-vol-001/volume001,dst=/disk001 $ sudo ctr run -t --rm --runtime io.containerd.kata.v2 --mount type=spdkvol,src=/kubelet/kata-test-vol-001/volume001,dst=/disk001,options=rbind:rw "$image" kata-spdk-vol-xx0530 /bin/bash ``` + +## Integrate Direct Volume with K8S + +Details see [`csi-kata-directvolume`](../../src/tools/csi-kata-directvolume/README.md) diff --git a/src/tools/csi-kata-directvolume/Makefile b/src/tools/csi-kata-directvolume/Makefile new file mode 100644 index 0000000000..c5a58c5558 --- /dev/null +++ b/src/tools/csi-kata-directvolume/Makefile @@ -0,0 +1,11 @@ +# +# Copyright 2019 The Kubernetes Authors. +# Copyright (c) 2023 Ant Group +# +# SPDX-License-Identifier: Apache-2.0 +# + +CMDS=directvolplugin +all: build + +include release-tools/build.make diff --git a/src/tools/csi-kata-directvolume/README.md b/src/tools/csi-kata-directvolume/README.md new file mode 100644 index 0000000000..7c9eccb040 --- /dev/null +++ b/src/tools/csi-kata-directvolume/README.md @@ -0,0 +1,79 @@ +# CSI Direct Volume Driver + +The `Direct Volume CSI driver` is heavily inspired by the [`K8s CSI HostPath driver`](https://github.com/kubernetes-csi/csi-driver-host-path). It aims to provide a production-ready implementation and a reference implementation for Kubernetes to connect to `Direct Volume`. + +This repository houses the `Direct Volume CSI driver`, along with all build and dependent configuration files needed for deployment. + +*WARNING* It is important to note that it is still under development. + +## Pre-requisite + +- K8S cluster +- Running version 1.20 or later +- Access to terminal with `kubectl` installed + +## Features + +The driver can provision volumes based on direct block devices, eliminating the need for loop devices and relying solely on single files stored on the host. + +## Deployment + +[Deployment for K8S 1.20+](docs/deploy-csi-kata-directvol.md) + +## Building the Binary + +If you want to build the driver yourself, you can do so with the following command from `csi-kata-directvolume` path: + +```shell +cd tools/csi-kata-directvolume/ && make +``` + +## Building the Container Image + +If you want to build the container image yourself, you can do so with the following command from a specified path. +Here, we just use `buildah/podman` as an example: + +```shell +$ tree -L 2 buildah-directv/ +buildah-directv/ +├── bin +│   └── directvolplugin +└── Dockerfile + +$ buildah bud -t kata-directvolume:v1.0.19 +STEP 1/7: FROM alpine +STEP 2/7: LABEL maintainers="Kata Containers Authors" +STEP 3/7: LABEL description="Kata DirectVolume Driver" +STEP 4/7: ARG binary=./bin/directvolplugin +STEP 5/7: RUN apk add util-linux coreutils e2fsprogs xfsprogs xfsprogs-extra btrfs-progs && apk update && apk upgrade +fetch https://dl-cdn.alpinelinux.org/alpine/v3.19/main/x86_64/APKINDEX.tar.gz +fetch https://dl-cdn.alpinelinux.org/alpine/v3.19/community/x86_64/APKINDEX.tar.gz +(1/66) Installing libblkid (2.39.3-r0) +... +(66/66) Installing xfsprogs-extra (6.5.0-r0) +Executing busybox-1.36.1-r15.trigger +OK: 64 MiB in 81 packages +fetch https://dl-cdn.alpinelinux.org/alpine/v3.19/main/x86_64/APKINDEX.tar.gz +fetch https://dl-cdn.alpinelinux.org/alpine/v3.19/community/x86_64/APKINDEX.tar.gz +v3.19.0-19-ga0ddaee500e [https://dl-cdn.alpinelinux.org/alpine/v3.19/main] +v3.19.0-18-gec62a609516 [https://dl-cdn.alpinelinux.org/alpine/v3.19/community] +OK: 22983 distinct packages available +OK: 64 MiB in 81 packages +STEP 6/7: COPY ${binary} /kata-directvol-plugin +STEP 7/7: ENTRYPOINT ["/kata-directvol-plugin"] +COMMIT kata-directvolume:v1.0.19 +Getting image source signatures +Copying blob 5af4f8f59b76 skipped: already exists +Copying blob a55645705de3 done +Copying config 244001cc51 done +Writing manifest to image destination +Storing signatures +--> 244001cc51d +Successfully tagged localhost/kata-directvolume:v1.0.19 +244001cc51d77302c4ed5e1a0ec347d12d85dec4576ea1313f700f66e2a7d36d +$ podman save localhost/kata-directvolume:v1.0.19 -o kata-directvolume-v1.0.19.tar +$ ctr -n k8s.io image import kata-directvolume-v1.0.19.tar +unpacking localhost/kata-directvolume:v1.0.19 (sha256:1bdc33ff7f9cee92e74cbf77a9d79d00dce6dbb9ba19b9811f683e1a087f8fbf)...done +$ crictl images |grep 1.0.19 +localhost/kata-directvolume v1.0.19 244001cc51d77 83.8MB +``` diff --git a/src/tools/csi-kata-directvolume/cmd/directvolplugin/main.go b/src/tools/csi-kata-directvolume/cmd/directvolplugin/main.go new file mode 100644 index 0000000000..b2e46a3ad3 --- /dev/null +++ b/src/tools/csi-kata-directvolume/cmd/directvolplugin/main.go @@ -0,0 +1,64 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package main + +import ( + "flag" + "kata-containers/csi-kata-directvolume/pkg/directvolume" + "os" + "path" + + "k8s.io/klog/v2" +) + +func init() { + if err := flag.Set("logtostderr", "true"); err != nil { + klog.Errorln("flag setting failed.") + } +} + +var ( + // Set by the build process + version = "" +) + +func main() { + cfg := directvolume.Config{ + VendorVersion: version, + } + + flag.StringVar(&cfg.Endpoint, "endpoint", "unix:///var/run/csi.sock", "CSI endpoint") + flag.StringVar(&cfg.DriverName, "drivername", "directvolume.csi.katacontainers.io", "name of the driver") + flag.StringVar(&cfg.StateDir, "statedir", "/csi-persist-data", "directory for storing state information across driver restarts, volumes ") + flag.StringVar(&cfg.StoragePath, "storagepath", "", "storage path for storing the backend files on host") + flag.StringVar(&cfg.NodeID, "nodeid", "", "node id") + flag.Var(&cfg.Capacity, "capacity", "Simulate storage capacity. The parameter is = where is the value of a 'kind' storage class parameter and is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.") + flag.Int64Var(&cfg.MaxVolumeSize, "max-volume-size", 1024*1024*1024*1024, "maximum size of volumes in bytes (inclusive)") + flag.BoolVar(&cfg.EnableTopology, "enable-topology", true, "Enables PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS capability.") + + showVersion := flag.Bool("version", false, "Show version.") + + flag.Parse() + + if *showVersion { + baseName := path.Base(os.Args[0]) + klog.Infof(baseName, version) + return + } + + driver, err := directvolume.NewDirectVolumeDriver(cfg) + if err != nil { + klog.Errorf("Failed to initialize driver: %s", err.Error()) + os.Exit(1) + } + + if err := driver.Run(); err != nil { + klog.Errorf("Failed to run driver: %s", err.Error()) + os.Exit(1) + } +} diff --git a/src/tools/csi-kata-directvolume/deploy/deploy.sh b/src/tools/csi-kata-directvolume/deploy/deploy.sh new file mode 100644 index 0000000000..0cde476efa --- /dev/null +++ b/src/tools/csi-kata-directvolume/deploy/deploy.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# +# Copyright (c) 2023 Ant Group +# +# SPDX-License-Identifier: Apache-2.0 +# + +set -e +set -o pipefail + +BASE_DIR=$(dirname "$0") + +${BASE_DIR}/rbac-deploy.sh +${BASE_DIR}/directvol-deploy.sh diff --git a/src/tools/csi-kata-directvolume/deploy/directvol-deploy.sh b/src/tools/csi-kata-directvolume/deploy/directvol-deploy.sh new file mode 100755 index 0000000000..43dd54e492 --- /dev/null +++ b/src/tools/csi-kata-directvolume/deploy/directvol-deploy.sh @@ -0,0 +1,115 @@ +#!/usr/bin/env bash +# +# Copyright 2017 The Kubernetes Authors. +# Copyright (c) 2023 Ant Group +# +# SPDX-License-Identifier: Apache-2.0 +# + +set -e +set -o pipefail + +BASE_DIR=$(dirname "$0") + +# KUBELET_DATA_DIR can be set to replace the default /var/lib/kubelet. +# All nodes must use the same directory. +default_kubelet_data_dir=/var/lib/kubelet +: ${KUBELET_DATA_DIR:=${default_kubelet_data_dir}} + +# namespace kata-directvolume +DIRECTVOL_NAMESPACE="kata-directvolume" + +# Some images are not affected by *_REGISTRY/*_TAG and IMAGE_* variables. +# The default is to update unless explicitly excluded. +update_image () { + case "$1" in socat) return 1;; esac +} + +run () { + echo "$@" >&2 + "$@" +} + +# deploy kata directvolume plugin and registrar sidecar +echo "deploying kata directvolume components" +for i in $(ls ${BASE_DIR}/kata-directvolume/csi-directvol-*.yaml | sort); do + echo " $i" + modified="$(cat "$i" | sed -e "s;${default_kubelet_data_dir}/;${KUBELET_DATA_DIR}/;" | while IFS= read -r line; do + nocomments="$(echo "$line" | sed -e 's/ *#.*$//')" + if echo "$nocomments" | grep -q '^[[:space:]]*image:[[:space:]]*'; then + # Split 'image: quay.io/k8scsi/csi-attacher:vx.y.z' + # into image (quay.io/k8scsi/csi-attacher:vx.y.z), + # registry (quay.io/k8scsi), + # name (csi-attacher), + # tag (vx.y.z). + image=$(echo "$nocomments" | sed -e 's;.*image:[[:space:]]*;;') + registry=$(echo "$image" | sed -e 's;\(.*\)/.*;\1;') + name=$(echo "$image" | sed -e 's;.*/\([^:]*\).*;\1;') + tag=$(echo "$image" | sed -e 's;.*:;;') + + # Variables are with underscores and upper case. + varname=$(echo $name | tr - _ | tr a-z A-Z) + + # Now replace registry and/or tag, if set as env variables. + # If not set, the replacement is the same as the original value. + # Only do this for the images which are meant to be configurable. + if update_image "$name"; then + prefix=$(eval echo \${${varname}_REGISTRY:-${IMAGE_REGISTRY:-${registry}}}/ | sed -e 's;none/;;') + if [ "$IMAGE_TAG" = "canary" ] && + [ -f ${BASE_DIR}/canary-blacklist.txt ] && + grep -q "^$name\$" ${BASE_DIR}/canary-blacklist.txt; then + # Ignore IMAGE_TAG=canary for this particular image because its + # canary image is blacklisted in the deployment blacklist. + suffix=$(eval echo :\${${varname}_TAG:-${tag}}) + else + suffix=$(eval echo :\${${varname}_TAG:-${IMAGE_TAG:-${tag}}}) + fi + line="$(echo "$nocomments" | sed -e "s;$image;${prefix}${name}${suffix};")" + fi + echo "kata-directvolume plugin using $line" >&2 + fi + if ! $have_csistoragecapacity; then + line="$(echo "$line" | grep -v -e 'storageCapacity: true' -e '--enable-capacity')" + fi + echo "$line" + done)" + if ! echo "$modified" | kubectl apply -f -; then + echo "modified version of $i:" + echo "$modified" + exit 1 + fi +done + +wait_for_daemonset () { + retries=10 + while [ $retries -ge 0 ]; do + ready=$(kubectl get -n $1 daemonset $2 -o jsonpath="{.status.numberReady}") + required=$(kubectl get -n $1 daemonset $2 -o jsonpath="{.status.desiredNumberScheduled}") + if [ $ready -gt 0 ] && [ $ready -eq $required ]; then + return 0 + fi + retries=$((retries - 1)) + sleep 3 + done + return 1 +} + + +# Wait until the DaemonSet is running on all nodes. +if ! wait_for_daemonset ${DIRECTVOL_NAMESPACE} csi-kata-directvol-plugin; then + echo + echo "driver not ready" + echo "Deployment:" + (set +e; set -x; kubectl describe all,role,clusterrole,rolebinding,clusterrolebinding,serviceaccount,storageclass,csidriver --all-namespaces -l app.kubernetes.io/instance=directvolume.csi.katacontainers.io) + echo + echo "Pod logs:" + kubectl get pods -l app.kubernetes.io/instance=directvolume.csi.katacontainers.io --all-namespaces -o=jsonpath='{range .items[*]}{.metadata.name}{" "}{range .spec.containers[*]}{.name}{" "}{end}{"\n"}{end}' | while read -r pod containers; do + for c in $containers; do + echo + (set +e; set -x; kubectl logs $pod $c) + done + done + exit 1 +fi + +kubectl get po,ds -A diff --git a/src/tools/csi-kata-directvolume/deploy/directvol-destroy.sh b/src/tools/csi-kata-directvolume/deploy/directvol-destroy.sh new file mode 100755 index 0000000000..59fca37f6b --- /dev/null +++ b/src/tools/csi-kata-directvolume/deploy/directvol-destroy.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# +# Copyright 2017 The Kubernetes Authors. +# Copyright (c) 2023 Ant Group +# +# SPDX-License-Identifier: Apache-2.0 +# + +set -e +set -o pipefail + +# Deleting all the resources installed by the directvol-deploy script. +# Every resource in the driver installation has the label representing the installation instance. +# Using app.kubernetes.io/instance: directvolume.csi.katacontainers.io and app.kubernetes.io/part-of: +# csi-driver-kata-directvolume labels to identify the installation set +kubectl delete all --all-namespaces -l app.kubernetes.io/instance=directvolume.csi.katacontainers.io,app.kubernetes.io/part-of=csi-driver-kata-directvolume --wait=true +kubectl delete role,clusterrole,rolebinding,clusterrolebinding,serviceaccount,storageclass,csidriver --all-namespaces -l app.kubernetes.io/instance=directvolume.csi.katacontainers.io,app.kubernetes.io/part-of=csi-driver-kata-directvolume --wait=true diff --git a/src/tools/csi-kata-directvolume/deploy/kata-directvolume/csi-directvol-driverinfo.yaml b/src/tools/csi-kata-directvolume/deploy/kata-directvolume/csi-directvol-driverinfo.yaml new file mode 100644 index 0000000000..f12b2d5dd1 --- /dev/null +++ b/src/tools/csi-kata-directvolume/deploy/kata-directvolume/csi-directvol-driverinfo.yaml @@ -0,0 +1,21 @@ +apiVersion: storage.k8s.io/v1 +kind: CSIDriver +metadata: + name: directvolume.csi.katacontainers.io + labels: + app.kubernetes.io/instance: directvolume.csi.katacontainers.io + app.kubernetes.io/part-of: csi-driver-kata-directvolume + app.kubernetes.io/name: directvolume.csi.katacontainers.io + app.kubernetes.io/component: csi-driver +spec: + # Supports persistent volume. + volumeLifecycleModes: + - Persistent + # To determine at runtime which mode a volume uses, pod info. + podInfoOnMount: true + # No attacher needed. + attachRequired: false + storageCapacity: false + # Kubernetes may use fsGroup to change permissions and ownership + # of the volume to match user requested fsGroup in the pod's SecurityPolicy + fsGroupPolicy: File diff --git a/src/tools/csi-kata-directvolume/deploy/kata-directvolume/csi-directvol-plugin.yaml b/src/tools/csi-kata-directvolume/deploy/kata-directvolume/csi-directvol-plugin.yaml new file mode 100644 index 0000000000..a243956254 --- /dev/null +++ b/src/tools/csi-kata-directvolume/deploy/kata-directvolume/csi-directvol-plugin.yaml @@ -0,0 +1,190 @@ +kind: DaemonSet +apiVersion: apps/v1 +metadata: + namespace: kata-directvolume + name: csi-kata-directvol-plugin + labels: + app.kubernetes.io/instance: directvolume.csi.katacontainers.io + app.kubernetes.io/part-of: csi-driver-kata-directvolume + app.kubernetes.io/name: csi-kata-directvol-plugin + app.kubernetes.io/component: plugin +spec: + selector: + matchLabels: + app.kubernetes.io/instance: directvolume.csi.katacontainers.io + app.kubernetes.io/part-of: csi-driver-kata-directvolume + app.kubernetes.io/name: csi-kata-directvol-plugin + app.kubernetes.io/component: plugin + template: + metadata: + labels: + app.kubernetes.io/instance: directvolume.csi.katacontainers.io + app.kubernetes.io/part-of: csi-driver-kata-directvolume + app.kubernetes.io/name: csi-kata-directvol-plugin + app.kubernetes.io/component: plugin + spec: + serviceAccountName: csi-provisioner + containers: + - name: csi-provisioner + image: registry.k8s.io/sig-storage/csi-provisioner:v3.6.0 + args: + - -v=3 + - --csi-address=/csi/csi.sock + - --feature-gates=Topology=true + - --node-deployment=true + - --strict-topology=true + - --immediate-topology=false + - --worker-threads=5 + #- --enable-capacity + #- --capacity-ownerref-level=0 # pod is owner + env: + - name: NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + securityContext: + # This is necessary only for systems with SELinux, where + # non-privileged sidecar containers cannot access unix domain socket + # created by privileged CSI driver container. + privileged: true + volumeMounts: + - mountPath: /csi + name: socket-dir + + - name: node-driver-registrar + image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.9.0 + args: + - --v=3 + - --csi-address=/csi/csi.sock + - --kubelet-registration-path=/var/lib/kubelet/plugins/csi-kata-directvolume/csi.sock + securityContext: + # This is necessary only for systems with SELinux, where + # non-privileged sidecar containers cannot access unix domain socket + # created by privileged CSI driver container. + privileged: true + env: + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + volumeMounts: + - mountPath: /csi + name: socket-dir + - mountPath: /registration + name: registration-dir + - mountPath: /csi-persist-data + name: csi-persist-data + + - name: kata-directvolume + # build and push it into registry + image: localhost/kata-directvolume:v1.0.18 + args: + - --drivername=directvolume.csi.katacontainers.io + - --v=5 + - --endpoint=$(CSI_ENDPOINT) + - --statedir=$(STATE_DIR) + - --storagepath=$(STORAGE_POOL) + - --nodeid=$(KUBE_NODE_NAME) + env: + - name: CSI_ENDPOINT + value: unix:///csi/csi.sock + - name: STORAGE_POOL + value: /tmp/kata-directvol-storages + - name: STATE_DIR + value: /csi-persist-data + - name: KUBE_NODE_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: spec.nodeName + securityContext: + privileged: true + ports: + - containerPort: 9898 + name: healthz + protocol: TCP + livenessProbe: + failureThreshold: 5 + httpGet: + path: /healthz + port: healthz + initialDelaySeconds: 10 + timeoutSeconds: 3 + periodSeconds: 2 + volumeMounts: + - mountPath: /csi + name: socket-dir + - mountPath: /var/lib/kubelet/pods + mountPropagation: Bidirectional + name: mountpoint-dir + - mountPath: /var/lib/kubelet/plugins + mountPropagation: Bidirectional + name: plugins-dir + - mountPath: /csi-persist-data + name: csi-persist-data + - mountPath: /dev + name: dev-dir + # backend block file stored at storage-pool + - mountPath: /tmp/kata-directvol-storages + name: storage-pool + # direct volume mountInfo.json stored at shared-directvols + - mountPath: /run/kata-containers/shared/direct-volumes + name: shared-directvols + + - name: liveness-probe + volumeMounts: + - mountPath: /csi + name: socket-dir + image: registry.k8s.io/sig-storage/livenessprobe:v2.8.0 + args: + - --csi-address=/csi/csi.sock + - --health-port=9898 + + volumes: + - hostPath: + path: /var/lib/kubelet/plugins/csi-kata-directvolume + type: DirectoryOrCreate + name: socket-dir + - hostPath: + path: /var/lib/kubelet/pods + type: DirectoryOrCreate + name: mountpoint-dir + - hostPath: + path: /var/lib/kubelet/plugins_registry + type: Directory + name: registration-dir + - hostPath: + path: /var/lib/kubelet/plugins + type: Directory + name: plugins-dir + - hostPath: + # 'path' is where PV data is persisted on host. + # using /tmp is also possible while the PVs will not available after plugin container recreation or host reboot + path: /var/lib/csi-directvolume-data/ + type: DirectoryOrCreate + name: csi-persist-data + - hostPath: + path: /dev + type: Directory + name: dev-dir + # kata-containers backend rawblock stored there. + - hostPath: + path: /tmp/kata-directvol-storages + type: DirectoryOrCreate + name: storage-pool + # kata-containers direct volumes stored there. + - hostPath: + path: /run/kata-containers/shared/direct-volumes/ + type: DirectoryOrCreate + name: shared-directvols + diff --git a/src/tools/csi-kata-directvolume/deploy/rbac-deploy.sh b/src/tools/csi-kata-directvolume/deploy/rbac-deploy.sh new file mode 100755 index 0000000000..4fc1d01ee9 --- /dev/null +++ b/src/tools/csi-kata-directvolume/deploy/rbac-deploy.sh @@ -0,0 +1,81 @@ +#!/usr/bin/env bash +# +# Copyright 2017 The Kubernetes Authors. +# Copyright (c) 2023 Ant Group +# +# SPDX-License-Identifier: Apache-2.0 +# + +set -e +set -o pipefail + +BASE_DIR=$(dirname "$0") +DEPLOY_DIR=${BASE_DIR}/kata-directvolume + +TEMP_DIR="$( mktemp -d )" +trap 'rm -rf ${TEMP_DIR}' EXIT + +: ${UPDATE_RBAC_RULES:=true} +function rbac_version () { + yaml="$1" + image="$2" + update_rbac="$3" + + # get version from `image: quay.io/k8scsi/csi-attacher:v1.0.1`, ignoring comments + version="$(sed -e 's/ *#.*$//' "$yaml" | grep "image:.*$image" | sed -e 's/ *#.*//' -e 's/.*://')" + + if $update_rbac; then + # apply overrides + varname=$(echo $image | tr - _ | tr a-z A-Z) + eval version=\${${varname}_TAG:-\${IMAGE_TAG:-\$version}} + fi + + echo "$version" +} + +# https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/${VERSION}/deploy/kubernetes/rbac.yaml +CSI_PROVISIONER_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/$(rbac_version "${BASE_DIR}/kata-directvolume/csi-directvol-plugin.yaml" csi-provisioner false)/deploy/kubernetes/rbac.yaml" +: ${CSI_PROVISIONER_RBAC:=https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/$(rbac_version "${BASE_DIR}/kata-directvolume/csi-directvol-plugin.yaml" csi-provisioner "${UPDATE_RBAC_RULES}")/deploy/kubernetes/rbac.yaml} + +run () { + echo "$@" >&2 + "$@" +} + +# namespace kata-directvolume +DIRECTVOL_NAMESPACE="kata-directvolume" + +# create namespace kata-directvolume +echo "Creating Namespace kata-directvolume ..." + cat <<- EOF > "${TEMP_DIR}"/kata-directvol-ns.yaml +apiVersion: v1 +kind: Namespace +metadata: + labels: + kubernetes.io/metadata.name: ${DIRECTVOL_NAMESPACE} + name: ${DIRECTVOL_NAMESPACE} +spec: + finalizers: + - kubernetes +EOF + +run kubectl apply -f "${TEMP_DIR}"/kata-directvol-ns.yaml +echo "Namespace kata-directvolume created Done !" + +# rbac rules +echo "Applying RBAC rules ..." + +eval component="CSI_PROVISIONER" +eval current="\${${component}_RBAC}" +eval original="\${${component}_RBAC_YAML}" + +if [[ "${current}" =~ ^http:// ]] || [[ "${current}" =~ ^https:// ]]; then + run curl "${current}" --output "${TEMP_DIR}"/rbac.yaml --silent --location +fi + +# replace the default namespace with specified namespace kata-directvolume +sed -e "s/namespace: default/namespace: kata-directvolume/g" "${TEMP_DIR}"/rbac.yaml > "${DEPLOY_DIR}/kata-directvol-rbac.yaml" + +# apply the kata-directvol-rbac.yaml +run kubectl apply -f "${DEPLOY_DIR}/kata-directvol-rbac.yaml" +echo "Applying RBAC rules Done!" \ No newline at end of file diff --git a/src/tools/csi-kata-directvolume/docs/deploy-csi-kata-directvol.md b/src/tools/csi-kata-directvolume/docs/deploy-csi-kata-directvol.md new file mode 100644 index 0000000000..c57eda4480 --- /dev/null +++ b/src/tools/csi-kata-directvolume/docs/deploy-csi-kata-directvol.md @@ -0,0 +1,178 @@ +# Deploy Kata Direct Volume CSI and Do Validation + +## How to Deploy Kata Direct Volume CSI + +First, you need to make sure you have a healthy Kubernetes(1.20+) cluster and have the permissions to create Kata pods. + +*WARNING* If you select a `K8S` with lower version, It cannot ensure that it will work well. + +The `CSI driver` is deployed as a `daemonset` and the pods of the `daemonset` contain 4 containers: + +1. `Kata Direct Volume CSI Driver`, which is the key implementation in it +2. [CSI-External-Provisioner](https://github.com/kubernetes-csi/external-provisioner) +3. [CSI-Liveness-Probe](https://github.com/kubernetes-csi/livenessprobe) +4. [CSI-Node-Driver-Registrar](https://github.com/kubernetes-csi/node-driver-registrar) + +The easiest way to deploy the `Direct Volume CSI driver` is to run the `deploy.sh` script for the Kubernetes version used by +the cluster as shown below for Kubernetes 1.28.2. + +```shell +sudo deploy/deploy.sh +``` + +You'll get an output similar to the following, indicating the application of `RBAC rules` and the successful deployment of `csi-provisioner`, `node-driver-registrar`, `kata directvolume csi driver`(`csi-kata-directvol-plugin`), liveness-probe. Please note that the following output is specific to Kubernetes 1.28.2. + +```shell +Creating Namespace kata-directvolume ... +kubectl apply -f /tmp/tmp.kN43BWUGQ5/kata-directvol-ns.yaml +namespace/kata-directvolume created +Namespace kata-directvolume created Done ! +Applying RBAC rules ... +curl https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/v3.6.0/deploy/kubernetes/rbac.yaml --output /tmp/tmp.kN43BWUGQ5/rbac.yaml --silent --location +kubectl apply -f ./kata-directvolume/kata-directvol-rbac.yaml +serviceaccount/csi-provisioner created +clusterrole.rbac.authorization.k8s.io/external-provisioner-runner created +clusterrolebinding.rbac.authorization.k8s.io/csi-provisioner-role created +role.rbac.authorization.k8s.io/external-provisioner-cfg created +rolebinding.rbac.authorization.k8s.io/csi-provisioner-role-cfg created + +$ ./directvol-deploy.sh +deploying kata directvolume components + ./kata-directvolume/csi-directvol-driverinfo.yaml +csidriver.storage.k8s.io/directvolume.csi.katacontainers.io created + ./kata-directvolume/csi-directvol-plugin.yaml +kata-directvolume plugin using image: registry.k8s.io/sig-storage/csi-provisioner:v3.6.0 +kata-directvolume plugin using image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.9.0 +kata-directvolume plugin using image: localhost/kata-directvolume:v1.0.52 +kata-directvolume plugin using image: registry.k8s.io/sig-storage/livenessprobe:v2.8.0 +daemonset.apps/csi-kata-directvol-plugin created + ./kata-directvolume/kata-directvol-ns.yaml +namespace/kata-directvolume unchanged + ./kata-directvolume/kata-directvol-rbac.yaml +serviceaccount/csi-provisioner unchanged +clusterrole.rbac.authorization.k8s.io/external-provisioner-runner configured +clusterrolebinding.rbac.authorization.k8s.io/csi-provisioner-role unchanged +role.rbac.authorization.k8s.io/external-provisioner-cfg unchanged +rolebinding.rbac.authorization.k8s.io/csi-provisioner-role-cfg unchanged +NAMESPACE NAME READY STATUS RESTARTS AGE +default pod/kata-driectvol-01 1/1 Running 0 3h57m +kata-directvolume pod/csi-kata-directvol-plugin-92smp 4/4 Running 0 4s +kube-flannel pod/kube-flannel-ds-vq796 1/1 Running 1 (67d ago) 67d +kube-system pod/coredns-66f779496c-9bmp2 1/1 Running 3 (67d ago) 67d +kube-system pod/coredns-66f779496c-qlq6d 1/1 Running 1 (67d ago) 67d +kube-system pod/etcd-tnt001 1/1 Running 19 (67d ago) 67d +kube-system pod/kube-apiserver-tnt001 1/1 Running 5 (67d ago) 67d +kube-system pod/kube-controller-manager-tnt001 1/1 Running 8 (67d ago) 67d +kube-system pod/kube-proxy-p9t6t 1/1 Running 6 (67d ago) 67d +kube-system pod/kube-scheduler-tnt001 1/1 Running 8 (67d ago) 67d + +NAMESPACE NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE +kata-directvolume daemonset.apps/csi-kata-directvol-plugin 1 1 1 1 1 4s +kube-flannel daemonset.apps/kube-flannel-ds 1 1 1 1 1 67d +kube-system daemonset.apps/kube-proxy 1 1 1 1 1 kubernetes.io/os=linux 67d +``` + + +## How to Run a Kata Pod and Validate it + + +First, ensure all expected pods are running properly, including `csi-provisioner`, `node-driver-registrar`, `kata-directvolume` `csi driver(csi-kata-directvol-plugin)`, liveness-probe: + +```shell +$ kubectl get po -A +NAMESPACE NAME READY STATUS RESTARTS AGE +default csi-kata-directvol-plugin-dlphw 4/4 Running 0 68m +kube-flannel kube-flannel-ds-vq796 1/1 Running 1 (52d ago) 52d +kube-system coredns-66f779496c-9bmp2 1/1 Running 3 (52d ago) 52d +kube-system coredns-66f779496c-qlq6d 1/1 Running 1 (52d ago) 52d +kube-system etcd-node001 1/1 Running 19 (52d ago) 52d +kube-system kube-apiserver-node001 1/1 Running 5 (52d ago) 52d +kube-system kube-controller-manager-node001 1/1 Running 8 (52d ago) 52d +kube-system kube-proxy-p9t6t 1/1 Running 6 (52d ago) 52d +kube-system kube-scheduler-node001 1/1 Running 8 (52d ago) 52d +``` + +From the root directory, deploy the application pods including a storage class, a `PVC`, and a pod which uses direct block device based volume. The details can be seen in `/examples/pod-with-directvol/*.yaml`: + +```shell +kubectl apply -f ${BASE_DIR}/csi-storageclass.yaml +kubectl apply -f ${BASE_DIR}/csi-pvc.yaml +kubectl apply -f ${BASE_DIR}/csi-app.yaml +``` + +Let's validate the components are deployed: + +```shell +$ kubectl get po -A +NAMESPACE NAME READY STATUS RESTARTS AGE +kata-directvolume csi-kata-directvol-plugin-dlphw 4/4 Running 0 68m +default kata-driectvol-01 1/1 Running 0 67m + +$ kubectl get sc,pvc -A +NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE +storageclass.storage.k8s.io/csi-kata-directvolume-sc directvolume.csi.katacontainers.io Delete Immediate false 71m + +NAMESPACE NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE +default persistentvolumeclaim/csi-directvolume-pvc Bound pvc-d7644547-f850-4bdf-8c93-aa745c7f31b5 1Gi RWO csi-kata-directvolume-sc 71m + +``` + +Finally, inspect the application pod `kata-driectvol-01` which running with direct block device based volume: + +```shell +$ kubectl describe po kata-driectvol-01 +Name: kata-driectvol-01 +Namespace: kata-directvolume +Priority: 0 +Runtime Class Name: kata +Service Account: default +Node: node001/10.10.1.19 +Start Time: Sat, 09 Dec 2023 23:06:49 +0800 +Labels: +Annotations: +Status: Running +IP: 10.244.0.232 +IPs: + IP: 10.244.0.232 +Containers: + first-container: + Container ID: containerd://c5eec9d645a67b982549321f382d83c56297d9a2a705857e8f3eaa6c6676908e + Image: ubuntu:22.04 + Image ID: docker.io/library/ubuntu@sha256:2b7412e6465c3c7fc5bb21d3e6f1917c167358449fecac8176c6e496e5c1f05f + Port: + Host Port: + Command: + sleep + 1000000 + State: Running + Started: Sat, 09 Dec 2023 23:06:51 +0800 + Ready: True + Restart Count: 0 + Environment: + Mounts: + /data from kata-driectvol0-volume (rw) + /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-zs9tm (ro) +Conditions: + Type Status + Initialized True + Ready True + ContainersReady True + PodScheduled True +Volumes: + kata-driectvol0-volume: + Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace) + ClaimName: csi-directvolume-pvc + ReadOnly: false + kube-api-access-zs9tm: + Type: Projected (a volume that contains injected data from multiple sources) + TokenExpirationSeconds: 3607 + ConfigMapName: kube-root-ca.crt + ConfigMapOptional: + DownwardAPI: true +QoS Class: BestEffort +Node-Selectors: +Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s + node.kubernetes.io/unreachable:NoExecute op=Exists for 300s +Events: + +``` diff --git a/src/tools/csi-kata-directvolume/examples/pod-apply.sh b/src/tools/csi-kata-directvolume/examples/pod-apply.sh new file mode 100755 index 0000000000..f5ba9d5a88 --- /dev/null +++ b/src/tools/csi-kata-directvolume/examples/pod-apply.sh @@ -0,0 +1,15 @@ +#!/bin/bash +# +# Copyright (c) 2023 Ant Group +# +# SPDX-License-Identifier: Apache-2.0 +# + +set -e +set -o pipefail + +BASE_DIR=$(dirname "$0")/pod-with-directvol + +kubectl apply -f ${BASE_DIR}/csi-storageclass.yaml +kubectl apply -f ${BASE_DIR}/csi-pvc.yaml +kubectl apply -f ${BASE_DIR}/csi-app.yaml diff --git a/src/tools/csi-kata-directvolume/examples/pod-delete.sh b/src/tools/csi-kata-directvolume/examples/pod-delete.sh new file mode 100755 index 0000000000..b4f3000843 --- /dev/null +++ b/src/tools/csi-kata-directvolume/examples/pod-delete.sh @@ -0,0 +1,15 @@ +#!/bin/bash +# +# Copyright (c) 2023 Ant Group +# +# SPDX-License-Identifier: Apache-2.0 +# + +set -e +set -o pipefail + +BASE_DIR=$(dirname "$0")/pod-with-directvol + +kubectl delete -f ${BASE_DIR}/csi-app.yaml +kubectl delete -f ${BASE_DIR}/csi-pvc.yaml +kubectl delete -f ${BASE_DIR}/csi-storageclass.yaml diff --git a/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-app.yaml b/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-app.yaml new file mode 100644 index 0000000000..1dd3a48d5e --- /dev/null +++ b/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-app.yaml @@ -0,0 +1,18 @@ +kind: Pod +apiVersion: v1 +metadata: + name: kata-driectvol-01 +spec: + runtimeClassName: kata + containers: + - name: first-container + image: ubuntu:22.04 + volumeMounts: + - mountPath: "/data" + name: kata-driectvol0-volume + command: [ "sleep", "1000000" ] + volumes: + - name: kata-driectvol0-volume + persistentVolumeClaim: + claimName: csi-directvolume-pvc # defined in csi-pvc.yaml + diff --git a/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-pvc.yaml b/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-pvc.yaml new file mode 100644 index 0000000000..4d9fedd740 --- /dev/null +++ b/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-pvc.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: csi-directvolume-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + storageClassName: csi-kata-directvolume-sc # defined in csi-storageclass.yaml diff --git a/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-storageclass.yaml b/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-storageclass.yaml new file mode 100644 index 0000000000..bc517b0930 --- /dev/null +++ b/src/tools/csi-kata-directvolume/examples/pod-with-directvol/csi-storageclass.yaml @@ -0,0 +1,12 @@ +apiVersion: storage.k8s.io/v1 +kind: StorageClass +metadata: + name: csi-kata-directvolume-sc +parameters: + katacontainers.direct.volume/volumetype: directvol + katacontainers.direct.volume/fstype: ext4 +provisioner: directvolume.csi.katacontainers.io +reclaimPolicy: Delete +volumeBindingMode: Immediate +allowVolumeExpansion: false + diff --git a/src/tools/csi-kata-directvolume/examples/pod-with-directvol/runtimeclass.yaml b/src/tools/csi-kata-directvolume/examples/pod-with-directvol/runtimeclass.yaml new file mode 100644 index 0000000000..c04de8ed09 --- /dev/null +++ b/src/tools/csi-kata-directvolume/examples/pod-with-directvol/runtimeclass.yaml @@ -0,0 +1,5 @@ +apiVersion: node.k8s.io/v1 +kind: RuntimeClass +metadata: + name: kata +handler: kata diff --git a/src/tools/csi-kata-directvolume/go.mod b/src/tools/csi-kata-directvolume/go.mod new file mode 100644 index 0000000000..75263e23a7 --- /dev/null +++ b/src/tools/csi-kata-directvolume/go.mod @@ -0,0 +1,97 @@ +module kata-containers/csi-kata-directvolume + +go 1.20 + +require ( + github.com/container-storage-interface/spec v1.9.0 + github.com/diskfs/go-diskfs v1.4.0 + github.com/golang/glog v1.2.0 + github.com/golang/protobuf v1.5.3 + github.com/kubernetes-csi/csi-lib-utils v0.16.0 + github.com/pborman/uuid v1.2.1 + github.com/stretchr/testify v1.8.4 + golang.org/x/net v0.19.0 + google.golang.org/grpc v1.59.0 + k8s.io/apimachinery v0.28.2 + k8s.io/klog/v2 v2.110.1 + k8s.io/mount-utils v0.28.2 + k8s.io/utils v0.0.0-20231127182322-b307cd553661 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/google/uuid v1.3.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/moby/sys/mountinfo v0.6.2 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/pkg/xattr v0.4.9 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/sirupsen/logrus v1.9.0 // indirect + github.com/ulikunitz/xz v0.5.11 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect + google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/djherbis/times.v1 v1.3.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace k8s.io/api => k8s.io/api v0.28.2 + +replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.28.2 + +replace k8s.io/apimachinery => k8s.io/apimachinery v0.28.2 + +replace k8s.io/apiserver => k8s.io/apiserver v0.28.2 + +replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.28.2 + +replace k8s.io/client-go => k8s.io/client-go v0.28.2 + +replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.28.2 + +replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.28.2 + +replace k8s.io/code-generator => k8s.io/code-generator v0.28.2 + +replace k8s.io/component-base => k8s.io/component-base v0.28.2 + +replace k8s.io/component-helpers => k8s.io/component-helpers v0.28.2 + +replace k8s.io/controller-manager => k8s.io/controller-manager v0.28.2 + +replace k8s.io/cri-api => k8s.io/cri-api v0.28.2 + +replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.28.2 + +replace k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.28.2 + +replace k8s.io/kms => k8s.io/kms v0.28.2 + +replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.28.2 + +replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.28.2 + +replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.28.2 + +replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.28.2 + +replace k8s.io/kubectl => k8s.io/kubectl v0.28.2 + +replace k8s.io/kubelet => k8s.io/kubelet v0.28.2 + +replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.28.2 + +replace k8s.io/metrics => k8s.io/metrics v0.28.2 + +replace k8s.io/mount-utils => k8s.io/mount-utils v0.28.2 + +replace k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.28.2 + +replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.28.2 + +replace k8s.io/endpointslice => k8s.io/endpointslice v0.28.2 diff --git a/src/tools/csi-kata-directvolume/go.sum b/src/tools/csi-kata-directvolume/go.sum new file mode 100644 index 0000000000..c402d2affb --- /dev/null +++ b/src/tools/csi-kata-directvolume/go.sum @@ -0,0 +1,114 @@ +github.com/container-storage-interface/spec v1.9.0 h1:zKtX4STsq31Knz3gciCYCi1SXtO2HJDecIjDVboYavY= +github.com/container-storage-interface/spec v1.9.0/go.mod h1:ZfDu+3ZRyeVqxZM0Ds19MVLkN2d1XJ5MAfi1L3VjlT0= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/diskfs/go-diskfs v1.4.0 h1:MAybY6TPD+fmhY+a2qFhmdvMeIKvCqlgh4QIc1uCmBs= +github.com/diskfs/go-diskfs v1.4.0/go.mod h1:G8cyy+ngM+3yKlqjweMmtqvE+TxsnIo1xumbJX1AeLg= +github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab h1:h1UgjJdAAhj+uPL68n7XASS6bU+07ZX1WJvVS2eyoeY= +github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab/go.mod h1:GLo/8fDswSAniFG+BFIaiSPcK610jyzgEhWYPQwuQdw= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kubernetes-csi/csi-lib-utils v0.16.0 h1:LXCvkhXHtFOkl7LoDqFdho/MuebccZqWxLwhKiRGiBg= +github.com/kubernetes-csi/csi-lib-utils v0.16.0/go.mod h1:fp1Oik+45tP2o4X9SD/SBWXLTQYT9wtLxGasBE3+vBI= +github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78= +github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= +github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= +github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= +github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8= +github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/djherbis/times.v1 v1.3.0 h1:uxMS4iMtH6Pwsxog094W0FYldiNnfY/xba00vq6C2+o= +gopkg.in/djherbis/times.v1 v1.3.0/go.mod h1:AQlg6unIsrsCEdQYhTzERy542dz6SFdQFZFv6mUY0P8= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +k8s.io/apimachinery v0.28.2 h1:KCOJLrc6gu+wV1BYgwik4AF4vXOlVJPdiqn0yAWWwXQ= +k8s.io/apimachinery v0.28.2/go.mod h1:RdzF87y/ngqk9H4z3EL2Rppv5jj95vGS/HaFXrLDApU= +k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= +k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= +k8s.io/mount-utils v0.28.2 h1:sIdMH7fRhcU48V1oYJ9cLmLm/TG+2jLhhe8eS3I+FWg= +k8s.io/mount-utils v0.28.2/go.mod h1:AyP8LmZSLgpGdFQr+vzHTerlPiGvXUdP99n98Er47jw= +k8s.io/utils v0.0.0-20231127182322-b307cd553661 h1:FepOBzJ0GXm8t0su67ln2wAZjbQ6RxQGZDnzuLcrUTI= +k8s.io/utils v0.0.0-20231127182322-b307cd553661/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= diff --git a/src/tools/csi-kata-directvolume/internal/endpoint.go b/src/tools/csi-kata-directvolume/internal/endpoint.go new file mode 100644 index 0000000000..6e19fb0707 --- /dev/null +++ b/src/tools/csi-kata-directvolume/internal/endpoint.go @@ -0,0 +1,51 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package endpoint + +import ( + "fmt" + "net" + "os" + "strings" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func Parse(ep string) (string, string, error) { + if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") { + s := strings.SplitN(ep, "://", 2) + if s[1] != "" { + return s[0], s[1], nil + } + return "", "", status.Error(codes.InvalidArgument, fmt.Sprintf("Invalid endpoint: %v", ep)) + } + + return "unix", ep, nil +} + +func Listen(endpoint string) (net.Listener, func(), error) { + proto, addr, err := Parse(endpoint) + if err != nil { + return nil, nil, err + } + + cleanup := func() {} + if proto == "unix" { + addr = "/" + addr + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { + return nil, nil, status.Error(codes.Internal, fmt.Sprintf("%s: %q", addr, err)) + } + cleanup = func() { + os.Remove(addr) + } + } + + l, err := net.Listen(proto, addr) + return l, cleanup, err +} diff --git a/src/tools/csi-kata-directvolume/pkg/directvolume/controllerserver.go b/src/tools/csi-kata-directvolume/pkg/directvolume/controllerserver.go new file mode 100644 index 0000000000..7ee6be4d71 --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/directvolume/controllerserver.go @@ -0,0 +1,315 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package directvolume + +import ( + "fmt" + "strings" + + "github.com/golang/protobuf/ptypes/wrappers" + "github.com/pborman/uuid" + "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/container-storage-interface/spec/lib/go/csi" + "k8s.io/klog/v2" + + "kata-containers/csi-kata-directvolume/pkg/utils" +) + +func (dv *directVolume) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, finalErr error) { + if err := dv.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { + klog.V(3).Infof("invalid create volume req: %v", req) + return nil, err + } + + if len(req.GetName()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Name missing in request") + } + caps := req.GetVolumeCapabilities() + if caps == nil { + return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request") + } + klog.Infof("createVolume with request: %+v", req) + + dv.mutex.Lock() + defer dv.mutex.Unlock() + + capacity := int64(req.GetCapacityRange().GetRequiredBytes()) + topologies := []*csi.Topology{} + if dv.config.EnableTopology { + topologies = append(topologies, &csi.Topology{Segments: map[string]string{TopologyKeyNode: dv.config.NodeID}}) + } + + volumeCtx := make(map[string]string) + volumeCtx[utils.IsDirectVolume] = "False" + + for key, value := range req.GetParameters() { + switch strings.ToLower(key) { + case utils.KataContainersDirectVolumeType: + if value == utils.DirectVolumeTypeName { + volumeCtx[utils.IsDirectVolume] = "True" + } + case utils.KataContainersDirectFsType: + volumeCtx[utils.KataContainersDirectFsType] = value + default: + continue + } + } + + contentSrc := req.GetVolumeContentSource() + + // Need to check for already existing volume name, and if found + // check for the requested capacity and already allocated capacity + // If err is nil, it means the volume with the same name already exists + // need to check if the size of existing volume is the same as in new + // request + if exVol, err := dv.state.GetVolumeByName(req.GetName()); err == nil { + if exVol.VolSize < capacity { + return nil, status.Errorf(codes.AlreadyExists, "Volume with the same name: %s but with different size already exist", req.GetName()) + } + + if contentSrc != nil { + volumeSource := req.VolumeContentSource + switch volumeSource.Type.(type) { + case *csi.VolumeContentSource_Volume: + if volumeSource.GetVolume() != nil && exVol.ParentVolID != volumeSource.GetVolume().GetVolumeId() { + return nil, status.Error(codes.AlreadyExists, "existing volume source volume id not matching") + } + default: + return nil, status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volumeSource) + } + } + + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: exVol.VolID, + CapacityBytes: int64(exVol.VolSize), + VolumeContext: volumeCtx, + ContentSource: contentSrc, + AccessibleTopology: topologies, + }, + }, nil + } + + volumeID := uuid.NewUUID().String() + kind := volumeCtx[storageKind] + + vol, err := dv.createVolume(volumeID, req.GetName(), capacity, kind) + if err != nil { + klog.Errorf("created volume %s at path %s failed with error: %v", vol.VolID, vol.VolPath, err.Error()) + return nil, err + } + klog.Infof("created volume %s at path %s", vol.VolID, vol.VolPath) + + if contentSrc != nil { + path := dv.getVolumePath(volumeID) + volumeSource := req.VolumeContentSource + switch volumeSource.Type.(type) { + case *csi.VolumeContentSource_Volume: + if srcVolume := volumeSource.GetVolume(); srcVolume != nil { + err = dv.loadFromVolume(capacity, srcVolume.GetVolumeId(), path) + vol.ParentVolID = srcVolume.GetVolumeId() + } + default: + err = status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volumeSource) + } + + if err != nil { + klog.V(4).Infof("VolumeSource error: %v", err) + if delErr := dv.deleteVolume(volumeID); delErr != nil { + klog.Infof("deleting direct volume %v failed: %v", volumeID, delErr) + } + return nil, err + } + klog.Infof("successfully populated volume %s", vol.VolID) + } + + volumeCtx[utils.DirectVolumeName] = req.GetName() + volumeCtx[utils.CapabilityInBytes] = fmt.Sprintf("%d", capacity) + + return &csi.CreateVolumeResponse{ + Volume: &csi.Volume{ + VolumeId: volumeID, + CapacityBytes: capacity, + VolumeContext: volumeCtx, + ContentSource: contentSrc, + AccessibleTopology: topologies, + }, + }, nil +} + +func (dv *directVolume) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + if err := dv.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { + klog.V(3).Infof("invalid delete volume req: %v", req) + return nil, err + } + + if len(req.GetVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + + dv.mutex.Lock() + defer dv.mutex.Unlock() + + volId := req.GetVolumeId() + vol, err := dv.state.GetVolumeByID(volId) + if err != nil { + klog.Warningf("Volume ID %s not found: might have already deleted", volId) + return &csi.DeleteVolumeResponse{}, nil + } + + if vol.Attached || !vol.Published.Empty() || !vol.Staged.Empty() { + msg := fmt.Sprintf("Volume '%s' is still used (attached: %v, staged: %v, published: %v) by '%s' node", + vol.VolID, vol.Attached, vol.Staged, vol.Published, vol.NodeID) + klog.Warning(msg) + } + + if err := dv.deleteVolume(volId); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume %v: %v", volId, err)) + } + klog.Infof("volume %v successfully deleted", volId) + + return &csi.DeleteVolumeResponse{}, nil +} + +func (dv *directVolume) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { + return &csi.ControllerGetCapabilitiesResponse{ + Capabilities: dv.getControllerServiceCapabilities(), + }, nil +} + +func (dv *directVolume) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { + if len(req.GetVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty") + } + if len(req.VolumeCapabilities) == 0 { + return nil, status.Error(codes.InvalidArgument, req.VolumeId) + } + + dv.mutex.Lock() + defer dv.mutex.Unlock() + + if _, err := dv.state.GetVolumeByID(req.GetVolumeId()); err != nil { + klog.Warning("Validate volume vapability failed. Volume not found: might have already deleted") + return nil, err + } + + return &csi.ValidateVolumeCapabilitiesResponse{ + Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{ + VolumeContext: req.GetVolumeContext(), + VolumeCapabilities: req.GetVolumeCapabilities(), + Parameters: req.GetParameters(), + }, + }, nil +} + +func (dv *directVolume) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) { + dv.mutex.Lock() + defer dv.mutex.Unlock() + + // Topology and capabilities are irrelevant. We only + // distinguish based on the "kind" parameter, if at all. + // Without configured capacity, we just have the maximum size. + available := dv.config.MaxVolumeSize + if dv.config.Capacity.Enabled() { + // Empty "kind" will return "zero capacity". There is no fallback + // to some arbitrary kind here because in practice it always should + // be set. + kind := req.GetParameters()[storageKind] + quantity := dv.config.Capacity[kind] + allocated := dv.sumVolumeSizes(kind) + available = quantity.Value() - allocated + } + maxVolumeSize := dv.config.MaxVolumeSize + if maxVolumeSize > available { + maxVolumeSize = available + } + + return &csi.GetCapacityResponse{ + AvailableCapacity: available, + MaximumVolumeSize: &wrappers.Int64Value{Value: maxVolumeSize}, + MinimumVolumeSize: &wrappers.Int64Value{Value: 0}, + }, nil +} + +func (dv *directVolume) validateControllerServiceRequest(c csi.ControllerServiceCapability_RPC_Type) error { + if c == csi.ControllerServiceCapability_RPC_UNKNOWN { + return nil + } + + for _, cap := range dv.getControllerServiceCapabilities() { + if c == cap.GetRpc().GetType() { + return nil + } + } + return status.Errorf(codes.InvalidArgument, "unsupported capability %s", c) +} + +func (dv *directVolume) getControllerServiceCapabilities() []*csi.ControllerServiceCapability { + cl := []csi.ControllerServiceCapability_RPC_Type{ + csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + } + + var csc []*csi.ControllerServiceCapability + + for _, cap := range cl { + csc = append(csc, &csi.ControllerServiceCapability{ + Type: &csi.ControllerServiceCapability_Rpc{ + Rpc: &csi.ControllerServiceCapability_RPC{ + Type: cap, + }, + }, + }) + } + + return csc +} + +func (dv *directVolume) ControllerModifyVolume(context.Context, *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) { + return nil, status.Error(codes.Unimplemented, "controllerModifyVolume is not supported") +} + +func (dv *directVolume) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) { + return &csi.ListVolumesResponse{}, nil +} + +func (dv *directVolume) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) { + return &csi.ControllerGetVolumeResponse{}, nil +} + +func (dv *directVolume) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { + + return &csi.ControllerPublishVolumeResponse{}, nil +} + +func (dv *directVolume) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { + + return &csi.ControllerUnpublishVolumeResponse{}, nil +} + +func (dv *directVolume) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { + + return &csi.CreateSnapshotResponse{}, nil +} + +func (dv *directVolume) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { + + return &csi.DeleteSnapshotResponse{}, nil +} + +func (dv *directVolume) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { + + return &csi.ListSnapshotsResponse{}, nil +} + +func (dv *directVolume) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { + + return &csi.ControllerExpandVolumeResponse{}, nil +} diff --git a/src/tools/csi-kata-directvolume/pkg/directvolume/directvolume.go b/src/tools/csi-kata-directvolume/pkg/directvolume/directvolume.go new file mode 100644 index 0000000000..becef40b2f --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/directvolume/directvolume.go @@ -0,0 +1,236 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package directvolume + +import ( + "errors" + "fmt" + "os" + "path" + "path/filepath" + "sync" + + "kata-containers/csi-kata-directvolume/pkg/state" + "kata-containers/csi-kata-directvolume/pkg/utils" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + utilexec "k8s.io/utils/exec" +) + +const ( + // storageKind is the special parameter which requests + // storage of a certain kind (only affects capacity checks). + storageKind = "kind" +) + +type directVolume struct { + mutex sync.Mutex + + config Config + state state.State +} + +type Config struct { + DriverName string + Endpoint string + NodeID string + VendorVersion string + MaxVolumeSize int64 + Capacity utils.Capacity + ShowVersion bool + EnableAttach bool + EnableTopology bool + + StateDir string + VolumeDevices map[string]string + StoragePath string + IsDirectVolume bool + safeMounter *utils.SafeMountFormater +} + +func NewDirectVolumeDriver(cfg Config) (*directVolume, error) { + if cfg.DriverName == "" { + return nil, errors.New("no driver name provided") + } + + if cfg.NodeID == "" { + return nil, errors.New("no node id provided") + } + + if cfg.Endpoint == "" { + return nil, errors.New("no driver endpoint provided") + } + + if cfg.StoragePath == "" { + return nil, errors.New("no storage path provided") + } + + if err := utils.MakeFullPath(cfg.StoragePath); err != nil { + return nil, fmt.Errorf("failed to mkdir -p storage path %v", cfg.StoragePath) + } + + if err := utils.MakeFullPath(cfg.StateDir); err != nil { + return nil, fmt.Errorf("failed to mkdir -p state dir%v", cfg.StateDir) + } + + if cfg.safeMounter == nil { + safeMnt := utils.NewSafeMountFormater() + cfg.safeMounter = &safeMnt + } + + cfg.VolumeDevices = make(map[string]string) + + klog.Infof("\nDriver: %v \nVersion: %s\nStoragePath: %s\nStatePath: %s\n", cfg.DriverName, cfg.VendorVersion, cfg.StoragePath, cfg.StateDir) + + s, err := state.New(path.Join(cfg.StateDir, "state.json")) + if err != nil { + return nil, err + } + dv := &directVolume{ + config: cfg, + state: s, + } + + return dv, nil +} + +func (dv *directVolume) Run() error { + s := NewNonBlockingGRPCServer() + + // dv itself implements ControllerServer, NodeServer, and IdentityServer. + s.Start(dv.config.Endpoint, dv, dv, dv) + s.Wait() + + return nil +} + +// getVolumePath returns the canonical path for direct volume +func (dv *directVolume) getVolumePath(volID string) string { + return filepath.Join(dv.config.StateDir, volID) +} + +// createVolume allocates capacity, creates the directory for the direct volume, and +// adds the volume to the list. +// It returns the volume path or err if one occurs. That error is suitable as result of a gRPC call. +func (dv *directVolume) createVolume(volID, name string, cap int64, kind string) (*state.Volume, error) { + // Check for maximum available capacity + if cap > dv.config.MaxVolumeSize { + return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", cap, dv.config.MaxVolumeSize) + } + if dv.config.Capacity.Enabled() { + if kind == "" { + // Pick some kind with sufficient remaining capacity. + for k, c := range dv.config.Capacity { + if dv.sumVolumeSizes(k)+cap <= c.Value() { + kind = k + break + } + } + } + + used := dv.sumVolumeSizes(kind) + available := dv.config.Capacity[kind] + if used+cap > available.Value() { + return nil, status.Errorf(codes.ResourceExhausted, "requested capacity %d exceeds remaining capacity for %q, %s out of %s already used", + cap, kind, resource.NewQuantity(used, resource.BinarySI).String(), available.String()) + } + } else if kind != "" { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("capacity tracking disabled, specifying kind %q is invalid", kind)) + } + + path := dv.getVolumePath(volID) + + if err := os.MkdirAll(path, utils.PERM); err != nil { + klog.Errorf("mkdirAll for path %s failed with error: %v", path, err.Error()) + return nil, err + } + + volume := state.Volume{ + VolID: volID, + VolName: name, + VolSize: cap, + VolPath: path, + Kind: kind, + } + + klog.Infof("adding direct volume: %s = %+v", volID, volume) + if err := dv.state.UpdateVolume(volume); err != nil { + return nil, err + } + + return &volume, nil +} + +// deleteVolume deletes the directory for the direct volume. +func (dv *directVolume) deleteVolume(volID string) error { + klog.V(4).Infof("starting to delete direct volume: %s", volID) + + vol, err := dv.state.GetVolumeByID(volID) + if err != nil { + klog.Warning("deleteVolume with Volume not found.") + // Return OK if the volume is not found. + return nil + } + + path := dv.getVolumePath(volID) + if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { + return err + } + if err := dv.state.DeleteVolume(volID); err != nil { + return err + } + klog.V(4).Infof("deleted direct volume: %s = %+v", volID, vol) + + return nil +} + +func (dv *directVolume) sumVolumeSizes(kind string) (sum int64) { + for _, volume := range dv.state.GetVolumes() { + if volume.Kind == kind { + sum += volume.VolSize + } + } + return +} + +// loadFromVolume populates the given destPath with data from the srcVolumeID +func (dv *directVolume) loadFromVolume(size int64, srcVolumeId, destPath string) error { + directVolume, err := dv.state.GetVolumeByID(srcVolumeId) + if err != nil { + klog.Error("loadFromVolume failed with get volume by ID error Volume not found") + return err + } + if directVolume.VolSize > size { + return status.Errorf(codes.InvalidArgument, "volume %v size %v is greater than requested volume size %v", srcVolumeId, directVolume.VolSize, size) + } + + return loadFromPersitStorage(directVolume, destPath) +} + +func loadFromPersitStorage(directVolume state.Volume, destPath string) error { + srcPath := directVolume.VolPath + isEmpty, err := utils.IsPathEmpty(srcPath) + if err != nil { + return fmt.Errorf("failed verification check of source direct volume %v: %w", directVolume.VolID, err) + } + + // If the source direct volume is empty it's a noop and we just move along, otherwise the cp call will + // fail with a a file stat error DNE + if !isEmpty { + args := []string{"-a", srcPath + "/.", destPath + "/"} + executor := utilexec.New() + out, err := executor.Command("cp", args...).CombinedOutput() + if err != nil { + return fmt.Errorf("failed pre-populate data from volume %v: %s: %w", directVolume.VolID, out, err) + } + } + return nil +} diff --git a/src/tools/csi-kata-directvolume/pkg/directvolume/identityserver.go b/src/tools/csi-kata-directvolume/pkg/directvolume/identityserver.go new file mode 100644 index 0000000000..7acd2d7b99 --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/directvolume/identityserver.go @@ -0,0 +1,61 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package directvolume + +import ( + "github.com/container-storage-interface/spec/lib/go/csi" + "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog/v2" +) + +func (dv *directVolume) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) { + klog.V(5).Infof("Using default GetPluginInfo") + + if dv.config.DriverName == "" { + return nil, status.Error(codes.Unavailable, "Driver name not configured") + } + + if dv.config.VendorVersion == "" { + return nil, status.Error(codes.Unavailable, "Driver is missing version") + } + + return &csi.GetPluginInfoResponse{ + Name: dv.config.DriverName, + VendorVersion: dv.config.VendorVersion, + }, nil +} + +func (dv *directVolume) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) { + return &csi.ProbeResponse{}, nil +} + +func (dv *directVolume) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { + klog.V(5).Infof("Using default capabilities") + caps := []*csi.PluginCapability{ + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, + }, + }, + }, + } + if dv.config.EnableTopology { + caps = append(caps, &csi.PluginCapability{ + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, + }, + }, + }) + } + + return &csi.GetPluginCapabilitiesResponse{Capabilities: caps}, nil +} diff --git a/src/tools/csi-kata-directvolume/pkg/directvolume/nodeserver.go b/src/tools/csi-kata-directvolume/pkg/directvolume/nodeserver.go new file mode 100644 index 0000000000..9847c38722 --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/directvolume/nodeserver.go @@ -0,0 +1,390 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package directvolume + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + + "kata-containers/csi-kata-directvolume/pkg/utils" + + "github.com/container-storage-interface/spec/lib/go/csi" + "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog/v2" +) + +const ( + TopologyKeyNode = "topology.directvolume.csi/node" +) + +func (dv *directVolume) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + klog.V(4).Infof("node publish volume with request %v", req) + + // Check arguments + if req.GetVolumeCapability() == nil { + return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request") + } + if len(req.GetVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if len(req.GetTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + if !isDirectVolume(req.VolumeContext) { + return nil, status.Errorf(codes.FailedPrecondition, "volume %q is not direct-volume.", req.VolumeId) + } + + dv.mutex.Lock() + defer dv.mutex.Unlock() + + targetPath := req.GetTargetPath() + if req.GetVolumeCapability().GetMount() == nil { + return nil, status.Error(codes.InvalidArgument, "It Must be mount access type") + } + + fsType := req.VolumeContext[utils.KataContainersDirectFsType] + if len(fsType) == 0 { + fsType = utils.DefaultFsType + klog.Warningf("volume context has no fsType, set default fstype %v\n", fsType) + } + + volType := req.VolumeContext[utils.KataContainersDirectVolumeType] + if len(volType) == 0 { + volType = "directvol" + klog.Warningf("volume context has no volumeType, set default volume type %v\n", volType) + } + + readOnly := req.GetReadonly() + volumeID := req.GetVolumeId() + attrib := req.GetVolumeContext() + + devicePath := dv.config.VolumeDevices[volumeID] + klog.Infof("target %v\nfstype %v\ndevice %v\nreadonly %v\nvolumeID %v\n", + targetPath, fsType, devicePath, readOnly, volumeID) + + options := []string{"bind"} + if readOnly { + options = append(options, "ro") + } else { + options = append(options, "rw") + } + + stagingTargetPath := req.GetStagingTargetPath() + + if canDoMnt, err := utils.CanDoBindmount(dv.config.safeMounter, targetPath); err != nil { + return nil, err + } else if !canDoMnt { + klog.V(3).Infof("cannot do bindmount target path: %s", targetPath) + return &csi.NodePublishVolumeResponse{}, nil + } + + if err := dv.config.safeMounter.DoBindmount(stagingTargetPath, targetPath, "", options); err != nil { + errMsg := fmt.Sprintf("failed to bindmount device: %s at %s: %s", stagingTargetPath, targetPath, err.Error()) + klog.Infof("do bindmount failed: %v.", errMsg) + return nil, status.Error(codes.Aborted, errMsg) + } + + // kata-containers DirectVolume add + mountInfo := utils.MountInfo{ + VolumeType: volType, + Device: devicePath, + FsType: fsType, + Metadata: attrib, + Options: options, + } + if err := utils.AddDirectVolume(targetPath, mountInfo); err != nil { + klog.Errorf("add direct volume with source %s and mountInfo %v failed", targetPath, mountInfo) + return nil, err + } + klog.Infof("add direct volume successfully.") + + volInStat, err := dv.state.GetVolumeByID(volumeID) + if err != nil { + capInt64, _ := strconv.ParseInt(req.VolumeContext[utils.CapabilityInBytes], 10, 64) + volName := req.VolumeContext[utils.DirectVolumeName] + kind := req.VolumeContext[storageKind] + vol, err := dv.createVolume(volumeID, volName, capInt64, kind) + if err != nil { + return nil, err + } + vol.NodeID = dv.config.NodeID + vol.Published.Add(targetPath) + klog.Infof("create volume %v successfully", vol) + + return &csi.NodePublishVolumeResponse{}, nil + } + + volInStat.NodeID = dv.config.NodeID + volInStat.Published.Add(targetPath) + if err := dv.state.UpdateVolume(volInStat); err != nil { + return nil, err + } + + klog.Infof("directvolume: volume %s has been published.", targetPath) + + return &csi.NodePublishVolumeResponse{}, nil +} + +func (dv *directVolume) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + + // Check arguments + if len(req.GetVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + if len(req.GetTargetPath()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + targetPath := req.GetTargetPath() + volumeID := req.GetVolumeId() + + // Lock before acting on global state. A production-quality + // driver might use more fine-grained locking. + dv.mutex.Lock() + defer dv.mutex.Unlock() + + // Unmount only if the target path is really a mount point. + if isMnt, err := dv.config.safeMounter.IsMountPoint(targetPath); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("check target path: %v", err)) + } else if isMnt { + // Unmounting the image or filesystem. + err = dv.config.safeMounter.Unmount(targetPath) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("unmount target path: %v", err)) + } + } + + // Delete the mount point. + // Does not return error for non-existent path, repeated calls OK for idempotency. + if err := os.RemoveAll(targetPath); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("remove target path: %v", err)) + } + + if err := utils.RemoveDirectVolume(targetPath); err != nil { + klog.V(4).Infof("remove direct volume failed.") + return nil, status.Error(codes.Internal, fmt.Sprintf("remove direct volume failed: %v", err)) + } + + klog.Infof("direct volume %s has been cleaned up.", targetPath) + + vol, err := dv.state.GetVolumeByID(volumeID) + if err != nil { + klog.Warningf("volume id %s not found in volume list, nothing to do.", volumeID) + return &csi.NodeUnpublishVolumeResponse{}, nil + } + + if !vol.Published.Has(targetPath) { + klog.V(4).Infof("volume %q is not published at %q, nothing to do.", volumeID, targetPath) + return &csi.NodeUnpublishVolumeResponse{}, nil + } + + vol.Published.Remove(targetPath) + if err := dv.state.UpdateVolume(vol); err != nil { + return nil, err + } + klog.Infof("volume %s has been unpublished.", targetPath) + + return &csi.NodeUnpublishVolumeResponse{}, nil +} + +func isDirectVolume(VolumeCtx map[string]string) bool { + return VolumeCtx[utils.IsDirectVolume] == "True" +} + +func (dv *directVolume) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + klog.V(4).Infof("NodeStageVolumeRequest with request %v", req) + + volumeID := req.GetVolumeId() + // Check arguments + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + stagingTargetPath := req.GetStagingTargetPath() + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + if req.GetVolumeCapability() == nil { + return nil, status.Error(codes.InvalidArgument, "Volume Capability missing in request") + } + + if !isDirectVolume(req.VolumeContext) { + return nil, status.Errorf(codes.FailedPrecondition, "volume %q is not direct-volume.", req.VolumeId) + } + + dv.mutex.Lock() + defer dv.mutex.Unlock() + + capacityInBytes := req.VolumeContext[utils.CapabilityInBytes] + devicePath, err := utils.CreateDirectBlockDevice(volumeID, capacityInBytes, dv.config.StoragePath) + if err != nil { + errMsg := status.Errorf(codes.Internal, "setup storage for volume '%s' failed", volumeID) + return &csi.NodeStageVolumeResponse{}, errMsg + } + + // /full_path_on_host/VolumeId/ + deviceUpperPath := filepath.Dir(*devicePath) + if canMnt, err := utils.CanDoBindmount(dv.config.safeMounter, stagingTargetPath); err != nil { + return nil, err + } else if !canMnt { + klog.Infof("staging target path: %s already mounted", stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil + } + + options := []string{"bind"} + if err := dv.config.safeMounter.DoBindmount(deviceUpperPath, stagingTargetPath, "", options); err != nil { + klog.Errorf("safe mounter: %v do bind mount %v failed, with error: %v", deviceUpperPath, stagingTargetPath, err.Error()) + return nil, err + } + + fsType, ok := req.VolumeContext[utils.KataContainersDirectFsType] + if !ok { + klog.Infof("fstype not set, default fstype will be set: %v\n", utils.DefaultFsType) + fsType = utils.DefaultFsType + } + + if err := dv.config.safeMounter.SafeFormatWithFstype(*devicePath, fsType, options); err != nil { + return nil, err + } + + dv.config.VolumeDevices[volumeID] = *devicePath + + klog.Infof("directvolume: volume %s has been staged.", stagingTargetPath) + + volInStat, err := dv.state.GetVolumeByID(req.VolumeId) + if err != nil { + capInt64, _ := strconv.ParseInt(req.VolumeContext[utils.CapabilityInBytes], 10, 64) + volName := req.VolumeContext[utils.DirectVolumeName] + kind := req.VolumeContext[storageKind] + vol, err := dv.createVolume(volumeID, volName, capInt64, kind) + if err != nil { + return nil, err + } + vol.Staged.Add(stagingTargetPath) + + klog.Infof("create volume %v successfully", vol) + return &csi.NodeStageVolumeResponse{}, nil + } + + if volInStat.Staged.Has(stagingTargetPath) { + klog.V(4).Infof("Volume %q is already staged at %q, nothing to do.", req.VolumeId, stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil + } + + if !volInStat.Staged.Empty() { + return nil, status.Errorf(codes.FailedPrecondition, "volume %q is already staged at %v", req.VolumeId, volInStat.Staged) + } + + volInStat.Staged.Add(stagingTargetPath) + if err := dv.state.UpdateVolume(volInStat); err != nil { + return nil, err + } + + return &csi.NodeStageVolumeResponse{}, nil +} + +func (dv *directVolume) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + // Check arguments + volumeID := req.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request") + } + stagingTargetPath := req.GetStagingTargetPath() + if stagingTargetPath == "" { + return nil, status.Error(codes.InvalidArgument, "Target path missing in request") + } + + dv.mutex.Lock() + defer dv.mutex.Unlock() + + // Unmount only if the target path is really a mount point. + if isMnt, err := dv.config.safeMounter.IsMountPoint(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("check staging target path: %v", err)) + } else if isMnt { + err = dv.config.safeMounter.Unmount(stagingTargetPath) + if err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("unmount staging target path: %v", err)) + } + } + + if deviceUpperPath, err := utils.GetStoragePath(dv.config.StoragePath, volumeID); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("get device UpperPath %s failed: %v", deviceUpperPath, err)) + } else { + if err = os.RemoveAll(deviceUpperPath); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("remove device upper path: %s failed %v", deviceUpperPath, err.Error())) + } + klog.Infof("direct volume %s has been removed.", deviceUpperPath) + } + + if err := os.RemoveAll(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, fmt.Sprintf("remove staging target path: %v", err)) + } + + klog.Infof("directvolume: volume %s has been unstaged.", stagingTargetPath) + vol, err := dv.state.GetVolumeByID(volumeID) + if err != nil { + klog.Warning("Volume not found: might have already deleted") + return &csi.NodeUnstageVolumeResponse{}, nil + } + + if !vol.Staged.Has(stagingTargetPath) { + klog.V(4).Infof("Volume %q is not staged at %q, nothing to do.", volumeID, stagingTargetPath) + return &csi.NodeUnstageVolumeResponse{}, nil + } + + if !vol.Published.Empty() { + return nil, status.Errorf(codes.Internal, "volume %q is still published at %q on node %q", vol.VolID, vol.Published, vol.NodeID) + } + + vol.Staged.Remove(stagingTargetPath) + if err := dv.state.UpdateVolume(vol); err != nil { + return nil, err + } + + return &csi.NodeUnstageVolumeResponse{}, nil +} + +func (dv *directVolume) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { + resp := &csi.NodeGetInfoResponse{ + NodeId: dv.config.NodeID, + } + + if dv.config.EnableTopology { + resp.AccessibleTopology = &csi.Topology{ + Segments: map[string]string{TopologyKeyNode: dv.config.NodeID}, + } + } + + return resp, nil +} + +func (dv *directVolume) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { + caps := []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + }, + } + + return &csi.NodeGetCapabilitiesResponse{Capabilities: caps}, nil +} + +func (dv *directVolume) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + return &csi.NodeGetVolumeStatsResponse{}, nil +} + +func (dv *directVolume) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) { + + return &csi.NodeExpandVolumeResponse{}, nil +} diff --git a/src/tools/csi-kata-directvolume/pkg/directvolume/server.go b/src/tools/csi-kata-directvolume/pkg/directvolume/server.go new file mode 100644 index 0000000000..1bf3fb8ad0 --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/directvolume/server.go @@ -0,0 +1,96 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package directvolume + +import ( + "sync" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "k8s.io/klog/v2" + + endpoint "kata-containers/csi-kata-directvolume/internal" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/golang/glog" + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" +) + +func NewNonBlockingGRPCServer() *nonBlockingGRPCServer { + return &nonBlockingGRPCServer{} +} + +// NonBlocking server +type nonBlockingGRPCServer struct { + wg sync.WaitGroup + server *grpc.Server + cleanup func() +} + +func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + + s.wg.Add(1) + + go s.serve(endpoint, ids, cs, ns) +} + +func (s *nonBlockingGRPCServer) Wait() { + s.wg.Wait() +} + +func (s *nonBlockingGRPCServer) Stop() { + s.server.GracefulStop() + s.cleanup() +} + +func (s *nonBlockingGRPCServer) ForceStop() { + s.server.Stop() + s.cleanup() +} + +func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { + listener, cleanup, err := endpoint.Listen(ep) + if err != nil { + klog.Fatalf("Failed to listen: %v", err) + } + + opts := []grpc.ServerOption{ + grpc.UnaryInterceptor(logGRPC), + } + server := grpc.NewServer(opts...) + s.server = server + s.cleanup = cleanup + + if ids != nil { + csi.RegisterIdentityServer(server, ids) + } + if cs != nil { + csi.RegisterControllerServer(server, cs) + } + if ns != nil { + csi.RegisterNodeServer(server, ns) + } + + klog.Infof("Listening for connections on address: %#v", listener.Addr()) + + if err := server.Serve(listener); err != nil { + klog.Fatalf("Failed to server: %v", err) + } +} + +func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + glog.V(3).Infof("GRPC call: %s", info.FullMethod) + glog.V(5).Infof("GRPC request: %+v", protosanitizer.StripSecrets(req)) + resp, err := handler(ctx, req) + if err != nil { + glog.Errorf("GRPC error: %v", err) + } else { + glog.V(5).Infof("GRPC response: %+v", protosanitizer.StripSecrets(resp)) + } + return resp, err +} diff --git a/src/tools/csi-kata-directvolume/pkg/state/state.go b/src/tools/csi-kata-directvolume/pkg/state/state.go new file mode 100644 index 0000000000..2ebdcd37ce --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/state/state.go @@ -0,0 +1,161 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +// Package state manages the internal state of the driver which needs to be maintained +// across driver restarts. +package state + +import ( + "encoding/json" + "errors" + "os" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type Volume struct { + VolName string + VolID string + VolSize int64 + VolPath string + // VolAccessType AccessType + ParentVolID string + ParentSnapID string + NodeID string + Kind string + ReadOnlyAttach bool + Attached bool + // Staged contains the staging target path at which the volume + // was staged. A set of paths is used for consistency + // with Published. + Staged Strings + // Published contains the target paths where the volume + // was published. + Published Strings +} + +// State is the interface that the rest of the code has to use to +// access and change state. All error messages contain gRPC +// status codes and can be returned without wrapping. +type State interface { + // GetVolumeByID retrieves a volume by its unique ID or returns + // an error including that ID when not found. + GetVolumeByID(volID string) (Volume, error) + + // GetVolumeByName retrieves a volume by its name or returns + // an error including that name when not found. + GetVolumeByName(volName string) (Volume, error) + + // GetVolumes returns all currently existing volumes. + GetVolumes() []Volume + + // UpdateVolume updates the existing direct volume, + // identified by its volume ID, or adds it if it does + // not exist yet. + UpdateVolume(volume Volume) error + + // DeleteVolume deletes the volume with the given + // volume ID. It is not an error when such a volume + // does not exist. + DeleteVolume(volID string) error +} + +type resources struct { + Volumes []Volume +} + +type state struct { + resources + statefilePath string +} + +var _ State = &state{} + +// New retrieves the complete state of the driver from the file if given +// and then ensures that all changes are mirrored immediately in the +// given file. If not given, the initial state is empty and changes +// are not saved. +func New(statefilePath string) (State, error) { + s := &state{ + statefilePath: statefilePath, + } + + return s, s.restore() +} + +func (s *state) dump() error { + data, err := json.Marshal(&s.resources) + if err != nil { + return status.Errorf(codes.Internal, "error encoding volumes: %v", err) + } + if err := os.WriteFile(s.statefilePath, data, 0600); err != nil { + return status.Errorf(codes.Internal, "error writing state file: %v", err) + } + return nil +} + +func (s *state) restore() error { + s.Volumes = nil + data, err := os.ReadFile(s.statefilePath) + switch { + case errors.Is(err, os.ErrNotExist): + // Nothing to do. + return nil + case err != nil: + return status.Errorf(codes.Internal, "error reading state file: %v", err) + } + if err := json.Unmarshal(data, &s.resources); err != nil { + return status.Errorf(codes.Internal, "error encoding volumes from state file %q: %v", s.statefilePath, err) + } + return nil +} + +func (s *state) GetVolumeByID(volID string) (Volume, error) { + for _, volume := range s.Volumes { + if volume.VolID == volID { + return volume, nil + } + } + return Volume{}, status.Errorf(codes.NotFound, "volume id %s does not exist in the volumes list", volID) +} + +func (s *state) GetVolumeByName(volName string) (Volume, error) { + for _, volume := range s.Volumes { + if volume.VolName == volName { + return volume, nil + } + } + return Volume{}, status.Errorf(codes.NotFound, "volume name %s does not exist in the volumes list", volName) +} + +func (s *state) GetVolumes() []Volume { + volumes := make([]Volume, len(s.Volumes)) + copy(volumes, s.Volumes) + return volumes +} + +func (s *state) UpdateVolume(update Volume) error { + for i, volume := range s.Volumes { + if volume.VolID == update.VolID { + s.Volumes[i] = update + return s.dump() + } + } + s.Volumes = append(s.Volumes, update) + return s.dump() +} + +func (s *state) DeleteVolume(volID string) error { + for i, volume := range s.Volumes { + if volume.VolID == volID { + s.Volumes = append(s.Volumes[:i], s.Volumes[i+1:]...) + return s.dump() + } + } + return nil +} diff --git a/src/tools/csi-kata-directvolume/pkg/state/state_test.go b/src/tools/csi-kata-directvolume/pkg/state/state_test.go new file mode 100644 index 0000000000..2c3a5d533b --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/state/state_test.go @@ -0,0 +1,48 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package state + +import ( + "path" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestVolumes(t *testing.T) { + tmp := t.TempDir() + statefileName := path.Join(tmp, "state.json") + + s, err := New(statefileName) + require.NoError(t, err, "construct state") + require.Empty(t, s.GetVolumes(), "initial volumes") + + _, err = s.GetVolumeByID("foo") + require.Equal(t, codes.NotFound, status.Convert(err).Code(), "GetVolumeByID of non-existent volume") + require.Contains(t, status.Convert(err).Message(), "foo") + + err = s.UpdateVolume(Volume{VolID: "foo", VolName: "bar"}) + require.NoError(t, err, "add volume") + + s, err = New(statefileName) + require.NoError(t, err, "reconstruct state") + _, err = s.GetVolumeByID("foo") + require.NoError(t, err, "get existing volume by ID") + _, err = s.GetVolumeByName("bar") + require.NoError(t, err, "get existing volume by name") + + err = s.DeleteVolume("foo") + require.NoError(t, err, "delete existing volume") + + err = s.DeleteVolume("foo") + require.NoError(t, err, "delete non-existent volume") + + require.Empty(t, s.GetVolumes(), "final volumes") +} diff --git a/src/tools/csi-kata-directvolume/pkg/state/strings.go b/src/tools/csi-kata-directvolume/pkg/state/strings.go new file mode 100644 index 0000000000..c063388eaa --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/state/strings.go @@ -0,0 +1,42 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package state + +// Strings is an ordered set of strings with helper functions for +// adding, searching and removing entries. +type Strings []string + +// Add appends at the end. +func (s *Strings) Add(str string) { + *s = append(*s, str) +} + +// Has checks whether the string is already present. +func (s *Strings) Has(str string) bool { + for _, str2 := range *s { + if str == str2 { + return true + } + } + return false +} + +// Empty returns true if the list is empty. +func (s *Strings) Empty() bool { + return len(*s) == 0 +} + +// Remove removes the first matched target of the string, if present. +func (s *Strings) Remove(str string) { + for i, str2 := range *s { + if str == str2 { + *s = append((*s)[:i], (*s)[i+1:]...) + return + } + } +} diff --git a/src/tools/csi-kata-directvolume/pkg/utils/capacity.go b/src/tools/csi-kata-directvolume/pkg/utils/capacity.go new file mode 100644 index 0000000000..7c84ef3784 --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/utils/capacity.go @@ -0,0 +1,62 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package utils + +import ( + "errors" + "flag" + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/api/resource" +) + +// Capacity simulates linear storage of certain types ("fast", +// "slow"). To calculate the amount of allocated space, the size of +// all currently existing volumes of the same kind is summed up. +// +// Available capacity is configurable with a command line flag +// -capacity = where is a string and +// is a quantity (1T, 1Gi). More than one of those +// flags can be used. +// +// The underlying map will be initialized if needed by Set, +// which makes it possible to define and use a Capacity instance +// without explicit initialization (`var capacity Capacity` or as +// member in a struct). +type Capacity map[string]resource.Quantity + +// Set is an implementation of flag.Value.Set. +func (c *Capacity) Set(arg string) error { + parts := strings.SplitN(arg, "=", 2) + if len(parts) != 2 { + return errors.New("must be of format =") + } + quantity, err := resource.ParseQuantity(parts[1]) + if err != nil { + return err + } + + // We overwrite any previous value. + if *c == nil { + *c = Capacity{} + } + (*c)[parts[0]] = quantity + return nil +} + +func (c *Capacity) String() string { + return fmt.Sprintf("%v", map[string]resource.Quantity(*c)) +} + +var _ flag.Value = &Capacity{} + +// Enabled returns true if capacities are configured. +func (c *Capacity) Enabled() bool { + return len(*c) > 0 +} diff --git a/src/tools/csi-kata-directvolume/pkg/utils/direct_volume.go b/src/tools/csi-kata-directvolume/pkg/utils/direct_volume.go new file mode 100644 index 0000000000..73cdcbc784 --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/utils/direct_volume.go @@ -0,0 +1,65 @@ +// Copyright (c) 2022 Databricks Inc. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +package utils + +import ( + b64 "encoding/base64" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + mountInfoFileName = "mountInfo.json" + kataDirectVolumeRootPath = "/run/kata-containers/shared/direct-volumes" +) + +// MountInfo contains the information needed by Kata to consume a host block device and mount it as a filesystem inside the guest VM. +type MountInfo struct { + // The type of the volume (ie. block) + VolumeType string `json:"volume-type"` + // The device backing the volume. + Device string `json:"device"` + // The filesystem type to be mounted on the volume. + FsType string `json:"fstype"` + // Additional metadata to pass to the agent regarding this volume. + Metadata map[string]string `json:"metadata,omitempty"` + // Additional mount options. + Options []string `json:"options,omitempty"` +} + +// Add writes the mount info of a direct volume into a filesystem path known to Kata Container. +func Add(volumePath string, mountInfo string) error { + volumeDir := filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath))) + stat, err := os.Stat(volumeDir) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return err + } + if err := os.MkdirAll(volumeDir, 0700); err != nil { + return err + } + } + if stat != nil && !stat.IsDir() { + return status.Error(codes.Unknown, fmt.Sprintf("%s should be a directory", volumeDir)) + } + + var deserialized MountInfo + if err := json.Unmarshal([]byte(mountInfo), &deserialized); err != nil { + return err + } + + return os.WriteFile(filepath.Join(volumeDir, mountInfoFileName), []byte(mountInfo), 0600) +} + +// Remove deletes the direct volume path including all the files inside it. +func Remove(volumePath string) error { + return os.RemoveAll(filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath)))) +} diff --git a/src/tools/csi-kata-directvolume/pkg/utils/safe_formater.go b/src/tools/csi-kata-directvolume/pkg/utils/safe_formater.go new file mode 100644 index 0000000000..654a574d4a --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/utils/safe_formater.go @@ -0,0 +1,132 @@ +// +// Copyright 2017 The Kubernetes Authors. +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package utils + +import ( + "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog/v2" + mountutils "k8s.io/mount-utils" + utilexec "k8s.io/utils/exec" +) + +const ( + // 'fsck' found errors and corrected them + fsckErrorsCorrected = 1 + // 'fsck' found errors but exited without correcting them + fsckErrorsUncorrected = 4 +) + +type SafeMountFormater struct { + *mountutils.SafeFormatAndMount +} + +func NewSafeMountFormater() SafeMountFormater { + return SafeMountFormater{ + &mountutils.SafeFormatAndMount{ + Interface: mountutils.New(""), + Exec: utilexec.New(), + }, + } +} + +func (mounter *SafeMountFormater) IsNotSafeMountPoint(filePath string) (bool, error) { + isMnt, err := mounter.IsMountPoint(filePath) + if err != nil { + return true, err + } + return !isMnt, nil +} + +func (mounter *SafeMountFormater) DoBindmount(sourcePath, targetPath, fsType string, options []string) error { + if err := mounter.Mount(sourcePath, targetPath, fsType, options); err != nil { + errMsg := fmt.Sprintf("failed to mount device: %s at %s: %s", sourcePath, targetPath, err) + return status.Error(codes.Aborted, errMsg) + } + + return nil +} + +// SafeFormatWithFstype uses unix utils to format disk +func (mounter *SafeMountFormater) SafeFormatWithFstype(source string, fstype string, options []string) error { + readOnly := false + for _, option := range options { + if option == "ro" { + readOnly = true + break + } + } + + // Check if the disk is already formatted + existingFormat, err := mounter.GetDiskFormat(source) + if err != nil { + return mountutils.NewMountError(mountutils.GetDiskFormatFailed, "failed to get disk format of disk %s: %v", source, err) + } + + // Use 'ext4' as the default + if len(fstype) == 0 { + fstype = DefaultFsType + } + + if existingFormat == "" { + // Do not attempt to format the disk if mounting as readonly, return an error to reflect this. + if readOnly { + return mountutils.NewMountError(mountutils.UnformattedReadOnly, "cannot mount unformatted disk %s as it is in read-only mode", source) + } + + // Disk is unformatted so format it. + args := []string{source} + if fstype == "ext4" || fstype == "ext3" { + args = []string{ + "-F", // Force flag + "-m0", // Zero blocks reserved for super-user + source, + } + } + + klog.Infof("Disk %q is unformatted, do format with type: %q and options: %v", source, fstype, args) + mkfsCmd := fmt.Sprintf("mkfs.%s", fstype) + if output, err := doSafeCommand(mkfsCmd, args...); err != nil { + detailedErr := fmt.Sprintf("format disk %q failed: type:(%q) errcode:(%v) output:(%v) ", source, fstype, err, string(output)) + klog.Error(detailedErr) + return mountutils.NewMountError(mountutils.FormatFailed, detailedErr) + } + + klog.Infof("Disk successfully formatted (mkfs): %s - %s", fstype, source) + } else { + if fstype != existingFormat { + // Do verify the disk formatted with expected fs type. + return mountutils.NewMountError(mountutils.FilesystemMismatch, err.Error()) + } + + if !readOnly { + // Run check tools on the disk to fix repairable issues, only do this for formatted volumes requested as rw. + klog.V(4).Infof("Checking for issues with fsck on disk: %s", source) + args := []string{"-a", source} + if output, err := doSafeCommand("fsck", args...); err != nil { + ee, isExitError := err.(utilexec.ExitError) + switch { + case err == utilexec.ErrExecutableNotFound: + klog.Warningf("'fsck' not found on system; continuing mount without running 'fsck'.") + case isExitError && ee.ExitStatus() == fsckErrorsCorrected: + klog.Infof("Device %s has errors which were corrected by fsck.", source) + case isExitError && ee.ExitStatus() == fsckErrorsUncorrected: + return mountutils.NewMountError(mountutils.HasFilesystemErrors, "'fsck' found errors on device %s but could not correct them: %s", source, string(output)) + case isExitError && ee.ExitStatus() > fsckErrorsUncorrected: + klog.Infof("`fsck` failed with error %v", string(output)) + default: + klog.Warningf("fsck on device %s failed with error %v", source, err.Error()) + } + } + } + } + + return nil +} diff --git a/src/tools/csi-kata-directvolume/pkg/utils/utils.go b/src/tools/csi-kata-directvolume/pkg/utils/utils.go new file mode 100644 index 0000000000..cdd80147a4 --- /dev/null +++ b/src/tools/csi-kata-directvolume/pkg/utils/utils.go @@ -0,0 +1,228 @@ +// +// Copyright (c) 2023 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// + +package utils + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strconv" + + diskfs "github.com/diskfs/go-diskfs" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "k8s.io/klog/v2" + utilexec "k8s.io/utils/exec" +) + +const ( + KataContainersDirectVolumeType = "katacontainers.direct.volume/volumetype" + KataContainersDirectFsType = "katacontainers.direct.volume/fstype" + DirectVolumeTypeName = "directvol" + IsDirectVolume = "is_directvolume" +) + +const ( + CapabilityInBytes = "capacity_in_bytes" + DirectVolumeName = "direct_volume_name" +) + +const PERM os.FileMode = 0750 +const DefaultFsType = "ext4" + +const ( + KiB int64 = 1024 + MiB int64 = KiB * 1024 + GiB int64 = MiB * 1024 + GiB100 int64 = GiB * 100 + TiB int64 = GiB * 1024 + TiB100 int64 = TiB * 100 +) + +func AddDirectVolume(targetPath string, mountInfo MountInfo) error { + mntArg, err := json.Marshal(&mountInfo) + if err != nil { + klog.Errorf("marshal mount info into bytes failed with error: %+v", err) + return err + } + + return Add(targetPath, string(mntArg)) +} + +func RemoveDirectVolume(targetPath string) error { + return Remove(targetPath) +} + +// storagePath/VolID/directvol.rawdisk +func SetupStoragePath(storagePath, volID string) (*string, error) { + upperDir := filepath.Join(storagePath, volID) + + return MkPathIfNotExit(upperDir) +} + +func MkPathIfNotExit(path string) (*string, error) { + if exist, err := CheckPathExist(path); err != nil { + return nil, errors.New("stat path failed") + } else if !exist { + if err := os.MkdirAll(path, PERM); err != nil { + return nil, errors.New("mkdir all failed.") + } + klog.Infof("mkdir full path successfully") + } + + return &path, nil +} + +func MakeFullPath(path string) error { + stat, err := os.Stat(path) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return errors.New("stat path failed with not exist") + } + if err := os.MkdirAll(path, PERM); err != nil { + return errors.New("mkdir all failed.") + } + } + + if stat != nil && !stat.IsDir() { + return errors.New("path should be a directory") + } + + return nil +} + +// IsPathEmpty is a simple check to determine if the specified directvolume directory +// is empty or not. +func IsPathEmpty(path string) (bool, error) { + f, err := os.Open(path) + if err != nil { + return true, err + } + defer f.Close() + + _, err = f.Readdir(1) + if err == io.EOF { + return true, nil + } + return false, err +} + +func CheckPathExist(path string) (bool, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return false, nil + } else { + return false, err + } + } + + return true, nil +} + +func CanDoBindmount(mounter *SafeMountFormater, targetPath string) (bool, error) { + notMnt, err := mounter.IsNotSafeMountPoint(targetPath) + if err != nil { + if _, err = MkPathIfNotExit(targetPath); err != nil { + return false, err + } else { + notMnt = true + } + } + + return notMnt, nil +} + +func doSafeCommand(rawCmd string, args ...string) ([]byte, error) { + executor := utilexec.New() + + path, err := executor.LookPath(rawCmd) + if err == exec.ErrNotFound { + return []byte{}, status.Error(codes.Internal, fmt.Sprintf("%s executable File not found in $PATH", rawCmd)) + } + + absCmdPath, err := filepath.Abs(path) + if err != nil { + return []byte{}, err + } + + out, err := executor.Command(absCmdPath, args...).CombinedOutput() + if err != nil { + detailedErr := fmt.Sprintf("exec command %v failed with errcode:(%v)", rawCmd, err) + klog.Errorf("do command: %v failed with %v", absCmdPath, detailedErr) + return out, status.Error(codes.Internal, detailedErr) + } + + return out, nil +} + +// storagePath/VolID/directvol.rawdisk +func GetStoragePath(storagePath, volID string) (string, error) { + upperPath := filepath.Join(storagePath, volID) + + return upperPath, nil +} + +// createVolume create the directory for the direct volume. +// It returns the volume path or err if one occurs. +func CreateDirectBlockDevice(volID, capacityInBytesStr, storagePath string) (*string, error) { + capacityInBytes, err := strconv.ParseInt(capacityInBytesStr, 10, 64) + if err != nil { + errMsg := status.Error(codes.Internal, err.Error()) + klog.Errorf("capacity in bytes convert to int failed with error: %v", errMsg) + return nil, errMsg + } + + diskSize := fmt.Sprintf("%dM", capacityInBytes/MiB) + upperDir, err := SetupStoragePath(storagePath, volID) + if err != nil { + klog.Errorf("setup storage path failed with error: %v", err) + return nil, err + } else { + // check the upper path for device exists. + if _, err = os.Stat(*upperDir); err != nil && os.IsNotExist(err) { + return nil, err + } + } + + // storagePath/62a268d9-893a-11ee-97cb-d89d6725e7b0/directvol-rawdisk.2048M + devicePath := filepath.Join(*upperDir, fmt.Sprintf("directvol-rawdisk.%s", diskSize)) + if _, err = os.Stat(devicePath); !os.IsNotExist(err) { + klog.Warning("direct block device exists, just skip creating it.") + return &devicePath, nil + } + + // create raw disk + if _, err = diskfs.Create(devicePath, capacityInBytes, diskfs.Raw, diskfs.SectorSizeDefault); err != nil { + errMsg := fmt.Errorf("diskfs create disk failed: %v", err) + klog.Errorf(errMsg.Error()) + + return nil, errMsg + } + + // Create a block file. + // storagePath/62a268d9-893a-11ee-97cb-d89d6725e7b0/directvol-rawdisk.2048M + if _, err = os.Stat(devicePath); err != nil { + return nil, err + } + + // fallocate -z -l diskSize filePath + fallocateCmd := "fallocate" + // TODO: "-z" to be added + args := []string{"-l", diskSize, devicePath} + if _, err := doSafeCommand(fallocateCmd, args...); err != nil { + klog.Infof("do fallocate %v failed with error(%v)", args, err) + return nil, err + } + + klog.Infof("create backend rawdisk successfully!") + + return &devicePath, nil +} diff --git a/src/tools/csi-kata-directvolume/release-tools/build.make b/src/tools/csi-kata-directvolume/release-tools/build.make new file mode 100644 index 0000000000..23f3e54b62 --- /dev/null +++ b/src/tools/csi-kata-directvolume/release-tools/build.make @@ -0,0 +1,122 @@ +# Copyright 2019 The Kubernetes Authors. +# +# SPDX-License-Identifier: Apache-2.0 +# + +.PHONY: build-% build container-% container push-% push clean test + +# A space-separated list of all commands in the repository, must be +# set in main Makefile of a repository. +# CMDS= + +# This is the default. It can be overridden in the main Makefile after +# including build.make. +REGISTRY_NAME=quay.io/k8scsi + +# Can be set to -mod=vendor to ensure that the "vendor" directory is used. +GOFLAGS_VENDOR= + +# Revision that gets built into each binary via the main.version +# string. Uses the `git describe` output based on the most recent +# version tag with a short revision suffix or, if nothing has been +# tagged yet, just the revision. +# +# Beware that tags may also be missing in shallow clones as done by +# some CI systems (like TravisCI, which pulls only 50 commits). +REV=$(shell git describe --long --tags --match='v*' --dirty 2>/dev/null || git rev-list -n1 HEAD) + +# A space-separated list of image tags under which the current build is to be pushed. +# Determined dynamically. +IMAGE_TAGS= + +# A "canary" image gets built if the current commit is the head of the remote "master" branch. +# That branch does not exist when building some other branch in TravisCI. +IMAGE_TAGS+=$(shell if [ "$$(git rev-list -n1 HEAD)" = "$$(git rev-list -n1 origin/master 2>/dev/null)" ]; then echo "canary"; fi) + +# A "X.Y.Z-canary" image gets built if the current commit is the head of a "origin/release-X.Y.Z" branch. +# The actual suffix does not matter, only the "release-" prefix is checked. +IMAGE_TAGS+=$(shell git branch -r --points-at=HEAD | grep 'origin/release-' | grep -v -e ' -> ' | sed -e 's;.*/release-\(.*\);\1-canary;') + +# A release image "vX.Y.Z" gets built if there is a tag of that format for the current commit. +# --abbrev=0 suppresses long format, only showing the closest tag. +IMAGE_TAGS+=$(shell tagged="$$(git describe --tags --match='v*' --abbrev=0)"; if [ "$$tagged" ] && [ "$$(git rev-list -n1 HEAD)" = "$$(git rev-list -n1 $$tagged)" ]; then echo $$tagged; fi) + +# Images are named after the command contained in them. +IMAGE_NAME=$(REGISTRY_NAME)/$* + +ifdef V +# Adding "-alsologtostderr" assumes that all test binaries contain glog. This is not guaranteed. +TESTARGS = -v -args -alsologtostderr -v 5 +else +TESTARGS = +endif + +ARCH := $(if $(GOARCH),$(GOARCH),$(shell go env GOARCH)) + +# Specific packages can be excluded from each of the tests below by setting the *_FILTER_CMD variables +# to something like "| grep -v 'github.com/kubernetes-csi/project/pkg/foobar'". See usage below. + +build-%: check-go-version-go + mkdir -p bin + CGO_ENABLED=0 GOOS=linux go build $(GOFLAGS_VENDOR) -a -ldflags '-X main.version=$(REV) -extldflags "-static"' -o ./bin/$* ./cmd/$* + if [ "$$ARCH" = "amd64" ]; then \ + CGO_ENABLED=0 GOOS=windows go build $(GOFLAGS_VENDOR) -a -ldflags '-X main.version=$(REV) -extldflags "-static"' -o ./bin/$*.exe ./cmd/$* ; \ + CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le go build $(GOFLAGS_VENDOR) -a -ldflags '-X main.version=$(REV) -extldflags "-static"' -o ./bin/$*-ppc64le ./cmd/$* ; \ + fi + +container-%: build-% + docker build -t $*:latest -f $(shell if [ -e ./cmd/$*/Dockerfile ]; then echo ./cmd/$*/Dockerfile; else echo Dockerfile; fi) --label revision=$(REV) . + +push-%: container-% + set -ex; \ + push_image () { \ + docker tag $*:latest $(IMAGE_NAME):$$tag; \ + docker push $(IMAGE_NAME):$$tag; \ + }; \ + for tag in $(IMAGE_TAGS); do \ + if [ "$$tag" = "canary" ] || echo "$$tag" | grep -q -e '-canary$$'; then \ + : "creating or overwriting canary image"; \ + push_image; \ + elif docker pull $(IMAGE_NAME):$$tag 2>&1 | tee /dev/stderr | grep -q "manifest for $(IMAGE_NAME):$$tag not found"; then \ + : "creating release image"; \ + push_image; \ + else \ + : "release image $(IMAGE_NAME):$$tag already exists, skipping push"; \ + fi; \ + done + +build: $(CMDS:%=build-%) +container: $(CMDS:%=container-%) +push: $(CMDS:%=push-%) + +clean: + -rm -rf bin + +test: check-go-version-go + + +.PHONY: test-vet +test: test-vet +test-vet: + @ echo; echo "### $@:" + go test $(GOFLAGS_VENDOR) `go list $(GOFLAGS_VENDOR) ./... | grep -v vendor $(TEST_VET_FILTER_CMD)` + +.PHONY: test-fmt +test: test-fmt +test-fmt: + @ echo; echo "### $@:" + files=$$(find . -name '*.go' | grep -v './vendor' $(TEST_FMT_FILTER_CMD)); \ + if [ $$(gofmt -d $$files | wc -l) -ne 0 ]; then \ + echo "formatting errors:"; \ + gofmt -d $$files; \ + false; \ + fi + + +# Targets in the makefile can depend on check-go-version- +# to trigger a warning if the x.y version of that binary does not match +# what the project uses. Make ensures that this is only checked once per +# invocation. +.PHONY: check-go-version-% +check-go-version-%: + ./release-tools/verify-go-version.sh "$*" diff --git a/src/tools/csi-kata-directvolume/release-tools/verify-go-version.sh b/src/tools/csi-kata-directvolume/release-tools/verify-go-version.sh new file mode 100755 index 0000000000..9c35c0c97b --- /dev/null +++ b/src/tools/csi-kata-directvolume/release-tools/verify-go-version.sh @@ -0,0 +1,35 @@ +#!/usr/bin/env bash +# +# Copyright 2019 The Kubernetes Authors. +# +# SPDX-License-Identifier: Apache-2.0 +# + +GO="$1" + +if [ ! "$GO" ]; then + echo >&2 "usage: $0 " + exit 1 +fi + +die () { + echo "ERROR: $*" + exit 1 +} + +version=$("$GO" version) || die "determining version of $GO failed" +# shellcheck disable=SC2001 +majorminor=$(echo "$version" | sed -e 's/.*go\([0-9]*\)\.\([0-9]*\).*/\1.\2/') + +if [ "$majorminor" != "$expected" ]; then + cat >&2 <