Merge pull request #8618 from Apokleos/csi-for-directvol

runtime-rs: Add dedicated CSI driver for DirectVolume support in Kata
This commit is contained in:
Alex.Lyn 2023-12-27 21:27:29 +08:00 committed by GitHub
commit 990a3adf39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 3105 additions and 0 deletions

View File

@ -224,3 +224,7 @@ In the case, `ctr run --mount type=X, src=source, dst=dest`, the X will be set `
$ # ctr run with --mount type=spdkvol,src=/kubelet/kata-test-vol-001/volume001,dst=/disk001
$ sudo ctr run -t --rm --runtime io.containerd.kata.v2 --mount type=spdkvol,src=/kubelet/kata-test-vol-001/volume001,dst=/disk001,options=rbind:rw "$image" kata-spdk-vol-xx0530 /bin/bash
```
## Integrate Direct Volume with K8S
Details see [`csi-kata-directvolume`](../../src/tools/csi-kata-directvolume/README.md)

View File

@ -0,0 +1,11 @@
#
# Copyright 2019 The Kubernetes Authors.
# Copyright (c) 2023 Ant Group
#
# SPDX-License-Identifier: Apache-2.0
#
CMDS=directvolplugin
all: build
include release-tools/build.make

View File

@ -0,0 +1,79 @@
# CSI Direct Volume Driver
The `Direct Volume CSI driver` is heavily inspired by the [`K8s CSI HostPath driver`](https://github.com/kubernetes-csi/csi-driver-host-path). It aims to provide a production-ready implementation and a reference implementation for Kubernetes to connect to `Direct Volume`.
This repository houses the `Direct Volume CSI driver`, along with all build and dependent configuration files needed for deployment.
*WARNING* It is important to note that it is still under development.
## Pre-requisite
- K8S cluster
- Running version 1.20 or later
- Access to terminal with `kubectl` installed
## Features
The driver can provision volumes based on direct block devices, eliminating the need for loop devices and relying solely on single files stored on the host.
## Deployment
[Deployment for K8S 1.20+](docs/deploy-csi-kata-directvol.md)
## Building the Binary
If you want to build the driver yourself, you can do so with the following command from `csi-kata-directvolume` path:
```shell
cd tools/csi-kata-directvolume/ && make
```
## Building the Container Image
If you want to build the container image yourself, you can do so with the following command from a specified path.
Here, we just use `buildah/podman` as an example:
```shell
$ tree -L 2 buildah-directv/
buildah-directv/
├── bin
│   └── directvolplugin
└── Dockerfile
$ buildah bud -t kata-directvolume:v1.0.19
STEP 1/7: FROM alpine
STEP 2/7: LABEL maintainers="Kata Containers Authors"
STEP 3/7: LABEL description="Kata DirectVolume Driver"
STEP 4/7: ARG binary=./bin/directvolplugin
STEP 5/7: RUN apk add util-linux coreutils e2fsprogs xfsprogs xfsprogs-extra btrfs-progs && apk update && apk upgrade
fetch https://dl-cdn.alpinelinux.org/alpine/v3.19/main/x86_64/APKINDEX.tar.gz
fetch https://dl-cdn.alpinelinux.org/alpine/v3.19/community/x86_64/APKINDEX.tar.gz
(1/66) Installing libblkid (2.39.3-r0)
...
(66/66) Installing xfsprogs-extra (6.5.0-r0)
Executing busybox-1.36.1-r15.trigger
OK: 64 MiB in 81 packages
fetch https://dl-cdn.alpinelinux.org/alpine/v3.19/main/x86_64/APKINDEX.tar.gz
fetch https://dl-cdn.alpinelinux.org/alpine/v3.19/community/x86_64/APKINDEX.tar.gz
v3.19.0-19-ga0ddaee500e [https://dl-cdn.alpinelinux.org/alpine/v3.19/main]
v3.19.0-18-gec62a609516 [https://dl-cdn.alpinelinux.org/alpine/v3.19/community]
OK: 22983 distinct packages available
OK: 64 MiB in 81 packages
STEP 6/7: COPY ${binary} /kata-directvol-plugin
STEP 7/7: ENTRYPOINT ["/kata-directvol-plugin"]
COMMIT kata-directvolume:v1.0.19
Getting image source signatures
Copying blob 5af4f8f59b76 skipped: already exists
Copying blob a55645705de3 done
Copying config 244001cc51 done
Writing manifest to image destination
Storing signatures
--> 244001cc51d
Successfully tagged localhost/kata-directvolume:v1.0.19
244001cc51d77302c4ed5e1a0ec347d12d85dec4576ea1313f700f66e2a7d36d
$ podman save localhost/kata-directvolume:v1.0.19 -o kata-directvolume-v1.0.19.tar
$ ctr -n k8s.io image import kata-directvolume-v1.0.19.tar
unpacking localhost/kata-directvolume:v1.0.19 (sha256:1bdc33ff7f9cee92e74cbf77a9d79d00dce6dbb9ba19b9811f683e1a087f8fbf)...done
$ crictl images |grep 1.0.19
localhost/kata-directvolume v1.0.19 244001cc51d77 83.8MB
```

View File

@ -0,0 +1,64 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package main
import (
"flag"
"kata-containers/csi-kata-directvolume/pkg/directvolume"
"os"
"path"
"k8s.io/klog/v2"
)
func init() {
if err := flag.Set("logtostderr", "true"); err != nil {
klog.Errorln("flag setting failed.")
}
}
var (
// Set by the build process
version = ""
)
func main() {
cfg := directvolume.Config{
VendorVersion: version,
}
flag.StringVar(&cfg.Endpoint, "endpoint", "unix:///var/run/csi.sock", "CSI endpoint")
flag.StringVar(&cfg.DriverName, "drivername", "directvolume.csi.katacontainers.io", "name of the driver")
flag.StringVar(&cfg.StateDir, "statedir", "/csi-persist-data", "directory for storing state information across driver restarts, volumes ")
flag.StringVar(&cfg.StoragePath, "storagepath", "", "storage path for storing the backend files on host")
flag.StringVar(&cfg.NodeID, "nodeid", "", "node id")
flag.Var(&cfg.Capacity, "capacity", "Simulate storage capacity. The parameter is <kind>=<quantity> where <kind> is the value of a 'kind' storage class parameter and <quantity> is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.")
flag.Int64Var(&cfg.MaxVolumeSize, "max-volume-size", 1024*1024*1024*1024, "maximum size of volumes in bytes (inclusive)")
flag.BoolVar(&cfg.EnableTopology, "enable-topology", true, "Enables PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS capability.")
showVersion := flag.Bool("version", false, "Show version.")
flag.Parse()
if *showVersion {
baseName := path.Base(os.Args[0])
klog.Infof(baseName, version)
return
}
driver, err := directvolume.NewDirectVolumeDriver(cfg)
if err != nil {
klog.Errorf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
}
if err := driver.Run(); err != nil {
klog.Errorf("Failed to run driver: %s", err.Error())
os.Exit(1)
}
}

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
#
# Copyright (c) 2023 Ant Group
#
# SPDX-License-Identifier: Apache-2.0
#
set -e
set -o pipefail
BASE_DIR=$(dirname "$0")
${BASE_DIR}/rbac-deploy.sh
${BASE_DIR}/directvol-deploy.sh

View File

@ -0,0 +1,115 @@
#!/usr/bin/env bash
#
# Copyright 2017 The Kubernetes Authors.
# Copyright (c) 2023 Ant Group
#
# SPDX-License-Identifier: Apache-2.0
#
set -e
set -o pipefail
BASE_DIR=$(dirname "$0")
# KUBELET_DATA_DIR can be set to replace the default /var/lib/kubelet.
# All nodes must use the same directory.
default_kubelet_data_dir=/var/lib/kubelet
: ${KUBELET_DATA_DIR:=${default_kubelet_data_dir}}
# namespace kata-directvolume
DIRECTVOL_NAMESPACE="kata-directvolume"
# Some images are not affected by *_REGISTRY/*_TAG and IMAGE_* variables.
# The default is to update unless explicitly excluded.
update_image () {
case "$1" in socat) return 1;; esac
}
run () {
echo "$@" >&2
"$@"
}
# deploy kata directvolume plugin and registrar sidecar
echo "deploying kata directvolume components"
for i in $(ls ${BASE_DIR}/kata-directvolume/csi-directvol-*.yaml | sort); do
echo " $i"
modified="$(cat "$i" | sed -e "s;${default_kubelet_data_dir}/;${KUBELET_DATA_DIR}/;" | while IFS= read -r line; do
nocomments="$(echo "$line" | sed -e 's/ *#.*$//')"
if echo "$nocomments" | grep -q '^[[:space:]]*image:[[:space:]]*'; then
# Split 'image: quay.io/k8scsi/csi-attacher:vx.y.z'
# into image (quay.io/k8scsi/csi-attacher:vx.y.z),
# registry (quay.io/k8scsi),
# name (csi-attacher),
# tag (vx.y.z).
image=$(echo "$nocomments" | sed -e 's;.*image:[[:space:]]*;;')
registry=$(echo "$image" | sed -e 's;\(.*\)/.*;\1;')
name=$(echo "$image" | sed -e 's;.*/\([^:]*\).*;\1;')
tag=$(echo "$image" | sed -e 's;.*:;;')
# Variables are with underscores and upper case.
varname=$(echo $name | tr - _ | tr a-z A-Z)
# Now replace registry and/or tag, if set as env variables.
# If not set, the replacement is the same as the original value.
# Only do this for the images which are meant to be configurable.
if update_image "$name"; then
prefix=$(eval echo \${${varname}_REGISTRY:-${IMAGE_REGISTRY:-${registry}}}/ | sed -e 's;none/;;')
if [ "$IMAGE_TAG" = "canary" ] &&
[ -f ${BASE_DIR}/canary-blacklist.txt ] &&
grep -q "^$name\$" ${BASE_DIR}/canary-blacklist.txt; then
# Ignore IMAGE_TAG=canary for this particular image because its
# canary image is blacklisted in the deployment blacklist.
suffix=$(eval echo :\${${varname}_TAG:-${tag}})
else
suffix=$(eval echo :\${${varname}_TAG:-${IMAGE_TAG:-${tag}}})
fi
line="$(echo "$nocomments" | sed -e "s;$image;${prefix}${name}${suffix};")"
fi
echo "kata-directvolume plugin using $line" >&2
fi
if ! $have_csistoragecapacity; then
line="$(echo "$line" | grep -v -e 'storageCapacity: true' -e '--enable-capacity')"
fi
echo "$line"
done)"
if ! echo "$modified" | kubectl apply -f -; then
echo "modified version of $i:"
echo "$modified"
exit 1
fi
done
wait_for_daemonset () {
retries=10
while [ $retries -ge 0 ]; do
ready=$(kubectl get -n $1 daemonset $2 -o jsonpath="{.status.numberReady}")
required=$(kubectl get -n $1 daemonset $2 -o jsonpath="{.status.desiredNumberScheduled}")
if [ $ready -gt 0 ] && [ $ready -eq $required ]; then
return 0
fi
retries=$((retries - 1))
sleep 3
done
return 1
}
# Wait until the DaemonSet is running on all nodes.
if ! wait_for_daemonset ${DIRECTVOL_NAMESPACE} csi-kata-directvol-plugin; then
echo
echo "driver not ready"
echo "Deployment:"
(set +e; set -x; kubectl describe all,role,clusterrole,rolebinding,clusterrolebinding,serviceaccount,storageclass,csidriver --all-namespaces -l app.kubernetes.io/instance=directvolume.csi.katacontainers.io)
echo
echo "Pod logs:"
kubectl get pods -l app.kubernetes.io/instance=directvolume.csi.katacontainers.io --all-namespaces -o=jsonpath='{range .items[*]}{.metadata.name}{" "}{range .spec.containers[*]}{.name}{" "}{end}{"\n"}{end}' | while read -r pod containers; do
for c in $containers; do
echo
(set +e; set -x; kubectl logs $pod $c)
done
done
exit 1
fi
kubectl get po,ds -A

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
#
# Copyright 2017 The Kubernetes Authors.
# Copyright (c) 2023 Ant Group
#
# SPDX-License-Identifier: Apache-2.0
#
set -e
set -o pipefail
# Deleting all the resources installed by the directvol-deploy script.
# Every resource in the driver installation has the label representing the installation instance.
# Using app.kubernetes.io/instance: directvolume.csi.katacontainers.io and app.kubernetes.io/part-of:
# csi-driver-kata-directvolume labels to identify the installation set
kubectl delete all --all-namespaces -l app.kubernetes.io/instance=directvolume.csi.katacontainers.io,app.kubernetes.io/part-of=csi-driver-kata-directvolume --wait=true
kubectl delete role,clusterrole,rolebinding,clusterrolebinding,serviceaccount,storageclass,csidriver --all-namespaces -l app.kubernetes.io/instance=directvolume.csi.katacontainers.io,app.kubernetes.io/part-of=csi-driver-kata-directvolume --wait=true

