adding custom TPR network support in pod

This commit is contained in:
Ramakrishnan, Kuralamudhan 2017-06-29 17:19:09 +01:00
parent f523946d0b
commit 62dd63b06a

View File

@ -19,15 +19,23 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"github.com/containernetworking/cni/pkg/invoke"
"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/version"
)
const defaultCNIDir = "/var/lib/cni/multus"
@ -36,8 +44,29 @@ var masterpluginEnabled bool
type NetConf struct {
types.NetConf
CNIDir string `json:"cniDir"`
Delegates []map[string]interface{} `json:"delegates"`
CNIDir string `json:"cniDir"`
Delegates []map[string]interface{} `json:"delegates"`
Kubeconfig string `json:"kubeconfig"`
}
type PodNet struct {
Networkname string `json:"name"`
}
type netplugin struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" description:"standard object metadata"`
Plugin string `json:"plugin"`
Args string `json:"args"`
}
// K8sArgs is the valid CNI_ARGS used for Kubernetes
type K8sArgs struct {
types.CommonArgs
IP net.IP
K8S_POD_NAME types.UnmarshallableString
K8S_POD_NAMESPACE types.UnmarshallableString
K8S_POD_INFRA_CONTAINER_ID types.UnmarshallableString
}
//taken from cni/plugins/meta/flannel/flannel.go
@ -57,6 +86,10 @@ func loadNetConf(bytes []byte) (*NetConf, error) {
return nil, fmt.Errorf("failed to load netconf: %v", err)
}
if netconf.Kubeconfig != "" {
return netconf, nil
}
if netconf.Delegates == nil {
return nil, fmt.Errorf(`"delegates" is must, refer README.md`)
}
@ -234,6 +267,181 @@ func clearPlugins(mIdx int, pIdx int, argIfname string, delegates []map[string]i
return nil
}
func createK8sClient(kubeconfig string) (*kubernetes.Clientset, error) {
// uses the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf("createK8sClient: failed to get context for the kubeconfig %v, refer Multus README.md for the usage guide", kubeconfig)
}
// creates the clientset
return kubernetes.NewForConfig(config)
}
func getPodNetworkAnnotation(client *kubernetes.Clientset, k8sArgs K8sArgs) (string, error) {
var annot string
var err error
pod, err := client.Pods(string(k8sArgs.K8S_POD_NAMESPACE)).Get(fmt.Sprintf("%s", string(k8sArgs.K8S_POD_NAME)), metav1.GetOptions{})
if err != nil {
return annot, fmt.Errorf("getPodNetworkAnnotation: failed to query the pod %v in out of cluster comm", string(k8sArgs.K8S_POD_NAME))
}
return pod.Annotations["networks"], nil
}
func parsePodNetworkObject(podnetwork string) ([]map[string]interface{}, error) {
var podNet []map[string]interface{}
if podnetwork == "" {
return nil, fmt.Errorf("parsePodNetworkObject: pod annotation not having \"network\" as key, refer Multus README.md for the usage guide")
}
if err := json.Unmarshal([]byte(podnetwork), &podNet); err != nil {
return nil, fmt.Errorf("parsePodNetworkObject: failed to load pod network err: %v | pod network: %v", err, podnetwork)
}
return podNet, nil
}
func getpluginargs(name string, args string, primary bool) (string, error) {
var netconf string
var tmpargs []string
if name == "" || args == "" {
return "", fmt.Errorf("getpluginargs: plugin name/args can't be empty")
}
if primary != false {
tmpargs = []string{`{"type": "`, name, `","masterplugin": true,`, args[strings.Index(args, "\"") : len(args)-1]}
} else {
tmpargs = []string{`{"type": "`, name, `",`, args[strings.Index(args, "\"") : len(args)-1]}
}
var str bytes.Buffer
for _, a := range tmpargs {
str.WriteString(a)
}
netconf = str.String()
return netconf, nil
}
func getnetplugin(client *kubernetes.Clientset, networkname string, primary bool) (string, error) {
if networkname == "" {
return "", fmt.Errorf("getnetplugin: network name can't be empty")
}
tprclient := fmt.Sprintf("/apis/kubernetes.com/v1/namespaces/default/networks/%s", networkname)
netobjdata, err := client.ExtensionsV1beta1().RESTClient().Get().AbsPath(tprclient).DoRaw()
if err != nil {
return "", fmt.Errorf("getnetplugin: failed to get TRP, refer Multus README.md for the usage guide: %v", err)
}
np := netplugin{}
if err := json.Unmarshal(netobjdata, &np); err != nil {
return "", fmt.Errorf("getnetplugin: failed to get the netplugin data: %v", err)
}
netargs, err := getpluginargs(np.Plugin, np.Args, primary)
if err != nil {
return "", err
}
return netargs, nil
}
func getPodNetworkObj(client *kubernetes.Clientset, netObjs []map[string]interface{}) (string, error) {
var np string
var err error
var str bytes.Buffer
str.WriteString("[")
for index, net := range netObjs {
var primary bool
if index == 0 {
primary = true
}
np, err = getnetplugin(client, net["name"].(string), primary)
if err != nil {
return "", fmt.Errorf("getPodNetworkObj: failed in getting the netplugin: %v", err)
}
str.WriteString(np)
if index != (len(netObjs) - 1) {
str.WriteString(",")
}
}
str.WriteString("]")
netconf := str.String()
return netconf, nil
}
func getMultusDelegates(delegate string) ([]map[string]interface{}, error) {
tmpNetconf := &NetConf{}
tmpDelegate := "{\"delegates\": " + delegate + "}"
if delegate == "" {
return nil, fmt.Errorf("getMultusDelegates: TPR network obj data can't be empty")
}
if err := json.Unmarshal([]byte(tmpDelegate), tmpNetconf); err != nil {
return nil, fmt.Errorf("getMultusDelegates: failed to load netconf: %v", err)
}
if tmpNetconf.Delegates == nil {
return nil, fmt.Errorf(`getMultusDelegates: "delegates" is must, refer Multus README.md for the usage guide`)
}
return tmpNetconf.Delegates, nil
}
func getK8sNetwork(args *skel.CmdArgs, kubeconfig string) ([]map[string]interface{}, error) {
k8sArgs := K8sArgs{}
var podNet []map[string]interface{}
err := types.LoadArgs(args.Args, &k8sArgs)
if err != nil {
return podNet, err
}
k8sclient, err := createK8sClient(kubeconfig)
if err != nil {
return podNet, err
}
netAnnot, err := getPodNetworkAnnotation(k8sclient, k8sArgs)
if err != nil {
return podNet, err
}
netObjs, err := parsePodNetworkObject(netAnnot)
if err != nil {
return podNet, err
}
multusDelegates, err := getPodNetworkObj(k8sclient, netObjs)
if err != nil {
return podNet, err
}
podNet, err = getMultusDelegates(multusDelegates)
if err != nil {
return podNet, err
}
return podNet, nil
}
func cmdAdd(args *skel.CmdArgs) error {
var result error
n, err := loadNetConf(args.StdinData)
@ -241,14 +449,25 @@ func cmdAdd(args *skel.CmdArgs) error {
return err
}
if n.Kubeconfig != "" {
podDelegate, r := getK8sNetwork(args, n.Kubeconfig)
if r != nil {
return fmt.Errorf("Multus: Err in getting k8s network from pod: %v", r)
}
n.Delegates = podDelegate
}
for _, delegate := range n.Delegates {
if err := checkDelegate(delegate); err != nil {
return fmt.Errorf("Multus: Err in delegate conf: %v", err)
}
}
if err := saveDelegates(args.ContainerID, n.CNIDir, n.Delegates); err != nil {
return fmt.Errorf("Multus: Err in saving the delegates: %v", err)
if n.Kubeconfig == "" {
if err := saveDelegates(args.ContainerID, n.CNIDir, n.Delegates); err != nil {
return fmt.Errorf("Multus: Err in saving the delegates: %v", err)
}
}
podifName := getifname()
@ -281,19 +500,30 @@ func cmdAdd(args *skel.CmdArgs) error {
func cmdDel(args *skel.CmdArgs) error {
var result error
var Delegates []map[string]interface{}
in, err := loadNetConf(args.StdinData)
if err != nil {
return err
}
netconfBytes, err := consumeScratchNetConf(args.ContainerID, in.CNIDir)
if err != nil {
return fmt.Errorf("Multus: Err in reading the delegates: %v", err)
}
if in.Kubeconfig != "" {
podDelegate, r := getK8sNetwork(args, in.Kubeconfig)
if r != nil {
return r
}
var Delegates []map[string]interface{}
if err := json.Unmarshal(netconfBytes, &Delegates); err != nil {
return fmt.Errorf("Multus: failed to load netconf: %v", err)
Delegates = podDelegate
} else {
netconfBytes, err := consumeScratchNetConf(args.ContainerID, in.CNIDir)
if err != nil {
return fmt.Errorf("Multus: Err in reading the delegates: %v", err)
}
if err := json.Unmarshal(netconfBytes, &Delegates); err != nil {
return fmt.Errorf("Multus: failed to load netconf: %v", err)
}
}
podifName := getifname()
@ -309,5 +539,5 @@ func cmdDel(args *skel.CmdArgs) error {
}
func main() {
skel.PluginMain(cmdAdd, cmdDel)
skel.PluginMain(cmdAdd, cmdDel, version.All)
}