multus-cni/k8sclient/k8sclient.go

545 lines
18 KiB
Go
Raw Normal View History

// Copyright (c) 2017 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package k8sclient
import (
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
2018-07-30 10:59:13 +00:00
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
2018-07-30 10:59:13 +00:00
"k8s.io/client-go/util/retry"
"github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/skel"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/intel/multus-cni/logging"
"github.com/intel/multus-cni/types"
)
const (
resourceNameAnnot = "k8s.v1.cni.cncf.io/resourceName"
)
// NoK8sNetworkError indicates error, no network in kubernetes
type NoK8sNetworkError struct {
message string
}
type ResourceInfo struct {
Index int
deviceIDs []string
}
2018-07-30 10:59:13 +00:00
type clientInfo struct {
Client KubeClient
Podnamespace string
Podname string
}
func (e *NoK8sNetworkError) Error() string { return string(e.message) }
type defaultKubeClient struct {
client kubernetes.Interface
}
// defaultKubeClient implements KubeClient
var _ KubeClient = &defaultKubeClient{}
func (d *defaultKubeClient) GetRawWithPath(path string) ([]byte, error) {
return d.client.ExtensionsV1beta1().RESTClient().Get().AbsPath(path).DoRaw()
}
func (d *defaultKubeClient) GetPod(namespace, name string) (*v1.Pod, error) {
return d.client.Core().Pods(namespace).Get(name, metav1.GetOptions{})
}
2018-07-30 10:59:13 +00:00
func (d *defaultKubeClient) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
return d.client.Core().Pods(pod.Namespace).UpdateStatus(pod)
}
// GetComputeDeviceMap returns a map of resourceName to list of device IDs
func GetComputeDeviceMap(pod *v1.Pod) map[string]*ResourceInfo {
resourceMap := make(map[string]*ResourceInfo)
for _, cntr := range pod.Spec.Containers {
for _, cd := range cntr.ComputeDevices {
entry, ok := resourceMap[cd.ResourceName]
if ok {
// already exists; append to it
entry.deviceIDs = append(entry.deviceIDs, cd.DeviceIDs...)
} else {
// new entry
resourceMap[cd.ResourceName] = &ResourceInfo{deviceIDs: cd.DeviceIDs}
}
}
}
return resourceMap
}
2018-07-30 10:59:13 +00:00
func setKubeClientInfo(c *clientInfo, client KubeClient, k8sArgs *types.K8sArgs) {
logging.Debugf("setKubeClientInfo: %v, %v, %v", c, client, k8sArgs)
2018-07-30 10:59:13 +00:00
c.Client = client
c.Podnamespace = string(k8sArgs.K8S_POD_NAMESPACE)
c.Podname = string(k8sArgs.K8S_POD_NAME)
}
func SetNetworkStatus(k *clientInfo, netStatus []*types.NetworkStatus) error {
logging.Debugf("SetNetworkStatus: %v, %v", k, netStatus)
2018-07-30 10:59:13 +00:00
pod, err := k.Client.GetPod(k.Podnamespace, k.Podname)
if err != nil {
return logging.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", k.Podname, err)
2018-07-30 10:59:13 +00:00
}
var ns string
if netStatus != nil {
var networkStatus []string
for _, nets := range netStatus {
data, err := json.MarshalIndent(nets, "", " ")
if err != nil {
return logging.Errorf("SetNetworkStatus: error with Marshal Indent: %v", err)
2018-07-30 10:59:13 +00:00
}
networkStatus = append(networkStatus, string(data))
}
ns = fmt.Sprintf("[%s]", strings.Join(networkStatus, ","))
}
_, err = setPodNetworkAnnotation(k.Client, k.Podnamespace, pod, ns)
if err != nil {
return logging.Errorf("SetNetworkStatus: failed to update the pod %v in out of cluster comm: %v", k.Podname, err)
2018-07-30 10:59:13 +00:00
}
return nil
}
func setPodNetworkAnnotation(client KubeClient, namespace string, pod *v1.Pod, networkstatus string) (*v1.Pod, error) {
logging.Debugf("setPodNetworkAnnotation: %v, %s, %v, %s", client, namespace, pod, networkstatus)
2018-07-30 10:59:13 +00:00
//if pod annotations is empty, make sure it allocatable
if len(pod.Annotations) == 0 {
pod.Annotations = make(map[string]string)
}
pod.Annotations["k8s.v1.cni.cncf.io/networks-status"] = networkstatus
pod = pod.DeepCopy()
var err error
if resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err != nil {
// Re-get the pod unless it's the first attempt to update
pod, err = client.GetPod(pod.Namespace, pod.Name)
if err != nil {
return err
}
}
pod, err = client.UpdatePodStatus(pod)
return err
}); resultErr != nil {
return nil, logging.Errorf("status update failed for pod %s/%s: %v", pod.Namespace, pod.Name, resultErr)
2018-07-30 10:59:13 +00:00
}
return pod, nil
}
func getPodNetworkAnnotation(pod *v1.Pod) (string, string, error) {
logging.Debugf("getPodNetworkAnnotation: %v", pod)
return pod.Annotations["k8s.v1.cni.cncf.io/networks"], pod.ObjectMeta.Namespace, nil
}
func parsePodNetworkObjectName(podnetwork string) (string, string, string, error) {
var netNsName string
var netIfName string
var networkName string
logging.Debugf("parsePodNetworkObjectName: %s", podnetwork)
slashItems := strings.Split(podnetwork, "/")
if len(slashItems) == 2 {
netNsName = strings.TrimSpace(slashItems[0])
networkName = slashItems[1]
} else if len(slashItems) == 1 {
networkName = slashItems[0]
} else {
return "", "", "", logging.Errorf("Invalid network object (failed at '/')")
}
atItems := strings.Split(networkName, "@")
networkName = strings.TrimSpace(atItems[0])
if len(atItems) == 2 {
netIfName = strings.TrimSpace(atItems[1])
} else if len(atItems) != 1 {
return "", "", "", logging.Errorf("Invalid network object (failed at '@')")
}
// Check and see if each item matches the specification for valid attachment name.
// "Valid attachment names must be comprised of units of the DNS-1123 label format"
// [a-z0-9]([-a-z0-9]*[a-z0-9])?
// And we allow at (@), and forward slash (/) (units separated by commas)
// It must start and end alphanumerically.
allItems := []string{netNsName, networkName, netIfName}
for i := range allItems {
matched, _ := regexp.MatchString("^[a-z0-9]([-a-z0-9]*[a-z0-9])?$", allItems[i])
if !matched && len([]rune(allItems[i])) > 0 {
return "", "", "", logging.Errorf(fmt.Sprintf("Failed to parse: one or more items did not match comma-delimited format (must consist of lower case alphanumeric characters). Must start and end with an alphanumeric character), mismatch @ '%v'", allItems[i]))
}
}
logging.Debugf("parsePodNetworkObjectName: parsed: %s, %s, %s", netNsName, networkName, netIfName)
return netNsName, networkName, netIfName, nil
}
func parsePodNetworkAnnotation(podNetworks, defaultNamespace string) ([]*types.NetworkSelectionElement, error) {
var networks []*types.NetworkSelectionElement
logging.Debugf("parsePodNetworkAnnotation: %s, %s", podNetworks, defaultNamespace)
if podNetworks == "" {
return nil, logging.Errorf("parsePodNetworkAnnotation: pod annotation not having \"network\" as key, refer Multus README.md for the usage guide")
}
if strings.IndexAny(podNetworks, "[{\"") >= 0 {
if err := json.Unmarshal([]byte(podNetworks), &networks); err != nil {
return nil, logging.Errorf("parsePodNetworkAnnotation: failed to parse pod Network Attachment Selection Annotation JSON format: %v", err)
}
} else {
// Comma-delimited list of network attachment object names
for _, item := range strings.Split(podNetworks, ",") {
// Remove leading and trailing whitespace.
item = strings.TrimSpace(item)
// Parse network name (i.e. <namespace>/<network name>@<ifname>)
netNsName, networkName, netIfName, err := parsePodNetworkObjectName(item)
if err != nil {
return nil, logging.Errorf("parsePodNetworkAnnotation: %v", err)
}
networks = append(networks, &types.NetworkSelectionElement{
Name: networkName,
Namespace: netNsName,
InterfaceRequest: netIfName,
})
}
}
for _, net := range networks {
if net.Namespace == "" {
net.Namespace = defaultNamespace
}
}
return networks, nil
}
func getCNIConfigFromFile(name string, confdir string) ([]byte, error) {
logging.Debugf("getCNIConfigFromFile: %s, %s", name, confdir)
// In the absence of valid keys in a Spec, the runtime (or
// meta-plugin) should load and execute a CNI .configlist
// or .config (in that order) file on-disk whose JSON
// “name” key matches this Network objects name.
// In part, adapted from K8s pkg/kubelet/dockershim/network/cni/cni.go#getDefaultCNINetwork
2018-07-26 23:04:52 +00:00
files, err := libcni.ConfFiles(confdir, []string{".conf", ".json", ".conflist"})
switch {
case err != nil:
return nil, logging.Errorf("No networks found in %s", confdir)
case len(files) == 0:
return nil, logging.Errorf("No networks found in %s", confdir)
}
for _, confFile := range files {
2018-07-26 23:04:52 +00:00
var confList *libcni.NetworkConfigList
if strings.HasSuffix(confFile, ".conflist") {
confList, err = libcni.ConfListFromFile(confFile)
if err != nil {
return nil, logging.Errorf("Error loading CNI conflist file %s: %v", confFile, err)
2018-07-26 23:04:52 +00:00
}
if confList.Name == name {
return confList.Bytes, nil
}
2018-07-26 23:04:52 +00:00
} else {
conf, err := libcni.ConfFromFile(confFile)
if err != nil {
return nil, logging.Errorf("Error loading CNI config file %s: %v", confFile, err)
2018-07-26 23:04:52 +00:00
}
if conf.Network.Name == name {
// Ensure the config has a "type" so we know what plugin to run.
// Also catches the case where somebody put a conflist into a conf file.
if conf.Network.Type == "" {
return nil, logging.Errorf("Error loading CNI config file %s: no 'type'; perhaps this is a .conflist?", confFile)
2018-07-26 23:04:52 +00:00
}
return conf.Bytes, nil
}
}
}
return nil, logging.Errorf("no network available in the name %s in cni dir %s", name, confdir)
}
// getCNIConfigFromSpec reads a CNI JSON configuration from the NetworkAttachmentDefinition
// object's Spec.Config field and fills in any missing details like the network name
func getCNIConfigFromSpec(configData, netName string) ([]byte, error) {
var rawConfig map[string]interface{}
var err error
logging.Debugf("getCNIConfigFromSpec: %s, %s", configData, netName)
configBytes := []byte(configData)
err = json.Unmarshal(configBytes, &rawConfig)
if err != nil {
return nil, logging.Errorf("getCNIConfigFromSpec: failed to unmarshal Spec.Config: %v", err)
}
// Inject network name if missing from Config for the thick plugin case
if n, ok := rawConfig["name"]; !ok || n == "" {
rawConfig["name"] = netName
configBytes, err = json.Marshal(rawConfig)
if err != nil {
return nil, logging.Errorf("getCNIConfigFromSpec: failed to re-marshal Spec.Config: %v", err)
}
}
return configBytes, nil
}
func cniConfigFromNetworkResource(customResource *types.NetworkAttachmentDefinition, confdir string) ([]byte, error) {
var config []byte
var err error
logging.Debugf("cniConfigFromNetworkResource: %v, %s", customResource, confdir)
emptySpec := types.NetworkAttachmentDefinitionSpec{}
2018-07-26 23:04:52 +00:00
if customResource.Spec == emptySpec {
// Network Spec empty; generate delegate from CNI JSON config
// from the configuration directory that has the same network
// name as the custom resource
config, err = getCNIConfigFromFile(customResource.Metadata.Name, confdir)
if err != nil {
return nil, logging.Errorf("cniConfigFromNetworkResource: err in getCNIConfigFromFile: %v", err)
}
} else {
// Config contains a standard JSON-encoded CNI configuration
// or configuration list which defines the plugin chain to
// execute.
config, err = getCNIConfigFromSpec(customResource.Spec.Config, customResource.Metadata.Name)
if err != nil {
return nil, logging.Errorf("cniConfigFromNetworkResource: err in getCNIConfigFromSpec: %v", err)
}
}
return config, nil
}
func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement, confdir string, resourceMap map[string]*ResourceInfo) (*types.DelegateNetConf, error) {
logging.Debugf("getKubernetesDelegate: %v, %v, %s", client, net, confdir)
rawPath := fmt.Sprintf("/apis/k8s.cni.cncf.io/v1/namespaces/%s/network-attachment-definitions/%s", net.Namespace, net.Name)
netData, err := client.GetRawWithPath(rawPath)
if err != nil {
return nil, logging.Errorf("getKubernetesDelegate: failed to get network resource, refer Multus README.md for the usage guide: %v", err)
}
customResource := &types.NetworkAttachmentDefinition{}
if err := json.Unmarshal(netData, customResource); err != nil {
return nil, logging.Errorf("getKubernetesDelegate: failed to get the netplugin data: %v", err)
}
// DEVICE_ID
// Get resourceName annotation from NetDefinition
deviceID := ""
resourceName, ok := customResource.Metadata.Annotations[resourceNameAnnot]
if ok {
// ResourceName annotation is found; try to get device info from resourceMap
entry, ok := resourceMap[resourceName]
if ok {
if idCount := len(entry.deviceIDs); idCount > 0 && idCount > entry.Index {
deviceID = entry.deviceIDs[entry.Index]
entry.Index++ // increment Index for next delegate
}
}
}
configBytes, err := cniConfigFromNetworkResource(customResource, confdir)
if err != nil {
return nil, err
}
if deviceID != "" {
if configBytes, err = delegateAddDeviceID(configBytes, deviceID); err != nil {
return nil, err
}
}
delegate, err := types.LoadDelegateNetConf(configBytes, net.InterfaceRequest)
if err != nil {
return nil, err
}
return delegate, nil
}
func delegateAddDeviceID(inBytes []byte, deviceID string) ([]byte, error) {
var rawConfig map[string]interface{}
var err error
err = json.Unmarshal(inBytes, &rawConfig)
if err != nil {
return nil, logging.Errorf("delegateAddDeviceID: failed to unmarshal inBytes: %v", err)
}
// Inject deviceID
rawConfig["deviceID"] = deviceID
configBytes, err := json.Marshal(rawConfig)
if err != nil {
return nil, logging.Errorf("delegateAddDeviceID: failed to re-marshal Spec.Config: %v", err)
}
return configBytes, nil
}
type KubeClient interface {
GetRawWithPath(path string) ([]byte, error)
GetPod(namespace, name string) (*v1.Pod, error)
2018-07-30 10:59:13 +00:00
UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error)
}
2018-07-26 23:04:52 +00:00
func GetK8sArgs(args *skel.CmdArgs) (*types.K8sArgs, error) {
k8sArgs := &types.K8sArgs{}
logging.Debugf("GetK8sNetwork: %v", args)
2018-07-26 23:04:52 +00:00
err := cnitypes.LoadArgs(args.Args, k8sArgs)
if err != nil {
return nil, err
}
return k8sArgs, nil
}
2018-07-30 10:59:13 +00:00
// Attempts to load Kubernetes-defined delegates and add them to the Multus config.
// Returns the number of Kubernetes-defined delegates added or an error.
func TryLoadK8sDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient KubeClient) (int, *clientInfo, error) {
var err error
clientInfo := &clientInfo{}
logging.Debugf("TryLoadK8sDelegates: %v, %v, %v", k8sArgs, conf, kubeClient)
2018-07-30 10:59:13 +00:00
kubeClient, err = GetK8sClient(conf.Kubeconfig, kubeClient)
if err != nil {
return 0, nil, err
}
if kubeClient == nil {
if len(conf.Delegates) == 0 {
// No available kube client and no delegates, we can't do anything
return 0, nil, logging.Errorf("must have either Kubernetes config or delegates, refer Multus README.md for the usage guide")
2018-07-30 10:59:13 +00:00
}
return 0, nil, nil
}
setKubeClientInfo(clientInfo, kubeClient, k8sArgs)
delegates, err := GetK8sNetwork(kubeClient, k8sArgs, conf.ConfDir)
if err != nil {
if _, ok := err.(*NoK8sNetworkError); ok {
return 0, clientInfo, nil
2018-07-30 10:59:13 +00:00
}
return 0, nil, logging.Errorf("Multus: Err in getting k8s network from pod: %v", err)
2018-07-30 10:59:13 +00:00
}
if err = conf.AddDelegates(delegates); err != nil {
return 0, nil, err
}
return len(delegates), clientInfo, nil
}
func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error) {
logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient)
// If we get a valid kubeClient (eg from testcases) just return that
// one.
if kubeClient != nil {
return kubeClient, nil
}
var err error
var config *rest.Config
// Otherwise try to create a kubeClient from a given kubeConfig
if kubeconfig != "" {
// uses the current context in kubeconfig
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, logging.Errorf("GetK8sClient: failed to get context for the kubeconfig %v, refer Multus README.md for the usage guide: %v", kubeconfig, err)
}
} else if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
// Try in-cluster config where multus might be running in a kubernetes pod
config, err = rest.InClusterConfig()
if err != nil {
return nil, logging.Errorf("createK8sClient: failed to get context for in-cluster kube config, refer Multus README.md for the usage guide: %v", err)
}
} else {
// No kubernetes config; assume we shouldn't talk to Kube at all
return nil, nil
}
// creates the clientset
client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &defaultKubeClient{client: client}, nil
}
2018-07-26 23:04:52 +00:00
func GetK8sNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string) ([]*types.DelegateNetConf, error) {
logging.Debugf("GetK8sNetwork: %v, %v, %v", k8sclient, k8sArgs, confdir)
pod, err := k8sclient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
return nil, logging.Errorf("GetK8sNetwork: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err)
}
netAnnot, defaultNamespace, err := getPodNetworkAnnotation(pod)
if err != nil {
return nil, err
}
// Get Pod ComputeDevices info
resourceMap := GetComputeDeviceMap(pod)
if len(netAnnot) == 0 {
return nil, &NoK8sNetworkError{"no kubernetes network found"}
}
networks, err := parsePodNetworkAnnotation(netAnnot, defaultNamespace)
if err != nil {
return nil, err
}
// Read all network objects referenced by 'networks'
var delegates []*types.DelegateNetConf
for _, net := range networks {
delegate, err := getKubernetesDelegate(k8sclient, net, confdir, resourceMap)
if err != nil {
return nil, logging.Errorf("GetK8sNetwork: failed getting the delegate: %v", err)
}
delegates = append(delegates, delegate)
}
return delegates, nil
}