View File

@ -0,0 +1,21 @@
apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
name: directvolume.csi.katacontainers.io
labels:
app.kubernetes.io/instance: directvolume.csi.katacontainers.io
app.kubernetes.io/part-of: csi-driver-kata-directvolume
app.kubernetes.io/name: directvolume.csi.katacontainers.io
app.kubernetes.io/component: csi-driver
spec:
# Supports persistent volume.
volumeLifecycleModes:
- Persistent
# To determine at runtime which mode a volume uses, pod info.
podInfoOnMount: true
# No attacher needed.
attachRequired: false
storageCapacity: false
# Kubernetes may use fsGroup to change permissions and ownership
# of the volume to match user requested fsGroup in the pod's SecurityPolicy
fsGroupPolicy: File

View File

@ -0,0 +1,190 @@
kind: DaemonSet
apiVersion: apps/v1
metadata:
namespace: kata-directvolume
name: csi-kata-directvol-plugin
labels:
app.kubernetes.io/instance: directvolume.csi.katacontainers.io
app.kubernetes.io/part-of: csi-driver-kata-directvolume
app.kubernetes.io/name: csi-kata-directvol-plugin
app.kubernetes.io/component: plugin
spec:
selector:
matchLabels:
app.kubernetes.io/instance: directvolume.csi.katacontainers.io
app.kubernetes.io/part-of: csi-driver-kata-directvolume
app.kubernetes.io/name: csi-kata-directvol-plugin
app.kubernetes.io/component: plugin
template:
metadata:
labels:
app.kubernetes.io/instance: directvolume.csi.katacontainers.io
app.kubernetes.io/part-of: csi-driver-kata-directvolume
app.kubernetes.io/name: csi-kata-directvol-plugin
app.kubernetes.io/component: plugin
spec:
serviceAccountName: csi-provisioner
containers:
- name: csi-provisioner
image: registry.k8s.io/sig-storage/csi-provisioner:v3.6.0
args:
- -v=3
- --csi-address=/csi/csi.sock
- --feature-gates=Topology=true
- --node-deployment=true
- --strict-topology=true
- --immediate-topology=false
- --worker-threads=5
#- --enable-capacity
#- --capacity-ownerref-level=0 # pod is owner
env:
- name: NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
# This is necessary only for systems with SELinux, where
# non-privileged sidecar containers cannot access unix domain socket
# created by privileged CSI driver container.
privileged: true
volumeMounts:
- mountPath: /csi
name: socket-dir
- name: node-driver-registrar
image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.9.0
args:
- --v=3
- --csi-address=/csi/csi.sock
- --kubelet-registration-path=/var/lib/kubelet/plugins/csi-kata-directvolume/csi.sock
securityContext:
# This is necessary only for systems with SELinux, where
# non-privileged sidecar containers cannot access unix domain socket
# created by privileged CSI driver container.
privileged: true
env:
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /registration
name: registration-dir
- mountPath: /csi-persist-data
name: csi-persist-data
- name: kata-directvolume
# build and push it into registry
image: localhost/kata-directvolume:v1.0.18
args:
- --drivername=directvolume.csi.katacontainers.io
- --v=5
- --endpoint=$(CSI_ENDPOINT)
- --statedir=$(STATE_DIR)
- --storagepath=$(STORAGE_POOL)
- --nodeid=$(KUBE_NODE_NAME)
env:
- name: CSI_ENDPOINT
value: unix:///csi/csi.sock
- name: STORAGE_POOL
value: /tmp/kata-directvol-storages
- name: STATE_DIR
value: /csi-persist-data
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: spec.nodeName
securityContext:
privileged: true
ports:
- containerPort: 9898
name: healthz
protocol: TCP
livenessProbe:
failureThreshold: 5
httpGet:
path: /healthz
port: healthz
initialDelaySeconds: 10
timeoutSeconds: 3
periodSeconds: 2
volumeMounts:
- mountPath: /csi
name: socket-dir
- mountPath: /var/lib/kubelet/pods
mountPropagation: Bidirectional
name: mountpoint-dir
- mountPath: /var/lib/kubelet/plugins
mountPropagation: Bidirectional
name: plugins-dir
- mountPath: /csi-persist-data
name: csi-persist-data
- mountPath: /dev
name: dev-dir
# backend block file stored at storage-pool
- mountPath: /tmp/kata-directvol-storages
name: storage-pool
# direct volume mountInfo.json stored at shared-directvols
- mountPath: /run/kata-containers/shared/direct-volumes
name: shared-directvols
- name: liveness-probe
volumeMounts:
- mountPath: /csi
name: socket-dir
image: registry.k8s.io/sig-storage/livenessprobe:v2.8.0
args:
- --csi-address=/csi/csi.sock
- --health-port=9898
volumes:
- hostPath:
path: /var/lib/kubelet/plugins/csi-kata-directvolume
type: DirectoryOrCreate
name: socket-dir
- hostPath:
path: /var/lib/kubelet/pods
type: DirectoryOrCreate
name: mountpoint-dir
- hostPath:
path: /var/lib/kubelet/plugins_registry
type: Directory
name: registration-dir
- hostPath:
path: /var/lib/kubelet/plugins
type: Directory
name: plugins-dir
- hostPath:
# 'path' is where PV data is persisted on host.
# using /tmp is also possible while the PVs will not available after plugin container recreation or host reboot
path: /var/lib/csi-directvolume-data/
type: DirectoryOrCreate
name: csi-persist-data
- hostPath:
path: /dev
type: Directory
name: dev-dir
# kata-containers backend rawblock stored there.
- hostPath:
path: /tmp/kata-directvol-storages
type: DirectoryOrCreate
name: storage-pool
# kata-containers direct volumes stored there.
- hostPath:
path: /run/kata-containers/shared/direct-volumes/
type: DirectoryOrCreate
name: shared-directvols

View File

