Update libcni cache when default-route net selection is used

To keep consistency between actual network and CNI result in cache,
update libcni cache when multus add/del default routes by
`default-route` network selection.
This commit is contained in:
Tomofumi Hayashi 2021-12-04 00:48:50 +09:00
parent b9d0d93d6e
commit d52f2b6a45
6 changed files with 1439 additions and 64 deletions

View File

@ -366,10 +366,13 @@ func TryLoadPodDelegates(pod *v1.Pod, conf *types.NetConf, clientInfo *ClientInf
} }
if isGatewayConfigured == true { if isGatewayConfigured == true {
types.CheckGatewayConfig(conf.Delegates) err = types.CheckGatewayConfig(conf.Delegates)
if err != nil {
return 0, nil, err
}
} }
return len(delegates), clientInfo, nil return len(delegates), clientInfo, err
} }
if _, ok := err.(*NoK8sNetworkError); ok { if _, ok := err.(*NoK8sNetworkError); ok {

View File

@ -636,10 +636,11 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
} }
} }
netName := ""
tmpResult, err = delegateAdd(exec, kubeClient, pod, ifName, delegate, rt, n, cniArgs) tmpResult, err = delegateAdd(exec, kubeClient, pod, ifName, delegate, rt, n, cniArgs)
if err != nil { if err != nil {
// If the add failed, tear down all networks we already added // If the add failed, tear down all networks we already added
netName := delegate.Conf.Name netName = delegate.Conf.Name
if netName == "" { if netName == "" {
netName = delegate.ConfList.Name netName = delegate.ConfList.Name
} }
@ -649,11 +650,12 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
} }
// Remove gateway from routing table if the gateway is not used // Remove gateway from routing table if the gateway is not used
deletegateway := false deleteV4gateway := false
deleteV6gateway := false
adddefaultgateway := false adddefaultgateway := false
if delegate.IsFilterGateway { if delegate.IsFilterV4Gateway {
deletegateway = true deleteV4gateway = true
logging.Debugf("Marked interface %v for gateway deletion", ifName) logging.Debugf("Marked interface %v for v4 gateway deletion", ifName)
} else { } else {
// Otherwise, determine if this interface now gets our default route. // Otherwise, determine if this interface now gets our default route.
// According to // According to
@ -661,25 +663,54 @@ func CmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient *k8s.ClientInfo) (c
// the list can be empty; if it is, we'll assume the CNI's config for the default gateway holds, // the list can be empty; if it is, we'll assume the CNI's config for the default gateway holds,
// else we'll update the defaultgateway to the one specified. // else we'll update the defaultgateway to the one specified.
if delegate.GatewayRequest != nil && delegate.GatewayRequest[0] != nil { if delegate.GatewayRequest != nil && delegate.GatewayRequest[0] != nil {
deletegateway = true deleteV4gateway = true
adddefaultgateway = true adddefaultgateway = true
logging.Debugf("Detected gateway override on interface %v to %v", ifName, delegate.GatewayRequest) logging.Debugf("Detected gateway override on interface %v to %v", ifName, delegate.GatewayRequest)
} }
} }
if deletegateway { if delegate.IsFilterV6Gateway {
tmpResult, err = netutils.DeleteDefaultGW(args, ifName, &tmpResult) deleteV6gateway = true
if err != nil { logging.Debugf("Marked interface %v for v6 gateway deletion", ifName)
return nil, cmdErr(k8sArgs, "error deleting default gateway: %v", err) } else {
// Otherwise, determine if this interface now gets our default route.
// According to
// https://docs.google.com/document/d/1Ny03h6IDVy_e_vmElOqR7UdTPAG_RNydhVE1Kx54kFQ (4.1.2.1.9)
// the list can be empty; if it is, we'll assume the CNI's config for the default gateway holds,
// else we'll update the defaultgateway to the one specified.
if delegate.GatewayRequest != nil && delegate.GatewayRequest[0] != nil {
deleteV6gateway = true
adddefaultgateway = true
logging.Debugf("Detected gateway override on interface %v to %v", ifName, delegate.GatewayRequest)
} }
} }
// Here we'll set the default gateway // Remove namespace from delegate.Name for Add/Del CNI cache
nameSlice := strings.Split(delegate.Name, "/")
netName = nameSlice[len(nameSlice) - 1]
// Remove gateway if `default-route` network selection is specified
if deleteV4gateway || deleteV6gateway {
err = netutils.DeleteDefaultGW(args, ifName)
if err != nil {
return nil, cmdErr(k8sArgs, "error deleting default gateway: %v", err)
}
err = netutils.DeleteDefaultGWCache(n.CNIDir, rt, netName, ifName, deleteV4gateway, deleteV6gateway)
if err != nil {
return nil, cmdErr(k8sArgs, "error deleting default gateway in cache: %v", err)
}
}
// Here we'll set the default gateway which specified in `default-route` network selection
if adddefaultgateway { if adddefaultgateway {
tmpResult, err = netutils.SetDefaultGW(args, ifName, delegate.GatewayRequest, &tmpResult) err = netutils.SetDefaultGW(args, ifName, delegate.GatewayRequest)
if err != nil { if err != nil {
return nil, cmdErr(k8sArgs, "error setting default gateway: %v", err) return nil, cmdErr(k8sArgs, "error setting default gateway: %v", err)
} }
err = netutils.AddDefaultGWCache(n.CNIDir, rt, netName, ifName, delegate.GatewayRequest)
if err != nil {
return nil, cmdErr(k8sArgs, "error setting default gateway in cache: %v", err)
}
} }
// Master plugin result is always used if present // Master plugin result is always used if present

