virtcontainers: Make the Network structure manage endpoints

Endpoints creations, attachement and hotplug are bound to the networking
namespace described through the Network structure.
Making them Network methods is natural and simplifies the code.

Signed-off-by: Samuel Ortiz <s.ortiz@apple.com>
This commit is contained in:
Samuel Ortiz 2021-11-04 13:34:08 +01:00 committed by Samuel Ortiz
parent 8f48e28325
commit 2edea88369
2 changed files with 211 additions and 146 deletions

View File

@ -236,6 +236,208 @@ func closeSpan(span otelTrace.Span, err error) {
span.End()
}
func (n *Network) attachEndpoint(ctx context.Context, s *Sandbox, netInfo NetworkInfo, link netlink.Link, hotplug bool) (Endpoint, error) {
var endpoint Endpoint
// TODO: This is the incoming interface
// based on the incoming interface we should create
// an appropriate EndPoint based on interface type
// This should be a switch
// Check if interface is a physical interface. Do not create
// tap interface/bridge if it is.
isPhysical, err := isPhysicalIface(netInfo.Iface.Name)
if err != nil {
return nil, err
}
if isPhysical {
networkLogger().WithField("interface", netInfo.Iface.Name).Info("Physical network interface found")
endpoint, err = createPhysicalEndpoint(netInfo)
} else {
var socketPath string
idx := len(n.Endpoints)
// Check if this is a dummy interface which has a vhost-user socket associated with it
socketPath, err = vhostUserSocketPath(netInfo)
if err != nil {
return nil, err
}
if socketPath != "" {
networkLogger().WithField("interface", netInfo.Iface.Name).Info("VhostUser network interface found")
endpoint, err = createVhostUserEndpoint(netInfo, socketPath)
} else if netInfo.Iface.Type == "macvlan" {
networkLogger().Infof("macvlan interface found")
endpoint, err = createMacvlanNetworkEndpoint(idx, netInfo.Iface.Name, n.InterworkingModel)
} else if netInfo.Iface.Type == "macvtap" {
networkLogger().Infof("macvtap interface found")
endpoint, err = createMacvtapNetworkEndpoint(netInfo)
} else if netInfo.Iface.Type == "tap" {
networkLogger().Info("tap interface found")
endpoint, err = createTapNetworkEndpoint(idx, netInfo.Iface.Name)
} else if netInfo.Iface.Type == "tuntap" {
if link != nil {
switch link.(*netlink.Tuntap).Mode {
case 0:
// mount /sys/class/net to get links
return nil, fmt.Errorf("Network device mode not determined correctly. Mount sysfs in caller")
case 1:
return nil, fmt.Errorf("tun networking device not yet supported")
case 2:
networkLogger().Info("tuntap tap interface found")
endpoint, err = createTuntapNetworkEndpoint(idx, netInfo.Iface.Name, netInfo.Iface.HardwareAddr, n.InterworkingModel)
default:
return nil, fmt.Errorf("tuntap network %v mode unsupported", link.(*netlink.Tuntap).Mode)
}
}
} else if netInfo.Iface.Type == "veth" {
networkLogger().Info("veth interface found")
endpoint, err = createVethNetworkEndpoint(idx, netInfo.Iface.Name, n.InterworkingModel)
} else if netInfo.Iface.Type == "ipvlan" {
networkLogger().Info("ipvlan interface found")
endpoint, err = createIPVlanNetworkEndpoint(idx, netInfo.Iface.Name)
} else {
return nil, fmt.Errorf("Unsupported network interface: %s", netInfo.Iface.Type)
}
}
endpoint.SetProperties(netInfo)
if err := doNetNS(n.NetNSPath, func(_ ns.NetNS) error {
networkLogger().WithField("endpoint-type", endpoint.Type()).WithField("hotplug", hotplug).Info("Attaching endpoint")
if hotplug {
if err := endpoint.HotAttach(ctx, s.hypervisor); err != nil {
return err
}
} else {
if err := endpoint.Attach(ctx, s); err != nil {
return err
}
}
if !s.hypervisor.IsRateLimiterBuiltin() {
rxRateLimiterMaxRate := s.hypervisor.HypervisorConfig().RxRateLimiterMaxRate
if rxRateLimiterMaxRate > 0 {
networkLogger().Info("Add Rx Rate Limiter")
if err := addRxRateLimiter(endpoint, rxRateLimiterMaxRate); err != nil {
return err
}
}
txRateLimiterMaxRate := s.hypervisor.HypervisorConfig().TxRateLimiterMaxRate
if txRateLimiterMaxRate > 0 {
networkLogger().Info("Add Tx Rate Limiter")
if err := addTxRateLimiter(endpoint, txRateLimiterMaxRate); err != nil {
return err
}
}
}
return nil
}); err != nil {
return nil, err
}
n.Endpoints = append(n.Endpoints, endpoint)
return endpoint, nil
}
func (n *Network) detachEndpoint(ctx context.Context, s *Sandbox, idx int, hotplug bool) error {
if idx > len(n.Endpoints)-1 {
return fmt.Errorf("Enpoint index overflow")
}
endpoint := n.Endpoints[idx]
if endpoint.GetRxRateLimiter() {
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Deleting rx rate limiter")
// Deleting rx rate limiter should enter the network namespace.
if err := removeRxRateLimiter(endpoint, n.NetNSPath); err != nil {
return err
}
}
if endpoint.GetTxRateLimiter() {
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Deleting tx rate limiter")
// Deleting tx rate limiter should enter the network namespace.
if err := removeTxRateLimiter(endpoint, n.NetNSPath); err != nil {
return err
}
}
// Detach for an endpoint should enter the network namespace
// if required.
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Detaching endpoint")
if hotplug && s != nil {
if err := endpoint.HotDetach(ctx, s.hypervisor, n.NetNSCreated, n.NetNSPath); err != nil {
return err
}
} else {
if err := endpoint.Detach(ctx, n.NetNSCreated, n.NetNSPath); err != nil {
return err
}
}
n.Endpoints = append(n.Endpoints[:idx], n.Endpoints[idx+1:]...)
return nil
}
// Scan the networking namespace through netlink and then:
// 1. Create the endpoints for the relevant interfaces found there.
// 2. Attach them to the VM.
func (n *Network) attachEndpoints(ctx context.Context, s *Sandbox, hotplug bool) error {
netnsHandle, err := netns.GetFromPath(n.NetNSPath)
if err != nil {
return err
}
defer netnsHandle.Close()
netlinkHandle, err := netlink.NewHandleAt(netnsHandle)
if err != nil {
return err
}
defer netlinkHandle.Close()
linkList, err := netlinkHandle.LinkList()
if err != nil {
return err
}
for _, link := range linkList {
netInfo, err := networkInfoFromLink(netlinkHandle, link)
if err != nil {
return err
}
// Ignore unconfigured network interfaces. These are
// either base tunnel devices that are not namespaced
// like gre0, gretap0, sit0, ipip0, tunl0 or incorrectly
// setup interfaces.
if len(netInfo.Addrs) == 0 {
continue
}
// Skip any loopback interfaces:
if (netInfo.Iface.Flags & net.FlagLoopback) != 0 {
continue
}
_, err = n.attachEndpoint(ctx, s, netInfo, link, hotplug)
if err != nil {
return err
}
}
sort.Slice(n.Endpoints, func(i, j int) bool {
return n.Endpoints[i].Name() < n.Endpoints[j].Name()
})
networkLogger().WithField("endpoints", n.Endpoints).Info("Endpoints found after scan")
return nil
}
// Run runs a callback in the specified network namespace.
func (n *Network) Run(ctx context.Context, _ string, cb func() error) error {
span, _ := n.trace(ctx, "Run")
@ -252,54 +454,14 @@ func (n *Network) Add(ctx context.Context, config *NetworkConfig, s *Sandbox, ho
katatrace.AddTags(span, "type", n.InterworkingModel.GetModel())
defer span.End()
endpoints, err := createEndpointsFromScan(n.NetNSPath, config)
if err != nil {
return endpoints, err
}
katatrace.AddTags(span, "endpoints", endpoints, "hotplug", hotplug)
err = doNetNS(n.NetNSPath, func(_ ns.NetNS) error {
for _, endpoint := range endpoints {
networkLogger().WithField("endpoint-type", endpoint.Type()).WithField("hotplug", hotplug).Info("Attaching endpoint")
if hotplug {
if err := endpoint.HotAttach(ctx, s.hypervisor); err != nil {
return err
}
} else {
if err := endpoint.Attach(ctx, s); err != nil {
return err
}
}
if !s.hypervisor.IsRateLimiterBuiltin() {
rxRateLimiterMaxRate := s.hypervisor.HypervisorConfig().RxRateLimiterMaxRate
if rxRateLimiterMaxRate > 0 {
networkLogger().Info("Add Rx Rate Limiter")
if err := addRxRateLimiter(endpoint, rxRateLimiterMaxRate); err != nil {
return err
}
}
txRateLimiterMaxRate := s.hypervisor.HypervisorConfig().TxRateLimiterMaxRate
if txRateLimiterMaxRate > 0 {
networkLogger().Info("Add Tx Rate Limiter")
if err := addTxRateLimiter(endpoint, txRateLimiterMaxRate); err != nil {
return err
}
}
}
}
return nil
})
if err != nil {
return []Endpoint{}, err
if err := n.attachEndpoints(ctx, s, hotplug); err != nil {
return n.Endpoints, err
}
n.Endpoints = append(n.Endpoints, endpoints...)
katatrace.AddTags(span, "endpoints", n.Endpoints, "hotplug", hotplug)
networkLogger().Debug("Network added")
return endpoints, nil
return n.Endpoints, nil
}
func (n *Network) PostAdd(ctx context.Context, _ *NetworkNamespace, hotplug bool) error {
@ -334,27 +496,8 @@ func (n *Network) Remove(ctx context.Context, _ *NetworkNamespace, hypervisor Hy
span, ctx := n.trace(ctx, "Remove")
defer span.End()
for _, endpoint := range n.Endpoints {
if endpoint.GetRxRateLimiter() {
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Deleting rx rate limiter")
// Deleting rx rate limiter should enter the network namespace.
if err := removeRxRateLimiter(endpoint, n.NetNSPath); err != nil {
return err
}
}
if endpoint.GetTxRateLimiter() {
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Deleting tx rate limiter")
// Deleting tx rate limiter should enter the network namespace.
if err := removeTxRateLimiter(endpoint, n.NetNSPath); err != nil {
return err
}
}
// Detach for an endpoint should enter the network namespace
// if required.
networkLogger().WithField("endpoint-type", endpoint.Type()).Info("Detaching endpoint")
if err := endpoint.Detach(ctx, n.NetNSCreated, n.NetNSPath); err != nil {
for i, _ := range n.Endpoints {
if err := n.detachEndpoint(ctx, nil, i, false); err != nil {
return err
}
}
@ -1204,73 +1347,6 @@ func networkInfoFromLink(handle *netlink.Handle, link netlink.Link) (NetworkInfo
}, nil
}
func createEndpointsFromScan(networkNSPath string, config *NetworkConfig) ([]Endpoint, error) {
var endpoints []Endpoint
netnsHandle, err := netns.GetFromPath(networkNSPath)
if err != nil {
return []Endpoint{}, err
}
defer netnsHandle.Close()
netlinkHandle, err := netlink.NewHandleAt(netnsHandle)
if err != nil {
return []Endpoint{}, err
}
defer netlinkHandle.Close()
linkList, err := netlinkHandle.LinkList()
if err != nil {
return []Endpoint{}, err
}
idx := 0
for _, link := range linkList {
var (
endpoint Endpoint
errCreate error
)
netInfo, err := networkInfoFromLink(netlinkHandle, link)
if err != nil {
return []Endpoint{}, err
}
// Ignore unconfigured network interfaces. These are
// either base tunnel devices that are not namespaced
// like gre0, gretap0, sit0, ipip0, tunl0 or incorrectly
// setup interfaces.
if len(netInfo.Addrs) == 0 {
continue
}
// Skip any loopback interfaces:
if (netInfo.Iface.Flags & net.FlagLoopback) != 0 {
continue
}
if err := doNetNS(networkNSPath, func(_ ns.NetNS) error {
endpoint, errCreate = createEndpoint(netInfo, idx, config.InterworkingModel, link)
return errCreate
}); err != nil {
return []Endpoint{}, err
}
endpoint.SetProperties(netInfo)
endpoints = append(endpoints, endpoint)
idx++
}
sort.Slice(endpoints, func(i, j int) bool {
return endpoints[i].Name() < endpoints[j].Name()
})
networkLogger().WithField("endpoints", endpoints).Info("Endpoints found after scan")
return endpoints, nil
}
func createEndpoint(netInfo NetworkInfo, idx int, model NetInterworkingModel, link netlink.Link) (Endpoint, error) {
var endpoint Endpoint
// TODO: This is the incoming interface

View File

@ -20,7 +20,6 @@ import (
"sync"
"syscall"
"github.com/containernetworking/plugins/pkg/ns"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -884,22 +883,14 @@ func (s *Sandbox) AddInterface(ctx context.Context, inf *pbTypes.Interface) (*pb
return nil, err
}
endpoint, err := createEndpoint(netInfo, len(s.networkNS.Endpoints), s.config.NetworkConfig.InterworkingModel, nil)
endpoint, err := s.network.attachEndpoint(ctx, s, netInfo, nil, true)
if err != nil {
return nil, err
}
endpoint.SetProperties(netInfo)
if err := doNetNS(s.networkNS.NetNsPath, func(_ ns.NetNS) error {
s.Logger().WithField("endpoint-type", endpoint.Type()).Info("Hot attaching endpoint")
return endpoint.HotAttach(ctx, s.hypervisor)
}); err != nil {
return nil, err
}
defer func() {
if err != nil {
if errDetach := endpoint.HotDetach(ctx, s.hypervisor, s.networkNS.NetNsCreated, s.networkNS.NetNsPath); errDetach != nil {
if errDetach := s.network.detachEndpoint(ctx, s, len(s.network.Endpoints)-1, true); err != nil {
s.Logger().WithField("endpoint-type", endpoint.Type()).WithError(errDetach).Error("rollback hot attaching endpoint failed")
}
}
@ -913,7 +904,6 @@ func (s *Sandbox) AddInterface(ctx context.Context, inf *pbTypes.Interface) (*pb
}
// Update the sandbox storage
s.networkNS.Endpoints = append(s.networkNS.Endpoints, endpoint)
if err = s.Save(); err != nil {
return nil, err
}
@ -926,10 +916,9 @@ func (s *Sandbox) RemoveInterface(ctx context.Context, inf *pbTypes.Interface) (
for i, endpoint := range s.networkNS.Endpoints {
if endpoint.HardwareAddr() == inf.HwAddr {
s.Logger().WithField("endpoint-type", endpoint.Type()).Info("Hot detaching endpoint")
if err := endpoint.HotDetach(ctx, s.hypervisor, s.networkNS.NetNsCreated, s.networkNS.NetNsPath); err != nil {
if err := s.network.detachEndpoint(ctx, s, i, true); err != nil {
return inf, err
}
s.networkNS.Endpoints = append(s.networkNS.Endpoints[:i], s.networkNS.Endpoints[i+1:]...)
if err := s.Save(); err != nil {
return inf, err