@ -0,0 +1,81 @@
#!/usr/bin/env bash
#
# Copyright 2017 The Kubernetes Authors.
# Copyright (c) 2023 Ant Group
#
# SPDX-License-Identifier: Apache-2.0
#
set -e
set -o pipefail
BASE_DIR=$(dirname "$0")
DEPLOY_DIR=${BASE_DIR}/kata-directvolume
TEMP_DIR="$( mktemp -d )"
trap 'rm -rf ${TEMP_DIR}' EXIT
: ${UPDATE_RBAC_RULES:=true}
function rbac_version () {
yaml="$1"
image="$2"
update_rbac="$3"
# get version from `image: quay.io/k8scsi/csi-attacher:v1.0.1`, ignoring comments
version="$(sed -e 's/ *#.*$//' "$yaml" | grep "image:.*$image" | sed -e 's/ *#.*//' -e 's/.*://')"
if $update_rbac; then
# apply overrides
varname=$(echo $image | tr - _ | tr a-z A-Z)
eval version=\${${varname}_TAG:-\${IMAGE_TAG:-\$version}}
fi
echo "$version"
}
# https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/${VERSION}/deploy/kubernetes/rbac.yaml
CSI_PROVISIONER_RBAC_YAML="https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/$(rbac_version "${BASE_DIR}/kata-directvolume/csi-directvol-plugin.yaml" csi-provisioner false)/deploy/kubernetes/rbac.yaml"
: ${CSI_PROVISIONER_RBAC:=https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/$(rbac_version "${BASE_DIR}/kata-directvolume/csi-directvol-plugin.yaml" csi-provisioner "${UPDATE_RBAC_RULES}")/deploy/kubernetes/rbac.yaml}
run () {
echo "$@" >&2
"$@"
}
# namespace kata-directvolume
DIRECTVOL_NAMESPACE="kata-directvolume"
# create namespace kata-directvolume
echo "Creating Namespace kata-directvolume ..."
cat <<- EOF > "${TEMP_DIR}"/kata-directvol-ns.yaml
apiVersion: v1
kind: Namespace
metadata:
labels:
kubernetes.io/metadata.name: ${DIRECTVOL_NAMESPACE}
name: ${DIRECTVOL_NAMESPACE}
spec:
finalizers:
- kubernetes
EOF
run kubectl apply -f "${TEMP_DIR}"/kata-directvol-ns.yaml
echo "Namespace kata-directvolume created Done !"
# rbac rules
echo "Applying RBAC rules ..."
eval component="CSI_PROVISIONER"
eval current="\${${component}_RBAC}"
eval original="\${${component}_RBAC_YAML}"
if [[ "${current}" =~ ^http:// ]] || [[ "${current}" =~ ^https:// ]]; then
run curl "${current}" --output "${TEMP_DIR}"/rbac.yaml --silent --location
fi
# replace the default namespace with specified namespace kata-directvolume
sed -e "s/namespace: default/namespace: kata-directvolume/g" "${TEMP_DIR}"/rbac.yaml > "${DEPLOY_DIR}/kata-directvol-rbac.yaml"
# apply the kata-directvol-rbac.yaml
run kubectl apply -f "${DEPLOY_DIR}/kata-directvol-rbac.yaml"
echo "Applying RBAC rules Done!"

View File

@ -0,0 +1,178 @@
# Deploy Kata Direct Volume CSI and Do Validation
## How to Deploy Kata Direct Volume CSI
First, you need to make sure you have a healthy Kubernetes(1.20+) cluster and have the permissions to create Kata pods.
*WARNING* If you select a `K8S` with lower version, It cannot ensure that it will work well.
The `CSI driver` is deployed as a `daemonset` and the pods of the `daemonset` contain 4 containers:
1. `Kata Direct Volume CSI Driver`, which is the key implementation in it
2. [CSI-External-Provisioner](https://github.com/kubernetes-csi/external-provisioner)
3. [CSI-Liveness-Probe](https://github.com/kubernetes-csi/livenessprobe)
4. [CSI-Node-Driver-Registrar](https://github.com/kubernetes-csi/node-driver-registrar)
The easiest way to deploy the `Direct Volume CSI driver` is to run the `deploy.sh` script for the Kubernetes version used by
the cluster as shown below for Kubernetes 1.28.2.
```shell
sudo deploy/deploy.sh
```
You'll get an output similar to the following, indicating the application of `RBAC rules` and the successful deployment of `csi-provisioner`, `node-driver-registrar`, `kata directvolume csi driver`(`csi-kata-directvol-plugin`), liveness-probe. Please note that the following output is specific to Kubernetes 1.28.2.
```shell
Creating Namespace kata-directvolume ...
kubectl apply -f /tmp/tmp.kN43BWUGQ5/kata-directvol-ns.yaml
namespace/kata-directvolume created
Namespace kata-directvolume created Done !
Applying RBAC rules ...
curl https://raw.githubusercontent.com/kubernetes-csi/external-provisioner/v3.6.0/deploy/kubernetes/rbac.yaml --output /tmp/tmp.kN43BWUGQ5/rbac.yaml --silent --location
kubectl apply -f ./kata-directvolume/kata-directvol-rbac.yaml
serviceaccount/csi-provisioner created
clusterrole.rbac.authorization.k8s.io/external-provisioner-runner created
clusterrolebinding.rbac.authorization.k8s.io/csi-provisioner-role created
role.rbac.authorization.k8s.io/external-provisioner-cfg created
rolebinding.rbac.authorization.k8s.io/csi-provisioner-role-cfg created
$ ./directvol-deploy.sh
deploying kata directvolume components
./kata-directvolume/csi-directvol-driverinfo.yaml
csidriver.storage.k8s.io/directvolume.csi.katacontainers.io created
./kata-directvolume/csi-directvol-plugin.yaml
kata-directvolume plugin using image: registry.k8s.io/sig-storage/csi-provisioner:v3.6.0
kata-directvolume plugin using image: registry.k8s.io/sig-storage/csi-node-driver-registrar:v2.9.0
kata-directvolume plugin using image: localhost/kata-directvolume:v1.0.52
kata-directvolume plugin using image: registry.k8s.io/sig-storage/livenessprobe:v2.8.0
daemonset.apps/csi-kata-directvol-plugin created
./kata-directvolume/kata-directvol-ns.yaml
namespace/kata-directvolume unchanged
./kata-directvolume/kata-directvol-rbac.yaml
serviceaccount/csi-provisioner unchanged
clusterrole.rbac.authorization.k8s.io/external-provisioner-runner configured
clusterrolebinding.rbac.authorization.k8s.io/csi-provisioner-role unchanged
role.rbac.authorization.k8s.io/external-provisioner-cfg unchanged
rolebinding.rbac.authorization.k8s.io/csi-provisioner-role-cfg unchanged
NAMESPACE NAME READY STATUS RESTARTS AGE
default pod/kata-driectvol-01 1/1 Running 0 3h57m
kata-directvolume pod/csi-kata-directvol-plugin-92smp 4/4 Running 0 4s
kube-flannel pod/kube-flannel-ds-vq796 1/1 Running 1 (67d ago) 67d
kube-system pod/coredns-66f779496c-9bmp2 1/1 Running 3 (67d ago) 67d
kube-system pod/coredns-66f779496c-qlq6d 1/1 Running 1 (67d ago) 67d
kube-system pod/etcd-tnt001 1/1 Running 19 (67d ago) 67d
kube-system pod/kube-apiserver-tnt001 1/1 Running 5 (67d ago) 67d
kube-system pod/kube-controller-manager-tnt001 1/1 Running 8 (67d ago) 67d
kube-system pod/kube-proxy-p9t6t 1/1 Running 6 (67d ago) 67d
kube-system pod/kube-scheduler-tnt001 1/1 Running 8 (67d ago) 67d
NAMESPACE NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE
kata-directvolume daemonset.apps/csi-kata-directvol-plugin 1 1 1 1 1 <none> 4s
kube-flannel daemonset.apps/kube-flannel-ds 1 1 1 1 1 <none> 67d
kube-system daemonset.apps/kube-proxy 1 1 1 1 1 kubernetes.io/os=linux 67d
```
## How to Run a Kata Pod and Validate it
First, ensure all expected pods are running properly, including `csi-provisioner`, `node-driver-registrar`, `kata-directvolume` `csi driver(csi-kata-directvol-plugin)`, liveness-probe:
```shell
$ kubectl get po -A
NAMESPACE NAME READY STATUS RESTARTS AGE
default csi-kata-directvol-plugin-dlphw 4/4 Running 0 68m
kube-flannel kube-flannel-ds-vq796 1/1 Running 1 (52d ago) 52d
kube-system coredns-66f779496c-9bmp2 1/1 Running 3 (52d ago) 52d
kube-system coredns-66f779496c-qlq6d 1/1 Running 1 (52d ago) 52d
kube-system etcd-node001 1/1 Running 19 (52d ago) 52d
kube-system kube-apiserver-node001 1/1 Running 5 (52d ago) 52d
kube-system kube-controller-manager-node001 1/1 Running 8 (52d ago) 52d
kube-system kube-proxy-p9t6t 1/1 Running 6 (52d ago) 52d
kube-system kube-scheduler-node001 1/1 Running 8 (52d ago) 52d
```
From the root directory, deploy the application pods including a storage class, a `PVC`, and a pod which uses direct block device based volume. The details can be seen in `/examples/pod-with-directvol/*.yaml`:
```shell
kubectl apply -f ${BASE_DIR}/csi-storageclass.yaml
kubectl apply -f ${BASE_DIR}/csi-pvc.yaml
kubectl apply -f ${BASE_DIR}/csi-app.yaml
```
Let's validate the components are deployed:
```shell
$ kubectl get po -A
NAMESPACE NAME READY STATUS RESTARTS AGE
kata-directvolume csi-kata-directvol-plugin-dlphw 4/4 Running 0 68m
default kata-driectvol-01 1/1 Running 0 67m
$ kubectl get sc,pvc -A
NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE
storageclass.storage.k8s.io/csi-kata-directvolume-sc directvolume.csi.katacontainers.io Delete Immediate false 71m
NAMESPACE NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
default persistentvolumeclaim/csi-directvolume-pvc Bound pvc-d7644547-f850-4bdf-8c93-aa745c7f31b5 1Gi RWO csi-kata-directvolume-sc 71m
```
Finally, inspect the application pod `kata-driectvol-01` which running with direct block device based volume:
```shell
$ kubectl describe po kata-driectvol-01
Name: kata-driectvol-01
Namespace: kata-directvolume
Priority: 0
Runtime Class Name: kata
Service Account: default
Node: node001/10.10.1.19
Start Time: Sat, 09 Dec 2023 23:06:49 +0800
Labels: <none>
Annotations: <none>
Status: Running
IP: 10.244.0.232
IPs:
IP: 10.244.0.232
Containers:
first-container:
Container ID: containerd://c5eec9d645a67b982549321f382d83c56297d9a2a705857e8f3eaa6c6676908e
Image: ubuntu:22.04
Image ID: docker.io/library/ubuntu@sha256:2b7412e6465c3c7fc5bb21d3e6f1917c167358449fecac8176c6e496e5c1f05f
Port: <none>
Host Port: <none>
Command:
sleep
1000000
State: Running
Started: Sat, 09 Dec 2023 23:06:51 +0800
Ready: True
Restart Count: 0
Environment: <none>
Mounts:
/data from kata-driectvol0-volume (rw)
/var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-zs9tm (ro)
Conditions:
Type Status
Initialized True
Ready True
ContainersReady True
PodScheduled True
Volumes:
kata-driectvol0-volume:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: csi-directvolume-pvc
ReadOnly: false
kube-api-access-zs9tm:
Type: Projected (a volume that contains injected data from multiple sources)
TokenExpirationSeconds: 3607
ConfigMapName: kube-root-ca.crt
ConfigMapOptional: <nil>
DownwardAPI: true
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events: <none>
```

View File

@ -0,0 +1,15 @@
#!/bin/bash
#
# Copyright (c) 2023 Ant Group
#
# SPDX-License-Identifier: Apache-2.0
#
set -e
set -o pipefail
BASE_DIR=$(dirname "$0")/pod-with-directvol
kubectl apply -f ${BASE_DIR}/csi-storageclass.yaml
kubectl apply -f ${BASE_DIR}/csi-pvc.yaml
kubectl apply -f ${BASE_DIR}/csi-app.yaml

View File

@ -0,0 +1,15 @@
#!/bin/bash
#
# Copyright (c) 2023 Ant Group
#
# SPDX-License-Identifier: Apache-2.0
#
set -e
set -o pipefail
BASE_DIR=$(dirname "$0")/pod-with-directvol
kubectl delete -f ${BASE_DIR}/csi-app.yaml
kubectl delete -f ${BASE_DIR}/csi-pvc.yaml
kubectl delete -f ${BASE_DIR}/csi-storageclass.yaml

View File

@ -0,0 +1,18 @@
kind: Pod
apiVersion: v1
metadata:
name: kata-driectvol-01
spec:
runtimeClassName: kata
containers:
- name: first-container
image: ubuntu:22.04
volumeMounts:
- mountPath: "/data"
name: kata-driectvol0-volume
command: [ "sleep", "1000000" ]
volumes:
- name: kata-driectvol0-volume
persistentVolumeClaim:
claimName: csi-directvolume-pvc # defined in csi-pvc.yaml

View File

@ -0,0 +1,11 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-directvolume-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
storageClassName: csi-kata-directvolume-sc # defined in csi-storageclass.yaml

View File

@ -0,0 +1,12 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-kata-directvolume-sc
parameters:
katacontainers.direct.volume/volumetype: directvol
katacontainers.direct.volume/fstype: ext4
provisioner: directvolume.csi.katacontainers.io
reclaimPolicy: Delete
volumeBindingMode: Immediate
allowVolumeExpansion: false

View File

@ -0,0 +1,5 @@
apiVersion: node.k8s.io/v1
kind: RuntimeClass
metadata:
name: kata
handler: kata

View File

@ -0,0 +1,97 @@
module kata-containers/csi-kata-directvolume
go 1.20
require (
github.com/container-storage-interface/spec v1.9.0
github.com/diskfs/go-diskfs v1.4.0
github.com/golang/glog v1.2.0
github.com/golang/protobuf v1.5.3
github.com/kubernetes-csi/csi-lib-utils v0.16.0
github.com/pborman/uuid v1.2.1
github.com/stretchr/testify v1.8.4
golang.org/x/net v0.19.0
google.golang.org/grpc v1.59.0
k8s.io/apimachinery v0.28.2
k8s.io/klog/v2 v2.110.1
k8s.io/mount-utils v0.28.2
k8s.io/utils v0.0.0-20231127182322-b307cd553661
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/xattr v0.4.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/ulikunitz/xz v0.5.11 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/djherbis/times.v1 v1.3.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace k8s.io/api => k8s.io/api v0.28.2
replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.28.2
replace k8s.io/apimachinery => k8s.io/apimachinery v0.28.2
replace k8s.io/apiserver => k8s.io/apiserver v0.28.2
replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.28.2
replace k8s.io/client-go => k8s.io/client-go v0.28.2
replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.28.2
replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.28.2
replace k8s.io/code-generator => k8s.io/code-generator v0.28.2
replace k8s.io/component-base => k8s.io/component-base v0.28.2
replace k8s.io/component-helpers => k8s.io/component-helpers v0.28.2
replace k8s.io/controller-manager => k8s.io/controller-manager v0.28.2
replace k8s.io/cri-api => k8s.io/cri-api v0.28.2
replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.28.2
replace k8s.io/dynamic-resource-allocation => k8s.io/dynamic-resource-allocation v0.28.2
replace k8s.io/kms => k8s.io/kms v0.28.2
replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.28.2
replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.28.2
replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.28.2
replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.28.2
replace k8s.io/kubectl => k8s.io/kubectl v0.28.2
replace k8s.io/kubelet => k8s.io/kubelet v0.28.2
replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.28.2
replace k8s.io/metrics => k8s.io/metrics v0.28.2
replace k8s.io/mount-utils => k8s.io/mount-utils v0.28.2
replace k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.28.2
replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.28.2
replace k8s.io/endpointslice => k8s.io/endpointslice v0.28.2

View File

@ -0,0 +1,114 @@
github.com/container-storage-interface/spec v1.9.0 h1:zKtX4STsq31Knz3gciCYCi1SXtO2HJDecIjDVboYavY=
github.com/container-storage-interface/spec v1.9.0/go.mod h1:ZfDu+3ZRyeVqxZM0Ds19MVLkN2d1XJ5MAfi1L3VjlT0=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/diskfs/go-diskfs v1.4.0 h1:MAybY6TPD+fmhY+a2qFhmdvMeIKvCqlgh4QIc1uCmBs=
github.com/diskfs/go-diskfs v1.4.0/go.mod h1:G8cyy+ngM+3yKlqjweMmtqvE+TxsnIo1xumbJX1AeLg=
github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab h1:h1UgjJdAAhj+uPL68n7XASS6bU+07ZX1WJvVS2eyoeY=
github.com/elliotwutingfeng/asciiset v0.0.0-20230602022725-51bbb787efab/go.mod h1:GLo/8fDswSAniFG+BFIaiSPcK610jyzgEhWYPQwuQdw=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubernetes-csi/csi-lib-utils v0.16.0 h1:LXCvkhXHtFOkl7LoDqFdho/MuebccZqWxLwhKiRGiBg=
github.com/kubernetes-csi/csi-lib-utils v0.16.0/go.mod h1:fp1Oik+45tP2o4X9SD/SBWXLTQYT9wtLxGasBE3+vBI=
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE=
github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/ulikunitz/xz v0.5.11 h1:kpFauv27b6ynzBNT/Xy+1k+fK4WswhN/6PN5WhFAGw8=
github.com/ulikunitz/xz v0.5.11/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/djherbis/times.v1 v1.3.0 h1:uxMS4iMtH6Pwsxog094W0FYldiNnfY/xba00vq6C2+o=
gopkg.in/djherbis/times.v1 v1.3.0/go.mod h1:AQlg6unIsrsCEdQYhTzERy542dz6SFdQFZFv6mUY0P8=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/apimachinery v0.28.2 h1:KCOJLrc6gu+wV1BYgwik4AF4vXOlVJPdiqn0yAWWwXQ=
k8s.io/apimachinery v0.28.2/go.mod h1:RdzF87y/ngqk9H4z3EL2Rppv5jj95vGS/HaFXrLDApU=
k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0=
k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo=
k8s.io/mount-utils v0.28.2 h1:sIdMH7fRhcU48V1oYJ9cLmLm/TG+2jLhhe8eS3I+FWg=
k8s.io/mount-utils v0.28.2/go.mod h1:AyP8LmZSLgpGdFQr+vzHTerlPiGvXUdP99n98Er47jw=
k8s.io/utils v0.0.0-20231127182322-b307cd553661 h1:FepOBzJ0GXm8t0su67ln2wAZjbQ6RxQGZDnzuLcrUTI=
k8s.io/utils v0.0.0-20231127182322-b307cd553661/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=

View File

@ -0,0 +1,51 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package endpoint
import (
"fmt"
"net"
"os"
"strings"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func Parse(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
return "", "", status.Error(codes.InvalidArgument, fmt.Sprintf("Invalid endpoint: %v", ep))
}
return "unix", ep, nil
}
func Listen(endpoint string) (net.Listener, func(), error) {
proto, addr, err := Parse(endpoint)
if err != nil {
return nil, nil, err
}
cleanup := func() {}
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
return nil, nil, status.Error(codes.Internal, fmt.Sprintf("%s: %q", addr, err))
}
cleanup = func() {
os.Remove(addr)
}
}
l, err := net.Listen(proto, addr)
return l, cleanup, err
}

View File

@ -0,0 +1,315 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package directvolume
import (
"fmt"
"strings"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/pborman/uuid"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi"
"k8s.io/klog/v2"
"kata-containers/csi-kata-directvolume/pkg/utils"
)
func (dv *directVolume) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (resp *csi.CreateVolumeResponse, finalErr error) {
if err := dv.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
klog.V(3).Infof("invalid create volume req: %v", req)
return nil, err
}
if len(req.GetName()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Name missing in request")
}
caps := req.GetVolumeCapabilities()
if caps == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities missing in request")
}
klog.Infof("createVolume with request: %+v", req)
dv.mutex.Lock()
defer dv.mutex.Unlock()
capacity := int64(req.GetCapacityRange().GetRequiredBytes())
topologies := []*csi.Topology{}
if dv.config.EnableTopology {
topologies = append(topologies, &csi.Topology{Segments: map[string]string{TopologyKeyNode: dv.config.NodeID}})
}
volumeCtx := make(map[string]string)
volumeCtx[utils.IsDirectVolume] = "False"
for key, value := range req.GetParameters() {
switch strings.ToLower(key) {
case utils.KataContainersDirectVolumeType:
if value == utils.DirectVolumeTypeName {
volumeCtx[utils.IsDirectVolume] = "True"
}
case utils.KataContainersDirectFsType:
volumeCtx[utils.KataContainersDirectFsType] = value
default:
continue
}
}
contentSrc := req.GetVolumeContentSource()
// Need to check for already existing volume name, and if found
// check for the requested capacity and already allocated capacity
// If err is nil, it means the volume with the same name already exists
// need to check if the size of existing volume is the same as in new
// request
if exVol, err := dv.state.GetVolumeByName(req.GetName()); err == nil {
if exVol.VolSize < capacity {
return nil, status.Errorf(codes.AlreadyExists, "Volume with the same name: %s but with different size already exist", req.GetName())
}
if contentSrc != nil {
volumeSource := req.VolumeContentSource
switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Volume:
if volumeSource.GetVolume() != nil && exVol.ParentVolID != volumeSource.GetVolume().GetVolumeId() {
return nil, status.Error(codes.AlreadyExists, "existing volume source volume id not matching")
}
default:
return nil, status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volumeSource)
}
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: exVol.VolID,
CapacityBytes: int64(exVol.VolSize),
VolumeContext: volumeCtx,
ContentSource: contentSrc,
AccessibleTopology: topologies,
},
}, nil
}
volumeID := uuid.NewUUID().String()
kind := volumeCtx[storageKind]
vol, err := dv.createVolume(volumeID, req.GetName(), capacity, kind)
if err != nil {
klog.Errorf("created volume %s at path %s failed with error: %v", vol.VolID, vol.VolPath, err.Error())
return nil, err
}
klog.Infof("created volume %s at path %s", vol.VolID, vol.VolPath)
if contentSrc != nil {
path := dv.getVolumePath(volumeID)
volumeSource := req.VolumeContentSource
switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Volume:
if srcVolume := volumeSource.GetVolume(); srcVolume != nil {
err = dv.loadFromVolume(capacity, srcVolume.GetVolumeId(), path)
vol.ParentVolID = srcVolume.GetVolumeId()
}
default:
err = status.Errorf(codes.InvalidArgument, "%v not a proper volume source", volumeSource)
}
if err != nil {
klog.V(4).Infof("VolumeSource error: %v", err)
if delErr := dv.deleteVolume(volumeID); delErr != nil {
klog.Infof("deleting direct volume %v failed: %v", volumeID, delErr)
}
return nil, err
}
klog.Infof("successfully populated volume %s", vol.VolID)
}
volumeCtx[utils.DirectVolumeName] = req.GetName()
volumeCtx[utils.CapabilityInBytes] = fmt.Sprintf("%d", capacity)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: volumeID,
CapacityBytes: capacity,
VolumeContext: volumeCtx,
ContentSource: contentSrc,
AccessibleTopology: topologies,
},
}, nil
}
func (dv *directVolume) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := dv.validateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
klog.V(3).Infof("invalid delete volume req: %v", req)
return nil, err
}
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
dv.mutex.Lock()
defer dv.mutex.Unlock()
volId := req.GetVolumeId()
vol, err := dv.state.GetVolumeByID(volId)
if err != nil {
klog.Warningf("Volume ID %s not found: might have already deleted", volId)
return &csi.DeleteVolumeResponse{}, nil
}
if vol.Attached || !vol.Published.Empty() || !vol.Staged.Empty() {
msg := fmt.Sprintf("Volume '%s' is still used (attached: %v, staged: %v, published: %v) by '%s' node",
vol.VolID, vol.Attached, vol.Staged, vol.Published, vol.NodeID)
klog.Warning(msg)
}
if err := dv.deleteVolume(volId); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to delete volume %v: %v", volId, err))
}
klog.Infof("volume %v successfully deleted", volId)
return &csi.DeleteVolumeResponse{}, nil
}
func (dv *directVolume) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return &csi.ControllerGetCapabilitiesResponse{
Capabilities: dv.getControllerServiceCapabilities(),
}, nil
}
func (dv *directVolume) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
}
if len(req.VolumeCapabilities) == 0 {
return nil, status.Error(codes.InvalidArgument, req.VolumeId)
}
dv.mutex.Lock()
defer dv.mutex.Unlock()
if _, err := dv.state.GetVolumeByID(req.GetVolumeId()); err != nil {
klog.Warning("Validate volume vapability failed. Volume not found: might have already deleted")
return nil, err
}
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
VolumeContext: req.GetVolumeContext(),
VolumeCapabilities: req.GetVolumeCapabilities(),
Parameters: req.GetParameters(),
},
}, nil
}
func (dv *directVolume) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
dv.mutex.Lock()
defer dv.mutex.Unlock()
// Topology and capabilities are irrelevant. We only
// distinguish based on the "kind" parameter, if at all.
// Without configured capacity, we just have the maximum size.
available := dv.config.MaxVolumeSize
if dv.config.Capacity.Enabled() {
// Empty "kind" will return "zero capacity". There is no fallback
// to some arbitrary kind here because in practice it always should
// be set.
kind := req.GetParameters()[storageKind]
quantity := dv.config.Capacity[kind]
allocated := dv.sumVolumeSizes(kind)
available = quantity.Value() - allocated
}
maxVolumeSize := dv.config.MaxVolumeSize
if maxVolumeSize > available {
maxVolumeSize = available
}
return &csi.GetCapacityResponse{
AvailableCapacity: available,
MaximumVolumeSize: &wrappers.Int64Value{Value: maxVolumeSize},
MinimumVolumeSize: &wrappers.Int64Value{Value: 0},
}, nil
}
func (dv *directVolume) validateControllerServiceRequest(c csi.ControllerServiceCapability_RPC_Type) error {
if c == csi.ControllerServiceCapability_RPC_UNKNOWN {
return nil
}
for _, cap := range dv.getControllerServiceCapabilities() {
if c == cap.GetRpc().GetType() {
return nil
}
}
return status.Errorf(codes.InvalidArgument, "unsupported capability %s", c)
}
func (dv *directVolume) getControllerServiceCapabilities() []*csi.ControllerServiceCapability {
cl := []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
}
var csc []*csi.ControllerServiceCapability
for _, cap := range cl {
csc = append(csc, &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
})
}
return csc
}
func (dv *directVolume) ControllerModifyVolume(context.Context, *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "controllerModifyVolume is not supported")
}
func (dv *directVolume) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return &csi.ListVolumesResponse{}, nil
}
func (dv *directVolume) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return &csi.ControllerGetVolumeResponse{}, nil
}
func (dv *directVolume) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return &csi.ControllerPublishVolumeResponse{}, nil
}
func (dv *directVolume) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
func (dv *directVolume) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return &csi.CreateSnapshotResponse{}, nil
}
func (dv *directVolume) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return &csi.DeleteSnapshotResponse{}, nil
}
func (dv *directVolume) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return &csi.ListSnapshotsResponse{}, nil
}
func (dv *directVolume) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return &csi.ControllerExpandVolumeResponse{}, nil
}

