Files
kata-containers/virtcontainers/hyperstart_agent.go
Peng Tao 6d05197625 Merge pull request #68 from devimc/agent/onlineCPUs
virtcontainers: agent: use onlineCPUMem to online vCPUs
2018-03-21 10:42:25 +08:00

802 lines
18 KiB
Go

//
// Copyright (c) 2016 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package virtcontainers
import (
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"syscall"
"time"
proxyClient "github.com/clearcontainers/proxy/client"
"github.com/kata-containers/runtime/virtcontainers/pkg/hyperstart"
ns "github.com/kata-containers/runtime/virtcontainers/pkg/nsenter"
"github.com/sirupsen/logrus"
"github.com/vishvananda/netlink"
)
var defaultSockPathTemplates = []string{"%s/%s/hyper.sock", "%s/%s/tty.sock"}
var defaultChannelTemplate = "sh.hyper.channel.%d"
var defaultDeviceIDTemplate = "channel%d"
var defaultIDTemplate = "charch%d"
var defaultSharedDir = "/run/hyper/shared/pods/"
var mountTag = "hyperShared"
var maxHostnameLen = 64
// HyperConfig is a structure storing information needed for
// hyperstart agent initialization.
type HyperConfig struct {
SockCtlName string
SockTtyName string
}
func (h *hyper) generateSockets(pod Pod, c HyperConfig) {
podSocketPaths := []string{
fmt.Sprintf(defaultSockPathTemplates[0], runStoragePath, pod.id),
fmt.Sprintf(defaultSockPathTemplates[1], runStoragePath, pod.id),
}
if c.SockCtlName != "" {
podSocketPaths[0] = c.SockCtlName
}
if c.SockTtyName != "" {
podSocketPaths[1] = c.SockTtyName
}
for i := 0; i < len(podSocketPaths); i++ {
s := Socket{
DeviceID: fmt.Sprintf(defaultDeviceIDTemplate, i),
ID: fmt.Sprintf(defaultIDTemplate, i),
HostPath: podSocketPaths[i],
Name: fmt.Sprintf(defaultChannelTemplate, i),
}
h.sockets = append(h.sockets, s)
}
}
// HyperAgentState is the structure describing the data stored from this
// agent implementation.
type HyperAgentState struct {
ProxyPid int
URL string
}
// hyper is the Agent interface implementation for hyperstart.
type hyper struct {
pod Pod
shim shim
proxy proxy
client *proxyClient.Client
state HyperAgentState
sockets []Socket
}
type hyperstartProxyCmd struct {
cmd string
message interface{}
token string
}
// Logger returns a logrus logger appropriate for logging hyper messages
func (h *hyper) Logger() *logrus.Entry {
return virtLog.WithField("subsystem", "hyper")
}
func (h *hyper) buildHyperContainerProcess(cmd Cmd) (*hyperstart.Process, error) {
var envVars []hyperstart.EnvironmentVar
for _, e := range cmd.Envs {
envVar := hyperstart.EnvironmentVar{
Env: e.Var,
Value: e.Value,
}
envVars = append(envVars, envVar)
}
process := &hyperstart.Process{
Terminal: cmd.Interactive,
Args: cmd.Args,
Envs: envVars,
Workdir: cmd.WorkDir,
User: cmd.User,
Group: cmd.PrimaryGroup,
AdditionalGroups: cmd.SupplementaryGroups,
NoNewPrivileges: cmd.NoNewPrivileges,
}
process.Capabilities = hyperstart.Capabilities{
Bounding: cmd.Capabilities.Bounding,
Effective: cmd.Capabilities.Effective,
Inheritable: cmd.Capabilities.Inheritable,
Permitted: cmd.Capabilities.Permitted,
Ambient: cmd.Capabilities.Ambient,
}
return process, nil
}
func (h *hyper) processHyperRoute(route netlink.Route, deviceName string) *hyperstart.Route {
gateway := route.Gw.String()
if gateway == "<nil>" {
gateway = ""
} else if route.Gw.To4() == nil { // Skip IPv6 as it is not supported by hyperstart agent
h.Logger().WithFields(logrus.Fields{
"unsupported-route-type": "ipv6",
"gateway": gateway,
}).Warn("unsupported route")
return nil
}
var destination string
if route.Dst == nil {
destination = ""
} else {
destination = route.Dst.String()
if destination == defaultRouteDest {
destination = defaultRouteLabel
}
// Skip IPv6 because not supported by hyperstart
if route.Dst.IP.To4() == nil {
h.Logger().WithFields(logrus.Fields{
"unsupported-route-type": "ipv6",
"destination": destination,
}).Warn("unsupported route")
return nil
}
}
return &hyperstart.Route{
Dest: destination,
Gateway: gateway,
Device: deviceName,
}
}
func (h *hyper) buildNetworkInterfacesAndRoutes(pod Pod) ([]hyperstart.NetworkIface, []hyperstart.Route, error) {
if pod.networkNS.NetNsPath == "" {
return []hyperstart.NetworkIface{}, []hyperstart.Route{}, nil
}
var ifaces []hyperstart.NetworkIface
var routes []hyperstart.Route
for _, endpoint := range pod.networkNS.Endpoints {
var ipAddresses []hyperstart.IPAddress
for _, addr := range endpoint.Properties().Addrs {
// Skip IPv6 because not supported by hyperstart.
// Skip localhost interface.
if addr.IP.To4() == nil || addr.IP.IsLoopback() {
continue
}
netMask, _ := addr.Mask.Size()
ipAddress := hyperstart.IPAddress{
IPAddress: addr.IP.String(),
NetMask: fmt.Sprintf("%d", netMask),
}
ipAddresses = append(ipAddresses, ipAddress)
}
iface := hyperstart.NetworkIface{
NewDevice: endpoint.Name(),
IPAddresses: ipAddresses,
MTU: endpoint.Properties().Iface.MTU,
MACAddr: endpoint.HardwareAddr(),
}
ifaces = append(ifaces, iface)
for _, r := range endpoint.Properties().Routes {
route := h.processHyperRoute(r, endpoint.Name())
if route == nil {
continue
}
routes = append(routes, *route)
}
}
return ifaces, routes, nil
}
func fsMapFromMounts(mounts []Mount) []*hyperstart.FsmapDescriptor {
var fsmap []*hyperstart.FsmapDescriptor
for _, m := range mounts {
fsmapDesc := &hyperstart.FsmapDescriptor{
Source: m.Source,
Path: m.Destination,
ReadOnly: m.ReadOnly,
DockerVolume: false,
}
fsmap = append(fsmap, fsmapDesc)
}
return fsmap
}
// init is the agent initialization implementation for hyperstart.
func (h *hyper) init(pod *Pod, config interface{}) (err error) {
switch c := config.(type) {
case HyperConfig:
// Create agent sockets from paths provided through
// configuration, or generate them from scratch.
h.generateSockets(*pod, c)
h.pod = *pod
default:
return fmt.Errorf("Invalid config type")
}
h.proxy, err = newProxy(pod.config.ProxyType)
if err != nil {
return err
}
h.shim, err = newShim(pod.config.ShimType)
if err != nil {
return err
}
// Fetch agent runtime info.
if err := pod.storage.fetchAgentState(pod.id, &h.state); err != nil {
h.Logger().Debug("Could not retrieve anything from storage")
}
return nil
}
func (h *hyper) createPod(pod *Pod) (err error) {
for _, socket := range h.sockets {
err := pod.hypervisor.addDevice(socket, serialPortDev)
if err != nil {
return err
}
}
// Adding the hyper shared volume.
// This volume contains all bind mounted container bundles.
sharedVolume := Volume{
MountTag: mountTag,
HostPath: filepath.Join(defaultSharedDir, pod.id),
}
if err := os.MkdirAll(sharedVolume.HostPath, dirMode); err != nil {
return err
}
return pod.hypervisor.addDevice(sharedVolume, fsDev)
}
func (h *hyper) capabilities() capabilities {
var caps capabilities
// add all capabilities supported by agent
caps.setBlockDeviceSupport()
return caps
}
// exec is the agent command execution implementation for hyperstart.
func (h *hyper) exec(pod *Pod, c Container, cmd Cmd) (*Process, error) {
token, err := h.attach()
if err != nil {
return nil, err
}
hyperProcess, err := h.buildHyperContainerProcess(cmd)
if err != nil {
return nil, err
}
execCommand := hyperstart.ExecCommand{
Container: c.id,
Process: *hyperProcess,
}
enterNSList := []ns.Namespace{
{
PID: c.process.Pid,
Type: ns.NSTypeNet,
},
{
PID: c.process.Pid,
Type: ns.NSTypePID,
},
}
process, err := prepareAndStartShim(pod, h.shim, c.id,
token, h.state.URL, cmd, []ns.NSType{}, enterNSList)
if err != nil {
return nil, err
}
proxyCmd := hyperstartProxyCmd{
cmd: hyperstart.ExecCmd,
message: execCommand,
token: process.Token,
}
if _, err := h.sendCmd(proxyCmd); err != nil {
return nil, err
}
return process, nil
}
// startPod is the agent Pod starting implementation for hyperstart.
func (h *hyper) startPod(pod Pod) error {
// Start the proxy here
pid, uri, err := h.proxy.start(pod, proxyParams{})
if err != nil {
return err
}
// Fill agent state with proxy information, and store them.
h.state.ProxyPid = pid
h.state.URL = uri
if err := pod.storage.storeAgentState(pod.id, h.state); err != nil {
return err
}
h.Logger().WithField("proxy-pid", pid).Info("proxy started")
if err := h.register(); err != nil {
return err
}
ifaces, routes, err := h.buildNetworkInterfacesAndRoutes(pod)
if err != nil {
return err
}
hostname := pod.config.Hostname
if len(hostname) > maxHostnameLen {
hostname = hostname[:maxHostnameLen]
}
hyperPod := hyperstart.Pod{
Hostname: hostname,
Containers: []hyperstart.Container{},
Interfaces: ifaces,
Routes: routes,
ShareDir: mountTag,
}
proxyCmd := hyperstartProxyCmd{
cmd: hyperstart.StartPod,
message: hyperPod,
}
_, err = h.sendCmd(proxyCmd)
return err
}
// stopPod is the agent Pod stopping implementation for hyperstart.
func (h *hyper) stopPod(pod Pod) error {
proxyCmd := hyperstartProxyCmd{
cmd: hyperstart.DestroyPod,
message: nil,
}
if _, err := h.sendCmd(proxyCmd); err != nil {
return err
}
if err := h.unregister(); err != nil {
return err
}
return h.proxy.stop(pod, h.state.ProxyPid)
}
func (h *hyper) startOneContainer(pod Pod, c *Container) error {
process, err := h.buildHyperContainerProcess(c.config.Cmd)
if err != nil {
return err
}
container := hyperstart.Container{
ID: c.id,
Image: c.id,
Rootfs: rootfsDir,
Process: process,
}
if c.config.Resources.CPUQuota != 0 && c.config.Resources.CPUPeriod != 0 {
container.Constraints = hyperstart.Constraints{
CPUQuota: c.config.Resources.CPUQuota,
CPUPeriod: c.config.Resources.CPUPeriod,
}
}
if c.config.Resources.CPUShares != 0 {
container.Constraints.CPUShares = c.config.Resources.CPUShares
}
container.SystemMountsInfo.BindMountDev = c.systemMountsInfo.BindMountDev
if c.state.Fstype != "" {
// Pass a drive name only in case of block driver
if pod.config.HypervisorConfig.BlockDeviceDriver == VirtioBlock {
driveName, err := getVirtDriveName(c.state.BlockIndex)
if err != nil {
return err
}
container.Image = driveName
} else {
scsiAddr, err := getSCSIAddress(c.state.BlockIndex)
if err != nil {
return err
}
container.SCSIAddr = scsiAddr
}
container.Fstype = c.state.Fstype
} else {
if err := bindMountContainerRootfs(defaultSharedDir, pod.id, c.id, c.rootFs, false); err != nil {
bindUnmountAllRootfs(defaultSharedDir, pod)
return err
}
}
//TODO : Enter mount namespace
// Handle container mounts
newMounts, err := c.mountSharedDirMounts(defaultSharedDir, "")
if err != nil {
bindUnmountAllRootfs(defaultSharedDir, pod)
return err
}
fsmap := fsMapFromMounts(newMounts)
// Append container mounts for block devices passed with --device.
for _, device := range c.devices {
d, ok := device.(*BlockDevice)
if ok {
fsmapDesc := &hyperstart.FsmapDescriptor{
Source: d.VirtPath,
Path: d.DeviceInfo.ContainerPath,
AbsolutePath: true,
DockerVolume: false,
SCSIAddr: d.SCSIAddr,
}
fsmap = append(fsmap, fsmapDesc)
}
}
// Assign fsmap for hyperstart to mount these at the correct location within the container
container.Fsmap = fsmap
proxyCmd := hyperstartProxyCmd{
cmd: hyperstart.NewContainer,
message: container,
token: c.process.Token,
}
if _, err := h.sendCmd(proxyCmd); err != nil {
return err
}
return nil
}
// createContainer is the agent Container creation implementation for hyperstart.
func (h *hyper) createContainer(pod *Pod, c *Container) (*Process, error) {
token, err := h.attach()
if err != nil {
return nil, err
}
createNSList := []ns.NSType{ns.NSTypePID}
enterNSList := []ns.Namespace{
{
Path: pod.networkNS.NetNsPath,
Type: ns.NSTypeNet,
},
}
return prepareAndStartShim(pod, h.shim, c.id, token,
h.state.URL, c.config.Cmd, createNSList, enterNSList)
}
// startContainer is the agent Container starting implementation for hyperstart.
func (h *hyper) startContainer(pod Pod, c *Container) error {
return h.startOneContainer(pod, c)
}
// stopContainer is the agent Container stopping implementation for hyperstart.
func (h *hyper) stopContainer(pod Pod, c Container) error {
// Nothing to be done in case the container has not been started.
if c.state.State == StateReady {
return nil
}
return h.stopOneContainer(pod.id, c)
}
func (h *hyper) stopOneContainer(podID string, c Container) error {
removeCommand := hyperstart.RemoveCommand{
Container: c.id,
}
proxyCmd := hyperstartProxyCmd{
cmd: hyperstart.RemoveContainer,
message: removeCommand,
}
if _, err := h.sendCmd(proxyCmd); err != nil {
return err
}
if err := c.unmountHostMounts(); err != nil {
return err
}
if c.state.Fstype == "" {
if err := bindUnmountContainerRootfs(defaultSharedDir, podID, c.id); err != nil {
return err
}
}
return nil
}
// killContainer is the agent process signal implementation for hyperstart.
func (h *hyper) killContainer(pod Pod, c Container, signal syscall.Signal, all bool) error {
// Send the signal to the shim directly in case the container has not
// been started yet.
if c.state.State == StateReady {
return signalShim(c.process.Pid, signal)
}
return h.killOneContainer(c.id, signal, all)
}
func (h *hyper) killOneContainer(cID string, signal syscall.Signal, all bool) error {
killCmd := hyperstart.KillCommand{
Container: cID,
Signal: signal,
AllProcesses: all,
}
proxyCmd := hyperstartProxyCmd{
cmd: hyperstart.KillContainer,
message: killCmd,
}
if _, err := h.sendCmd(proxyCmd); err != nil {
return err
}
return nil
}
func (h *hyper) processListContainer(pod Pod, c Container, options ProcessListOptions) (ProcessList, error) {
return h.processListOneContainer(pod.id, c.id, options)
}
func (h *hyper) processListOneContainer(podID, cID string, options ProcessListOptions) (ProcessList, error) {
psCmd := hyperstart.PsCommand{
Container: cID,
Format: options.Format,
PsArgs: options.Args,
}
proxyCmd := hyperstartProxyCmd{
cmd: hyperstart.PsContainer,
message: psCmd,
}
response, err := h.sendCmd(proxyCmd)
if err != nil {
return nil, err
}
msg, ok := response.([]byte)
if !ok {
return nil, fmt.Errorf("failed to get response message from container %s pod %s", cID, podID)
}
return msg, nil
}
// connectProxyRetry repeatedly tries to connect to the proxy on the specified
// address until a timeout state is reached, when it will fail.
func (h *hyper) connectProxyRetry(scheme, address string) (conn net.Conn, err error) {
attempt := 1
timeoutSecs := waitForProxyTimeoutSecs * time.Second
startTime := time.Now()
lastLogTime := startTime
for {
conn, err = net.Dial(scheme, address)
if err == nil {
// If the initial connection was unsuccessful,
// ensure a log message is generated when successfully
// connected.
if attempt > 1 {
h.Logger().WithField("attempt", fmt.Sprintf("%d", attempt)).Info("Connected to proxy")
}
return conn, nil
}
attempt++
now := time.Now()
delta := now.Sub(startTime)
remaining := timeoutSecs - delta
if remaining <= 0 {
return nil, fmt.Errorf("failed to connect to proxy after %v: %v", timeoutSecs, err)
}
logDelta := now.Sub(lastLogTime)
logDeltaSecs := logDelta / time.Second
if logDeltaSecs >= 1 {
h.Logger().WithError(err).WithFields(logrus.Fields{
"attempt": fmt.Sprintf("%d", attempt),
"proxy-network": scheme,
"proxy-address": address,
"remaining-time-secs": fmt.Sprintf("%2.2f", remaining.Seconds()),
}).Warning("Retrying proxy connection")
lastLogTime = now
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}
func (h *hyper) connect() error {
if h.client != nil {
return nil
}
u, err := url.Parse(h.state.URL)
if err != nil {
return err
}
if u.Scheme == "" {
return fmt.Errorf("URL scheme cannot be empty")
}
address := u.Host
if address == "" {
if u.Path == "" {
return fmt.Errorf("URL host and path cannot be empty")
}
address = u.Path
}
conn, err := h.connectProxyRetry(u.Scheme, address)
if err != nil {
return err
}
h.client = proxyClient.NewClient(conn)
return nil
}
func (h *hyper) disconnect() {
if h.client == nil {
return
}
h.client.Close()
h.client = nil
}
func (h *hyper) register() error {
if err := h.connect(); err != nil {
return err
}
defer h.disconnect()
registerVMOptions := &proxyClient.RegisterVMOptions{
Console: h.pod.hypervisor.getPodConsole(h.pod.id),
NumIOStreams: 0,
}
_, err := h.client.RegisterVM(h.pod.id, h.sockets[0].HostPath,
h.sockets[1].HostPath, registerVMOptions)
return err
}
func (h *hyper) unregister() error {
if err := h.connect(); err != nil {
return err
}
defer h.disconnect()
h.client.UnregisterVM(h.pod.id)
return nil
}
func (h *hyper) attach() (string, error) {
if err := h.connect(); err != nil {
return "", err
}
defer h.disconnect()
numTokens := 1
attachVMOptions := &proxyClient.AttachVMOptions{
NumIOStreams: numTokens,
}
attachVMReturn, err := h.client.AttachVM(h.pod.id, attachVMOptions)
if err != nil {
return "", err
}
if len(attachVMReturn.IO.Tokens) != numTokens {
return "", fmt.Errorf("%d tokens retrieved out of %d expected",
len(attachVMReturn.IO.Tokens), numTokens)
}
return attachVMReturn.IO.Tokens[0], nil
}
func (h *hyper) sendCmd(proxyCmd hyperstartProxyCmd) (interface{}, error) {
if err := h.connect(); err != nil {
return nil, err
}
defer h.disconnect()
attachVMOptions := &proxyClient.AttachVMOptions{
NumIOStreams: 0,
}
if _, err := h.client.AttachVM(h.pod.id, attachVMOptions); err != nil {
return nil, err
}
var tokens []string
if proxyCmd.token != "" {
tokens = append(tokens, proxyCmd.token)
}
return h.client.HyperWithTokens(proxyCmd.cmd, tokens, proxyCmd.message)
}
func (h *hyper) onlineCPUMem() error {
// cc-agent uses udev to online CPUs automatically
return nil
}