mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-04 02:56:18 +00:00
netmon: Monitor network changes
This commit introduces a new watcher dedicated to the monitoring of a specific network namespace in order to detect any change that could happen to the network. As a result of such a detection, the watcher should call into the appropriate runtime path with the correct arguments to modify the pod network accordingly. Fixes #170 Signed-off-by: Sebastien Boeuf <sebastien.boeuf@intel.com>
This commit is contained in:
parent
d6e4a98387
commit
b708a4a05c
1
.gitignore
vendored
1
.gitignore
vendored
@ -6,6 +6,7 @@
|
||||
/cli/coverage.html
|
||||
/cli/config-generated.go
|
||||
/cli/config/configuration.toml
|
||||
/kata-netmon
|
||||
/virtcontainers/hack/virtc/virtc
|
||||
/virtcontainers/hook/mock/hook
|
||||
/virtcontainers/shim/mock/shim
|
||||
|
565
netmon/netmon.go
Normal file
565
netmon/netmon.go
Normal file
@ -0,0 +1,565 @@
|
||||
// Copyright (c) 2018 Intel Corporation
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/kata-containers/agent/protocols/grpc"
|
||||
"github.com/vishvananda/netlink"
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
const (
|
||||
netmonName = "kata-netmon"
|
||||
netmonVersion = "0.0.1"
|
||||
|
||||
kataCmd = "kata-network"
|
||||
kataCLIAddIfaceCmd = "add-iface"
|
||||
kataCLIDelIfaceCmd = "del-iface"
|
||||
kataCLIUpdtRoutesCmd = "update-routes"
|
||||
|
||||
kataSuffix = "kata"
|
||||
|
||||
// For simplicity the code will only focus on IPv4 addresses for now.
|
||||
netlinkFamily = netlink.FAMILY_V4
|
||||
|
||||
storageParentPath = "/var/run/kata-containers/netmon/sbs"
|
||||
storageDirPerm = os.FileMode(0750)
|
||||
|
||||
// sharedFile is the name of the file that will be used to share
|
||||
// the data between this process and the kata-runtime process
|
||||
// responsible for updating the network.
|
||||
sharedFile = "shared.json"
|
||||
storageFilePerm = os.FileMode(0640)
|
||||
)
|
||||
|
||||
type netmonParams struct {
|
||||
sandboxID string
|
||||
runtimePath string
|
||||
debug bool
|
||||
}
|
||||
|
||||
type netmon struct {
|
||||
netmonParams
|
||||
|
||||
storagePath string
|
||||
sharedFile string
|
||||
|
||||
netIfaces map[int]grpc.Interface
|
||||
|
||||
linkUpdateCh chan netlink.LinkUpdate
|
||||
linkDoneCh chan struct{}
|
||||
|
||||
rtUpdateCh chan netlink.RouteUpdate
|
||||
rtDoneCh chan struct{}
|
||||
|
||||
netHandler *netlink.Handle
|
||||
}
|
||||
|
||||
func printVersion() {
|
||||
fmt.Printf("%s version %s\n", netmonName, netmonVersion)
|
||||
}
|
||||
|
||||
const componentDescription = `is a network monitoring process that is intended to be started in the
|
||||
appropriate network namespace so that it can listen to any event related to
|
||||
link and routes. Whenever a new interface or route is created/updated, it is
|
||||
responsible for calling into the kata-runtime CLI to ask for the actual
|
||||
creation/update of the given interface or route.
|
||||
`
|
||||
|
||||
func printComponentDescription() {
|
||||
fmt.Printf("\n%s %s\n", netmonName, componentDescription)
|
||||
}
|
||||
|
||||
func parseOptions() netmonParams {
|
||||
var version, help bool
|
||||
|
||||
params := netmonParams{}
|
||||
|
||||
flag.BoolVar(&help, "h", false, "describe component usage")
|
||||
flag.BoolVar(&help, "help", false, "")
|
||||
flag.BoolVar(¶ms.debug, "d", false, "enable debug mode")
|
||||
flag.BoolVar(&version, "v", false, "display program version and exit")
|
||||
flag.BoolVar(&version, "version", false, "")
|
||||
flag.StringVar(¶ms.sandboxID, "s", "", "sandbox id (required)")
|
||||
flag.StringVar(¶ms.runtimePath, "r", "", "runtime path (required)")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
if help {
|
||||
printComponentDescription()
|
||||
flag.PrintDefaults()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if version {
|
||||
printVersion()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
if params.sandboxID == "" {
|
||||
fmt.Fprintf(os.Stderr, "Error: sandbox id is empty, one must be provided\n")
|
||||
flag.PrintDefaults()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if params.runtimePath == "" {
|
||||
fmt.Fprintf(os.Stderr, "Error: runtime path is empty, one must be provided\n")
|
||||
flag.PrintDefaults()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return params
|
||||
}
|
||||
|
||||
func newNetmon(params netmonParams) (*netmon, error) {
|
||||
handler, err := netlink.NewHandle(netlinkFamily)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
n := &netmon{
|
||||
netmonParams: params,
|
||||
storagePath: filepath.Join(storageParentPath, params.sandboxID),
|
||||
sharedFile: filepath.Join(storageParentPath, params.sandboxID, sharedFile),
|
||||
netIfaces: make(map[int]grpc.Interface),
|
||||
linkUpdateCh: make(chan netlink.LinkUpdate),
|
||||
linkDoneCh: make(chan struct{}),
|
||||
rtUpdateCh: make(chan netlink.RouteUpdate),
|
||||
rtDoneCh: make(chan struct{}),
|
||||
netHandler: handler,
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(n.storagePath, storageDirPerm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (n *netmon) cleanup() {
|
||||
os.RemoveAll(n.storagePath)
|
||||
n.netHandler.Delete()
|
||||
close(n.linkDoneCh)
|
||||
close(n.rtDoneCh)
|
||||
}
|
||||
|
||||
func initLogs() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *netmon) listenNetlinkEvents() error {
|
||||
if err := netlink.LinkSubscribe(n.linkUpdateCh, n.linkDoneCh); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return netlink.RouteSubscribe(n.rtUpdateCh, n.rtDoneCh)
|
||||
}
|
||||
|
||||
// convertInterface converts a link and its IP addresses as defined by netlink
|
||||
// package, into the Interface structure format expected by kata-runtime to
|
||||
// describe an interface and its associated IP addresses.
|
||||
func convertInterface(linkAttrs *netlink.LinkAttrs, addrs []netlink.Addr) grpc.Interface {
|
||||
if linkAttrs == nil {
|
||||
fmt.Printf("Link attributes are nil")
|
||||
return grpc.Interface{}
|
||||
}
|
||||
|
||||
var ipAddrs []*grpc.IPAddress
|
||||
|
||||
for _, addr := range addrs {
|
||||
if addr.IPNet == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
netMask, _ := addr.Mask.Size()
|
||||
|
||||
ipAddr := &grpc.IPAddress{
|
||||
Family: grpc.IPFamily(netlinkFamily),
|
||||
Address: addr.IP.String(),
|
||||
Mask: fmt.Sprintf("%d", netMask),
|
||||
}
|
||||
|
||||
ipAddrs = append(ipAddrs, ipAddr)
|
||||
}
|
||||
|
||||
return grpc.Interface{
|
||||
Device: linkAttrs.Name,
|
||||
Name: linkAttrs.Name,
|
||||
IPAddresses: ipAddrs,
|
||||
Mtu: uint64(linkAttrs.MTU),
|
||||
HwAddr: linkAttrs.HardwareAddr.String(),
|
||||
}
|
||||
}
|
||||
|
||||
// convertRoutes converts a list of routes as defined by netlink package,
|
||||
// into a list of Route structure format expected by kata-runtime to
|
||||
// describe a set of routes.
|
||||
func convertRoutes(netRoutes []netlink.Route) []grpc.Route {
|
||||
var routes []grpc.Route
|
||||
|
||||
// Ignore routes with IPv6 addresses as this is not supported
|
||||
// by Kata yet.
|
||||
for _, netRoute := range netRoutes {
|
||||
dst := ""
|
||||
if netRoute.Dst != nil && netRoute.Dst.IP.To4() != nil {
|
||||
dst = netRoute.Dst.IP.String()
|
||||
}
|
||||
|
||||
src := ""
|
||||
if netRoute.Src.To4() != nil {
|
||||
src = netRoute.Src.String()
|
||||
}
|
||||
|
||||
gw := ""
|
||||
if netRoute.Gw.To4() != nil {
|
||||
gw = netRoute.Gw.String()
|
||||
}
|
||||
|
||||
dev := ""
|
||||
iface, err := net.InterfaceByIndex(netRoute.LinkIndex)
|
||||
if err == nil {
|
||||
dev = iface.Name
|
||||
}
|
||||
|
||||
route := grpc.Route{
|
||||
Dest: dst,
|
||||
Gateway: gw,
|
||||
Device: dev,
|
||||
Source: src,
|
||||
Scope: uint32(netRoute.Scope),
|
||||
}
|
||||
|
||||
routes = append(routes, route)
|
||||
}
|
||||
|
||||
return routes
|
||||
}
|
||||
|
||||
// scanNetwork lists all the interfaces it can find inside the current
|
||||
// network namespace, and store them in-memory to keep track of them.
|
||||
func (n *netmon) scanNetwork() error {
|
||||
links, err := n.netHandler.LinkList()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, link := range links {
|
||||
addrs, err := n.netHandler.AddrList(link, netlinkFamily)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
linkAttrs := link.Attrs()
|
||||
if linkAttrs == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
iface := convertInterface(linkAttrs, addrs)
|
||||
n.netIfaces[linkAttrs.Index] = iface
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *netmon) storeDataToSend(data interface{}) error {
|
||||
// Marshal the data structure into a JSON bytes array.
|
||||
jsonArray, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Store the JSON bytes array at the specified path.
|
||||
return ioutil.WriteFile(n.sharedFile, jsonArray, storageFilePerm)
|
||||
}
|
||||
|
||||
func (n *netmon) execKataCmd(subCmd string) error {
|
||||
execCmd := exec.Command(n.runtimePath, kataCmd, subCmd, n.sandboxID, n.sharedFile)
|
||||
|
||||
// Make use of Run() to ensure the kata-runtime process has correctly
|
||||
// terminated before to go further.
|
||||
out, err := execCmd.Output()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Printf("COMMAND OUTPUT: %q\n", string(out))
|
||||
|
||||
// Remove the shared file after the command returned. At this point
|
||||
// we know the content of the file is not going to be used anymore,
|
||||
// and the file path can be reused for further commands.
|
||||
return os.Remove(n.sharedFile)
|
||||
}
|
||||
|
||||
func (n *netmon) addInterfaceCLI(iface grpc.Interface) error {
|
||||
fmt.Printf("%s %s %+v\n", n.runtimePath, kataCLIAddIfaceCmd, iface)
|
||||
|
||||
if err := n.storeDataToSend(iface); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return n.execKataCmd(kataCLIAddIfaceCmd)
|
||||
}
|
||||
|
||||
func (n *netmon) delInterfaceCLI(iface grpc.Interface) error {
|
||||
fmt.Printf("%s %s %+v\n", n.runtimePath, kataCLIDelIfaceCmd, iface)
|
||||
|
||||
if err := n.storeDataToSend(iface); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return n.execKataCmd(kataCLIDelIfaceCmd)
|
||||
}
|
||||
|
||||
func (n *netmon) updateRoutesCLI(routes []grpc.Route) error {
|
||||
fmt.Printf("%s %s %+v\n", n.runtimePath, kataCLIUpdtRoutesCmd, routes)
|
||||
|
||||
if err := n.storeDataToSend(routes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return n.execKataCmd(kataCLIUpdtRoutesCmd)
|
||||
}
|
||||
|
||||
func (n *netmon) updateRoutes() error {
|
||||
// Get all the routes.
|
||||
netlinkRoutes, err := n.netHandler.RouteList(nil, netlinkFamily)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Translate them into grpc.Route structures.
|
||||
routes := convertRoutes(netlinkRoutes)
|
||||
|
||||
// Update the routes through the Kata CLI.
|
||||
return n.updateRoutesCLI(routes)
|
||||
}
|
||||
|
||||
func (n *netmon) handleRTMNewAddr(ev netlink.LinkUpdate) error {
|
||||
fmt.Printf("handleRTMNewAddr: Interface update not supported\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *netmon) handleRTMDelAddr(ev netlink.LinkUpdate) error {
|
||||
fmt.Printf("handleRTMDelAddr: Interface update not supported\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *netmon) handleRTMNewLink(ev netlink.LinkUpdate) error {
|
||||
// NEWLINK might be a lot of different things. We're interested in
|
||||
// adding the interface (both to our list and by calling into the
|
||||
// Kata CLI API) only if this has the flags UP and RUNNING, meaning
|
||||
// we don't expect any further change on the interface, and that we
|
||||
// are ready to add it.
|
||||
|
||||
linkAttrs := ev.Link.Attrs()
|
||||
if linkAttrs == nil {
|
||||
fmt.Printf("The link attributes are nil\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
// First, ignore if the interface name contains "kata". This way we
|
||||
// are preventing from adding interfaces created by Kata Containers.
|
||||
if strings.HasSuffix(linkAttrs.Name, kataSuffix) {
|
||||
fmt.Printf("Ignore the interface %s because found %q\n",
|
||||
linkAttrs.Name, kataSuffix)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if the interface exist in the internal list.
|
||||
if _, exist := n.netIfaces[int(ev.Index)]; exist {
|
||||
fmt.Printf("Ignoring interface %s because already exist\n",
|
||||
linkAttrs.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Now, check if the interface has been enabled to UP and RUNNING.
|
||||
if (ev.Flags&unix.IFF_UP) != unix.IFF_UP ||
|
||||
(ev.Flags&unix.IFF_RUNNING) != unix.IFF_RUNNING {
|
||||
fmt.Printf("Ignore the interface %s because not UP and RUNNING\n",
|
||||
linkAttrs.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the list of IP addresses associated with this interface.
|
||||
addrs, err := n.netHandler.AddrList(ev.Link, netlinkFamily)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Convert the interfaces in the appropriate structure format.
|
||||
iface := convertInterface(linkAttrs, addrs)
|
||||
|
||||
// Add the interface through the Kata CLI.
|
||||
if err := n.addInterfaceCLI(iface); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add the interface to the internal list.
|
||||
n.netIfaces[linkAttrs.Index] = iface
|
||||
|
||||
// Complete by updating the routes.
|
||||
return n.updateRoutes()
|
||||
}
|
||||
|
||||
func (n *netmon) handleRTMDelLink(ev netlink.LinkUpdate) error {
|
||||
// It can only delete if identical interface is found in the internal
|
||||
// list of interfaces. Otherwise, the deletion will be ignored.
|
||||
linkAttrs := ev.Link.Attrs()
|
||||
if linkAttrs == nil {
|
||||
fmt.Printf("Link attributes are nil\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
// First, ignore if the interface name contains "kata". This way we
|
||||
// are preventing from deleting interfaces created by Kata Containers.
|
||||
if strings.Contains(linkAttrs.Name, kataSuffix) {
|
||||
fmt.Printf("Ignore the interface %s because found %q\n",
|
||||
linkAttrs.Name, kataSuffix)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if the interface exist in the internal list.
|
||||
iface, exist := n.netIfaces[int(ev.Index)]
|
||||
if !exist {
|
||||
fmt.Printf("Ignoring interface %s because not found\n",
|
||||
linkAttrs.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := n.delInterfaceCLI(iface); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete the interface from the internal list.
|
||||
delete(n.netIfaces, linkAttrs.Index)
|
||||
|
||||
// Complete by updating the routes.
|
||||
return n.updateRoutes()
|
||||
}
|
||||
|
||||
func (n *netmon) handleRTMNewRoute(ev netlink.RouteUpdate) error {
|
||||
// Add the route through updateRoutes(), only if the route refer to an
|
||||
// interface that already exists in the internal list of interfaces.
|
||||
if _, exist := n.netIfaces[ev.Route.LinkIndex]; !exist {
|
||||
fmt.Printf("Ignoring route %+v since interface %d not found\n",
|
||||
ev.Route, ev.Route.LinkIndex)
|
||||
return nil
|
||||
}
|
||||
|
||||
return n.updateRoutes()
|
||||
}
|
||||
|
||||
func (n *netmon) handleRTMDelRoute(ev netlink.RouteUpdate) error {
|
||||
// Remove the route through updateRoutes(), only if the route refer to
|
||||
// an interface that already exists in the internal list of interfaces.
|
||||
return n.updateRoutes()
|
||||
}
|
||||
|
||||
func (n *netmon) handleLinkEvent(ev netlink.LinkUpdate) error {
|
||||
switch ev.Header.Type {
|
||||
case unix.NLMSG_DONE:
|
||||
fmt.Printf("netlink msg type: NLMSG_DONE\n")
|
||||
return nil
|
||||
case unix.NLMSG_ERROR:
|
||||
fmt.Printf("netlink msg type: NLMSG_ERROR\n")
|
||||
return fmt.Errorf("Error while listening on netlink socket")
|
||||
case unix.RTM_NEWADDR:
|
||||
fmt.Printf("netlink msg type: RTM_NEWADDR\n")
|
||||
return n.handleRTMNewAddr(ev)
|
||||
case unix.RTM_DELADDR:
|
||||
fmt.Printf("handle_netlink_message: RTM_DELADDR\n")
|
||||
return n.handleRTMDelAddr(ev)
|
||||
case unix.RTM_NEWLINK:
|
||||
fmt.Printf("handle_netlink_message: RTM_NEWLINK\n")
|
||||
return n.handleRTMNewLink(ev)
|
||||
case unix.RTM_DELLINK:
|
||||
fmt.Printf("handle_netlink_message: RTM_DELLINK\n")
|
||||
return n.handleRTMDelLink(ev)
|
||||
default:
|
||||
fmt.Printf("Unknown msg type %v\n", ev.Header.Type)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *netmon) handleRouteEvent(ev netlink.RouteUpdate) error {
|
||||
switch ev.Type {
|
||||
case unix.RTM_NEWROUTE:
|
||||
fmt.Printf("handle_netlink_message: RTM_NEWROUTE\n")
|
||||
return n.handleRTMNewRoute(ev)
|
||||
case unix.RTM_DELROUTE:
|
||||
fmt.Printf("handle_netlink_message: RTM_DELROUTE\n")
|
||||
return n.handleRTMDelRoute(ev)
|
||||
default:
|
||||
fmt.Printf("Unknown msg type %v\n", ev.Type)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *netmon) handleEvents() (err error) {
|
||||
for {
|
||||
select {
|
||||
case ev := <-n.linkUpdateCh:
|
||||
if err = n.handleLinkEvent(ev); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: handleLinkEvent() %v", err)
|
||||
return err
|
||||
}
|
||||
case ev := <-n.rtUpdateCh:
|
||||
if err = n.handleRouteEvent(ev); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: handleRouteEvent() %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Parse parameters.
|
||||
params := parseOptions()
|
||||
|
||||
// Create netmon handler.
|
||||
n, err := newNetmon(params)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: newNetmon() %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer n.cleanup()
|
||||
|
||||
// Init logging.
|
||||
if err := initLogs(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: initLogs() %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Scan the current interfaces.
|
||||
if err := n.scanNetwork(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: scanNetwork() %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Subscribe to the link listener.
|
||||
if err := n.listenNetlinkEvents(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: listenNetlinkEvents() %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Go into the main loop.
|
||||
if err := n.handleEvents(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Error: handleEvents() %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user