View File

@ -0,0 +1,236 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package directvolume
import (
"errors"
"fmt"
"os"
"path"
"path/filepath"
"sync"
"kata-containers/csi-kata-directvolume/pkg/state"
"kata-containers/csi-kata-directvolume/pkg/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
utilexec "k8s.io/utils/exec"
)
const (
// storageKind is the special parameter which requests
// storage of a certain kind (only affects capacity checks).
storageKind = "kind"
)
type directVolume struct {
mutex sync.Mutex
config Config
state state.State
}
type Config struct {
DriverName string
Endpoint string
NodeID string
VendorVersion string
MaxVolumeSize int64
Capacity utils.Capacity
ShowVersion bool
EnableAttach bool
EnableTopology bool
StateDir string
VolumeDevices map[string]string
StoragePath string
IsDirectVolume bool
safeMounter *utils.SafeMountFormater
}
func NewDirectVolumeDriver(cfg Config) (*directVolume, error) {
if cfg.DriverName == "" {
return nil, errors.New("no driver name provided")
}
if cfg.NodeID == "" {
return nil, errors.New("no node id provided")
}
if cfg.Endpoint == "" {
return nil, errors.New("no driver endpoint provided")
}
if cfg.StoragePath == "" {
return nil, errors.New("no storage path provided")
}
if err := utils.MakeFullPath(cfg.StoragePath); err != nil {
return nil, fmt.Errorf("failed to mkdir -p storage path %v", cfg.StoragePath)
}
if err := utils.MakeFullPath(cfg.StateDir); err != nil {
return nil, fmt.Errorf("failed to mkdir -p state dir%v", cfg.StateDir)
}
if cfg.safeMounter == nil {
safeMnt := utils.NewSafeMountFormater()
cfg.safeMounter = &safeMnt
}
cfg.VolumeDevices = make(map[string]string)
klog.Infof("\nDriver: %v \nVersion: %s\nStoragePath: %s\nStatePath: %s\n", cfg.DriverName, cfg.VendorVersion, cfg.StoragePath, cfg.StateDir)
s, err := state.New(path.Join(cfg.StateDir, "state.json"))
if err != nil {
return nil, err
}
dv := &directVolume{
config: cfg,
state: s,
}
return dv, nil
}
func (dv *directVolume) Run() error {
s := NewNonBlockingGRPCServer()
// dv itself implements ControllerServer, NodeServer, and IdentityServer.
s.Start(dv.config.Endpoint, dv, dv, dv)
s.Wait()
return nil
}
// getVolumePath returns the canonical path for direct volume
func (dv *directVolume) getVolumePath(volID string) string {
return filepath.Join(dv.config.StateDir, volID)
}
// createVolume allocates capacity, creates the directory for the direct volume, and
// adds the volume to the list.
// It returns the volume path or err if one occurs. That error is suitable as result of a gRPC call.
func (dv *directVolume) createVolume(volID, name string, cap int64, kind string) (*state.Volume, error) {
// Check for maximum available capacity
if cap > dv.config.MaxVolumeSize {
return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", cap, dv.config.MaxVolumeSize)
}
if dv.config.Capacity.Enabled() {
if kind == "" {
// Pick some kind with sufficient remaining capacity.
for k, c := range dv.config.Capacity {
if dv.sumVolumeSizes(k)+cap <= c.Value() {
kind = k
break
}
}
}
used := dv.sumVolumeSizes(kind)
available := dv.config.Capacity[kind]
if used+cap > available.Value() {
return nil, status.Errorf(codes.ResourceExhausted, "requested capacity %d exceeds remaining capacity for %q, %s out of %s already used",
cap, kind, resource.NewQuantity(used, resource.BinarySI).String(), available.String())
}
} else if kind != "" {
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("capacity tracking disabled, specifying kind %q is invalid", kind))
}
path := dv.getVolumePath(volID)
if err := os.MkdirAll(path, utils.PERM); err != nil {
klog.Errorf("mkdirAll for path %s failed with error: %v", path, err.Error())
return nil, err
}
volume := state.Volume{
VolID: volID,
VolName: name,
VolSize: cap,
VolPath: path,
Kind: kind,
}
klog.Infof("adding direct volume: %s = %+v", volID, volume)
if err := dv.state.UpdateVolume(volume); err != nil {
return nil, err
}
return &volume, nil
}
// deleteVolume deletes the directory for the direct volume.
func (dv *directVolume) deleteVolume(volID string) error {
klog.V(4).Infof("starting to delete direct volume: %s", volID)
vol, err := dv.state.GetVolumeByID(volID)
if err != nil {
klog.Warning("deleteVolume with Volume not found.")
// Return OK if the volume is not found.
return nil
}
path := dv.getVolumePath(volID)
if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) {
return err
}
if err := dv.state.DeleteVolume(volID); err != nil {
return err
}
klog.V(4).Infof("deleted direct volume: %s = %+v", volID, vol)
return nil
}
func (dv *directVolume) sumVolumeSizes(kind string) (sum int64) {
for _, volume := range dv.state.GetVolumes() {
if volume.Kind == kind {
sum += volume.VolSize
}
}
return
}
// loadFromVolume populates the given destPath with data from the srcVolumeID
func (dv *directVolume) loadFromVolume(size int64, srcVolumeId, destPath string) error {
directVolume, err := dv.state.GetVolumeByID(srcVolumeId)
if err != nil {
klog.Error("loadFromVolume failed with get volume by ID error Volume not found")
return err
}
if directVolume.VolSize > size {
return status.Errorf(codes.InvalidArgument, "volume %v size %v is greater than requested volume size %v", srcVolumeId, directVolume.VolSize, size)
}
return loadFromPersitStorage(directVolume, destPath)
}
func loadFromPersitStorage(directVolume state.Volume, destPath string) error {
srcPath := directVolume.VolPath
isEmpty, err := utils.IsPathEmpty(srcPath)
if err != nil {
return fmt.Errorf("failed verification check of source direct volume %v: %w", directVolume.VolID, err)
}
// If the source direct volume is empty it's a noop and we just move along, otherwise the cp call will
// fail with a a file stat error DNE
if !isEmpty {
args := []string{"-a", srcPath + "/.", destPath + "/"}
executor := utilexec.New()
out, err := executor.Command("cp", args...).CombinedOutput()
if err != nil {
return fmt.Errorf("failed pre-populate data from volume %v: %s: %w", directVolume.VolID, out, err)
}
}
return nil
}

