diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 1a7807f7105..eb1dce4d8d8 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -260,7 +260,6 @@ filegroup( "//pkg/kubelet/configmap:all-srcs", "//pkg/kubelet/container:all-srcs", "//pkg/kubelet/custommetrics:all-srcs", - "//pkg/kubelet/deviceplugin:all-srcs", "//pkg/kubelet/dockershim:all-srcs", "//pkg/kubelet/envvars:all-srcs", "//pkg/kubelet/events:all-srcs", diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index b107c893320..a50202332fe 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -7,8 +7,6 @@ go_library( "container_manager.go", "container_manager_stub.go", "container_manager_unsupported.go", - "device_plugin_handler.go", - "device_plugin_handler_stub.go", "fake_internal_container_lifecycle.go", "helpers_unsupported.go", "internal_container_lifecycle.go", @@ -34,18 +32,15 @@ go_library( deps = [ "//pkg/features:go_default_library", "//pkg/kubelet/apis/cri:go_default_library", - "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/cm/cpumanager:go_default_library", "//pkg/kubelet/container:go_default_library", - "//pkg/kubelet/deviceplugin:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", "//pkg/kubelet/status:go_default_library", "//pkg/util/mount:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", @@ -55,6 +50,7 @@ go_library( "//pkg/api/v1/helper:go_default_library", "//pkg/api/v1/helper/qos:go_default_library", "//pkg/api/v1/resource:go_default_library", + "//pkg/kubelet/cm/deviceplugin:go_default_library", "//pkg/kubelet/cm/util:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/metrics:go_default_library", @@ -69,6 +65,7 @@ go_library( "//vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs:go_default_library", "//vendor/github.com/opencontainers/runc/libcontainer/cgroups/systemd:go_default_library", "//vendor/github.com/opencontainers/runc/libcontainer/configs:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], @@ -80,7 +77,6 @@ go_test( name = "go_default_test", srcs = [ "container_manager_unsupported_test.go", - "device_plugin_handler_test.go", ] + select({ "@io_bazel_rules_go//go/platform:linux_amd64": [ "cgroup_manager_linux_test.go", @@ -94,19 +90,15 @@ go_test( importpath = "k8s.io/kubernetes/pkg/kubelet/cm", library = ":go_default_library", deps = [ - "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", "//pkg/util/mount:go_default_library", - "//vendor/github.com/stretchr/testify/assert:go_default_library", - "//vendor/k8s.io/api/core/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", ] + select({ "@io_bazel_rules_go//go/platform:linux_amd64": [ "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", ], "//conditions:default": [], }), @@ -125,6 +117,7 @@ filegroup( ":package-srcs", "//pkg/kubelet/cm/cpumanager:all-srcs", "//pkg/kubelet/cm/cpuset:all-srcs", + "//pkg/kubelet/cm/deviceplugin:all-srcs", "//pkg/kubelet/cm/util:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index d320c90d2ec..0bc5a333936 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -45,6 +45,7 @@ import ( internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/qos" @@ -125,7 +126,7 @@ type containerManagerImpl struct { // Interface for QoS cgroup management qosContainerManager QOSContainerManager // Interface for exporting and allocating devices reported by device plugins. - devicePluginHandler DevicePluginHandler + devicePluginHandler deviceplugin.Handler // Interface for CPU affinity management. cpuManager cpumanager.Manager } @@ -273,9 +274,9 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I glog.Infof("Creating device plugin handler: %t", devicePluginEnabled) if devicePluginEnabled { - cm.devicePluginHandler, err = NewDevicePluginHandlerImpl(updateDeviceCapacityFunc) + cm.devicePluginHandler, err = deviceplugin.NewHandlerImpl(updateDeviceCapacityFunc) } else { - cm.devicePluginHandler, err = NewDevicePluginHandlerStub() + cm.devicePluginHandler, err = deviceplugin.NewHandlerStub() } if err != nil { return nil, err @@ -618,72 +619,17 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe opts := &kubecontainer.RunContainerOptions{} // Gets devices, mounts, and envs from device plugin handler. glog.V(3).Infof("Calling devicePluginHandler AllocateDevices") - // Maps to detect duplicate settings. - devsMap := make(map[string]string) - mountsMap := make(map[string]string) - envsMap := make(map[string]string) - allocResps, err := cm.devicePluginHandler.Allocate(pod, container, activePods) + err := cm.devicePluginHandler.Allocate(pod, container, activePods) if err != nil { return opts, err } - // Loops through AllocationResponses of all required extended resources. - for _, resp := range allocResps { - // Each Allocate response has the following artifacts. - // Environment variables - // Mount points - // Device files - // These artifacts are per resource per container. - // Updates RunContainerOptions.Envs. - for k, v := range resp.Envs { - if e, ok := envsMap[k]; ok { - glog.V(3).Infof("skip existing envs %s %s", k, v) - if e != v { - glog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v) - } - continue - } - envsMap[k] = v - opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v}) - } - - // Updates RunContainerOptions.Devices. - for _, dev := range resp.Devices { - if d, ok := devsMap[dev.ContainerPath]; ok { - glog.V(3).Infof("skip existing device %s %s", dev.ContainerPath, dev.HostPath) - if d != dev.HostPath { - glog.Errorf("Container device %s has conflicting mapping host devices: %s and %s", - dev.ContainerPath, d, dev.HostPath) - } - continue - } - devsMap[dev.ContainerPath] = dev.HostPath - opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{ - PathOnHost: dev.HostPath, - PathInContainer: dev.ContainerPath, - Permissions: dev.Permissions, - }) - } - // Updates RunContainerOptions.Mounts. - for _, mount := range resp.Mounts { - if m, ok := mountsMap[mount.ContainerPath]; ok { - glog.V(3).Infof("skip existing mount %s %s", mount.ContainerPath, mount.HostPath) - if m != mount.HostPath { - glog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s", - mount.ContainerPath, m, mount.HostPath) - } - continue - } - mountsMap[mount.ContainerPath] = mount.HostPath - opts.Mounts = append(opts.Mounts, kubecontainer.Mount{ - Name: mount.ContainerPath, - ContainerPath: mount.ContainerPath, - HostPath: mount.HostPath, - ReadOnly: mount.ReadOnly, - // TODO: This may need to be part of Device plugin API. - SELinuxRelabel: false, - }) - } + devOpts := cm.devicePluginHandler.GetDeviceRunContainerOptions(pod, container) + if devOpts == nil { + return opts, nil } + opts.Devices = append(opts.Devices, devOpts.Devices...) + opts.Mounts = append(opts.Mounts, devOpts.Mounts...) + opts.Envs = append(opts.Envs, devOpts.Envs...) return opts, nil } diff --git a/pkg/kubelet/cm/device_plugin_handler.go b/pkg/kubelet/cm/device_plugin_handler.go deleted file mode 100644 index afaa05c74bd..00000000000 --- a/pkg/kubelet/cm/device_plugin_handler.go +++ /dev/null @@ -1,294 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cm - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "sync" - - "github.com/golang/glog" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/sets" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" - "k8s.io/kubernetes/pkg/kubelet/deviceplugin" -) - -// podDevices represents a list of pod to device Id mappings. -type containerDevices map[string]sets.String -type podDevices map[string]containerDevices - -func (pdev podDevices) pods() sets.String { - ret := sets.NewString() - for k := range pdev { - ret.Insert(k) - } - return ret -} - -func (pdev podDevices) insert(podUID, contName string, device string) { - if _, exists := pdev[podUID]; !exists { - pdev[podUID] = make(containerDevices) - } - if _, exists := pdev[podUID][contName]; !exists { - pdev[podUID][contName] = sets.NewString() - } - pdev[podUID][contName].Insert(device) -} - -func (pdev podDevices) getDevices(podUID, contName string) sets.String { - containers, exists := pdev[podUID] - if !exists { - return nil - } - devices, exists := containers[contName] - if !exists { - return nil - } - return devices -} - -func (pdev podDevices) delete(pods []string) { - for _, uid := range pods { - delete(pdev, uid) - } -} - -func (pdev podDevices) devices() sets.String { - ret := sets.NewString() - for _, containerDevices := range pdev { - for _, deviceSet := range containerDevices { - ret = ret.Union(deviceSet) - } - } - return ret -} - -type DevicePluginHandler interface { - // Start starts device plugin registration service. - Start() error - // Devices returns all of registered devices keyed by resourceName. - Devices() map[string][]pluginapi.Device - // Allocate attempts to allocate all of required extended resources for - // the input container, issues an Allocate rpc request for each of such - // resources, and returns their AllocateResponses on success. - Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) -} - -type DevicePluginHandlerImpl struct { - sync.Mutex - devicePluginManager deviceplugin.Manager - // devicePluginManagerMonitorCallback is used for testing only. - devicePluginManagerMonitorCallback deviceplugin.MonitorCallback - // allDevices contains all of registered resourceNames and their exported device IDs. - allDevices map[string]sets.String - // allocatedDevices contains pod to allocated device mapping, keyed by resourceName. - allocatedDevices map[string]podDevices -} - -// NewDevicePluginHandler create a DevicePluginHandler -// updateCapacityFunc is called to update ContainerManager capacity when -// device capacity changes. -func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*DevicePluginHandlerImpl, error) { - glog.V(2).Infof("Creating Device Plugin Handler") - handler := &DevicePluginHandlerImpl{ - allDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]podDevices), - } - - deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) { - var capacity = v1.ResourceList{} - kept := append(updated, added...) - if _, ok := handler.allDevices[resourceName]; !ok { - handler.allDevices[resourceName] = sets.NewString() - } - // For now, DevicePluginHandler only keeps track of healthy devices. - // We can revisit this later when the need comes to track unhealthy devices here. - for _, dev := range kept { - if dev.Health == pluginapi.Healthy { - handler.allDevices[resourceName].Insert(dev.ID) - } else { - handler.allDevices[resourceName].Delete(dev.ID) - } - } - for _, dev := range deleted { - handler.allDevices[resourceName].Delete(dev.ID) - } - capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(handler.allDevices[resourceName].Len()), resource.DecimalSI) - updateCapacityFunc(capacity) - } - - mgr, err := deviceplugin.NewManagerImpl(pluginapi.KubeletSocket, deviceManagerMonitorCallback) - if err != nil { - return nil, fmt.Errorf("Failed to initialize device plugin manager: %+v", err) - } - - handler.devicePluginManager = mgr - handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback - // Loads in allocatedDevices information from disk. - err = handler.readCheckpoint() - if err != nil { - glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err) - } - return handler, nil -} - -func (h *DevicePluginHandlerImpl) Start() error { - return h.devicePluginManager.Start() -} - -func (h *DevicePluginHandlerImpl) Devices() map[string][]pluginapi.Device { - return h.devicePluginManager.Devices() -} - -func (h *DevicePluginHandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) { - var ret []*pluginapi.AllocateResponse - h.updateAllocatedDevices(activePods) - for k, v := range container.Resources.Limits { - resource := string(k) - needed := int(v.Value()) - glog.V(3).Infof("needs %d %s", needed, resource) - _, registeredResource := h.allDevices[resource] - if !registeredResource || needed == 0 { - continue - } - h.Lock() - // Gets list of devices that have already been allocated. - // This can happen if a container restarts for example. - if h.allocatedDevices[resource] == nil { - h.allocatedDevices[resource] = make(podDevices) - } - devices := h.allocatedDevices[resource].getDevices(string(pod.UID), container.Name) - if devices != nil { - glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, container.Name, pod.UID, devices.List()) - needed = needed - devices.Len() - } - // Get Devices in use. - devicesInUse := h.allocatedDevices[resource].devices() - // Get a list of available devices. - available := h.allDevices[resource].Difference(devicesInUse) - if int(available.Len()) < needed { - h.Unlock() - return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len()) - } - allocated := available.UnsortedList()[:needed] - for _, device := range allocated { - // Update internal allocated device cache. - h.allocatedDevices[resource].insert(string(pod.UID), container.Name, device) - } - h.Unlock() - // devicePluginManager.Allocate involves RPC calls to device plugin, which - // could be heavy-weight. Therefore we want to perform this operation outside - // mutex lock. Note if Allcate call fails, we may leave container resources - // partially allocated for the failed container. We rely on updateAllocatedDevices() - // to garbage collect these resources later. Another side effect is that if - // we have X resource A and Y resource B in total, and two containers, container1 - // and container2 both require X resource A and Y resource B. Both allocation - // requests may fail if we serve them in mixed order. - // TODO: may revisit this part later if we see inefficient resource allocation - // in real use as the result of this. - resp, err := h.devicePluginManager.Allocate(resource, append(devices.UnsortedList(), allocated...)) - if err != nil { - return nil, err - } - ret = append(ret, resp) - } - // Checkpoints device to container allocation information. - if err := h.writeCheckpoint(); err != nil { - return nil, err - } - return ret, nil -} - -// updateAllocatedDevices updates the list of GPUs in use. -// It gets a list of active pods and then frees any GPUs that are bound to -// terminated pods. Returns error on failure. -func (h *DevicePluginHandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) { - h.Lock() - defer h.Unlock() - activePodUids := sets.NewString() - for _, pod := range activePods { - activePodUids.Insert(string(pod.UID)) - } - for _, podDevs := range h.allocatedDevices { - allocatedPodUids := podDevs.pods() - podsToBeRemoved := allocatedPodUids.Difference(activePodUids) - glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List()) - podDevs.delete(podsToBeRemoved.List()) - } -} - -type checkpointEntry struct { - PodUID string - ContainerName string - ResourceName string - DeviceID string -} - -// checkpointData struct is used to store pod to device allocation information -// in a checkpoint file. -// TODO: add version control when we need to change checkpoint format. -type checkpointData struct { - Entries []checkpointEntry -} - -// Checkpoints device to container allocation information to disk. -func (h *DevicePluginHandlerImpl) writeCheckpoint() error { - filepath := h.devicePluginManager.CheckpointFile() - var data checkpointData - for resourceName, podDev := range h.allocatedDevices { - for podUID, conDev := range podDev { - for conName, devs := range conDev { - for _, devId := range devs.UnsortedList() { - data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resourceName, devId}) - } - } - } - } - dataJson, err := json.Marshal(data) - if err != nil { - return err - } - return ioutil.WriteFile(filepath, dataJson, 0644) -} - -// Reads device to container allocation information from disk, and populates -// h.allocatedDevices accordingly. -func (h *DevicePluginHandlerImpl) readCheckpoint() error { - filepath := h.devicePluginManager.CheckpointFile() - content, err := ioutil.ReadFile(filepath) - if err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err) - } - glog.V(2).Infof("Read checkpoint file %s\n", filepath) - var data checkpointData - if err := json.Unmarshal(content, &data); err != nil { - return fmt.Errorf("failed to unmarshal checkpoint data: %v", err) - } - for _, entry := range data.Entries { - glog.V(2).Infof("Get checkpoint entry: %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceID) - if h.allocatedDevices[entry.ResourceName] == nil { - h.allocatedDevices[entry.ResourceName] = make(podDevices) - } - h.allocatedDevices[entry.ResourceName].insert(entry.PodUID, entry.ContainerName, entry.DeviceID) - } - return nil -} diff --git a/pkg/kubelet/deviceplugin/BUILD b/pkg/kubelet/cm/deviceplugin/BUILD similarity index 57% rename from pkg/kubelet/deviceplugin/BUILD rename to pkg/kubelet/cm/deviceplugin/BUILD index 987c76001a7..437ba6fcf5f 100644 --- a/pkg/kubelet/deviceplugin/BUILD +++ b/pkg/kubelet/cm/deviceplugin/BUILD @@ -9,19 +9,25 @@ load( go_library( name = "go_default_library", srcs = [ + "device_plugin_handler.go", + "device_plugin_handler_stub.go", "device_plugin_stub.go", "endpoint.go", "manager.go", + "pod_devices.go", "types.go", ], - importpath = "k8s.io/kubernetes/pkg/kubelet/deviceplugin", + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin", deps = [ "//pkg/api/v1/helper:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", + "//pkg/kubelet/container:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/golang.org/x/net/context:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", ], ) @@ -41,13 +47,20 @@ filegroup( go_test( name = "go_default_test", srcs = [ + "device_plugin_handler_test.go", "endpoint_test.go", "manager_test.go", ], - importpath = "k8s.io/kubernetes/pkg/kubelet/deviceplugin", + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/deviceplugin", library = ":go_default_library", deps = [ "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", + "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library", ], ) diff --git a/pkg/kubelet/deviceplugin/OWNERS b/pkg/kubelet/cm/deviceplugin/OWNERS similarity index 100% rename from pkg/kubelet/deviceplugin/OWNERS rename to pkg/kubelet/cm/deviceplugin/OWNERS diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go new file mode 100644 index 00000000000..23d67c3fd6d --- /dev/null +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_handler.go @@ -0,0 +1,291 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "sync" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" +) + +// Handler defines the functions used to manage and access device plugin resources. +type Handler interface { + // Start starts device plugin registration service. + Start() error + // Devices returns all of registered devices keyed by resourceName. + Devices() map[string][]pluginapi.Device + // Allocate attempts to allocate all of required extended resources for + // the input container, issues an Allocate rpc request for each of such + // resources, processes their AllocateResponses, and updates the cached + // containerDevices on success. + Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error + // GetDeviceRunContainerOptions checks whether we have cached containerDevices + // for the passed-in and returns its DeviceRunContainerOptions + // for the found one. An empty struct is returned in case no cached state is found. + GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions +} + +// HandlerImpl implements the actual functionality to manage device plugin resources. +type HandlerImpl struct { + // TODO: consider to change this to RWMutex. + sync.Mutex + devicePluginManager Manager + // devicePluginManagerMonitorCallback is used for testing only. + devicePluginManagerMonitorCallback MonitorCallback + // allDevices contains all of registered resourceNames and their exported device IDs. + allDevices map[string]sets.String + // allocatedDevices contains allocated deviceIds, keyed by resourceName. + allocatedDevices map[string]sets.String + // podDevices contains pod to allocated device mapping. + podDevices podDevices +} + +// NewHandlerImpl creates a HandlerImpl to manage device plugin resources. +// updateCapacityFunc is called to update ContainerManager capacity when +// device capacity changes. +func NewHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*HandlerImpl, error) { + glog.V(2).Infof("Creating Device Plugin Handler") + handler := &HandlerImpl{ + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + } + + deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) { + var capacity = v1.ResourceList{} + kept := append(updated, added...) + if _, ok := handler.allDevices[resourceName]; !ok { + handler.allDevices[resourceName] = sets.NewString() + } + // For now, Handler only keeps track of healthy devices. + // We can revisit this later when the need comes to track unhealthy devices here. + for _, dev := range kept { + if dev.Health == pluginapi.Healthy { + handler.allDevices[resourceName].Insert(dev.ID) + } else { + handler.allDevices[resourceName].Delete(dev.ID) + } + } + for _, dev := range deleted { + handler.allDevices[resourceName].Delete(dev.ID) + } + capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(handler.allDevices[resourceName].Len()), resource.DecimalSI) + updateCapacityFunc(capacity) + } + + mgr, err := NewManagerImpl(pluginapi.KubeletSocket, deviceManagerMonitorCallback) + if err != nil { + return nil, fmt.Errorf("Failed to initialize device plugin manager: %+v", err) + } + + handler.devicePluginManager = mgr + handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback + // Loads in allocatedDevices information from disk. + err = handler.readCheckpoint() + if err != nil { + glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err) + } + return handler, nil +} + +// Start starts device plugin registration service. +func (h *HandlerImpl) Start() error { + return h.devicePluginManager.Start() +} + +// Devices returns all of registered devices keyed by resourceName. +func (h *HandlerImpl) Devices() map[string][]pluginapi.Device { + return h.devicePluginManager.Devices() +} + +// Returns list of device Ids we need to allocate with Allocate rpc call. +// Returns empty list in case we don't need to issue the Allocate rpc call. +func (h *HandlerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) { + h.Lock() + defer h.Unlock() + needed := required + // Gets list of devices that have already been allocated. + // This can happen if a container restarts for example. + devices := h.podDevices.containerDevices(podUID, contName, resource) + if devices != nil { + glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List()) + needed = needed - devices.Len() + // A pod's resource is not expected to change once admitted by the API server, + // so just fail loudly here. We can revisit this part if this no longer holds. + if needed != 0 { + return nil, fmt.Errorf("pod %v container %v changed request for resource %v from %v to %v", podUID, contName, resource, devices.Len(), required) + } + } + if needed == 0 { + // No change, no work. + return nil, nil + } + devices = sets.NewString() + // Needs to allocate additional devices. + if h.allocatedDevices[resource] == nil { + h.allocatedDevices[resource] = sets.NewString() + } + // Gets Devices in use. + devicesInUse := h.allocatedDevices[resource] + // Gets a list of available devices. + available := h.allDevices[resource].Difference(devicesInUse) + if int(available.Len()) < needed { + return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len()) + } + allocated := available.UnsortedList()[:needed] + // Updates h.allocatedDevices with allocated devices to prevent them + // from being allocated to other pods/containers, given that we are + // not holding lock during the rpc call. + for _, device := range allocated { + h.allocatedDevices[resource].Insert(device) + devices.Insert(device) + } + return devices, nil +} + +// Allocate attempts to allocate all of required extended resources for +// the input container, issues an Allocate rpc request for each of such +// resources, processes their AllocateResponses, and updates the cached +// containerDevices on success. +func (h *HandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error { + podUID := string(pod.UID) + contName := container.Name + allocatedDevicesUpdated := false + for k, v := range container.Resources.Limits { + resource := string(k) + needed := int(v.Value()) + glog.V(3).Infof("needs %d %s", needed, resource) + if _, registeredResource := h.allDevices[resource]; !registeredResource { + continue + } + // Updates allocatedDevices to garbage collect any stranded resources + // before doing the device plugin allocation. + if !allocatedDevicesUpdated { + h.updateAllocatedDevices(activePods) + allocatedDevicesUpdated = true + } + allocDevices, err := h.devicesToAllocate(podUID, contName, resource, needed) + if err != nil { + return err + } + if allocDevices == nil || len(allocDevices) <= 0 { + continue + } + // devicePluginManager.Allocate involves RPC calls to device plugin, which + // could be heavy-weight. Therefore we want to perform this operation outside + // mutex lock. Note if Allcate call fails, we may leave container resources + // partially allocated for the failed container. We rely on updateAllocatedDevices() + // to garbage collect these resources later. Another side effect is that if + // we have X resource A and Y resource B in total, and two containers, container1 + // and container2 both require X resource A and Y resource B. Both allocation + // requests may fail if we serve them in mixed order. + // TODO: may revisit this part later if we see inefficient resource allocation + // in real use as the result of this. Should also consider to parallize device + // plugin Allocate grpc calls if it becomes common that a container may require + // resources from multiple device plugins. + resp, err := h.devicePluginManager.Allocate(resource, allocDevices.UnsortedList()) + if err != nil { + // In case of allocation failure, we want to restore h.allocatedDevices + // to the actual allocated state from h.podDevices. + h.Lock() + h.allocatedDevices = h.podDevices.devices() + h.Unlock() + return err + } + + // Update internal cached podDevices state. + h.Lock() + h.podDevices.insert(podUID, contName, resource, allocDevices, resp) + h.Unlock() + } + + // Checkpoints device to container allocation information. + return h.writeCheckpoint() +} + +// GetDeviceRunContainerOptions checks whether we have cached containerDevices +// for the passed-in and returns its DeviceRunContainerOptions +// for the found one. An empty struct is returned in case no cached state is found. +func (h *HandlerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { + h.Lock() + defer h.Unlock() + return h.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name) +} + +// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to +// terminated pods. Returns error on failure. +func (h *HandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) { + h.Lock() + defer h.Unlock() + activePodUids := sets.NewString() + for _, pod := range activePods { + activePodUids.Insert(string(pod.UID)) + } + allocatedPodUids := h.podDevices.pods() + podsToBeRemoved := allocatedPodUids.Difference(activePodUids) + if len(podsToBeRemoved) <= 0 { + return + } + glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List()) + h.podDevices.delete(podsToBeRemoved.List()) + // Regenerated allocatedDevices after we update pod allocation information. + h.allocatedDevices = h.podDevices.devices() +} + +// Checkpoints device to container allocation information to disk. +func (h *HandlerImpl) writeCheckpoint() error { + h.Lock() + data := h.podDevices.toCheckpointData() + h.Unlock() + + dataJSON, err := json.Marshal(data) + if err != nil { + return err + } + filepath := h.devicePluginManager.CheckpointFile() + return ioutil.WriteFile(filepath, dataJSON, 0644) +} + +// Reads device to container allocation information from disk, and populates +// h.allocatedDevices accordingly. +func (h *HandlerImpl) readCheckpoint() error { + filepath := h.devicePluginManager.CheckpointFile() + content, err := ioutil.ReadFile(filepath) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err) + } + glog.V(2).Infof("Read checkpoint file %s\n", filepath) + var data checkpointData + if err := json.Unmarshal(content, &data); err != nil { + return fmt.Errorf("failed to unmarshal checkpoint data: %v", err) + } + + h.Lock() + defer h.Unlock() + h.podDevices.fromCheckpointData(data) + h.allocatedDevices = h.podDevices.devices() + return nil +} diff --git a/pkg/kubelet/cm/device_plugin_handler_stub.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go similarity index 50% rename from pkg/kubelet/cm/device_plugin_handler_stub.go rename to pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go index 46cb40b1f3a..c3735d486c9 100644 --- a/pkg/kubelet/cm/device_plugin_handler_stub.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_stub.go @@ -14,29 +14,37 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cm +package deviceplugin import ( "k8s.io/api/core/v1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" ) -// A simple stub implementation for DevicePluginHandler. -type DevicePluginHandlerStub struct{} +// HandlerStub provides a simple stub implementation for Handler. +type HandlerStub struct{} -func NewDevicePluginHandlerStub() (*DevicePluginHandlerStub, error) { - return &DevicePluginHandlerStub{}, nil +// NewHandlerStub creates a HandlerStub. +func NewHandlerStub() (*HandlerStub, error) { + return &HandlerStub{}, nil } -func (h *DevicePluginHandlerStub) Start() error { +// Start simply returns nil. +func (h *HandlerStub) Start() error { return nil } -func (h *DevicePluginHandlerStub) Devices() map[string][]pluginapi.Device { +// Devices returns an empty map. +func (h *HandlerStub) Devices() map[string][]pluginapi.Device { return make(map[string][]pluginapi.Device) } -func (h *DevicePluginHandlerStub) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) { - var ret []*pluginapi.AllocateResponse - return ret, nil +// Allocate simply returns nil. +func (h *HandlerStub) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) error { + return nil +} + +// GetDeviceRunContainerOptions simply returns nil. +func (h *HandlerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions { + return nil } diff --git a/pkg/kubelet/cm/device_plugin_handler_test.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go similarity index 51% rename from pkg/kubelet/cm/device_plugin_handler_test.go rename to pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go index f4d8768dd8a..17c01059593 100644 --- a/pkg/kubelet/cm/device_plugin_handler_test.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go @@ -14,11 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cm +package deviceplugin import ( "flag" "fmt" + "reflect" "testing" "github.com/stretchr/testify/assert" @@ -37,8 +38,8 @@ func TestUpdateCapacity(t *testing.T) { verifyCapacityFunc := func(updates v1.ResourceList) { as.Equal(expected, updates) } - testDevicePluginHandler, err := NewDevicePluginHandlerImpl(verifyCapacityFunc) - as.NotNil(testDevicePluginHandler) + testHandler, err := NewHandlerImpl(verifyCapacityFunc) + as.NotNil(testHandler) as.Nil(err) devs := []pluginapi.Device{ @@ -51,9 +52,9 @@ func TestUpdateCapacity(t *testing.T) { // Adds three devices for resource1, two healthy and one unhealthy. // Expects capacity for resource1 to be 2. expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI) - testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{}) + testHandler.devicePluginManagerMonitorCallback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{}) // Deletes an unhealthy device should NOT change capacity. - testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) + testHandler.devicePluginManagerMonitorCallback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) // Updates a healthy device to unhealthy should reduce capacity by 1. expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI) // Deletes a healthy device should reduce capacity by 1. @@ -62,7 +63,7 @@ func TestUpdateCapacity(t *testing.T) { delete(expected, v1.ResourceName(resourceName)) resourceName2 := "resource2" expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI) - testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) + testHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) } type stringPairType struct { @@ -96,10 +97,12 @@ func (m *DevicePluginManagerTestStub) Devices() map[string][]pluginapi.Device { func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []string) (*pluginapi.AllocateResponse, error) { resp := new(pluginapi.AllocateResponse) + resp.Envs = make(map[string]string) for _, id := range devIds { key := resourceName + id - fmt.Printf("Alloc device %q for resource %q\n", id, resourceName) + fmt.Printf("Alloc device %v for resource %v\n", id, resourceName) for _, dev := range m.devRuntimeDevices[key] { + fmt.Printf("Add dev %v %v\n", dev.value1, dev.value2) resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ ContainerPath: dev.value1, HostPath: dev.value2, @@ -107,15 +110,15 @@ func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []str }) } for _, mount := range m.devRuntimeMounts[key] { - fmt.Printf("Add mount %q %q\n", mount.value1, mount.value2) + fmt.Printf("Add mount %v %v\n", mount.value1, mount.value2) resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ ContainerPath: mount.value1, HostPath: mount.value2, ReadOnly: true, }) } - resp.Envs = make(map[string]string) for _, env := range m.devRuntimeEnvs[key] { + fmt.Printf("Add env %v %v\n", env.value1, env.value2) resp.Envs[env.value1] = env.value2 } } @@ -130,6 +133,37 @@ func (m *DevicePluginManagerTestStub) CheckpointFile() string { return "/tmp/device-plugin-checkpoint" } +func constructDevices(devices []string) sets.String { + ret := sets.NewString() + for _, dev := range devices { + ret.Insert(dev) + } + return ret +} + +func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.AllocateResponse { + resp := &pluginapi.AllocateResponse{} + for k, v := range devices { + resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{ + HostPath: k, + ContainerPath: v, + Permissions: "mrw", + }) + } + for k, v := range mounts { + resp.Mounts = append(resp.Mounts, &pluginapi.Mount{ + ContainerPath: k, + HostPath: v, + ReadOnly: true, + }) + } + resp.Envs = make(map[string]string) + for k, v := range envs { + resp.Envs[k] = v + } + return resp +} + func TestCheckpoint(t *testing.T) { resourceName1 := "domain1.com/resource1" resourceName2 := "domain2.com/resource2" @@ -138,31 +172,59 @@ func TestCheckpoint(t *testing.T) { as := assert.New(t) as.Nil(err) - testDevicePluginHandler := &DevicePluginHandlerImpl{ + testHandler := &HandlerImpl{ devicePluginManager: m, allDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]podDevices), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), } - testDevicePluginHandler.allocatedDevices[resourceName1] = make(podDevices) - testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev1") - testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con1", "dev2") - testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod1", "con2", "dev1") - testDevicePluginHandler.allocatedDevices[resourceName1].insert("pod2", "con1", "dev1") - testDevicePluginHandler.allocatedDevices[resourceName2] = make(podDevices) - testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev3") - testDevicePluginHandler.allocatedDevices[resourceName2].insert("pod1", "con1", "dev4") - err = testDevicePluginHandler.writeCheckpoint() + testHandler.podDevices.insert("pod1", "con1", resourceName1, + constructDevices([]string{"dev1", "dev2"}), + constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + testHandler.podDevices.insert("pod1", "con1", resourceName2, + constructDevices([]string{"dev1", "dev2"}), + constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}, + map[string]string{"/home/r2lib1": "/usr/r2lib1"}, + map[string]string{"r2devices": "dev1 dev2"})) + testHandler.podDevices.insert("pod1", "con2", resourceName1, + constructDevices([]string{"dev3"}), + constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + testHandler.podDevices.insert("pod2", "con1", resourceName1, + constructDevices([]string{"dev4"}), + constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + + expectedPodDevices := testHandler.podDevices + expectedAllocatedDevices := testHandler.podDevices.devices() + + err = testHandler.writeCheckpoint() as.Nil(err) - expected := testDevicePluginHandler.allocatedDevices - testDevicePluginHandler.allocatedDevices = make(map[string]podDevices) - err = testDevicePluginHandler.readCheckpoint() + testHandler.podDevices = make(podDevices) + err = testHandler.readCheckpoint() as.Nil(err) - as.Equal(expected, testDevicePluginHandler.allocatedDevices) + + as.Equal(len(expectedPodDevices), len(testHandler.podDevices)) + for podUID, containerDevices := range expectedPodDevices { + for conName, resources := range containerDevices { + for resource := range resources { + as.True(reflect.DeepEqual( + expectedPodDevices.containerDevices(podUID, conName, resource), + testHandler.podDevices.containerDevices(podUID, conName, resource))) + opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName) + opts2 := testHandler.podDevices.deviceRunContainerOptions(podUID, conName) + as.Equal(len(opts1.Envs), len(opts2.Envs)) + as.Equal(len(opts1.Mounts), len(opts2.Mounts)) + as.Equal(len(opts1.Devices), len(opts2.Devices)) + } + } + } + as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices)) } func TestPodContainerDeviceAllocation(t *testing.T) { - t.Skip("Skipping due to issue #54100") flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) var logLevel string flag.StringVar(&logLevel, "logLevel", "4", "test") @@ -171,39 +233,40 @@ func TestPodContainerDeviceAllocation(t *testing.T) { var activePods []*v1.Pod resourceName1 := "domain1.com/resource1" resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI) - devId1 := "dev1" - devId2 := "dev2" + devID1 := "dev1" + devID2 := "dev2" resourceName2 := "domain2.com/resource2" resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI) - devId3 := "dev3" - devId4 := "dev4" + devID3 := "dev3" + devID4 := "dev4" m, err := NewDevicePluginManagerTestStub() as := assert.New(t) as.Nil(err) monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} - testDevicePluginHandler := &DevicePluginHandlerImpl{ + testHandler := &HandlerImpl{ devicePluginManager: m, devicePluginManagerMonitorCallback: monitorCallback, allDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]podDevices), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), } - testDevicePluginHandler.allDevices[resourceName1] = sets.NewString() - testDevicePluginHandler.allDevices[resourceName1].Insert(devId1) - testDevicePluginHandler.allDevices[resourceName1].Insert(devId2) - testDevicePluginHandler.allDevices[resourceName2] = sets.NewString() - testDevicePluginHandler.allDevices[resourceName2].Insert(devId3) - testDevicePluginHandler.allDevices[resourceName2].Insert(devId4) + testHandler.allDevices[resourceName1] = sets.NewString() + testHandler.allDevices[resourceName1].Insert(devID1) + testHandler.allDevices[resourceName1].Insert(devID2) + testHandler.allDevices[resourceName2] = sets.NewString() + testHandler.allDevices[resourceName2].Insert(devID3) + testHandler.allDevices[resourceName2].Insert(devID4) - m.devRuntimeDevices[resourceName1+devId1] = append(m.devRuntimeDevices[resourceName1+devId1], stringPairType{"/dev/aaa", "/dev/aaa"}) - m.devRuntimeDevices[resourceName1+devId1] = append(m.devRuntimeDevices[resourceName1+devId1], stringPairType{"/dev/bbb", "/dev/bbb"}) - m.devRuntimeDevices[resourceName1+devId2] = append(m.devRuntimeDevices[resourceName1+devId2], stringPairType{"/dev/ccc", "/dev/ccc"}) - m.devRuntimeMounts[resourceName1+devId1] = append(m.devRuntimeMounts[resourceName1+devId1], stringPairType{"/container_dir1/file1", "host_dir1/file1"}) - m.devRuntimeMounts[resourceName1+devId2] = append(m.devRuntimeMounts[resourceName1+devId2], stringPairType{"/container_dir1/file1", "host_dir1/file1"}) - m.devRuntimeEnvs[resourceName1+devId2] = append(m.devRuntimeEnvs[resourceName1+devId2], stringPairType{"key1", "val1"}) - m.devRuntimeEnvs[resourceName2+devId3] = append(m.devRuntimeEnvs[resourceName2+devId3], stringPairType{"key2", "val2"}) - m.devRuntimeEnvs[resourceName2+devId4] = append(m.devRuntimeEnvs[resourceName2+devId4], stringPairType{"key2", "val2"}) + m.devRuntimeDevices[resourceName1+devID1] = append(m.devRuntimeDevices[resourceName1+devID1], stringPairType{"/dev/aaa", "/dev/aaa"}) + m.devRuntimeDevices[resourceName1+devID1] = append(m.devRuntimeDevices[resourceName1+devID1], stringPairType{"/dev/bbb", "/dev/bbb"}) + m.devRuntimeDevices[resourceName1+devID2] = append(m.devRuntimeDevices[resourceName1+devID2], stringPairType{"/dev/ccc", "/dev/ccc"}) + m.devRuntimeMounts[resourceName1+devID1] = append(m.devRuntimeMounts[resourceName1+devID1], stringPairType{"/container_dir1/file1", "host_dir1/file1"}) + m.devRuntimeMounts[resourceName1+devID2] = append(m.devRuntimeMounts[resourceName1+devID2], stringPairType{"/container_dir1/file2", "host_dir1/file2"}) + m.devRuntimeEnvs[resourceName1+devID2] = append(m.devRuntimeEnvs[resourceName1+devID2], stringPairType{"key1", "val1"}) + m.devRuntimeEnvs[resourceName2+devID3] = append(m.devRuntimeEnvs[resourceName2+devID3], stringPairType{"key2", "val2"}) + m.devRuntimeEnvs[resourceName2+devID4] = append(m.devRuntimeEnvs[resourceName2+devID4], stringPairType{"key2", "val3"}) pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -225,18 +288,16 @@ func TestPodContainerDeviceAllocation(t *testing.T) { }, } - cm := &containerManagerImpl{ - devicePluginHandler: testDevicePluginHandler, - } activePods = append(activePods, pod) - runContainerOpts, err := cm.GetResources(pod, &pod.Spec.Containers[0], activePods) + err = testHandler.Allocate(pod, &pod.Spec.Containers[0], activePods) + as.Nil(err) + runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) as.Equal(len(runContainerOpts.Devices), 3) - // Two devices require to mount the same path. Expects a single mount entry to be created. - as.Equal(len(runContainerOpts.Mounts), 1) - as.Equal(runContainerOpts.Mounts[0].ContainerPath, "/container_dir1/file1") + as.Equal(len(runContainerOpts.Mounts), 2) as.Equal(len(runContainerOpts.Envs), 2) // Requesting to create a pod without enough resources should fail. + as.Equal(2, testHandler.allocatedDevices[resourceName1].Len()) failPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: uuid.NewUUID(), @@ -247,18 +308,17 @@ func TestPodContainerDeviceAllocation(t *testing.T) { Name: string(uuid.NewUUID()), Resources: v1.ResourceRequirements{ Limits: v1.ResourceList{ - v1.ResourceName(resourceName1): resourceQuantity1, + v1.ResourceName(resourceName1): resourceQuantity2, }, }, }, }, }, } - runContainerOpts2, err := cm.GetResources(failPod, &failPod.Spec.Containers[0], activePods) + err = testHandler.Allocate(failPod, &failPod.Spec.Containers[0], activePods) as.NotNil(err) - as.Equal(len(runContainerOpts2.Devices), 0) - as.Equal(len(runContainerOpts2.Mounts), 0) - as.Equal(len(runContainerOpts2.Envs), 0) + runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0]) + as.Nil(runContainerOpts2) // Requesting to create a new pod with a single resourceName2 should succeed. newPod := &v1.Pod{ @@ -278,7 +338,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) { }, }, } - runContainerOpts3, err := cm.GetResources(newPod, &newPod.Spec.Containers[0], activePods) + err = testHandler.Allocate(newPod, &newPod.Spec.Containers[0], activePods) as.Nil(err) - as.Equal(len(runContainerOpts3.Envs), 1) + runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0]) + as.Equal(1, len(runContainerOpts3.Envs)) } diff --git a/pkg/kubelet/deviceplugin/device_plugin_stub.go b/pkg/kubelet/cm/deviceplugin/device_plugin_stub.go similarity index 100% rename from pkg/kubelet/deviceplugin/device_plugin_stub.go rename to pkg/kubelet/cm/deviceplugin/device_plugin_stub.go diff --git a/pkg/kubelet/deviceplugin/endpoint.go b/pkg/kubelet/cm/deviceplugin/endpoint.go similarity index 100% rename from pkg/kubelet/deviceplugin/endpoint.go rename to pkg/kubelet/cm/deviceplugin/endpoint.go diff --git a/pkg/kubelet/deviceplugin/endpoint_test.go b/pkg/kubelet/cm/deviceplugin/endpoint_test.go similarity index 100% rename from pkg/kubelet/deviceplugin/endpoint_test.go rename to pkg/kubelet/cm/deviceplugin/endpoint_test.go diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/cm/deviceplugin/manager.go similarity index 99% rename from pkg/kubelet/deviceplugin/manager.go rename to pkg/kubelet/cm/deviceplugin/manager.go index a8de2aa5e9f..39a5c713dcf 100644 --- a/pkg/kubelet/deviceplugin/manager.go +++ b/pkg/kubelet/cm/deviceplugin/manager.go @@ -141,7 +141,7 @@ func (m *ManagerImpl) Devices() map[string][]pluginapi.Device { devs := make(map[string][]pluginapi.Device) for k, e := range m.endpoints { - glog.V(3).Infof("Endpoint: %+v: %+v", k, e) + glog.V(3).Infof("Endpoint: %+v: %p", k, e) devs[k] = e.getDevices() } @@ -221,8 +221,8 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { m.mutex.Lock() old, ok := m.endpoints[r.ResourceName] m.endpoints[r.ResourceName] = e - m.mutex.Unlock() glog.V(2).Infof("Registered endpoint %v", e) + m.mutex.Unlock() if ok && old != nil { old.stop() diff --git a/pkg/kubelet/deviceplugin/manager_test.go b/pkg/kubelet/cm/deviceplugin/manager_test.go similarity index 100% rename from pkg/kubelet/deviceplugin/manager_test.go rename to pkg/kubelet/cm/deviceplugin/manager_test.go diff --git a/pkg/kubelet/cm/deviceplugin/pod_devices.go b/pkg/kubelet/cm/deviceplugin/pod_devices.go new file mode 100644 index 00000000000..e343321e221 --- /dev/null +++ b/pkg/kubelet/cm/deviceplugin/pod_devices.go @@ -0,0 +1,228 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/util/sets" + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +type deviceAllocateInfo struct { + // deviceIds contains device Ids allocated to this container for the given resourceName. + deviceIds sets.String + // allocResp contains cached rpc AllocateResponse. + allocResp *pluginapi.AllocateResponse +} + +type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName. +type containerDevices map[string]resourceAllocateInfo // Keyed by containerName. +type podDevices map[string]containerDevices // Keyed by podUID. + +func (pdev podDevices) pods() sets.String { + ret := sets.NewString() + for k := range pdev { + ret.Insert(k) + } + return ret +} + +func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.AllocateResponse) { + if _, podExists := pdev[podUID]; !podExists { + pdev[podUID] = make(containerDevices) + } + if _, contExists := pdev[podUID][contName]; !contExists { + pdev[podUID][contName] = make(resourceAllocateInfo) + } + pdev[podUID][contName][resource] = deviceAllocateInfo{ + deviceIds: devices, + allocResp: resp, + } +} + +func (pdev podDevices) delete(pods []string) { + for _, uid := range pods { + delete(pdev, uid) + } +} + +// Returns list of device Ids allocated to the given container for the given resource. +// Returns nil if we don't have cached state for the given . +func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String { + if _, podExists := pdev[podUID]; !podExists { + return nil + } + if _, contExists := pdev[podUID][contName]; !contExists { + return nil + } + devs, resourceExists := pdev[podUID][contName][resource] + if !resourceExists { + return nil + } + return devs.deviceIds +} + +// Returns all of devices allocated to the pods being tracked, keyed by resourceName. +func (pdev podDevices) devices() map[string]sets.String { + ret := make(map[string]sets.String) + for _, containerDevices := range pdev { + for _, resources := range containerDevices { + for resource, devices := range resources { + if _, exists := ret[resource]; !exists { + ret[resource] = sets.NewString() + } + ret[resource] = ret[resource].Union(devices.deviceIds) + } + } + } + return ret +} + +type checkpointEntry struct { + PodUID string + ContainerName string + ResourceName string + DeviceIDs []string + AllocResp []byte +} + +// checkpointData struct is used to store pod to device allocation information +// in a checkpoint file. +// TODO: add version control when we need to change checkpoint format. +type checkpointData struct { + Entries []checkpointEntry +} + +// Turns podDevices to checkpointData. +func (pdev podDevices) toCheckpointData() checkpointData { + var data checkpointData + for podUID, containerDevices := range pdev { + for conName, resources := range containerDevices { + for resource, devices := range resources { + devIds := devices.deviceIds.UnsortedList() + allocResp, err := devices.allocResp.Marshal() + if err != nil { + glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err) + continue + } + data.Entries = append(data.Entries, checkpointEntry{podUID, conName, resource, devIds, allocResp}) + } + } + } + return data +} + +// Populates podDevices from the passed in checkpointData. +func (pdev podDevices) fromCheckpointData(data checkpointData) { + for _, entry := range data.Entries { + glog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", + entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) + devIDs := sets.NewString() + for _, devID := range entry.DeviceIDs { + devIDs.Insert(devID) + } + allocResp := &pluginapi.AllocateResponse{} + err := allocResp.Unmarshal(entry.AllocResp) + if err != nil { + glog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err) + continue + } + pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, devIDs, allocResp) + } +} + +// Returns combined container runtime settings to consume the container's allocated devices. +func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions { + containers, exists := pdev[podUID] + if !exists { + return nil + } + resources, exists := containers[contName] + if !exists { + return nil + } + opts := &DeviceRunContainerOptions{} + // Maps to detect duplicate settings. + devsMap := make(map[string]string) + mountsMap := make(map[string]string) + envsMap := make(map[string]string) + // Loops through AllocationResponses of all cached device resources. + for _, devices := range resources { + resp := devices.allocResp + // Each Allocate response has the following artifacts. + // Environment variables + // Mount points + // Device files + // These artifacts are per resource per container. + // Updates RunContainerOptions.Envs. + for k, v := range resp.Envs { + if e, ok := envsMap[k]; ok { + glog.V(3).Infof("skip existing env %s %s", k, v) + if e != v { + glog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v) + } + continue + } + glog.V(4).Infof("add env %s %s", k, v) + envsMap[k] = v + opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v}) + } + + // Updates RunContainerOptions.Devices. + for _, dev := range resp.Devices { + if d, ok := devsMap[dev.ContainerPath]; ok { + glog.V(3).Infof("skip existing device %s %s", dev.ContainerPath, dev.HostPath) + if d != dev.HostPath { + glog.Errorf("Container device %s has conflicting mapping host devices: %s and %s", + dev.ContainerPath, d, dev.HostPath) + } + continue + } + glog.V(4).Infof("add device %s %s", dev.ContainerPath, dev.HostPath) + devsMap[dev.ContainerPath] = dev.HostPath + opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{ + PathOnHost: dev.HostPath, + PathInContainer: dev.ContainerPath, + Permissions: dev.Permissions, + }) + } + // Updates RunContainerOptions.Mounts. + for _, mount := range resp.Mounts { + if m, ok := mountsMap[mount.ContainerPath]; ok { + glog.V(3).Infof("skip existing mount %s %s", mount.ContainerPath, mount.HostPath) + if m != mount.HostPath { + glog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s", + mount.ContainerPath, m, mount.HostPath) + } + continue + } + glog.V(4).Infof("add mount %s %s", mount.ContainerPath, mount.HostPath) + mountsMap[mount.ContainerPath] = mount.HostPath + opts.Mounts = append(opts.Mounts, kubecontainer.Mount{ + Name: mount.ContainerPath, + ContainerPath: mount.ContainerPath, + HostPath: mount.HostPath, + ReadOnly: mount.ReadOnly, + // TODO: This may need to be part of Device plugin API. + SELinuxRelabel: false, + }) + } + } + return opts +} diff --git a/pkg/kubelet/deviceplugin/types.go b/pkg/kubelet/cm/deviceplugin/types.go similarity index 88% rename from pkg/kubelet/deviceplugin/types.go rename to pkg/kubelet/cm/deviceplugin/types.go index 9d0baffc015..b103ab05b1e 100644 --- a/pkg/kubelet/deviceplugin/types.go +++ b/pkg/kubelet/cm/deviceplugin/types.go @@ -18,6 +18,7 @@ package deviceplugin import ( pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) // MonitorCallback is the function called when a device's health state changes, @@ -46,6 +47,16 @@ type Manager interface { CheckpointFile() string } +// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. +type DeviceRunContainerOptions struct { + // The environment variables list. + Envs []kubecontainer.EnvVar + // The mounts for the container. + Mounts []kubecontainer.Mount + // The host devices mapped into the container. + Devices []kubecontainer.DeviceInfo +} + // TODO: evaluate whether we need these error definitions. const ( // errFailedToDialDevicePlugin is the error raised when the device plugin could not be