adding network status features

This commit is contained in:
rkamudhan
2018-07-30 11:59:13 +01:00
committed by Kuralamudhan Ramakrishnan
parent 583cbe476a
commit 8bb09518df
6 changed files with 213 additions and 59 deletions

View File

@@ -21,11 +21,12 @@ import (
"regexp"
"strings"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
"github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/skel"
@@ -38,6 +39,12 @@ type NoK8sNetworkError struct {
message string
}
type clientInfo struct {
Client KubeClient
Podnamespace string
Podname string
}
func (e *NoK8sNetworkError) Error() string { return string(e.message) }
type defaultKubeClient struct {
@@ -55,6 +62,71 @@ func (d *defaultKubeClient) GetPod(namespace, name string) (*v1.Pod, error) {
return d.client.Core().Pods(namespace).Get(name, metav1.GetOptions{})
}
func (d *defaultKubeClient) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
return d.client.Core().Pods(pod.Namespace).UpdateStatus(pod)
}
func setKubeClientInfo(c *clientInfo, client KubeClient, k8sArgs *types.K8sArgs) {
c.Client = client
c.Podnamespace = string(k8sArgs.K8S_POD_NAMESPACE)
c.Podname = string(k8sArgs.K8S_POD_NAME)
}
func SetNetworkStatus(k *clientInfo, netStatus []*types.NetworkStatus) error {
pod, err := k.Client.GetPod(k.Podnamespace, k.Podname)
if err != nil {
return fmt.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", k.Podname, err)
}
var ns string
if netStatus != nil {
var networkStatus []string
for _, nets := range netStatus {
data, err := json.MarshalIndent(nets, "", " ")
if err != nil {
return fmt.Errorf("SetNetworkStatus: error with Marshal Indent: %v", err)
}
networkStatus = append(networkStatus, string(data))
}
ns = fmt.Sprintf("[%s]", strings.Join(networkStatus, ","))
}
_, err = setPodNetworkAnnotation(k.Client, k.Podnamespace, pod, ns)
if err != nil {
return fmt.Errorf("SetNetworkStatus: failed to update the pod %v in out of cluster comm: %v", k.Podname, err)
}
return nil
}
func setPodNetworkAnnotation(client KubeClient, namespace string, pod *v1.Pod, networkstatus string) (*v1.Pod, error) {
//if pod annotations is empty, make sure it allocatable
if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
pod.Annotations["k8s.v1.cni.cncf.io/networks-status"] = networkstatus
pod = pod.DeepCopy()
var err error
if resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err != nil {
// Re-get the pod unless it's the first attempt to update
pod, err = client.GetPod(pod.Namespace, pod.Name)
if err != nil {
return err
}
}
pod, err = client.UpdatePodStatus(pod)
return err
}); resultErr != nil {
return nil, fmt.Errorf("status update failed for pod %s/%s: %v", pod.Namespace, pod.Name, resultErr)
}
return pod, nil
}
func getPodNetworkAnnotation(client KubeClient, k8sArgs *types.K8sArgs) (string, string, error) {
var err error
@@ -271,6 +343,7 @@ func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement
type KubeClient interface {
GetRawWithPath(path string) ([]byte, error)
GetPod(namespace, name string) (*v1.Pod, error)
UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error)
}
func GetK8sArgs(args *skel.CmdArgs) (*types.K8sArgs, error) {
@@ -284,6 +357,41 @@ func GetK8sArgs(args *skel.CmdArgs) (*types.K8sArgs, error) {
return k8sArgs, nil
}
// Attempts to load Kubernetes-defined delegates and add them to the Multus config.
// Returns the number of Kubernetes-defined delegates added or an error.
func TryLoadK8sDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient KubeClient) (int, *clientInfo, error) {
var err error
clientInfo := &clientInfo{}
kubeClient, err = GetK8sClient(conf.Kubeconfig, kubeClient)
if err != nil {
return 0, nil, err
}
if kubeClient == nil {
if len(conf.Delegates) == 0 {
// No available kube client and no delegates, we can't do anything
return 0, nil, fmt.Errorf("must have either Kubernetes config or delegates, refer Multus README.md for the usage guide")
}
return 0, nil, nil
}
setKubeClientInfo(clientInfo, kubeClient, k8sArgs)
delegates, err := GetK8sNetwork(kubeClient, k8sArgs, conf.ConfDir)
if err != nil {
if _, ok := err.(*NoK8sNetworkError); ok {
return 0, nil, nil
}
return 0, nil, fmt.Errorf("Multus: Err in getting k8s network from pod: %v", err)
}
if err = conf.AddDelegates(delegates); err != nil {
return 0, nil, err
}
return len(delegates), clientInfo, nil
}
func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error) {
// If we get a valid kubeClient (eg from testcases) just return that
// one.

View File

@@ -204,39 +204,6 @@ func delPlugins(exec invoke.Exec, argIfname string, delegates []*types.DelegateN
return nil
}
// Attempts to load Kubernetes-defined delegates and add them to the Multus config.
// Returns the number of Kubernetes-defined delegates added or an error.
func tryLoadK8sDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient k8s.KubeClient) (int, error) {
var err error
kubeClient, err = k8s.GetK8sClient(conf.Kubeconfig, kubeClient)
if err != nil {
return 0, err
}
if kubeClient == nil {
if len(conf.Delegates) == 0 {
// No available kube client and no delegates, we can't do anything
return 0, fmt.Errorf("must have either Kubernetes config or delegates, refer Multus README.md for the usage guide")
}
return 0, nil
}
delegates, err := k8s.GetK8sNetwork(kubeClient, k8sArgs, conf.ConfDir)
if err != nil {
if _, ok := err.(*k8s.NoK8sNetworkError); ok {
return 0, nil
}
return 0, fmt.Errorf("Multus: Err in getting k8s network from pod: %v", err)
}
if err = conf.AddDelegates(delegates); err != nil {
return 0, err
}
return len(delegates), nil
}
func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cnitypes.Result, error) {
n, err := types.LoadNetConf(args.StdinData)
if err != nil {
@@ -248,9 +215,9 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn
return nil, fmt.Errorf("Multus: Err in getting k8s args: %v", err)
}
numK8sDelegates, err := tryLoadK8sDelegates(k8sArgs, n, kubeClient)
numK8sDelegates, kc, err := k8s.TryLoadK8sDelegates(k8sArgs, n, kubeClient)
if err != nil {
return nil, err
return nil, fmt.Errorf("Multus: Err in loading K8s Delegates k8s args: %v", err)
}
if numK8sDelegates == 0 {
@@ -261,6 +228,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn
}
var result, tmpResult cnitypes.Result
var netStatus []*types.NetworkStatus
var rt *libcni.RuntimeConf
lastIdx := 0
for idx, delegate := range n.Delegates {
@@ -276,12 +244,30 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn
if delegate.MasterPlugin || result == nil {
result = tmpResult
}
//create the network status, only in case Multus as kubeconfig
if n.Kubeconfig != "" && kc.Podnamespace != "kube-system" {
delegateNetStatus, err := types.LoadNetworkStatus(tmpResult, delegate.Conf.Name, delegate.MasterPlugin)
if err != nil {
return nil, fmt.Errorf("Multus: Err in setting networks status: %v", err)
}
netStatus = append(netStatus, delegateNetStatus)
}
}
if err != nil {
// Ignore errors; DEL must be idempotent anyway
_ = delPlugins(exec, args.IfName, n.Delegates, lastIdx, rt, n.BinDir)
return nil, err
return nil, fmt.Errorf("Multus: Err in tearing down failed plugins: %v", err)
}
//set the network status annotation in apiserver, only in case Multus as kubeconfig
if n.Kubeconfig != "" && kc.Podnamespace != "kube-system" {
err = k8s.SetNetworkStatus(kc, netStatus)
if err != nil {
return nil, fmt.Errorf("Multus: Err set the networks status: %v", err)
}
}
return result, nil
@@ -309,7 +295,7 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) err
return fmt.Errorf("Multus: Err in getting k8s args: %v", err)
}
numK8sDelegates, err := tryLoadK8sDelegates(k8sArgs, in, kubeClient)
numK8sDelegates, kc, err := k8s.TryLoadK8sDelegates(k8sArgs, in, kubeClient)
if err != nil {
return err
}
@@ -330,6 +316,14 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) err
}
}
//unset the network status annotation in apiserver, only in case Multus as kubeconfig
if in.Kubeconfig != "" && kc.Podnamespace != "kube-system" {
err := k8s.SetNetworkStatus(kc, nil)
if err != nil {
return fmt.Errorf("Multus: Err unset the networks status: %v", err)
}
}
rt, _ := types.LoadCNIRuntimeConf(args, k8sArgs, "")
return delPlugins(exec, args.IfName, in.Delegates, len(in.Delegates)-1, rt, in.BinDir)
}