View File

@ -0,0 +1,61 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package directvolume
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
)
func (dv *directVolume) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
klog.V(5).Infof("Using default GetPluginInfo")
if dv.config.DriverName == "" {
return nil, status.Error(codes.Unavailable, "Driver name not configured")
}
if dv.config.VendorVersion == "" {
return nil, status.Error(codes.Unavailable, "Driver is missing version")
}
return &csi.GetPluginInfoResponse{
Name: dv.config.DriverName,
VendorVersion: dv.config.VendorVersion,
}, nil
}
func (dv *directVolume) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}
func (dv *directVolume) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
klog.V(5).Infof("Using default capabilities")
caps := []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
}
if dv.config.EnableTopology {
caps = append(caps, &csi.PluginCapability{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
})
}
return &csi.GetPluginCapabilitiesResponse{Capabilities: caps}, nil
}

View File

@ -0,0 +1,390 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package directvolume
import (
"fmt"
"os"
"path/filepath"
"strconv"
"kata-containers/csi-kata-directvolume/pkg/utils"
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
)
const (
TopologyKeyNode = "topology.directvolume.csi/node"
)
func (dv *directVolume) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
klog.V(4).Infof("node publish volume with request %v", req)
// Check arguments
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability missing in request")
}
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
if !isDirectVolume(req.VolumeContext) {
return nil, status.Errorf(codes.FailedPrecondition, "volume %q is not direct-volume.", req.VolumeId)
}
dv.mutex.Lock()
defer dv.mutex.Unlock()
targetPath := req.GetTargetPath()
if req.GetVolumeCapability().GetMount() == nil {
return nil, status.Error(codes.InvalidArgument, "It Must be mount access type")
}
fsType := req.VolumeContext[utils.KataContainersDirectFsType]
if len(fsType) == 0 {
fsType = utils.DefaultFsType
klog.Warningf("volume context has no fsType, set default fstype %v\n", fsType)
}
volType := req.VolumeContext[utils.KataContainersDirectVolumeType]
if len(volType) == 0 {
volType = "directvol"
klog.Warningf("volume context has no volumeType, set default volume type %v\n", volType)
}
readOnly := req.GetReadonly()
volumeID := req.GetVolumeId()
attrib := req.GetVolumeContext()
devicePath := dv.config.VolumeDevices[volumeID]
klog.Infof("target %v\nfstype %v\ndevice %v\nreadonly %v\nvolumeID %v\n",
targetPath, fsType, devicePath, readOnly, volumeID)
options := []string{"bind"}
if readOnly {
options = append(options, "ro")
} else {
options = append(options, "rw")
}
stagingTargetPath := req.GetStagingTargetPath()
if canDoMnt, err := utils.CanDoBindmount(dv.config.safeMounter, targetPath); err != nil {
return nil, err
} else if !canDoMnt {
klog.V(3).Infof("cannot do bindmount target path: %s", targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
if err := dv.config.safeMounter.DoBindmount(stagingTargetPath, targetPath, "", options); err != nil {
errMsg := fmt.Sprintf("failed to bindmount device: %s at %s: %s", stagingTargetPath, targetPath, err.Error())
klog.Infof("do bindmount failed: %v.", errMsg)
return nil, status.Error(codes.Aborted, errMsg)
}
// kata-containers DirectVolume add
mountInfo := utils.MountInfo{
VolumeType: volType,
Device: devicePath,
FsType: fsType,
Metadata: attrib,
Options: options,
}
if err := utils.AddDirectVolume(targetPath, mountInfo); err != nil {
klog.Errorf("add direct volume with source %s and mountInfo %v failed", targetPath, mountInfo)
return nil, err
}
klog.Infof("add direct volume successfully.")
volInStat, err := dv.state.GetVolumeByID(volumeID)
if err != nil {
capInt64, _ := strconv.ParseInt(req.VolumeContext[utils.CapabilityInBytes], 10, 64)
volName := req.VolumeContext[utils.DirectVolumeName]
kind := req.VolumeContext[storageKind]
vol, err := dv.createVolume(volumeID, volName, capInt64, kind)
if err != nil {
return nil, err
}
vol.NodeID = dv.config.NodeID
vol.Published.Add(targetPath)
klog.Infof("create volume %v successfully", vol)
return &csi.NodePublishVolumeResponse{}, nil
}
volInStat.NodeID = dv.config.NodeID
volInStat.Published.Add(targetPath)
if err := dv.state.UpdateVolume(volInStat); err != nil {
return nil, err
}
klog.Infof("directvolume: volume %s has been published.", targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (dv *directVolume) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
// Check arguments
if len(req.GetVolumeId()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if len(req.GetTargetPath()) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
targetPath := req.GetTargetPath()
volumeID := req.GetVolumeId()
// Lock before acting on global state. A production-quality
// driver might use more fine-grained locking.
dv.mutex.Lock()
defer dv.mutex.Unlock()
// Unmount only if the target path is really a mount point.
if isMnt, err := dv.config.safeMounter.IsMountPoint(targetPath); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("check target path: %v", err))
} else if isMnt {
// Unmounting the image or filesystem.
err = dv.config.safeMounter.Unmount(targetPath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("unmount target path: %v", err))
}
}
// Delete the mount point.
// Does not return error for non-existent path, repeated calls OK for idempotency.
if err := os.RemoveAll(targetPath); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("remove target path: %v", err))
}
if err := utils.RemoveDirectVolume(targetPath); err != nil {
klog.V(4).Infof("remove direct volume failed.")
return nil, status.Error(codes.Internal, fmt.Sprintf("remove direct volume failed: %v", err))
}
klog.Infof("direct volume %s has been cleaned up.", targetPath)
vol, err := dv.state.GetVolumeByID(volumeID)
if err != nil {
klog.Warningf("volume id %s not found in volume list, nothing to do.", volumeID)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
if !vol.Published.Has(targetPath) {
klog.V(4).Infof("volume %q is not published at %q, nothing to do.", volumeID, targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
vol.Published.Remove(targetPath)
if err := dv.state.UpdateVolume(vol); err != nil {
return nil, err
}
klog.Infof("volume %s has been unpublished.", targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func isDirectVolume(VolumeCtx map[string]string) bool {
return VolumeCtx[utils.IsDirectVolume] == "True"
}
func (dv *directVolume) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
klog.V(4).Infof("NodeStageVolumeRequest with request %v", req)
volumeID := req.GetVolumeId()
// Check arguments
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
stagingTargetPath := req.GetStagingTargetPath()
if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
if req.GetVolumeCapability() == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capability missing in request")
}
if !isDirectVolume(req.VolumeContext) {
return nil, status.Errorf(codes.FailedPrecondition, "volume %q is not direct-volume.", req.VolumeId)
}
dv.mutex.Lock()
defer dv.mutex.Unlock()
capacityInBytes := req.VolumeContext[utils.CapabilityInBytes]
devicePath, err := utils.CreateDirectBlockDevice(volumeID, capacityInBytes, dv.config.StoragePath)
if err != nil {
errMsg := status.Errorf(codes.Internal, "setup storage for volume '%s' failed", volumeID)
return &csi.NodeStageVolumeResponse{}, errMsg
}
// /full_path_on_host/VolumeId/
deviceUpperPath := filepath.Dir(*devicePath)
if canMnt, err := utils.CanDoBindmount(dv.config.safeMounter, stagingTargetPath); err != nil {
return nil, err
} else if !canMnt {
klog.Infof("staging target path: %s already mounted", stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
options := []string{"bind"}
if err := dv.config.safeMounter.DoBindmount(deviceUpperPath, stagingTargetPath, "", options); err != nil {
klog.Errorf("safe mounter: %v do bind mount %v failed, with error: %v", deviceUpperPath, stagingTargetPath, err.Error())
return nil, err
}
fsType, ok := req.VolumeContext[utils.KataContainersDirectFsType]
if !ok {
klog.Infof("fstype not set, default fstype will be set: %v\n", utils.DefaultFsType)
fsType = utils.DefaultFsType
}
if err := dv.config.safeMounter.SafeFormatWithFstype(*devicePath, fsType, options); err != nil {
return nil, err
}
dv.config.VolumeDevices[volumeID] = *devicePath
klog.Infof("directvolume: volume %s has been staged.", stagingTargetPath)
volInStat, err := dv.state.GetVolumeByID(req.VolumeId)
if err != nil {
capInt64, _ := strconv.ParseInt(req.VolumeContext[utils.CapabilityInBytes], 10, 64)
volName := req.VolumeContext[utils.DirectVolumeName]
kind := req.VolumeContext[storageKind]
vol, err := dv.createVolume(volumeID, volName, capInt64, kind)
if err != nil {
return nil, err
}
vol.Staged.Add(stagingTargetPath)
klog.Infof("create volume %v successfully", vol)
return &csi.NodeStageVolumeResponse{}, nil
}
if volInStat.Staged.Has(stagingTargetPath) {
klog.V(4).Infof("Volume %q is already staged at %q, nothing to do.", req.VolumeId, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
if !volInStat.Staged.Empty() {
return nil, status.Errorf(codes.FailedPrecondition, "volume %q is already staged at %v", req.VolumeId, volInStat.Staged)
}
volInStat.Staged.Add(stagingTargetPath)
if err := dv.state.UpdateVolume(volInStat); err != nil {
return nil, err
}
return &csi.NodeStageVolumeResponse{}, nil
}
func (dv *directVolume) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
// Check arguments
volumeID := req.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
stagingTargetPath := req.GetStagingTargetPath()
if stagingTargetPath == "" {
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
}
dv.mutex.Lock()
defer dv.mutex.Unlock()
// Unmount only if the target path is really a mount point.
if isMnt, err := dv.config.safeMounter.IsMountPoint(stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("check staging target path: %v", err))
} else if isMnt {
err = dv.config.safeMounter.Unmount(stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("unmount staging target path: %v", err))
}
}
if deviceUpperPath, err := utils.GetStoragePath(dv.config.StoragePath, volumeID); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("get device UpperPath %s failed: %v", deviceUpperPath, err))
} else {
if err = os.RemoveAll(deviceUpperPath); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("remove device upper path: %s failed %v", deviceUpperPath, err.Error()))
}
klog.Infof("direct volume %s has been removed.", deviceUpperPath)
}
if err := os.RemoveAll(stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("remove staging target path: %v", err))
}
klog.Infof("directvolume: volume %s has been unstaged.", stagingTargetPath)
vol, err := dv.state.GetVolumeByID(volumeID)
if err != nil {
klog.Warning("Volume not found: might have already deleted")
return &csi.NodeUnstageVolumeResponse{}, nil
}
if !vol.Staged.Has(stagingTargetPath) {
klog.V(4).Infof("Volume %q is not staged at %q, nothing to do.", volumeID, stagingTargetPath)
return &csi.NodeUnstageVolumeResponse{}, nil
}
if !vol.Published.Empty() {
return nil, status.Errorf(codes.Internal, "volume %q is still published at %q on node %q", vol.VolID, vol.Published, vol.NodeID)
}
vol.Staged.Remove(stagingTargetPath)
if err := dv.state.UpdateVolume(vol); err != nil {
return nil, err
}
return &csi.NodeUnstageVolumeResponse{}, nil
}
func (dv *directVolume) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
resp := &csi.NodeGetInfoResponse{
NodeId: dv.config.NodeID,
}
if dv.config.EnableTopology {
resp.AccessibleTopology = &csi.Topology{
Segments: map[string]string{TopologyKeyNode: dv.config.NodeID},
}
}
return resp, nil
}
func (dv *directVolume) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
caps := []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
}
return &csi.NodeGetCapabilitiesResponse{Capabilities: caps}, nil
}
func (dv *directVolume) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return &csi.NodeGetVolumeStatsResponse{}, nil
}
func (dv *directVolume) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return &csi.NodeExpandVolumeResponse{}, nil
}

