diff --git a/.gitignore b/.gitignore index 29f801ccdc..58b1b58a28 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ /cli/coverage.html /cli/config-generated.go /cli/config/configuration.toml +/kata-netmon /virtcontainers/hack/virtc/virtc /virtcontainers/hook/mock/hook /virtcontainers/shim/mock/shim diff --git a/netmon/netmon.go b/netmon/netmon.go new file mode 100644 index 0000000000..2e7ebc765d --- /dev/null +++ b/netmon/netmon.go @@ -0,0 +1,565 @@ +// Copyright (c) 2018 Intel Corporation +// +// SPDX-License-Identifier: Apache-2.0 +// + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "net" + "os" + "os/exec" + "path/filepath" + "strings" + + "github.com/kata-containers/agent/protocols/grpc" + "github.com/vishvananda/netlink" + "golang.org/x/sys/unix" +) + +const ( + netmonName = "kata-netmon" + netmonVersion = "0.0.1" + + kataCmd = "kata-network" + kataCLIAddIfaceCmd = "add-iface" + kataCLIDelIfaceCmd = "del-iface" + kataCLIUpdtRoutesCmd = "update-routes" + + kataSuffix = "kata" + + // For simplicity the code will only focus on IPv4 addresses for now. + netlinkFamily = netlink.FAMILY_V4 + + storageParentPath = "/var/run/kata-containers/netmon/sbs" + storageDirPerm = os.FileMode(0750) + + // sharedFile is the name of the file that will be used to share + // the data between this process and the kata-runtime process + // responsible for updating the network. + sharedFile = "shared.json" + storageFilePerm = os.FileMode(0640) +) + +type netmonParams struct { + sandboxID string + runtimePath string + debug bool +} + +type netmon struct { + netmonParams + + storagePath string + sharedFile string + + netIfaces map[int]grpc.Interface + + linkUpdateCh chan netlink.LinkUpdate + linkDoneCh chan struct{} + + rtUpdateCh chan netlink.RouteUpdate + rtDoneCh chan struct{} + + netHandler *netlink.Handle +} + +func printVersion() { + fmt.Printf("%s version %s\n", netmonName, netmonVersion) +} + +const componentDescription = `is a network monitoring process that is intended to be started in the +appropriate network namespace so that it can listen to any event related to +link and routes. Whenever a new interface or route is created/updated, it is +responsible for calling into the kata-runtime CLI to ask for the actual +creation/update of the given interface or route. +` + +func printComponentDescription() { + fmt.Printf("\n%s %s\n", netmonName, componentDescription) +} + +func parseOptions() netmonParams { + var version, help bool + + params := netmonParams{} + + flag.BoolVar(&help, "h", false, "describe component usage") + flag.BoolVar(&help, "help", false, "") + flag.BoolVar(¶ms.debug, "d", false, "enable debug mode") + flag.BoolVar(&version, "v", false, "display program version and exit") + flag.BoolVar(&version, "version", false, "") + flag.StringVar(¶ms.sandboxID, "s", "", "sandbox id (required)") + flag.StringVar(¶ms.runtimePath, "r", "", "runtime path (required)") + + flag.Parse() + + if help { + printComponentDescription() + flag.PrintDefaults() + os.Exit(0) + } + + if version { + printVersion() + os.Exit(0) + } + + if params.sandboxID == "" { + fmt.Fprintf(os.Stderr, "Error: sandbox id is empty, one must be provided\n") + flag.PrintDefaults() + os.Exit(1) + } + + if params.runtimePath == "" { + fmt.Fprintf(os.Stderr, "Error: runtime path is empty, one must be provided\n") + flag.PrintDefaults() + os.Exit(1) + } + + return params +} + +func newNetmon(params netmonParams) (*netmon, error) { + handler, err := netlink.NewHandle(netlinkFamily) + if err != nil { + return nil, err + } + + n := &netmon{ + netmonParams: params, + storagePath: filepath.Join(storageParentPath, params.sandboxID), + sharedFile: filepath.Join(storageParentPath, params.sandboxID, sharedFile), + netIfaces: make(map[int]grpc.Interface), + linkUpdateCh: make(chan netlink.LinkUpdate), + linkDoneCh: make(chan struct{}), + rtUpdateCh: make(chan netlink.RouteUpdate), + rtDoneCh: make(chan struct{}), + netHandler: handler, + } + + if err := os.MkdirAll(n.storagePath, storageDirPerm); err != nil { + return nil, err + } + + return n, nil +} + +func (n *netmon) cleanup() { + os.RemoveAll(n.storagePath) + n.netHandler.Delete() + close(n.linkDoneCh) + close(n.rtDoneCh) +} + +func initLogs() error { + return nil +} + +func (n *netmon) listenNetlinkEvents() error { + if err := netlink.LinkSubscribe(n.linkUpdateCh, n.linkDoneCh); err != nil { + return err + } + + return netlink.RouteSubscribe(n.rtUpdateCh, n.rtDoneCh) +} + +// convertInterface converts a link and its IP addresses as defined by netlink +// package, into the Interface structure format expected by kata-runtime to +// describe an interface and its associated IP addresses. +func convertInterface(linkAttrs *netlink.LinkAttrs, addrs []netlink.Addr) grpc.Interface { + if linkAttrs == nil { + fmt.Printf("Link attributes are nil") + return grpc.Interface{} + } + + var ipAddrs []*grpc.IPAddress + + for _, addr := range addrs { + if addr.IPNet == nil { + continue + } + + netMask, _ := addr.Mask.Size() + + ipAddr := &grpc.IPAddress{ + Family: grpc.IPFamily(netlinkFamily), + Address: addr.IP.String(), + Mask: fmt.Sprintf("%d", netMask), + } + + ipAddrs = append(ipAddrs, ipAddr) + } + + return grpc.Interface{ + Device: linkAttrs.Name, + Name: linkAttrs.Name, + IPAddresses: ipAddrs, + Mtu: uint64(linkAttrs.MTU), + HwAddr: linkAttrs.HardwareAddr.String(), + } +} + +// convertRoutes converts a list of routes as defined by netlink package, +// into a list of Route structure format expected by kata-runtime to +// describe a set of routes. +func convertRoutes(netRoutes []netlink.Route) []grpc.Route { + var routes []grpc.Route + + // Ignore routes with IPv6 addresses as this is not supported + // by Kata yet. + for _, netRoute := range netRoutes { + dst := "" + if netRoute.Dst != nil && netRoute.Dst.IP.To4() != nil { + dst = netRoute.Dst.IP.String() + } + + src := "" + if netRoute.Src.To4() != nil { + src = netRoute.Src.String() + } + + gw := "" + if netRoute.Gw.To4() != nil { + gw = netRoute.Gw.String() + } + + dev := "" + iface, err := net.InterfaceByIndex(netRoute.LinkIndex) + if err == nil { + dev = iface.Name + } + + route := grpc.Route{ + Dest: dst, + Gateway: gw, + Device: dev, + Source: src, + Scope: uint32(netRoute.Scope), + } + + routes = append(routes, route) + } + + return routes +} + +// scanNetwork lists all the interfaces it can find inside the current +// network namespace, and store them in-memory to keep track of them. +func (n *netmon) scanNetwork() error { + links, err := n.netHandler.LinkList() + if err != nil { + return err + } + + for _, link := range links { + addrs, err := n.netHandler.AddrList(link, netlinkFamily) + if err != nil { + return err + } + + linkAttrs := link.Attrs() + if linkAttrs == nil { + continue + } + + iface := convertInterface(linkAttrs, addrs) + n.netIfaces[linkAttrs.Index] = iface + } + + return nil +} + +func (n *netmon) storeDataToSend(data interface{}) error { + // Marshal the data structure into a JSON bytes array. + jsonArray, err := json.Marshal(data) + if err != nil { + return err + } + + // Store the JSON bytes array at the specified path. + return ioutil.WriteFile(n.sharedFile, jsonArray, storageFilePerm) +} + +func (n *netmon) execKataCmd(subCmd string) error { + execCmd := exec.Command(n.runtimePath, kataCmd, subCmd, n.sandboxID, n.sharedFile) + + // Make use of Run() to ensure the kata-runtime process has correctly + // terminated before to go further. + out, err := execCmd.Output() + if err != nil { + return err + } + + fmt.Printf("COMMAND OUTPUT: %q\n", string(out)) + + // Remove the shared file after the command returned. At this point + // we know the content of the file is not going to be used anymore, + // and the file path can be reused for further commands. + return os.Remove(n.sharedFile) +} + +func (n *netmon) addInterfaceCLI(iface grpc.Interface) error { + fmt.Printf("%s %s %+v\n", n.runtimePath, kataCLIAddIfaceCmd, iface) + + if err := n.storeDataToSend(iface); err != nil { + return err + } + + return n.execKataCmd(kataCLIAddIfaceCmd) +} + +func (n *netmon) delInterfaceCLI(iface grpc.Interface) error { + fmt.Printf("%s %s %+v\n", n.runtimePath, kataCLIDelIfaceCmd, iface) + + if err := n.storeDataToSend(iface); err != nil { + return err + } + + return n.execKataCmd(kataCLIDelIfaceCmd) +} + +func (n *netmon) updateRoutesCLI(routes []grpc.Route) error { + fmt.Printf("%s %s %+v\n", n.runtimePath, kataCLIUpdtRoutesCmd, routes) + + if err := n.storeDataToSend(routes); err != nil { + return err + } + + return n.execKataCmd(kataCLIUpdtRoutesCmd) +} + +func (n *netmon) updateRoutes() error { + // Get all the routes. + netlinkRoutes, err := n.netHandler.RouteList(nil, netlinkFamily) + if err != nil { + return err + } + + // Translate them into grpc.Route structures. + routes := convertRoutes(netlinkRoutes) + + // Update the routes through the Kata CLI. + return n.updateRoutesCLI(routes) +} + +func (n *netmon) handleRTMNewAddr(ev netlink.LinkUpdate) error { + fmt.Printf("handleRTMNewAddr: Interface update not supported\n") + return nil +} + +func (n *netmon) handleRTMDelAddr(ev netlink.LinkUpdate) error { + fmt.Printf("handleRTMDelAddr: Interface update not supported\n") + return nil +} + +func (n *netmon) handleRTMNewLink(ev netlink.LinkUpdate) error { + // NEWLINK might be a lot of different things. We're interested in + // adding the interface (both to our list and by calling into the + // Kata CLI API) only if this has the flags UP and RUNNING, meaning + // we don't expect any further change on the interface, and that we + // are ready to add it. + + linkAttrs := ev.Link.Attrs() + if linkAttrs == nil { + fmt.Printf("The link attributes are nil\n") + return nil + } + + // First, ignore if the interface name contains "kata". This way we + // are preventing from adding interfaces created by Kata Containers. + if strings.HasSuffix(linkAttrs.Name, kataSuffix) { + fmt.Printf("Ignore the interface %s because found %q\n", + linkAttrs.Name, kataSuffix) + return nil + } + + // Check if the interface exist in the internal list. + if _, exist := n.netIfaces[int(ev.Index)]; exist { + fmt.Printf("Ignoring interface %s because already exist\n", + linkAttrs.Name) + return nil + } + + // Now, check if the interface has been enabled to UP and RUNNING. + if (ev.Flags&unix.IFF_UP) != unix.IFF_UP || + (ev.Flags&unix.IFF_RUNNING) != unix.IFF_RUNNING { + fmt.Printf("Ignore the interface %s because not UP and RUNNING\n", + linkAttrs.Name) + return nil + } + + // Get the list of IP addresses associated with this interface. + addrs, err := n.netHandler.AddrList(ev.Link, netlinkFamily) + if err != nil { + return err + } + + // Convert the interfaces in the appropriate structure format. + iface := convertInterface(linkAttrs, addrs) + + // Add the interface through the Kata CLI. + if err := n.addInterfaceCLI(iface); err != nil { + return err + } + + // Add the interface to the internal list. + n.netIfaces[linkAttrs.Index] = iface + + // Complete by updating the routes. + return n.updateRoutes() +} + +func (n *netmon) handleRTMDelLink(ev netlink.LinkUpdate) error { + // It can only delete if identical interface is found in the internal + // list of interfaces. Otherwise, the deletion will be ignored. + linkAttrs := ev.Link.Attrs() + if linkAttrs == nil { + fmt.Printf("Link attributes are nil\n") + return nil + } + + // First, ignore if the interface name contains "kata". This way we + // are preventing from deleting interfaces created by Kata Containers. + if strings.Contains(linkAttrs.Name, kataSuffix) { + fmt.Printf("Ignore the interface %s because found %q\n", + linkAttrs.Name, kataSuffix) + return nil + } + + // Check if the interface exist in the internal list. + iface, exist := n.netIfaces[int(ev.Index)] + if !exist { + fmt.Printf("Ignoring interface %s because not found\n", + linkAttrs.Name) + return nil + } + + if err := n.delInterfaceCLI(iface); err != nil { + return err + } + + // Delete the interface from the internal list. + delete(n.netIfaces, linkAttrs.Index) + + // Complete by updating the routes. + return n.updateRoutes() +} + +func (n *netmon) handleRTMNewRoute(ev netlink.RouteUpdate) error { + // Add the route through updateRoutes(), only if the route refer to an + // interface that already exists in the internal list of interfaces. + if _, exist := n.netIfaces[ev.Route.LinkIndex]; !exist { + fmt.Printf("Ignoring route %+v since interface %d not found\n", + ev.Route, ev.Route.LinkIndex) + return nil + } + + return n.updateRoutes() +} + +func (n *netmon) handleRTMDelRoute(ev netlink.RouteUpdate) error { + // Remove the route through updateRoutes(), only if the route refer to + // an interface that already exists in the internal list of interfaces. + return n.updateRoutes() +} + +func (n *netmon) handleLinkEvent(ev netlink.LinkUpdate) error { + switch ev.Header.Type { + case unix.NLMSG_DONE: + fmt.Printf("netlink msg type: NLMSG_DONE\n") + return nil + case unix.NLMSG_ERROR: + fmt.Printf("netlink msg type: NLMSG_ERROR\n") + return fmt.Errorf("Error while listening on netlink socket") + case unix.RTM_NEWADDR: + fmt.Printf("netlink msg type: RTM_NEWADDR\n") + return n.handleRTMNewAddr(ev) + case unix.RTM_DELADDR: + fmt.Printf("handle_netlink_message: RTM_DELADDR\n") + return n.handleRTMDelAddr(ev) + case unix.RTM_NEWLINK: + fmt.Printf("handle_netlink_message: RTM_NEWLINK\n") + return n.handleRTMNewLink(ev) + case unix.RTM_DELLINK: + fmt.Printf("handle_netlink_message: RTM_DELLINK\n") + return n.handleRTMDelLink(ev) + default: + fmt.Printf("Unknown msg type %v\n", ev.Header.Type) + } + + return nil +} + +func (n *netmon) handleRouteEvent(ev netlink.RouteUpdate) error { + switch ev.Type { + case unix.RTM_NEWROUTE: + fmt.Printf("handle_netlink_message: RTM_NEWROUTE\n") + return n.handleRTMNewRoute(ev) + case unix.RTM_DELROUTE: + fmt.Printf("handle_netlink_message: RTM_DELROUTE\n") + return n.handleRTMDelRoute(ev) + default: + fmt.Printf("Unknown msg type %v\n", ev.Type) + } + + return nil +} + +func (n *netmon) handleEvents() (err error) { + for { + select { + case ev := <-n.linkUpdateCh: + if err = n.handleLinkEvent(ev); err != nil { + fmt.Fprintf(os.Stderr, "Error: handleLinkEvent() %v", err) + return err + } + case ev := <-n.rtUpdateCh: + if err = n.handleRouteEvent(ev); err != nil { + fmt.Fprintf(os.Stderr, "Error: handleRouteEvent() %v", err) + return err + } + } + } +} + +func main() { + // Parse parameters. + params := parseOptions() + + // Create netmon handler. + n, err := newNetmon(params) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: newNetmon() %v", err) + os.Exit(1) + } + defer n.cleanup() + + // Init logging. + if err := initLogs(); err != nil { + fmt.Fprintf(os.Stderr, "Error: initLogs() %v", err) + os.Exit(1) + } + + // Scan the current interfaces. + if err := n.scanNetwork(); err != nil { + fmt.Fprintf(os.Stderr, "Error: scanNetwork() %v", err) + os.Exit(1) + } + + // Subscribe to the link listener. + if err := n.listenNetlinkEvents(); err != nil { + fmt.Fprintf(os.Stderr, "Error: listenNetlinkEvents() %v", err) + os.Exit(1) + } + + // Go into the main loop. + if err := n.handleEvents(); err != nil { + fmt.Fprintf(os.Stderr, "Error: handleEvents() %v", err) + os.Exit(1) + } +}