View File

@@ -300,7 +300,7 @@ var _ = Describe("multus operations", func() {
result, err := cmdAdd(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.PodCount).To(Equal(2))
Expect(fKubeClient.NetCount).To(Equal(2))
r := result.(*types020.Result)
// plugin 1 is the masterplugin

View File

@@ -21,8 +21,8 @@ import (
"net"
"strings"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/pkg/api/v1"
. "github.com/onsi/gomega"
)
@@ -92,6 +92,12 @@ func (f *FakeKubeClient) GetPod(namespace, name string) (*v1.Pod, error) {
return pod, nil
}
func (f *FakeKubeClient) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
key := fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
f.pods[key] = pod
return f.pods[key], nil
}
func (f *FakeKubeClient) AddPod(pod *v1.Pod) {
key := fmt.Sprintf("%s/%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
f.pods[key] = pod

View File

@@ -21,6 +21,7 @@ import (
"github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/skel"
"github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"github.com/containernetworking/cni/pkg/version"
"github.com/intel/multus-cni/logging"
@@ -48,25 +49,6 @@ func LoadDelegateNetConfList(bytes []byte, delegateConf *DelegateNetConf) error
return nil
}
func LoadCNIRuntimeConf(args *skel.CmdArgs, k8sArgs *K8sArgs, ifName string) (*libcni.RuntimeConf, error) {
// In part, adapted from K8s pkg/kubelet/dockershim/network/cni/cni.go#buildCNIRuntimeConf
// Todo
// ingress, egress and bandwidth capability features as same as kubelet.
rt := &libcni.RuntimeConf{
ContainerID: args.ContainerID,
NetNS: args.Netns,
IfName: ifName,
Args: [][2]string{
{"IgnoreUnknown", "1"},
{"K8S_POD_NAMESPACE", string(k8sArgs.K8S_POD_NAMESPACE)},
{"K8S_POD_NAME", string(k8sArgs.K8S_POD_NAME)},
{"K8S_POD_INFRA_CONTAINER_ID", string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID)},
},
}
return rt, nil
}
// Convert raw CNI JSON into a DelegateNetConf structure
func LoadDelegateNetConf(bytes []byte, ifnameRequest string) (*DelegateNetConf, error) {
delegateConf := &DelegateNetConf{}
@@ -90,6 +72,60 @@ func LoadDelegateNetConf(bytes []byte, ifnameRequest string) (*DelegateNetConf,
return delegateConf, nil
}
func LoadCNIRuntimeConf(args *skel.CmdArgs, k8sArgs *K8sArgs, ifName string) (*libcni.RuntimeConf, error) {
// In part, adapted from K8s pkg/kubelet/dockershim/network/cni/cni.go#buildCNIRuntimeConf
// Todo
// ingress, egress and bandwidth capability features as same as kubelet.
rt := &libcni.RuntimeConf{
ContainerID: args.ContainerID,
NetNS: args.Netns,
IfName: ifName,
Args: [][2]string{
{"IgnoreUnknown", "1"},
{"K8S_POD_NAMESPACE", string(k8sArgs.K8S_POD_NAMESPACE)},
{"K8S_POD_NAME", string(k8sArgs.K8S_POD_NAME)},
{"K8S_POD_INFRA_CONTAINER_ID", string(k8sArgs.K8S_POD_INFRA_CONTAINER_ID)},
},
}
return rt, nil
}
func LoadNetworkStatus(r types.Result, netName string, defaultNet bool) (*NetworkStatus, error) {
// Convert whatever the IPAM result was into the current Result type
result, err := current.NewResultFromResult(r)
if err != nil {
return nil, fmt.Errorf("error convert the type.Result to current.Result: %v", err)
}
netstatus := &NetworkStatus{}
netstatus.Name = netName
netstatus.Default = defaultNet
for _, ifs := range result.Interfaces {
//Only pod interfaces can have sandbox information
if ifs.Sandbox != "" {
netstatus.Interface = ifs.Name
netstatus.Mac = ifs.Mac
}
}
for _, ipconfig := range result.IPs {
if ipconfig.Version == "4" && ipconfig.Address.IP.To4() != nil {
netstatus.IPs = append(netstatus.IPs, ipconfig.Address.IP.String())
}
if ipconfig.Version == "6" && ipconfig.Address.IP.To16() != nil {
netstatus.IPs = append(netstatus.IPs, ipconfig.Address.IP.String())
}
}
netstatus.DNS = result.DNS
return netstatus, nil
}
func LoadNetConf(bytes []byte) (*NetConf, error) {
netconf := &NetConf{}
if err := json.Unmarshal(bytes, netconf); err != nil {

View File

@@ -38,11 +38,21 @@ type NetConf struct {
// RawDelegates is private to the NetConf class; use Delegates instead
RawDelegates []map[string]interface{} `json:"delegates"`
Delegates []*DelegateNetConf `json:"-"`
NetStatus []*NetworkStatus `json:"-"`
Kubeconfig string `json:"kubeconfig"`
LogFile string `json:"logFile"`
LogLevel string `json:"logLevel"`
}
type NetworkStatus struct {
Name string `json:"name"`
Interface string `json:"interface,omitempty"`
IPs []string `json:"ips,omitempty"`
Mac string `json:"mac,omitempty"`
Default bool `json:"default,omitempty"`
DNS types.DNS `json:"dns,omitempty"`
}
type DelegateNetConf struct {
Conf types.NetConf
ConfList types.NetConfList