View File

@ -0,0 +1,96 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package directvolume
import (
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/klog/v2"
endpoint "kata-containers/csi-kata-directvolume/internal"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
)
func NewNonBlockingGRPCServer() *nonBlockingGRPCServer {
return &nonBlockingGRPCServer{}
}
// NonBlocking server
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
cleanup func()
}
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
s.wg.Add(1)
go s.serve(endpoint, ids, cs, ns)
}
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
s.cleanup()
}
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
s.cleanup()
}
func (s *nonBlockingGRPCServer) serve(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
listener, cleanup, err := endpoint.Listen(ep)
if err != nil {
klog.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
server := grpc.NewServer(opts...)
s.server = server
s.cleanup = cleanup
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
klog.Infof("Listening for connections on address: %#v", listener.Addr())
if err := server.Serve(listener); err != nil {
klog.Fatalf("Failed to server: %v", err)
}
}
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
glog.V(3).Infof("GRPC call: %s", info.FullMethod)
glog.V(5).Infof("GRPC request: %+v", protosanitizer.StripSecrets(req))
resp, err := handler(ctx, req)
if err != nil {
glog.Errorf("GRPC error: %v", err)
} else {
glog.V(5).Infof("GRPC response: %+v", protosanitizer.StripSecrets(resp))
}
return resp, err
}

View File