View File

@ -16,26 +16,24 @@
package netutils package netutils
import ( import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"path/filepath"
"github.com/containernetworking/cni/libcni"
"github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/skel"
cnitypes "github.com/containernetworking/cni/pkg/types"
"github.com/containernetworking/cni/pkg/types/current"
"github.com/containernetworking/plugins/pkg/ns" "github.com/containernetworking/plugins/pkg/ns"
"github.com/vishvananda/netlink" "github.com/vishvananda/netlink"
"gopkg.in/k8snetworkplumbingwg/multus-cni.v3/pkg/logging" "gopkg.in/k8snetworkplumbingwg/multus-cni.v3/pkg/logging"
"net"
"strings"
) )
// DeleteDefaultGW removes the default gateway from marked interfaces. // DeleteDefaultGW removes the default gateway from marked interfaces.
func DeleteDefaultGW(args *skel.CmdArgs, ifName string, res *cnitypes.Result) (*current.Result, error) { func DeleteDefaultGW(args *skel.CmdArgs, ifName string) error {
result, err := current.NewResultFromResult(*res)
if err != nil {
return nil, logging.Errorf("DeleteDefaultGW: Error creating new from current CNI result: %v", err)
}
netns, err := ns.GetNS(args.Netns) netns, err := ns.GetNS(args.Netns)
if err != nil { if err != nil {
return nil, logging.Errorf("DeleteDefaultGW: Error getting namespace %v", err) return logging.Errorf("DeleteDefaultGW: Error getting namespace %v", err)
} }
defer netns.Close() defer netns.Close()
@ -50,40 +48,27 @@ func DeleteDefaultGW(args *skel.CmdArgs, ifName string, res *cnitypes.Result) (*
} }
return err return err
}) })
var newRoutes []*cnitypes.Route return err
for _, route := range result.Routes {
if mask, _ := route.Dst.Mask.Size(); mask != 0 {
newRoutes = append(newRoutes, route)
}
}
result.Routes = newRoutes
return result, err
} }
// SetDefaultGW adds a default gateway on a specific interface // SetDefaultGW adds a default gateway on a specific interface
func SetDefaultGW(args *skel.CmdArgs, ifName string, gateways []net.IP, res *cnitypes.Result) (*current.Result, error) { func SetDefaultGW(args *skel.CmdArgs, ifName string, gateways []net.IP) error {
// Use the current CNI result...
result, err := current.NewResultFromResult(*res)
if err != nil {
return nil, logging.Errorf("SetDefaultGW: Error creating new CNI result from current: %v", err)
}
// This ensures we're acting within the net namespace for the pod. // This ensures we're acting within the net namespace for the pod.
netns, err := ns.GetNS(args.Netns) netns, err := ns.GetNS(args.Netns)
if err != nil { if err != nil {
return nil, logging.Errorf("SetDefaultGW: Error getting namespace %v", err) return logging.Errorf("SetDefaultGW: Error getting namespace %v", err)
} }
defer netns.Close() defer netns.Close()
var newResultDefaultRoutes []*cnitypes.Route
// Do this within the net namespace. // Do this within the net namespace.
err = netns.Do(func(_ ns.NetNS) error { err = netns.Do(func(_ ns.NetNS) error {
var err error var err error
// Pick up the link info as we need the index. // Pick up the link info as we need the index.
link, _ := netlink.LinkByName(ifName) link, err := netlink.LinkByName(ifName)
if err != nil {
return logging.Errorf("SetDefaultGW: Error getting link %v", err)
}
// Cycle through all the desired gateways. // Cycle through all the desired gateways.
for _, gw := range gateways { for _, gw := range gateways {
@ -95,15 +80,6 @@ func SetDefaultGW(args *skel.CmdArgs, ifName string, gateways []net.IP, res *cni
Gw: gw, Gw: gw,
} }
// Build a new element for the results route
// Set a correct CIDR depending on IP type
_, dstipnet, _ := net.ParseCIDR("::0/0")
if strings.Count(gw.String(), ":") < 2 {
_, dstipnet, _ = net.ParseCIDR("0.0.0.0/0")
}
newResultDefaultRoutes = append(newResultDefaultRoutes, &cnitypes.Route{Dst: *dstipnet, GW: gw})
// Perform the creation of the default route.... // Perform the creation of the default route....
err = netlink.RouteAdd(&newDefaultRoute) err = netlink.RouteAdd(&newDefaultRoute)
if err != nil { if err != nil {
@ -113,7 +89,317 @@ func SetDefaultGW(args *skel.CmdArgs, ifName string, gateways []net.IP, res *cni
return err return err
}) })
result.Routes = newResultDefaultRoutes return err
return result, err }
// DeleteDefaultGWCache updates libcni cache to remove default gateway routes in result
func DeleteDefaultGWCache(cacheDir string, rt *libcni.RuntimeConf, netName string, ifName string, ipv4, ipv6 bool) error {
cacheFile := filepath.Join(cacheDir, "results", fmt.Sprintf("%s-%s-%s", netName, rt.ContainerID, rt.IfName))
cache, err := ioutil.ReadFile(cacheFile)
if err != nil {
return err
}
logging.Debugf("DeleteDefaultGWCache: update cache to delete GW from: %s", string(cache))
newCache, err := deleteDefaultGWCacheBytes(cache, ipv4, ipv6)
if err != nil {
return err
}
logging.Debugf("DeleteDefaultGWCache: update cache to delete GW: %s", string(newCache))
return ioutil.WriteFile(cacheFile, newCache, 0600)
}
func deleteDefaultGWCacheBytes(cacheFile []byte, ipv4, ipv6 bool) ([]byte, error) {
var cachedInfo map[string]interface{}
if err := json.Unmarshal(cacheFile, &cachedInfo); err != nil {
return nil, err
}
// try to get result
_, ok := cachedInfo["result"]
if !ok {
return nil, fmt.Errorf("cannot get result from cache")
}
resultJSON, ok := cachedInfo["result"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("wrong result type: %v", cachedInfo["result"])
}
newResult, err := deleteDefaultGWResult(resultJSON, ipv4, ipv6)
if err != nil {
return nil, err
}
cachedInfo["result"] = newResult
newCache, err := json.Marshal(cachedInfo)
if err != nil {
return nil, fmt.Errorf("failed to encode json: %v", err)
}
return newCache, nil
}
func deleteDefaultGWResultRoutes(routes []interface{}, dstGW string) ([]interface{}, error) {
for i, r := range routes {
route, ok := r.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("wrong route format: %v", r)
}
_, ok = route["dst"]
if ok {
dst, ok := route["dst"].(string)
if !ok {
return nil, fmt.Errorf("wrong dst format: %v", route["dst"])
}
if dst == dstGW {
routes = append(routes[:i], routes[i+1:]...)
}
}
}
return routes, nil
}
func deleteDefaultGWResult(result map[string]interface{}, ipv4, ipv6 bool) (map[string]interface{}, error) {
// try to get cniVersion from result
_, ok := result["cniVersion"]
if !ok {
// fallback to processing result for old cni version(0.1.0/0.2.0)
return deleteDefaultGWResult020(result, ipv4, ipv6)
}
cniVersion, ok := result["cniVersion"].(string)
if !ok {
return nil, fmt.Errorf("wrong cniVersion format: %v", result["cniVersion"])
}
if cniVersion == "0.1.0" || cniVersion == "0.2.0" {
// fallback to processing result for old cni version(0.1.0/0.2.0)
return deleteDefaultGWResult020(result, ipv4, ipv6)
}
if cniVersion != "0.3.0" && cniVersion != "0.3.1" && cniVersion != "0.4.0" && cniVersion != "1.0.0" {
return nil, fmt.Errorf("not supported version: %s", cniVersion)
}
_, ok = result["routes"]
if !ok {
// No route in result, hence we do nothing
return result, nil
}
routes, ok := result["routes"].([]interface{})
if !ok {
return nil, fmt.Errorf("wrong routes format: %v", result["routes"])
}
var err error
// delete IPv4 default routes
if ipv4 {
routes, err = deleteDefaultGWResultRoutes(routes, "0.0.0.0/0")
if err != nil {
return nil, err
}
}
if ipv6 {
routes, err = deleteDefaultGWResultRoutes(routes, "::0/0")
if err != nil {
return nil, err
}
}
result["routes"] = routes
return result, nil
}
func deleteDefaultGWResult020(result map[string]interface{}, ipv4, ipv6 bool) (map[string]interface{}, error) {
var err error
if ipv4 {
_, ok := result["ip4"]
if ok {
ip4, ok := result["ip4"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("wrong ip4 format: %v", result["ip4"])
}
_, ok = ip4["routes"]
if ok {
routes, ok := ip4["routes"].([]interface{})
if !ok {
return nil, fmt.Errorf("wrong ip4 routes format: %v", ip4["routes"])
}
routes, err = deleteDefaultGWResultRoutes(routes, "0.0.0.0/0")
if err != nil {
return nil, err
}
ip4["routes"] = routes
}
}
}
if ipv6 {
_, ok := result["ip6"]
if ok {
ip6, ok := result["ip6"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("wrong ip6 format: %v", result["ip6"])
}
_, ok = ip6["routes"]
if ok {
routes, ok := ip6["routes"].([]interface{})
if !ok {
return nil, fmt.Errorf("wrong ip6 routes format: %v", ip6["routes"])
}
routes, err = deleteDefaultGWResultRoutes(routes, "::0/0")
if err != nil {
return nil, err
}
ip6["routes"] = routes
}
}
}
return result, nil
}
// AddDefaultGWCache updates libcni cache to add default gateway result
func AddDefaultGWCache(cacheDir string, rt *libcni.RuntimeConf, netName string, ifName string, gw []net.IP) error {
cacheFile := filepath.Join(cacheDir, "results", fmt.Sprintf("%s-%s-%s", netName, rt.ContainerID, rt.IfName))
cache, err := ioutil.ReadFile(cacheFile)
if err != nil {
return err
}
logging.Debugf("AddDefaultGWCache: update cache to add GW from: %s", string(cache))
newCache, err := addDefaultGWCacheBytes(cache, gw)
if err != nil {
return err
}
logging.Debugf("AddDefaultGWCache: update cache to add GW: %s", string(newCache))
return ioutil.WriteFile(cacheFile, newCache, 0600)
}
func addDefaultGWCacheBytes(cacheFile []byte, gw []net.IP) ([]byte, error) {
var cachedInfo map[string]interface{}
if err := json.Unmarshal(cacheFile, &cachedInfo); err != nil {
return nil, err
}
// try to get result
_, ok := cachedInfo["result"]
if !ok {
return nil, fmt.Errorf("cannot get result from cache")
}
resultJSON, ok := cachedInfo["result"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("wrong result type: %v", cachedInfo["result"])
}
newResult, err := addDefaultGWResult(resultJSON, gw)
if err != nil {
return nil, err
}
cachedInfo["result"] = newResult
newCache, err := json.Marshal(cachedInfo)
if err != nil {
return nil, fmt.Errorf("failed to encode json: %v", err)
}
return newCache, nil
}
func addDefaultGWResult(result map[string]interface{}, gw []net.IP) (map[string]interface{}, error) {
// try to get cniVersion from result
_, ok := result["cniVersion"]
if !ok {
// fallback to processing result for old cni version(0.1.0/0.2.0)
return addDefaultGWResult020(result, gw)
}
cniVersion, ok := result["cniVersion"].(string)
if !ok {
return nil, fmt.Errorf("wrong cniVersion format: %v", result["cniVersion"])
}
if cniVersion == "0.1.0" || cniVersion == "0.2.0" {
// fallback to processing result for old cni version(0.1.0/0.2.0)
return addDefaultGWResult020(result, gw)
}
if cniVersion != "0.3.0" && cniVersion != "0.3.1" && cniVersion != "0.4.0" && cniVersion != "1.0.0" {
return nil, fmt.Errorf("not supported version: %s", cniVersion)
}
routes := []interface{}{}
_, ok = result["routes"]
if ok {
routes, ok = result["routes"].([]interface{})
if !ok {
return nil, fmt.Errorf("wrong routes format: %v", result["routes"])
}
}
for _, g := range gw {
dst := "0.0.0.0/0"
if g.To4() == nil {
dst = "::0/0"
}
routes = append(routes, map[string]string{
"dst": dst,
"gw": g.String(),
})
}
result["routes"] = routes
return result, nil
}
func addDefaultGWResult020(result map[string]interface{}, gw []net.IP) (map[string]interface{}, error) {
for _, g := range gw {
if g.To4() != nil {
_, ok := result["ip4"]
if ok {
ip4, ok := result["ip4"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("wrong ip4 format: %v", result["ip4"])
}
routes := []interface{}{}
_, ok = ip4["routes"]
if ok {
routes, ok = ip4["routes"].([]interface{})
if !ok {
return nil, fmt.Errorf("wrong ip4 routes format: %v", ip4["routes"])
}
}
ip4["routes"] = append(routes, map[string]string{
"dst": "0.0.0.0/0",
"gw": g.String(),
})
}
} else {
_, ok := result["ip6"]
if ok {
ip6, ok := result["ip6"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("wrong ip6 format: %v", result["ip4"])
}
routes := []interface{}{}
_, ok = ip6["routes"]
if ok {
routes, ok = ip6["routes"].([]interface{})
if !ok {
return nil, fmt.Errorf("wrong ip6 routes format: %v", ip6["routes"])
}
}
ip6["routes"] = append(routes, map[string]string{
"dst": "::/0",
"gw": g.String(),
})
}
}
}
return result, nil
} }

File diff suppressed because it is too large Load Diff

View File

@ -504,15 +504,45 @@ func addCNIArgsInConfList(inBytes []byte, cniArgs *map[string]interface{}) ([]by
return configBytes, nil return configBytes, nil
} }
// CheckGatewayConfig check gatewayRequest and mark IsFilterGateway flag if // CheckGatewayConfig check gatewayRequest and mark IsFilter{V4,V6}Gateway flag if
// gw filtering is required // gw filtering is required
func CheckGatewayConfig(delegates []*DelegateNetConf) { func CheckGatewayConfig(delegates []*DelegateNetConf) error {
// Check the Gateway
for i, delegate := range delegates { v4Gateways := 0
if delegate.GatewayRequest == nil { v6Gateways := 0
delegates[i].IsFilterGateway = true
// Check the gateway
for _, delegate := range delegates {
for _, gw := range delegate.GatewayRequest {
if gw.To4() != nil {
v4Gateways++
} else {
v6Gateways++
}
} }
} }
if v4Gateways > 1 || v6Gateways > 1 {
return fmt.Errorf("multus does not support ECMP for default-route")
}
// set filter flag for each delegate
for i, delegate := range delegates {
// no GatewayRequest
if delegate.GatewayRequest == nil {
delegates[i].IsFilterV4Gateway = true
delegates[i].IsFilterV6Gateway = true
} else {
for _, gw := range delegate.GatewayRequest {
if gw.To4() != nil {
delegates[i].IsFilterV6Gateway = true
} else {
delegates[i].IsFilterV4Gateway = true
}
}
}
}
return nil
} }
// CheckSystemNamespaces checks whether given namespace is in systemNamespaces or not. // CheckSystemNamespaces checks whether given namespace is in systemNamespaces or not.

View File

@ -98,7 +98,8 @@ type DelegateNetConf struct {
PortMappingsRequest []*PortMapEntry `json:"-"` PortMappingsRequest []*PortMapEntry `json:"-"`
BandwidthRequest *BandwidthEntry `json:"-"` BandwidthRequest *BandwidthEntry `json:"-"`
GatewayRequest []net.IP `json:"default-route,omitempty"` GatewayRequest []net.IP `json:"default-route,omitempty"`
IsFilterGateway bool IsFilterV4Gateway bool
IsFilterV6Gateway bool
// MasterPlugin is only used internal housekeeping // MasterPlugin is only used internal housekeeping
MasterPlugin bool `json:"-"` MasterPlugin bool `json:"-"`
// Conflist plugin is only used internal housekeeping // Conflist plugin is only used internal housekeeping