@ -0,0 +1,161 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
// Package state manages the internal state of the driver which needs to be maintained
// across driver restarts.
package state
import (
"encoding/json"
"errors"
"os"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type Volume struct {
VolName string
VolID string
VolSize int64
VolPath string
// VolAccessType AccessType
ParentVolID string
ParentSnapID string
NodeID string
Kind string
ReadOnlyAttach bool
Attached bool
// Staged contains the staging target path at which the volume
// was staged. A set of paths is used for consistency
// with Published.
Staged Strings
// Published contains the target paths where the volume
// was published.
Published Strings
}
// State is the interface that the rest of the code has to use to
// access and change state. All error messages contain gRPC
// status codes and can be returned without wrapping.
type State interface {
// GetVolumeByID retrieves a volume by its unique ID or returns
// an error including that ID when not found.
GetVolumeByID(volID string) (Volume, error)
// GetVolumeByName retrieves a volume by its name or returns
// an error including that name when not found.
GetVolumeByName(volName string) (Volume, error)
// GetVolumes returns all currently existing volumes.
GetVolumes() []Volume
// UpdateVolume updates the existing direct volume,
// identified by its volume ID, or adds it if it does
// not exist yet.
UpdateVolume(volume Volume) error
// DeleteVolume deletes the volume with the given
// volume ID. It is not an error when such a volume
// does not exist.
DeleteVolume(volID string) error
}
type resources struct {
Volumes []Volume
}
type state struct {
resources
statefilePath string
}
var _ State = &state{}
// New retrieves the complete state of the driver from the file if given
// and then ensures that all changes are mirrored immediately in the
// given file. If not given, the initial state is empty and changes
// are not saved.
func New(statefilePath string) (State, error) {
s := &state{
statefilePath: statefilePath,
}
return s, s.restore()
}
func (s *state) dump() error {
data, err := json.Marshal(&s.resources)
if err != nil {
return status.Errorf(codes.Internal, "error encoding volumes: %v", err)
}
if err := os.WriteFile(s.statefilePath, data, 0600); err != nil {
return status.Errorf(codes.Internal, "error writing state file: %v", err)
}
return nil
}
func (s *state) restore() error {
s.Volumes = nil
data, err := os.ReadFile(s.statefilePath)
switch {
case errors.Is(err, os.ErrNotExist):
// Nothing to do.
return nil
case err != nil:
return status.Errorf(codes.Internal, "error reading state file: %v", err)
}
if err := json.Unmarshal(data, &s.resources); err != nil {
return status.Errorf(codes.Internal, "error encoding volumes from state file %q: %v", s.statefilePath, err)
}
return nil
}
func (s *state) GetVolumeByID(volID string) (Volume, error) {
for _, volume := range s.Volumes {
if volume.VolID == volID {
return volume, nil
}
}
return Volume{}, status.Errorf(codes.NotFound, "volume id %s does not exist in the volumes list", volID)
}
func (s *state) GetVolumeByName(volName string) (Volume, error) {
for _, volume := range s.Volumes {
if volume.VolName == volName {
return volume, nil
}
}
return Volume{}, status.Errorf(codes.NotFound, "volume name %s does not exist in the volumes list", volName)
}
func (s *state) GetVolumes() []Volume {
volumes := make([]Volume, len(s.Volumes))
copy(volumes, s.Volumes)
return volumes
}
func (s *state) UpdateVolume(update Volume) error {
for i, volume := range s.Volumes {
if volume.VolID == update.VolID {
s.Volumes[i] = update
return s.dump()
}
}
s.Volumes = append(s.Volumes, update)
return s.dump()
}
func (s *state) DeleteVolume(volID string) error {
for i, volume := range s.Volumes {
if volume.VolID == volID {
s.Volumes = append(s.Volumes[:i], s.Volumes[i+1:]...)
return s.dump()
}
}
return nil
}

View File

@ -0,0 +1,48 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package state
import (
"path"
"testing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestVolumes(t *testing.T) {
tmp := t.TempDir()
statefileName := path.Join(tmp, "state.json")
s, err := New(statefileName)
require.NoError(t, err, "construct state")
require.Empty(t, s.GetVolumes(), "initial volumes")
_, err = s.GetVolumeByID("foo")
require.Equal(t, codes.NotFound, status.Convert(err).Code(), "GetVolumeByID of non-existent volume")
require.Contains(t, status.Convert(err).Message(), "foo")
err = s.UpdateVolume(Volume{VolID: "foo", VolName: "bar"})
require.NoError(t, err, "add volume")
s, err = New(statefileName)
require.NoError(t, err, "reconstruct state")
_, err = s.GetVolumeByID("foo")
require.NoError(t, err, "get existing volume by ID")
_, err = s.GetVolumeByName("bar")
require.NoError(t, err, "get existing volume by name")
err = s.DeleteVolume("foo")
require.NoError(t, err, "delete existing volume")
err = s.DeleteVolume("foo")
require.NoError(t, err, "delete non-existent volume")
require.Empty(t, s.GetVolumes(), "final volumes")
}

View File

@ -0,0 +1,42 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package state
// Strings is an ordered set of strings with helper functions for
// adding, searching and removing entries.
type Strings []string
// Add appends at the end.
func (s *Strings) Add(str string) {
*s = append(*s, str)
}
// Has checks whether the string is already present.
func (s *Strings) Has(str string) bool {
for _, str2 := range *s {
if str == str2 {
return true
}
}
return false
}
// Empty returns true if the list is empty.
func (s *Strings) Empty() bool {
return len(*s) == 0
}
// Remove removes the first matched target of the string, if present.
func (s *Strings) Remove(str string) {
for i, str2 := range *s {
if str == str2 {
*s = append((*s)[:i], (*s)[i+1:]...)
return
}
}
}

View File

@ -0,0 +1,62 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package utils
import (
"errors"
"flag"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/api/resource"
)
// Capacity simulates linear storage of certain types ("fast",
// "slow"). To calculate the amount of allocated space, the size of
// all currently existing volumes of the same kind is summed up.
//
// Available capacity is configurable with a command line flag
// -capacity <type>=<size> where <type> is a string and <size>
// is a quantity (1T, 1Gi). More than one of those
// flags can be used.
//
// The underlying map will be initialized if needed by Set,
// which makes it possible to define and use a Capacity instance
// without explicit initialization (`var capacity Capacity` or as
// member in a struct).
type Capacity map[string]resource.Quantity
// Set is an implementation of flag.Value.Set.
func (c *Capacity) Set(arg string) error {
parts := strings.SplitN(arg, "=", 2)
if len(parts) != 2 {
return errors.New("must be of format <type>=<size>")
}
quantity, err := resource.ParseQuantity(parts[1])
if err != nil {
return err
}
// We overwrite any previous value.
if *c == nil {
*c = Capacity{}
}
(*c)[parts[0]] = quantity
return nil
}
func (c *Capacity) String() string {
return fmt.Sprintf("%v", map[string]resource.Quantity(*c))
}
var _ flag.Value = &Capacity{}
// Enabled returns true if capacities are configured.
func (c *Capacity) Enabled() bool {
return len(*c) > 0
}

View File

@ -0,0 +1,65 @@
// Copyright (c) 2022 Databricks Inc.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
package utils
import (
b64 "encoding/base64"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
mountInfoFileName = "mountInfo.json"
kataDirectVolumeRootPath = "/run/kata-containers/shared/direct-volumes"
)
// MountInfo contains the information needed by Kata to consume a host block device and mount it as a filesystem inside the guest VM.
type MountInfo struct {
// The type of the volume (ie. block)
VolumeType string `json:"volume-type"`
// The device backing the volume.
Device string `json:"device"`
// The filesystem type to be mounted on the volume.
FsType string `json:"fstype"`
// Additional metadata to pass to the agent regarding this volume.
Metadata map[string]string `json:"metadata,omitempty"`
// Additional mount options.
Options []string `json:"options,omitempty"`
}
// Add writes the mount info of a direct volume into a filesystem path known to Kata Container.
func Add(volumePath string, mountInfo string) error {
volumeDir := filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath)))
stat, err := os.Stat(volumeDir)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return err
}
if err := os.MkdirAll(volumeDir, 0700); err != nil {
return err
}
}
if stat != nil && !stat.IsDir() {
return status.Error(codes.Unknown, fmt.Sprintf("%s should be a directory", volumeDir))
}
var deserialized MountInfo
if err := json.Unmarshal([]byte(mountInfo), &deserialized); err != nil {
return err
}
return os.WriteFile(filepath.Join(volumeDir, mountInfoFileName), []byte(mountInfo), 0600)
}
// Remove deletes the direct volume path including all the files inside it.
func Remove(volumePath string) error {
return os.RemoveAll(filepath.Join(kataDirectVolumeRootPath, b64.URLEncoding.EncodeToString([]byte(volumePath))))
}

View File

@ -0,0 +1,132 @@
//
// Copyright 2017 The Kubernetes Authors.
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package utils
import (
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
mountutils "k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
)
const (
// 'fsck' found errors and corrected them
fsckErrorsCorrected = 1
// 'fsck' found errors but exited without correcting them
fsckErrorsUncorrected = 4
)
type SafeMountFormater struct {
*mountutils.SafeFormatAndMount
}
func NewSafeMountFormater() SafeMountFormater {
return SafeMountFormater{
&mountutils.SafeFormatAndMount{
Interface: mountutils.New(""),
Exec: utilexec.New(),
},
}
}
func (mounter *SafeMountFormater) IsNotSafeMountPoint(filePath string) (bool, error) {
isMnt, err := mounter.IsMountPoint(filePath)
if err != nil {
return true, err
}
return !isMnt, nil
}
func (mounter *SafeMountFormater) DoBindmount(sourcePath, targetPath, fsType string, options []string) error {
if err := mounter.Mount(sourcePath, targetPath, fsType, options); err != nil {
errMsg := fmt.Sprintf("failed to mount device: %s at %s: %s", sourcePath, targetPath, err)
return status.Error(codes.Aborted, errMsg)
}
return nil
}
// SafeFormatWithFstype uses unix utils to format disk
func (mounter *SafeMountFormater) SafeFormatWithFstype(source string, fstype string, options []string) error {
readOnly := false
for _, option := range options {
if option == "ro" {
readOnly = true
break
}
}
// Check if the disk is already formatted
existingFormat, err := mounter.GetDiskFormat(source)
if err != nil {
return mountutils.NewMountError(mountutils.GetDiskFormatFailed, "failed to get disk format of disk %s: %v", source, err)
}
// Use 'ext4' as the default
if len(fstype) == 0 {
fstype = DefaultFsType
}
if existingFormat == "" {
// Do not attempt to format the disk if mounting as readonly, return an error to reflect this.
if readOnly {
return mountutils.NewMountError(mountutils.UnformattedReadOnly, "cannot mount unformatted disk %s as it is in read-only mode", source)
}
// Disk is unformatted so format it.
args := []string{source}
if fstype == "ext4" || fstype == "ext3" {
args = []string{
"-F", // Force flag
"-m0", // Zero blocks reserved for super-user
source,
}
}
klog.Infof("Disk %q is unformatted, do format with type: %q and options: %v", source, fstype, args)
mkfsCmd := fmt.Sprintf("mkfs.%s", fstype)
if output, err := doSafeCommand(mkfsCmd, args...); err != nil {
detailedErr := fmt.Sprintf("format disk %q failed: type:(%q) errcode:(%v) output:(%v) ", source, fstype, err, string(output))
klog.Error(detailedErr)
return mountutils.NewMountError(mountutils.FormatFailed, detailedErr)
}
klog.Infof("Disk successfully formatted (mkfs): %s - %s", fstype, source)
} else {
if fstype != existingFormat {
// Do verify the disk formatted with expected fs type.
return mountutils.NewMountError(mountutils.FilesystemMismatch, err.Error())
}
if !readOnly {
// Run check tools on the disk to fix repairable issues, only do this for formatted volumes requested as rw.
klog.V(4).Infof("Checking for issues with fsck on disk: %s", source)
args := []string{"-a", source}
if output, err := doSafeCommand("fsck", args...); err != nil {
ee, isExitError := err.(utilexec.ExitError)
switch {
case err == utilexec.ErrExecutableNotFound:
klog.Warningf("'fsck' not found on system; continuing mount without running 'fsck'.")
case isExitError && ee.ExitStatus() == fsckErrorsCorrected:
klog.Infof("Device %s has errors which were corrected by fsck.", source)
case isExitError && ee.ExitStatus() == fsckErrorsUncorrected:
return mountutils.NewMountError(mountutils.HasFilesystemErrors, "'fsck' found errors on device %s but could not correct them: %s", source, string(output))
case isExitError && ee.ExitStatus() > fsckErrorsUncorrected:
klog.Infof("`fsck` failed with error %v", string(output))
default:
klog.Warningf("fsck on device %s failed with error %v", source, err.Error())
}
}
}
}
return nil
}

View File

@ -0,0 +1,228 @@
//
// Copyright (c) 2023 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
package utils
import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
diskfs "github.com/diskfs/go-diskfs"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"
utilexec "k8s.io/utils/exec"
)
const (
KataContainersDirectVolumeType = "katacontainers.direct.volume/volumetype"
KataContainersDirectFsType = "katacontainers.direct.volume/fstype"
DirectVolumeTypeName = "directvol"
IsDirectVolume = "is_directvolume"
)
const (
CapabilityInBytes = "capacity_in_bytes"
DirectVolumeName = "direct_volume_name"
)
const PERM os.FileMode = 0750
const DefaultFsType = "ext4"
const (
KiB int64 = 1024
MiB int64 = KiB * 1024
GiB int64 = MiB * 1024
GiB100 int64 = GiB * 100
TiB int64 = GiB * 1024
TiB100 int64 = TiB * 100
)
func AddDirectVolume(targetPath string, mountInfo MountInfo) error {
mntArg, err := json.Marshal(&mountInfo)
if err != nil {
klog.Errorf("marshal mount info into bytes failed with error: %+v", err)
return err
}
return Add(targetPath, string(mntArg))
}
func RemoveDirectVolume(targetPath string) error {
return Remove(targetPath)
}
// storagePath/VolID/directvol.rawdisk
func SetupStoragePath(storagePath, volID string) (*string, error) {
upperDir := filepath.Join(storagePath, volID)
return MkPathIfNotExit(upperDir)
}
func MkPathIfNotExit(path string) (*string, error) {
if exist, err := CheckPathExist(path); err != nil {
return nil, errors.New("stat path failed")
} else if !exist {
if err := os.MkdirAll(path, PERM); err != nil {
return nil, errors.New("mkdir all failed.")
}
klog.Infof("mkdir full path successfully")
}
return &path, nil
}
func MakeFullPath(path string) error {
stat, err := os.Stat(path)
if err != nil {
if !errors.Is(err, os.ErrNotExist) {
return errors.New("stat path failed with not exist")
}
if err := os.MkdirAll(path, PERM); err != nil {
return errors.New("mkdir all failed.")
}
}
if stat != nil && !stat.IsDir() {
return errors.New("path should be a directory")
}
return nil
}
// IsPathEmpty is a simple check to determine if the specified directvolume directory
// is empty or not.
func IsPathEmpty(path string) (bool, error) {
f, err := os.Open(path)
if err != nil {
return true, err
}
defer f.Close()
_, err = f.Readdir(1)
if err == io.EOF {
return true, nil
}
return false, err
}
func CheckPathExist(path string) (bool, error) {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false, nil
} else {
return false, err
}
}
return true, nil
}
func CanDoBindmount(mounter *SafeMountFormater, targetPath string) (bool, error) {
notMnt, err := mounter.IsNotSafeMountPoint(targetPath)
if err != nil {
if _, err = MkPathIfNotExit(targetPath); err != nil {
return false, err
} else {
notMnt = true
}
}
return notMnt, nil
}
func doSafeCommand(rawCmd string, args ...string) ([]byte, error) {
executor := utilexec.New()
path, err := executor.LookPath(rawCmd)
if err == exec.ErrNotFound {
return []byte{}, status.Error(codes.Internal, fmt.Sprintf("%s executable File not found in $PATH", rawCmd))
}
absCmdPath, err := filepath.Abs(path)
if err != nil {
return []byte{}, err
}
out, err := executor.Command(absCmdPath, args...).CombinedOutput()
if err != nil {
detailedErr := fmt.Sprintf("exec command %v failed with errcode:(%v)", rawCmd, err)
klog.Errorf("do command: %v failed with %v", absCmdPath, detailedErr)
return out, status.Error(codes.Internal, detailedErr)
}
return out, nil
}
// storagePath/VolID/directvol.rawdisk
func GetStoragePath(storagePath, volID string) (string, error) {
upperPath := filepath.Join(storagePath, volID)
return upperPath, nil
}
// createVolume create the directory for the direct volume.
// It returns the volume path or err if one occurs.
func CreateDirectBlockDevice(volID, capacityInBytesStr, storagePath string) (*string, error) {
capacityInBytes, err := strconv.ParseInt(capacityInBytesStr, 10, 64)
if err != nil {
errMsg := status.Error(codes.Internal, err.Error())
klog.Errorf("capacity in bytes convert to int failed with error: %v", errMsg)
return nil, errMsg
}
diskSize := fmt.Sprintf("%dM", capacityInBytes/MiB)
upperDir, err := SetupStoragePath(storagePath, volID)
if err != nil {
klog.Errorf("setup storage path failed with error: %v", err)
return nil, err
} else {
// check the upper path for device exists.
if _, err = os.Stat(*upperDir); err != nil && os.IsNotExist(err) {
return nil, err
}
}
// storagePath/62a268d9-893a-11ee-97cb-d89d6725e7b0/directvol-rawdisk.2048M
devicePath := filepath.Join(*upperDir, fmt.Sprintf("directvol-rawdisk.%s", diskSize))
if _, err = os.Stat(devicePath); !os.IsNotExist(err) {
klog.Warning("direct block device exists, just skip creating it.")
return &devicePath, nil
}
// create raw disk
if _, err = diskfs.Create(devicePath, capacityInBytes, diskfs.Raw, diskfs.SectorSizeDefault); err != nil {
errMsg := fmt.Errorf("diskfs create disk failed: %v", err)
klog.Errorf(errMsg.Error())
return nil, errMsg
}
// Create a block file.
// storagePath/62a268d9-893a-11ee-97cb-d89d6725e7b0/directvol-rawdisk.2048M
if _, err = os.Stat(devicePath); err != nil {
return nil, err
}
// fallocate -z -l diskSize filePath
fallocateCmd := "fallocate"
// TODO: "-z" to be added
args := []string{"-l", diskSize, devicePath}
if _, err := doSafeCommand(fallocateCmd, args...); err != nil {
klog.Infof("do fallocate %v failed with error(%v)", args, err)
return nil, err
}
klog.Infof("create backend rawdisk successfully!")
return &devicePath, nil
}

View File

@ -0,0 +1,122 @@
# Copyright 2019 The Kubernetes Authors.
#
# SPDX-License-Identifier: Apache-2.0
#
.PHONY: build-% build container-% container push-% push clean test
# A space-separated list of all commands in the repository, must be
# set in main Makefile of a repository.
# CMDS=
# This is the default. It can be overridden in the main Makefile after
# including build.make.
REGISTRY_NAME=quay.io/k8scsi
# Can be set to -mod=vendor to ensure that the "vendor" directory is used.
GOFLAGS_VENDOR=
# Revision that gets built into each binary via the main.version
# string. Uses the `git describe` output based on the most recent
# version tag with a short revision suffix or, if nothing has been
# tagged yet, just the revision.
#
# Beware that tags may also be missing in shallow clones as done by
# some CI systems (like TravisCI, which pulls only 50 commits).
REV=$(shell git describe --long --tags --match='v*' --dirty 2>/dev/null || git rev-list -n1 HEAD)
# A space-separated list of image tags under which the current build is to be pushed.
# Determined dynamically.
IMAGE_TAGS=
# A "canary" image gets built if the current commit is the head of the remote "master" branch.
# That branch does not exist when building some other branch in TravisCI.
IMAGE_TAGS+=$(shell if [ "$$(git rev-list -n1 HEAD)" = "$$(git rev-list -n1 origin/master 2>/dev/null)" ]; then echo "canary"; fi)
# A "X.Y.Z-canary" image gets built if the current commit is the head of a "origin/release-X.Y.Z" branch.
# The actual suffix does not matter, only the "release-" prefix is checked.
IMAGE_TAGS+=$(shell git branch -r --points-at=HEAD | grep 'origin/release-' | grep -v -e ' -> ' | sed -e 's;.*/release-\(.*\);\1-canary;')
# A release image "vX.Y.Z" gets built if there is a tag of that format for the current commit.
# --abbrev=0 suppresses long format, only showing the closest tag.
IMAGE_TAGS+=$(shell tagged="$$(git describe --tags --match='v*' --abbrev=0)"; if [ "$$tagged" ] && [ "$$(git rev-list -n1 HEAD)" = "$$(git rev-list -n1 $$tagged)" ]; then echo $$tagged; fi)
# Images are named after the command contained in them.
IMAGE_NAME=$(REGISTRY_NAME)/$*
ifdef V
# Adding "-alsologtostderr" assumes that all test binaries contain glog. This is not guaranteed.
TESTARGS = -v -args -alsologtostderr -v 5
else
TESTARGS =
endif
ARCH := $(if $(GOARCH),$(GOARCH),$(shell go env GOARCH))
# Specific packages can be excluded from each of the tests below by setting the *_FILTER_CMD variables
# to something like "| grep -v 'github.com/kubernetes-csi/project/pkg/foobar'". See usage below.
build-%: check-go-version-go
mkdir -p bin
CGO_ENABLED=0 GOOS=linux go build $(GOFLAGS_VENDOR) -a -ldflags '-X main.version=$(REV) -extldflags "-static"' -o ./bin/$* ./cmd/$*
if [ "$$ARCH" = "amd64" ]; then \
CGO_ENABLED=0 GOOS=windows go build $(GOFLAGS_VENDOR) -a -ldflags '-X main.version=$(REV) -extldflags "-static"' -o ./bin/$*.exe ./cmd/$* ; \
CGO_ENABLED=0 GOOS=linux GOARCH=ppc64le go build $(GOFLAGS_VENDOR) -a -ldflags '-X main.version=$(REV) -extldflags "-static"' -o ./bin/$*-ppc64le ./cmd/$* ; \
fi
container-%: build-%
docker build -t $*:latest -f $(shell if [ -e ./cmd/$*/Dockerfile ]; then echo ./cmd/$*/Dockerfile; else echo Dockerfile; fi) --label revision=$(REV) .
push-%: container-%
set -ex; \
push_image () { \
docker tag $*:latest $(IMAGE_NAME):$$tag; \
docker push $(IMAGE_NAME):$$tag; \
}; \
for tag in $(IMAGE_TAGS); do \
if [ "$$tag" = "canary" ] || echo "$$tag" | grep -q -e '-canary$$'; then \
: "creating or overwriting canary image"; \
push_image; \
elif docker pull $(IMAGE_NAME):$$tag 2>&1 | tee /dev/stderr | grep -q "manifest for $(IMAGE_NAME):$$tag not found"; then \
: "creating release image"; \
push_image; \
else \
: "release image $(IMAGE_NAME):$$tag already exists, skipping push"; \
fi; \
done
build: $(CMDS:%=build-%)
container: $(CMDS:%=container-%)
push: $(CMDS:%=push-%)
clean:
-rm -rf bin
test: check-go-version-go
.PHONY: test-vet
test: test-vet
test-vet:
@ echo; echo "### $@:"
go test $(GOFLAGS_VENDOR) `go list $(GOFLAGS_VENDOR) ./... | grep -v vendor $(TEST_VET_FILTER_CMD)`
.PHONY: test-fmt
test: test-fmt
test-fmt:
@ echo; echo "### $@:"
files=$$(find . -name '*.go' | grep -v './vendor' $(TEST_FMT_FILTER_CMD)); \
if [ $$(gofmt -d $$files | wc -l) -ne 0 ]; then \
echo "formatting errors:"; \
gofmt -d $$files; \
false; \
fi
# Targets in the makefile can depend on check-go-version-<path to go binary>
# to trigger a warning if the x.y version of that binary does not match
# what the project uses. Make ensures that this is only checked once per
# invocation.
.PHONY: check-go-version-%
check-go-version-%:
./release-tools/verify-go-version.sh "$*"

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
#
# Copyright 2019 The Kubernetes Authors.
#
# SPDX-License-Identifier: Apache-2.0
#
GO="$1"
if [ ! "$GO" ]; then
echo >&2 "usage: $0 <path to go binary>"
exit 1
fi
die () {
echo "ERROR: $*"
exit 1
}
version=$("$GO" version) || die "determining version of $GO failed"
# shellcheck disable=SC2001
majorminor=$(echo "$version" | sed -e 's/.*go\([0-9]*\)\.\([0-9]*\).*/\1.\2/')
if [ "$majorminor" != "$expected" ]; then
cat >&2 <<EOF
======================================================
WARNING
Compile the Project with Go version v$majorminor !
======================================================
EOF
fi