Kubelet side extension to support device allocation

This commit is contained in:
Jiaying Zhang 2017-08-22 15:03:47 -07:00
parent 7a8ad491ef
commit 02001af752
25 changed files with 839 additions and 258 deletions

View File

@ -427,6 +427,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
if err != nil {
return err
}
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
@ -450,6 +453,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
ExperimentalQOSReserved: *experimentalQOSReserved,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {

View File

@ -73,11 +73,11 @@ const (
// Works only with Docker Container Runtime.
Accelerators utilfeature.Feature = "Accelerators"
// owner: @vishh
// owner: @jiayingz
// alpha: v1.8
//
// Enables support for Device Plugins
// Only Nvidia GPUs are supported as of v1.8.
// Only Nvidia GPUs are tested as of v1.8.
DevicePlugins utilfeature.Feature = "DevicePlugins"
// owner: @gmarek

View File

@ -43,7 +43,6 @@ go_library(
"//pkg/fieldpath:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/v1alpha1:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",

View File

@ -30,22 +30,4 @@ const (
DevicePluginPath = "/var/lib/kubelet/device-plugins/"
// KubeletSocket is the path of the Kubelet registry socket
KubeletSocket = DevicePluginPath + "kubelet.sock"
// InvalidChars are the characters that may not appear in a Vendor or Kind field
InvalidChars = "/ "
// ErrFailedToDialDevicePlugin is the error raised when the device plugin could not be
// reached on the registered socket
ErrFailedToDialDevicePlugin = "Failed to dial device plugin:"
// ErrUnsuportedVersion is the error raised when the device plugin uses an API version not
// supported by the Kubelet registry
ErrUnsuportedVersion = "Unsupported version"
// ErrDevicePluginAlreadyExists is the error raised when a device plugin with the
// same Resource Name tries to register itself
ErrDevicePluginAlreadyExists = "Another device plugin already registered this Resource Name"
// ErrInvalidResourceName is the error raised when a device plugin is registering
// itself with an invalid ResourceName
ErrInvalidResourceName = "The Resource Name is invalid"
// ErrEmptyResourceName is the error raised when the resource name field is empty
ErrEmptyResourceName = "Invalid Empty ResourceName"
)

View File

@ -8,6 +8,7 @@ go_library(
"container_manager_stub.go",
"container_manager_unsupported.go",
"device_plugin_handler.go",
"device_plugin_handler_stub.go",
"helpers_unsupported.go",
"pod_container_manager_stub.go",
"pod_container_manager_unsupported.go",
@ -31,11 +32,13 @@ go_library(
"//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/deviceplugin:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
@ -57,7 +60,6 @@ go_library(
"//vendor/github.com/opencontainers/runc/libcontainer/cgroups/fs:go_default_library",
"//vendor/github.com/opencontainers/runc/libcontainer/cgroups/systemd:go_default_library",
"//vendor/github.com/opencontainers/runc/libcontainer/configs:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
@ -69,6 +71,7 @@ go_test(
name = "go_default_test",
srcs = [
"container_manager_unsupported_test.go",
"device_plugin_handler_test.go",
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"cgroup_manager_linux_test.go",
@ -81,15 +84,19 @@ go_test(
}),
library = ":go_default_library",
deps = [
"//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
] + select({
"@io_bazel_rules_go//go/platform:linux_amd64": [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
],
"//conditions:default": [],
}),

View File

@ -20,8 +20,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
// TODO: Migrate kubelet to either use its own internal objects or client library.
"k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"fmt"
@ -68,7 +68,9 @@ type ContainerManager interface {
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error
InternalContainerLifecycle() InternalContainerLifecycle
// Returns RunContainerOptions with devices, mounts, and env fields populated for
// extended resources required by container.
GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error)
}
type NodeConfig struct {

View File

@ -41,6 +41,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/qos"
utilfile "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/util/mount"
@ -117,8 +118,8 @@ type containerManagerImpl struct {
recorder record.EventRecorder
// Interface for QoS cgroup management
qosContainerManager QOSContainerManager
// Interface for device plugin management.
devicePluginHdler DevicePluginHandler
// Interface for exporting and allocating devices reported by device plugins.
devicePluginHandler DevicePluginHandler
}
type features struct {
@ -181,7 +182,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
subsystems, err := GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
@ -252,7 +253,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err
}
return &containerManagerImpl{
cm := &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
NodeConfig: nodeConfig,
@ -262,7 +263,31 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cgroupRoot: cgroupRoot,
recorder: recorder,
qosContainerManager: qosContainerManager,
}, nil
}
updateDeviceCapacityFunc := func(updates v1.ResourceList) {
cm.Lock()
defer cm.Unlock()
for k, v := range updates {
if v.Value() <= 0 {
delete(cm.capacity, k)
} else {
cm.capacity[k] = v
}
}
}
glog.Infof("Creating device plugin handler: %t", devicePluginEnabled)
if devicePluginEnabled {
cm.devicePluginHandler, err = NewDevicePluginHandlerImpl(updateDeviceCapacityFunc)
} else {
cm.devicePluginHandler, err = NewDevicePluginHandlerStub()
}
if err != nil {
return nil, err
}
return cm, nil
}
// NewPodContainerManager is a factory method returns a PodContainerManager object
@ -545,6 +570,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc)
}
close(stopChan)
}, time.Second, stopChan)
// Starts device plugin manager.
if err := cm.devicePluginHandler.Start(); err != nil {
return err
}
return nil
}
@ -562,6 +592,76 @@ func (cm *containerManagerImpl) setFsCapacity() error {
return nil
}
// TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
opts := &kubecontainer.RunContainerOptions{}
// Gets devices, mounts, and envs from device plugin handler.
glog.V(3).Infof("Calling devicePluginHandler AllocateDevices")
// Maps to detect duplicate settings.
devsMap := make(map[string]string)
mountsMap := make(map[string]string)
envsMap := make(map[string]string)
allocResps, err := cm.devicePluginHandler.Allocate(pod, container, activePods)
if err != nil {
return opts, err
}
// Loops through AllocationResponses of all required extended resources.
for _, resp := range allocResps {
// Loops through runtime spec of all devices of the given resource.
for _, devRuntime := range resp.Spec {
// Updates RunContainerOptions.Devices.
for _, dev := range devRuntime.Devices {
if d, ok := devsMap[dev.ContainerPath]; ok {
glog.V(3).Infof("skip existing device %s %s", dev.ContainerPath, dev.HostPath)
if d != dev.HostPath {
glog.Errorf("Container device %s has conflicting mapping host devices: %s and %s",
dev.ContainerPath, d, dev.HostPath)
}
continue
}
devsMap[dev.ContainerPath] = dev.HostPath
opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{
PathOnHost: dev.HostPath,
PathInContainer: dev.ContainerPath,
Permissions: dev.Permissions,
})
}
// Updates RunContainerOptions.Mounts.
for _, mount := range devRuntime.Mounts {
if m, ok := mountsMap[mount.ContainerPath]; ok {
glog.V(3).Infof("skip existing mount %s %s", mount.ContainerPath, mount.HostPath)
if m != mount.HostPath {
glog.Errorf("Container mount %s has conflicting mapping host mounts: %s and %s",
mount.ContainerPath, m, mount.HostPath)
}
continue
}
mountsMap[mount.ContainerPath] = mount.HostPath
opts.Mounts = append(opts.Mounts, kubecontainer.Mount{
Name: mount.ContainerPath,
ContainerPath: mount.ContainerPath,
HostPath: mount.HostPath,
ReadOnly: mount.ReadOnly,
SELinuxRelabel: false,
})
}
// Updates RunContainerOptions.Envs.
for k, v := range devRuntime.Envs {
if e, ok := envsMap[k]; ok {
glog.V(3).Infof("skip existing envs %s %s", k, v)
if e != v {
glog.Errorf("Environment variable %s has conflicting setting: %s and %s", k, e, v)
}
continue
}
envsMap[k] = v
opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v})
}
}
}
return opts, nil
}
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
cpuLimit := int64(0)
@ -822,13 +922,3 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
defer cm.RUnlock()
return cm.capacity
}
// GetDevicePluginHandler returns the DevicePluginHandler
func (m *containerManagerImpl) GetDevicePluginHandler() DevicePluginHandler {
return m.devicePluginHdler
}
// SetDevicePluginHandler sets the DevicePluginHandler
func (m *containerManagerImpl) SetDevicePluginHandler(d DevicePluginHandler) {
m.devicePluginHdler = d
}

View File

@ -19,6 +19,8 @@ package cm
import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
type containerManagerStub struct{}
@ -66,6 +68,10 @@ func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{}
}
func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
return &kubecontainer.RunContainerOptions{}, nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
}

View File

@ -72,6 +72,10 @@ func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerMana
return &unsupportedPodContainerManager{}
}
func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) {
return &kubecontainer.RunContainerOptions{}, nil
}
func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil
}

View File

@ -38,6 +38,6 @@ func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc) error {
return nil
}
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
return &containerManagerImpl{}, nil
}

View File

@ -18,38 +18,214 @@ package cm
import (
"fmt"
"sync"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/deviceplugin"
)
// podDevices represents a list of pod to device Id mappings.
type containerDevices map[string]sets.String
type podDevices map[string]containerDevices
func (pdev podDevices) pods() sets.String {
ret := sets.NewString()
for k := range pdev {
ret.Insert(k)
}
return ret
}
func (pdev podDevices) insert(podUID, contName string, device string) {
if _, exists := pdev[podUID]; !exists {
pdev[podUID] = make(containerDevices)
}
if _, exists := pdev[podUID][contName]; !exists {
pdev[podUID][contName] = sets.NewString()
}
pdev[podUID][contName].Insert(device)
}
func (pdev podDevices) getDevices(podUID, contName string) sets.String {
containers, exists := pdev[podUID]
if !exists {
return nil
}
devices, exists := containers[contName]
if !exists {
return nil
}
return devices
}
func (pdev podDevices) delete(pods []string) {
for _, uid := range pods {
delete(pdev, uid)
}
}
func (pdev podDevices) devices() sets.String {
ret := sets.NewString()
for _, containerDevices := range pdev {
for _, deviceSet := range containerDevices {
ret = ret.Union(deviceSet)
}
}
return ret
}
type DevicePluginHandler interface {
// Start starts device plugin registration service.
Start() error
// Devices returns all of registered devices keyed by resourceName.
Devices() map[string][]*pluginapi.Device
// Allocate attempts to allocate all of required extended resources for
// the input container, issues an Allocate rpc request for each of such
// resources, and returns their AllocateResponses on success.
Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error)
}
type DevicePluginHandlerImpl struct {
sync.Mutex
devicePluginManager deviceplugin.Manager
// devicePluginManagerMonitorCallback is used for testing only.
devicePluginManagerMonitorCallback deviceplugin.MonitorCallback
// allDevices contains all of registered resourceNames and their exported device IDs.
allDevices map[string]sets.String
// allocatedDevices contains pod to allocated device mapping, keyed by resourceName.
allocatedDevices map[string]podDevices
}
// NewDevicePluginHandler create a DevicePluginHandler
func NewDevicePluginHandler() (*DevicePluginHandlerImpl, error) {
glog.V(2).Infof("Starting Device Plugin Handler")
// updateCapacityFunc is called to update ContainerManager capacity when
// device capacity changes.
func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*DevicePluginHandlerImpl, error) {
glog.V(2).Infof("Creating Device Plugin Handler")
handler := &DevicePluginHandlerImpl{
allDevices: make(map[string]sets.String),
allocatedDevices: devicesInUse(),
}
mgr, err := deviceplugin.NewManagerImpl(pluginapi.DevicePluginPath,
func(r string, a, u, d []*pluginapi.Device) {})
deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {
var capacity = v1.ResourceList{}
kept := append(updated, added...)
if _, ok := handler.allDevices[resourceName]; !ok {
handler.allDevices[resourceName] = sets.NewString()
}
// For now, DevicePluginHandler only keeps track of healthy devices.
// We can revisit this later when the need comes to track unhealthy devices here.
for _, dev := range kept {
if dev.Health == pluginapi.Healthy {
handler.allDevices[resourceName].Insert(dev.ID)
} else {
handler.allDevices[resourceName].Delete(dev.ID)
}
}
for _, dev := range deleted {
handler.allDevices[resourceName].Delete(dev.ID)
}
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(handler.allDevices[resourceName].Len()), resource.DecimalSI)
updateCapacityFunc(capacity)
}
mgr, err := deviceplugin.NewManagerImpl(pluginapi.KubeletSocket, deviceManagerMonitorCallback)
if err != nil {
return nil, fmt.Errorf("Failed to initialize device plugin: %+v", err)
return nil, fmt.Errorf("Failed to initialize device plugin manager: %+v", err)
}
if err := mgr.Start(); err != nil {
return nil, err
}
return &DevicePluginHandlerImpl{
devicePluginManager: mgr,
}, nil
handler.devicePluginManager = mgr
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
return handler, nil
}
func (h *DevicePluginHandlerImpl) Start() error {
return h.devicePluginManager.Start()
}
// TODO cache this
func (h *DevicePluginHandlerImpl) Devices() map[string][]*pluginapi.Device {
return h.devicePluginManager.Devices()
}
func (h *DevicePluginHandlerImpl) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) {
var ret []*pluginapi.AllocateResponse
h.updateAllocatedDevices(activePods)
for k, v := range container.Resources.Limits {
resource := string(k)
needed := int(v.Value())
glog.V(3).Infof("needs %d %s", needed, resource)
if !deviceplugin.IsDeviceName(k) || needed == 0 {
continue
}
h.Lock()
// Gets list of devices that have already been allocated.
// This can happen if a container restarts for example.
if h.allocatedDevices[resource] == nil {
h.allocatedDevices[resource] = make(podDevices)
}
devices := h.allocatedDevices[resource].getDevices(string(pod.UID), container.Name)
if devices != nil {
glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, container.Name, pod.UID, devices.List())
needed = needed - devices.Len()
}
// Get Devices in use.
devicesInUse := h.allocatedDevices[resource].devices()
// Get a list of available devices.
available := h.allDevices[resource].Difference(devicesInUse)
if int(available.Len()) < needed {
h.Unlock()
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
}
allocated := available.UnsortedList()[:needed]
for _, device := range allocated {
// Update internal allocated device cache.
h.allocatedDevices[resource].insert(string(pod.UID), container.Name, device)
}
h.Unlock()
// devicePluginManager.Allocate involves RPC calls to device plugin, which
// could be heavy-weight. Therefore we want to perform this operation outside
// mutex lock. Note if Allcate call fails, we may leave container resources
// partially allocated for the failed container. We rely on updateAllocatedDevices()
// to garbage collect these resources later. Another side effect is that if
// we have X resource A and Y resource B in total, and two containers, container1
// and container2 both require X resource A and Y resource B. Both allocation
// requests may fail if we serve them in mixed order.
// TODO: may revisit this part later if we see inefficient resource allocation
// in real use as the result of this.
resp, err := h.devicePluginManager.Allocate(resource, append(devices.UnsortedList(), allocated...))
if err != nil {
return nil, err
}
ret = append(ret, resp)
}
return ret, nil
}
// devicesInUse returns a list of custom devices in use along with the
// respective pods that are using them.
func devicesInUse() map[string]podDevices {
// TODO: gets the initial state from checkpointing.
return make(map[string]podDevices)
}
// updateAllocatedDevices updates the list of GPUs in use.
// It gets a list of active pods and then frees any GPUs that are bound to
// terminated pods. Returns error on failure.
func (h *DevicePluginHandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
h.Lock()
defer h.Unlock()
activePodUids := sets.NewString()
for _, pod := range activePods {
activePodUids.Insert(string(pod.UID))
}
for _, podDevs := range h.allocatedDevices {
allocatedPodUids := podDevs.pods()
podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List())
podDevs.delete(podsToBeRemoved.List())
}
}

View File

@ -0,0 +1,42 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// A simple stub implementation for DevicePluginHandler.
type DevicePluginHandlerStub struct{}
func NewDevicePluginHandlerStub() (*DevicePluginHandlerStub, error) {
return &DevicePluginHandlerStub{}, nil
}
func (h *DevicePluginHandlerStub) Start() error {
return nil
}
func (h *DevicePluginHandlerStub) Devices() map[string][]*pluginapi.Device {
return make(map[string][]*pluginapi.Device)
}
func (h *DevicePluginHandlerStub) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) {
var ret []*pluginapi.AllocateResponse
return ret, nil
}

View File

@ -0,0 +1,250 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"flag"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
func TestUpdateCapacity(t *testing.T) {
var expected = v1.ResourceList{}
as := assert.New(t)
verifyCapacityFunc := func(updates v1.ResourceList) {
as.Equal(expected, updates)
}
testDevicePluginHandler, err := NewDevicePluginHandlerImpl(verifyCapacityFunc)
as.NotNil(testDevicePluginHandler)
as.Nil(err)
devs := []*pluginapi.Device{
{ID: "Device1", Health: pluginapi.Healthy},
{ID: "Device2", Health: pluginapi.Healthy},
{ID: "Device3", Health: pluginapi.Unhealthy},
}
resourceName := "resource1"
// Adds three devices for resource1, two healthy and one unhealthy.
// Expects capacity for resource1 to be 2.
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, devs, []*pluginapi.Device{}, []*pluginapi.Device{})
// Deletes an unhealthy device should NOT change capacity.
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, []*pluginapi.Device{devs[2]})
// Updates a healthy device to unhealthy should reduce capacity by 1.
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI)
// Deletes a healthy device should reduce capacity by 1.
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
// Tests adding another resource.
delete(expected, v1.ResourceName(resourceName))
resourceName2 := "resource2"
expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []*pluginapi.Device{}, []*pluginapi.Device{})
}
type stringPairType struct {
value1 string
value2 string
}
// DevicePluginManager stub to test device Allocation behavior.
type DevicePluginManagerTestStub struct {
// All data structs are keyed by resourceName+DevId
devRuntimeDevices map[string][]stringPairType
devRuntimeMounts map[string][]stringPairType
devRuntimeEnvs map[string][]stringPairType
}
func NewDevicePluginManagerTestStub() (*DevicePluginManagerTestStub, error) {
return &DevicePluginManagerTestStub{
devRuntimeDevices: make(map[string][]stringPairType),
devRuntimeMounts: make(map[string][]stringPairType),
devRuntimeEnvs: make(map[string][]stringPairType),
}, nil
}
func (m *DevicePluginManagerTestStub) Start() error {
return nil
}
func (m *DevicePluginManagerTestStub) Devices() map[string][]*pluginapi.Device {
return make(map[string][]*pluginapi.Device)
}
func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.AllocateResponse)
for _, id := range devIds {
key := resourceName + id
fmt.Printf("Alloc device %q for resource %q\n", id, resourceName)
devRuntime := new(pluginapi.DeviceRuntimeSpec)
for _, dev := range m.devRuntimeDevices[key] {
devRuntime.Devices = append(devRuntime.Devices, &pluginapi.DeviceSpec{
ContainerPath: dev.value1,
HostPath: dev.value2,
Permissions: "mrw",
})
}
for _, mount := range m.devRuntimeMounts[key] {
fmt.Printf("Add mount %q %q\n", mount.value1, mount.value2)
devRuntime.Mounts = append(devRuntime.Mounts, &pluginapi.Mount{
ContainerPath: mount.value1,
HostPath: mount.value2,
ReadOnly: true,
})
}
devRuntime.Envs = make(map[string]string)
for _, env := range m.devRuntimeEnvs[key] {
devRuntime.Envs[env.value1] = env.value2
}
resp.Spec = append(resp.Spec, devRuntime)
}
return resp, nil
}
func (m *DevicePluginManagerTestStub) Stop() error {
return nil
}
func TestPodContainerDeviceAllocation(t *testing.T) {
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
var logLevel string
flag.StringVar(&logLevel, "logLevel", "4", "test")
flag.Lookup("v").Value.Set(logLevel)
var activePods []*v1.Pod
resourceName1 := "domain1.com/resource1"
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
devId1 := "dev1"
devId2 := "dev2"
resourceName2 := "domain2.com/resource2"
resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI)
devId3 := "dev3"
devId4 := "dev4"
m, err := NewDevicePluginManagerTestStub()
as := assert.New(t)
as.Nil(err)
monitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {}
testDevicePluginHandler := &DevicePluginHandlerImpl{
devicePluginManager: m,
devicePluginManagerMonitorCallback: monitorCallback,
allDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]podDevices),
}
testDevicePluginHandler.allDevices[resourceName1] = sets.NewString()
testDevicePluginHandler.allDevices[resourceName1].Insert(devId1)
testDevicePluginHandler.allDevices[resourceName1].Insert(devId2)
testDevicePluginHandler.allDevices[resourceName2] = sets.NewString()
testDevicePluginHandler.allDevices[resourceName2].Insert(devId3)
testDevicePluginHandler.allDevices[resourceName2].Insert(devId4)
m.devRuntimeDevices[resourceName1+devId1] = append(m.devRuntimeDevices[resourceName1+devId1], stringPairType{"/dev/aaa", "/dev/aaa"})
m.devRuntimeDevices[resourceName1+devId1] = append(m.devRuntimeDevices[resourceName1+devId1], stringPairType{"/dev/bbb", "/dev/bbb"})
m.devRuntimeDevices[resourceName1+devId2] = append(m.devRuntimeDevices[resourceName1+devId2], stringPairType{"/dev/ccc", "/dev/ccc"})
m.devRuntimeMounts[resourceName1+devId1] = append(m.devRuntimeMounts[resourceName1+devId1], stringPairType{"/container_dir1/file1", "host_dir1/file1"})
m.devRuntimeMounts[resourceName1+devId2] = append(m.devRuntimeMounts[resourceName1+devId2], stringPairType{"/container_dir1/file1", "host_dir1/file1"})
m.devRuntimeEnvs[resourceName1+devId2] = append(m.devRuntimeEnvs[resourceName1+devId2], stringPairType{"key1", "val1"})
m.devRuntimeEnvs[resourceName2+devId3] = append(m.devRuntimeEnvs[resourceName2+devId3], stringPairType{"key2", "val2"})
m.devRuntimeEnvs[resourceName2+devId4] = append(m.devRuntimeEnvs[resourceName2+devId4], stringPairType{"key2", "val2"})
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: uuid.NewUUID(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: string(uuid.NewUUID()),
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceName(resourceName1): resourceQuantity1,
v1.ResourceName("cpu"): resourceQuantity1,
v1.ResourceName(resourceName2): resourceQuantity2,
},
},
},
},
},
}
cm := &containerManagerImpl{
devicePluginHandler: testDevicePluginHandler,
}
activePods = append(activePods, pod)
runContainerOpts, err := cm.GetResources(pod, &pod.Spec.Containers[0], activePods)
as.Equal(len(runContainerOpts.Devices), 3)
// Two devices require to mount the same path. Expects a single mount entry to be created.
as.Equal(len(runContainerOpts.Mounts), 1)
as.Equal(runContainerOpts.Mounts[0].ContainerPath, "/container_dir1/file1")
as.Equal(len(runContainerOpts.Envs), 2)
// Requesting to create a pod without enough resources should fail.
failPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: uuid.NewUUID(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: string(uuid.NewUUID()),
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceName(resourceName1): resourceQuantity1,
},
},
},
},
},
}
runContainerOpts2, err := cm.GetResources(failPod, &failPod.Spec.Containers[0], activePods)
as.NotNil(err)
as.Equal(len(runContainerOpts2.Devices), 0)
as.Equal(len(runContainerOpts2.Mounts), 0)
as.Equal(len(runContainerOpts2.Envs), 0)
// Requesting to create a new pod with a single resourceName2 should succeed.
newPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: uuid.NewUUID(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: string(uuid.NewUUID()),
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceName(resourceName2): resourceQuantity2,
},
},
},
},
},
}
runContainerOpts3, err := cm.GetResources(newPod, &newPod.Spec.Containers[0], activePods)
as.Nil(err)
as.Equal(len(runContainerOpts3.Envs), 1)
}

View File

@ -11,18 +11,20 @@ load(
go_library(
name = "go_default_library",
srcs = [
"device_plugin_stub.go",
"endpoint.go",
"manager.go",
"mock_device_plugin.go",
"types.go",
"utils.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/v1/helper:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
],
)

View File

@ -20,6 +20,7 @@ import (
"log"
"net"
"os"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -27,8 +28,8 @@ import (
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// MockDevicePlugin is a mock device plugin
type MockDevicePlugin struct {
// Stub implementation for DevicePlugin.
type Stub struct {
devs []*pluginapi.Device
socket string
@ -38,9 +39,9 @@ type MockDevicePlugin struct {
server *grpc.Server
}
// NewMockDevicePlugin returns an initialized MockDevicePlugin
func NewMockDevicePlugin(devs []*pluginapi.Device, socket string) *MockDevicePlugin {
return &MockDevicePlugin{
// NewDevicePluginStub returns an initialized DevicePlugin Stub.
func NewDevicePluginStub(devs []*pluginapi.Device, socket string) *Stub {
return &Stub{
devs: devs,
socket: socket,
@ -50,7 +51,7 @@ func NewMockDevicePlugin(devs []*pluginapi.Device, socket string) *MockDevicePlu
}
// Start starts the gRPC server of the device plugin
func (m *MockDevicePlugin) Start() error {
func (m *Stub) Start() error {
err := m.cleanup()
if err != nil {
return err
@ -65,20 +66,28 @@ func (m *MockDevicePlugin) Start() error {
pluginapi.RegisterDevicePluginServer(m.server, m)
go m.server.Serve(sock)
// Wait till grpc server is ready.
for i := 0; i < 10; i++ {
services := m.server.GetServiceInfo()
if len(services) > 0 {
break
}
time.Sleep(1 * time.Second)
}
log.Println("Starting to serve on", m.socket)
return nil
}
// Stop stops the gRPC server
func (m *MockDevicePlugin) Stop() error {
func (m *Stub) Stop() error {
m.server.Stop()
return m.cleanup()
}
// ListAndWatch lists devices and update that list according to the Update call
func (m *MockDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
func (m *Stub) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
log.Println("ListAndWatch")
var devs []*pluginapi.Device
@ -102,19 +111,19 @@ func (m *MockDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePl
}
// Update allows the device plugin to send new devices through ListAndWatch
func (m *MockDevicePlugin) Update(devs []*pluginapi.Device) {
func (m *Stub) Update(devs []*pluginapi.Device) {
m.update <- devs
}
// Allocate does a mock allocation
func (m *MockDevicePlugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
func (m *Stub) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
log.Printf("Allocate, %+v", r)
var response pluginapi.AllocateResponse
return &response, nil
}
func (m *MockDevicePlugin) cleanup() error {
func (m *Stub) cleanup() error {
if err := os.Remove(m.socket); err != nil && !os.IsNotExist(err) {
return err
}

View File

@ -29,6 +29,9 @@ import (
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// endpoint maps to a single registered device plugin. It is responsible
// for managing gRPC communications with the device plugin and caching
// device states reported by the device plugin.
type endpoint struct {
client pluginapi.DevicePluginClient
@ -44,9 +47,11 @@ type endpoint struct {
ctx context.Context
}
// newEndpoint creates a new endpoint for the given resourceName.
func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) {
client, err := dial(socketPath)
if err != nil {
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
return nil, err
}
@ -66,49 +71,65 @@ func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*en
}, nil
}
func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
glog.V(2).Infof("Starting ListAndWatch")
func (e *endpoint) getDevices() []*pluginapi.Device {
e.mutex.Lock()
defer e.mutex.Unlock()
return copyDevices(e.devices)
}
// list initializes ListAndWatch gRPC call for the device plugin and gets the
// initial list of the devices. Returns ListAndWatch gRPC stream on success.
func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
glog.V(3).Infof("Starting List")
stream, err := e.client.ListAndWatch(e.ctx, &pluginapi.Empty{})
if err != nil {
glog.Errorf(ErrListAndWatch, e.resourceName, err)
glog.Errorf(errListAndWatch, e.resourceName, err)
return nil, err
}
devs, err := stream.Recv()
if err != nil {
glog.Errorf(ErrListAndWatch, e.resourceName, err)
glog.Errorf(errListAndWatch, e.resourceName, err)
return nil, err
}
devices := make(map[string]*pluginapi.Device)
var added, updated, deleted []*pluginapi.Device
for _, d := range devs.Devices {
devices[d.ID] = d
added = append(added, cloneDevice(d))
}
e.mutex.Lock()
e.devices = devices
e.mutex.Unlock()
e.callback(e.resourceName, added, updated, deleted)
return stream, nil
}
// listAndWatch blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch
// stream update contains a new list of device states. listAndWatch compares the new
// device states with its cached states to get list of new, updated, and deleted devices.
// It then issues a callback to pass this information to the device_plugin_handler which
// will adjust the resource available information accordingly.
func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) {
glog.V(2).Infof("Starting ListAndWatch")
glog.V(3).Infof("Starting ListAndWatch")
devices := make(map[string]*pluginapi.Device)
e.mutex.Lock()
for _, d := range e.devices {
devices[d.ID] = CloneDevice(d)
devices[d.ID] = cloneDevice(d)
}
e.mutex.Unlock()
for {
response, err := stream.Recv()
if err != nil {
glog.Errorf(ErrListAndWatch, e.resourceName, err)
glog.Errorf(errListAndWatch, e.resourceName, err)
return
}
@ -126,7 +147,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d)
devices[d.ID] = d
added = append(added, CloneDevice(d))
added = append(added, cloneDevice(d))
continue
}
@ -142,7 +163,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
}
devices[d.ID] = d
updated = append(updated, CloneDevice(d))
updated = append(updated, cloneDevice(d))
}
var deleted []*pluginapi.Device
@ -153,7 +174,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
glog.Errorf("Device %s was deleted", d.ID)
deleted = append(deleted, CloneDevice(d))
deleted = append(deleted, cloneDevice(d))
delete(devices, id)
}
@ -166,14 +187,10 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
}
func (e *endpoint) allocate(devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) {
var ids []string
for _, d := range devs {
ids = append(ids, d.ID)
}
// allocate issues Allocate gRPC call to the device plugin.
func (e *endpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
DevicesIDs: ids,
DevicesIDs: devs,
})
}
@ -181,6 +198,7 @@ func (e *endpoint) stop() {
e.cancel()
}
// dial establishes the gRPC communication with the registered device plugin.
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
@ -189,7 +207,7 @@ func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) {
)
if err != nil {
return nil, fmt.Errorf(pluginapi.ErrFailedToDialDevicePlugin+" %v", err)
return nil, fmt.Errorf(errFailedToDialDevicePlugin+" %v", err)
}
return pluginapi.NewDevicePluginClient(c), nil

View File

@ -17,7 +17,6 @@ limitations under the License.
package deviceplugin
import (
"os"
"path"
"testing"
"time"
@ -32,8 +31,7 @@ var (
)
func TestNewEndpoint(t *testing.T) {
wd, _ := os.Getwd()
socket := path.Join(wd, esocketName)
socket := path.Join("/tmp", esocketName)
devs := []*pluginapi.Device{
{ID: "ADeviceId", Health: pluginapi.Healthy},
@ -44,8 +42,7 @@ func TestNewEndpoint(t *testing.T) {
}
func TestList(t *testing.T) {
wd, _ := os.Getwd()
socket := path.Join(wd, esocketName)
socket := path.Join("/tmp", esocketName)
devs := []*pluginapi.Device{
{ID: "ADeviceId", Health: pluginapi.Healthy},
@ -70,8 +67,7 @@ func TestList(t *testing.T) {
}
func TestListAndWatch(t *testing.T) {
wd, _ := os.Getwd()
socket := path.Join(wd, esocketName)
socket := path.Join("/tmp", esocketName)
devs := []*pluginapi.Device{
{ID: "ADeviceId", Health: pluginapi.Healthy},
@ -118,8 +114,8 @@ func TestListAndWatch(t *testing.T) {
}
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*MockDevicePlugin, *endpoint) {
p := NewMockDevicePlugin(devs, socket)
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*Stub, *endpoint) {
p := NewDevicePluginStub(devs, socket)
err := p.Start()
require.NoError(t, err)
@ -130,7 +126,7 @@ func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string,
return p, e
}
func ecleanup(t *testing.T, p *MockDevicePlugin, e *endpoint) {
func ecleanup(t *testing.T, p *Stub, e *endpoint) {
p.Stop()
e.stop()
}

View File

@ -21,6 +21,7 @@ import (
"net"
"os"
"path/filepath"
"sync"
"github.com/golang/glog"
"golang.org/x/net/context"
@ -29,15 +30,27 @@ import (
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// NewManagerImpl creates a new manager on the socket `socketPath` and can
// rebuild state from devices and available []Device.
// f is the callback that is called when a device becomes unhealthy
// ManagerImpl is the structure in charge of managing Device Plugins.
type ManagerImpl struct {
socketname string
socketdir string
Endpoints map[string]*endpoint // Key is ResourceName
mutex sync.Mutex
callback MonitorCallback
server *grpc.Server
}
// NewManagerImpl creates a new manager on the socket `socketPath`.
// f is the callback that is called when a device becomes unhealthy.
// socketPath is present for testing purposes in production this is pluginapi.KubeletSocket
func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) {
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(ErrBadSocket+" %v", socketPath)
return nil, fmt.Errorf(errBadSocket+" %v", socketPath)
}
dir, file := filepath.Split(socketPath)
@ -50,6 +63,26 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error)
}, nil
}
func removeContents(dir string) error {
d, err := os.Open(dir)
if err != nil {
return err
}
defer d.Close()
names, err := d.Readdirnames(-1)
if err != nil {
return err
}
for _, name := range names {
// TODO: skip checkpoint file and check for file type.
err = os.RemoveAll(filepath.Join(dir, name))
if err != nil {
return err
}
}
return nil
}
// Start starts the Device Plugin Manager
func (m *ManagerImpl) Start() error {
glog.V(2).Infof("Starting Device Plugin manager")
@ -57,14 +90,20 @@ func (m *ManagerImpl) Start() error {
socketPath := filepath.Join(m.socketdir, m.socketname)
os.MkdirAll(m.socketdir, 0755)
// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := removeContents(m.socketdir); err != nil {
glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
}
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
glog.Errorf(ErrRemoveSocket+" %+v", err)
glog.Errorf(errRemoveSocket+" %+v", err)
return err
}
s, err := net.Listen("unix", socketPath)
if err != nil {
glog.Errorf(ErrListenSocket+" %+v", err)
glog.Errorf(errListenSocket+" %+v", err)
return err
}
@ -77,39 +116,33 @@ func (m *ManagerImpl) Start() error {
}
// Devices is the map of devices that are known by the Device
// Plugin manager with the Kind of the devices as key
// Plugin manager with the kind of the devices as key
func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device {
glog.V(2).Infof("Devices called")
m.mutex.Lock()
defer m.mutex.Unlock()
devs := make(map[string][]*pluginapi.Device)
for k, e := range m.Endpoints {
glog.V(2).Infof("Endpoint: %+v: %+v", k, e)
e.mutex.Lock()
devs[k] = copyDevices(e.devices)
e.mutex.Unlock()
glog.V(3).Infof("Endpoint: %+v: %+v", k, e)
devs[k] = e.getDevices()
}
return devs
}
// Allocate is the call that you can use to allocate a set of Devices
func (m *ManagerImpl) Allocate(resourceName string,
devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
// Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins.
func (m *ManagerImpl) Allocate(resourceName string, devs []string) (*pluginapi.AllocateResponse, error) {
if len(devs) == 0 {
return nil, nil
}
glog.Infof("Recieved request for devices %v for device plugin %s",
glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s",
devs, resourceName)
m.mutex.Lock()
e, ok := m.Endpoints[resourceName]
m.mutex.Unlock()
if !ok {
return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName)
}
@ -117,45 +150,46 @@ func (m *ManagerImpl) Allocate(resourceName string,
return e.allocate(devs)
}
// Register registers a device plugin
// Register registers a device plugin.
func (m *ManagerImpl) Register(ctx context.Context,
r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
glog.V(2).Infof("Got request for Device Plugin %s", r.ResourceName)
if r.Version != pluginapi.Version {
return &pluginapi.Empty{},
fmt.Errorf(pluginapi.ErrUnsuportedVersion)
return &pluginapi.Empty{}, fmt.Errorf(errUnsuportedVersion)
}
if err := IsResourceNameValid(r.ResourceName); err != nil {
return &pluginapi.Empty{}, err
}
if _, ok := m.Endpoints[r.ResourceName]; ok {
return &pluginapi.Empty{},
fmt.Errorf(pluginapi.ErrDevicePluginAlreadyExists)
}
// TODO: for now, always accepts newest device plugin. Later may consider to
// add some policies here, e.g., verify whether an old device plugin with the
// same resource name is still alive to determine whether we want to accept
// the new registration.
go m.addEndpoint(r)
return &pluginapi.Empty{}, nil
}
// Stop is the function that can stop the gRPC server
// Stop is the function that can stop the gRPC server.
func (m *ManagerImpl) Stop() error {
for _, e := range m.Endpoints {
e.stop()
}
m.server.Stop()
return nil
}
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
socketPath := filepath.Join(m.socketdir, r.Endpoint)
// Stops existing endpoint if there is any.
m.mutex.Lock()
old, ok := m.Endpoints[r.ResourceName]
m.mutex.Unlock()
if ok && old != nil {
old.stop()
}
socketPath := filepath.Join(m.socketdir, r.Endpoint)
e, err := newEndpoint(socketPath, r.ResourceName, m.callback)
if err != nil {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
@ -172,22 +206,15 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
e.listAndWatch(stream)
m.mutex.Lock()
e.mutex.Lock()
delete(m.Endpoints, r.ResourceName)
if old, ok := m.Endpoints[r.ResourceName]; ok && old == e {
delete(m.Endpoints, r.ResourceName)
}
glog.V(2).Infof("Unregistered endpoint %v", e)
e.mutex.Unlock()
m.mutex.Unlock()
}()
m.mutex.Lock()
e.mutex.Lock()
m.Endpoints[r.ResourceName] = e
glog.V(2).Infof("Registered endpoint %v", e)
e.mutex.Unlock()
m.mutex.Unlock()
}

View File

@ -49,18 +49,18 @@ func TestNewManagerImplStart(t *testing.T) {
require.NoError(t, err)
}
func setup(t *testing.T, devs []*pluginapi.Device, pluginSocket, serverSocket string, callback MonitorCallback) (Manager, *MockDevicePlugin) {
func setup(t *testing.T, devs []*pluginapi.Device, pluginSocket, serverSocket string, callback MonitorCallback) (Manager, *Stub) {
m, err := NewManagerImpl(serverSocket, callback)
require.NoError(t, err)
p := NewMockDevicePlugin(devs, pluginSocket)
p := NewDevicePluginStub(devs, pluginSocket)
err = p.Start()
require.NoError(t, err)
return m, p
}
func cleanup(t *testing.T, m Manager, p *MockDevicePlugin) {
func cleanup(t *testing.T, m Manager, p *Stub) {
p.Stop()
m.Stop()
}

View File

@ -17,58 +17,54 @@ limitations under the License.
package deviceplugin
import (
"sync"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// MonitorCallback is the function called when a device becomes
// unhealthy (or healthy again)
// Updated contains the most recent state of the Device
// MonitorCallback is the function called when a device's health state changes,
// or new devices are reported, or old devices are deleted.
// Updated contains the most recent state of the Device.
type MonitorCallback func(resourceName string, added, updated, deleted []*pluginapi.Device)
// Manager manages the Device Plugins running on a machine
// Manager manages all the Device Plugins running on a node.
type Manager interface {
// Start starts the gRPC service
// Start starts the gRPC Registration service.
Start() error
// Devices is the map of devices that have registered themselves
// against the manager.
// The map key is the ResourceName of the device plugins
// The map key is the ResourceName of the device plugins.
Devices() map[string][]*pluginapi.Device
// Allocate is calls the gRPC Allocate on the device plugin
Allocate(string, []*pluginapi.Device) (*pluginapi.AllocateResponse, error)
// Allocate takes resourceName and list of device Ids, and calls the
// gRPC Allocate on the device plugin matching the resourceName.
Allocate(string, []string) (*pluginapi.AllocateResponse, error)
// Stop stops the manager
// Stop stops the manager.
Stop() error
}
// ManagerImpl is the structure in charge of managing Device Plugins
type ManagerImpl struct {
socketname string
socketdir string
Endpoints map[string]*endpoint // Key is ResourceName
mutex sync.Mutex
callback MonitorCallback
server *grpc.Server
}
// TODO: evaluate whether we need these error definitions.
const (
// ErrDevicePluginUnknown is the error raised when the device Plugin returned by Monitor is not know by the Device Plugin manager
ErrDevicePluginUnknown = "Manager does not have device plugin for device:"
// ErrDeviceUnknown is the error raised when the device returned by Monitor is not know by the Device Plugin manager
ErrDeviceUnknown = "Could not find device in it's Device Plugin's Device List:"
// ErrBadSocket is the error raised when the registry socket path is not absolute
ErrBadSocket = "Bad socketPath, must be an absolute path:"
// ErrRemoveSocket is the error raised when the registry could not remove the existing socket
ErrRemoveSocket = "Failed to remove socket while starting device plugin registry, with error"
// ErrListenSocket is the error raised when the registry could not listen on the socket
ErrListenSocket = "Failed to listen to socket while starting device plugin registry, with error"
// ErrListAndWatch is the error raised when ListAndWatch ended unsuccessfully
ErrListAndWatch = "ListAndWatch ended unexpectedly for device plugin %s with error %v"
// errFailedToDialDevicePlugin is the error raised when the device plugin could not be
// reached on the registered socket
errFailedToDialDevicePlugin = "failed to dial device plugin:"
// errUnsuportedVersion is the error raised when the device plugin uses an API version not
// supported by the Kubelet registry
errUnsuportedVersion = "unsupported API version by the Kubelet registry"
// errDevicePluginAlreadyExists is the error raised when a device plugin with the
// same Resource Name tries to register itself
errDevicePluginAlreadyExists = "another device plugin already registered this Resource Name"
// errInvalidResourceName is the error raised when a device plugin is registering
// itself with an invalid ResourceName
errInvalidResourceName = "the ResourceName is invalid"
// errEmptyResourceName is the error raised when the resource name field is empty
errEmptyResourceName = "invalid Empty ResourceName"
// errBadSocket is the error raised when the registry socket path is not absolute
errBadSocket = "bad socketPath, must be an absolute path:"
// errRemoveSocket is the error raised when the registry could not remove the existing socket
errRemoveSocket = "failed to remove socket while starting device plugin registry, with error"
// errListenSocket is the error raised when the registry could not listen on the socket
errListenSocket = "failed to listen to socket while starting device plugin registry, with error"
// errListAndWatch is the error raised when ListAndWatch ended unsuccessfully
errListAndWatch = "listAndWatch ended unexpectedly for device plugin %s with error %v"
)

View File

@ -18,13 +18,13 @@ package deviceplugin
import (
"fmt"
"strings"
"k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// CloneDevice clones a pluginapi.Device
func CloneDevice(d *pluginapi.Device) *pluginapi.Device {
func cloneDevice(d *pluginapi.Device) *pluginapi.Device {
return &pluginapi.Device{
ID: d.ID,
Health: d.Health,
@ -36,41 +36,26 @@ func copyDevices(devs map[string]*pluginapi.Device) []*pluginapi.Device {
var clones []*pluginapi.Device
for _, d := range devs {
clones = append(clones, CloneDevice(d))
clones = append(clones, cloneDevice(d))
}
return clones
}
// GetDevice returns the Device if a boolean signaling if the device was found or not
func GetDevice(d *pluginapi.Device, devs []*pluginapi.Device) (*pluginapi.Device, bool) {
name := DeviceKey(d)
for _, d := range devs {
if DeviceKey(d) != name {
continue
}
return d, true
}
return nil, false
}
// IsResourceNameValid returns an error if the resource is invalid,
// IsResourceNameValid returns an error if the resource is invalid or is not an
// extended resource name.
func IsResourceNameValid(resourceName string) error {
if resourceName == "" {
return fmt.Errorf(pluginapi.ErrEmptyResourceName)
return fmt.Errorf(errEmptyResourceName)
}
if strings.ContainsAny(resourceName, pluginapi.InvalidChars) {
return fmt.Errorf(pluginapi.ErrInvalidResourceName)
if !IsDeviceName(v1.ResourceName(resourceName)) {
return fmt.Errorf(errInvalidResourceName)
}
return nil
}
// DeviceKey returns the Key of a device
func DeviceKey(d *pluginapi.Device) string {
return d.ID
// IsDeviceName returns whether the ResourceName points to an extended resource
// name exported by a device plugin.
func IsDeviceName(k v1.ResourceName) bool {
return v1helper.IsExtendedResourceName(k)
}

View File

@ -25,7 +25,7 @@ import (
)
func TestCloneDevice(t *testing.T) {
d := CloneDevice(&pluginapi.Device{ID: "ADeviceId", Health: pluginapi.Healthy})
d := cloneDevice(&pluginapi.Device{ID: "ADeviceId", Health: pluginapi.Healthy})
require.Equal(t, d.ID, "ADeviceId")
require.Equal(t, d.Health, pluginapi.Healthy)
@ -40,15 +40,14 @@ func TestCopyDevices(t *testing.T) {
require.Len(t, devs, 1)
}
func TestGetDevice(t *testing.T) {
devs := []*pluginapi.Device{
{ID: "ADeviceId", Health: pluginapi.Healthy},
}
_, ok := GetDevice(&pluginapi.Device{ID: "AnotherDeviceId"}, devs)
require.False(t, ok)
d, ok := GetDevice(&pluginapi.Device{ID: "ADeviceId"}, devs)
require.True(t, ok)
require.Equal(t, d, devs[0])
func TestIsResourceName(t *testing.T) {
require.NotNil(t, IsResourceNameValid(""))
require.NotNil(t, IsResourceNameValid("cpu"))
require.NotNil(t, IsResourceNameValid("name1"))
require.NotNil(t, IsResourceNameValid("alpha.kubernetes.io/name1"))
require.NotNil(t, IsResourceNameValid("beta.kubernetes.io/name1"))
require.NotNil(t, IsResourceNameValid("kubernetes.io/name1"))
require.Nil(t, IsResourceNameValid("domain1.io/name1"))
require.Nil(t, IsResourceNameValid("alpha.domain1.io/name1"))
require.Nil(t, IsResourceNameValid("beta.domain1.io/name1"))
}

View File

@ -754,15 +754,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err
}
if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) {
devicePluginHdlr, err := cm.NewDevicePluginHandler()
if err != nil {
return nil, err
}
klet.containerManager.SetDevicePluginHandler(devicePluginHdlr)
}
// If the experimentalMounterPathFlag is set, we do not want to
// check node capabilities since the mount path is not the default
if len(kubeCfg.ExperimentalMounterPath) != 0 {

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/util"
@ -596,6 +595,15 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
node.Status.Capacity[v1.ResourceEphemeralStorage] = initialCapacity[v1.ResourceEphemeralStorage]
}
}
initialCapacity := kl.containerManager.GetCapacity()
if initialCapacity != nil {
for k, v := range initialCapacity {
if v1helper.IsExtendedResourceName(k) {
node.Status.Capacity[k] = v
}
}
}
}
// Set Allocatable.
@ -622,27 +630,6 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
}
node.Status.Allocatable[k] = value
}
hdlr := kl.containerManager.GetDevicePluginHandler()
if hdlr == nil {
return
}
for k, v := range hdlr.Devices() {
key := v1.ResourceName(v1.ResourceOpaqueIntPrefix + k)
var n int64
n = 0
for _, d := range v {
if d.Health == pluginapi.Unhealthy {
continue
}
n++
}
node.Status.Capacity[key] = *resource.NewQuantity(n, resource.DecimalSI)
}
}
// Set versioninfo for the node.

View File

@ -352,8 +352,13 @@ func (kl *Kubelet) GetPodCgroupParent(pod *v1.Pod) string {
// the container runtime to set parameters for launching a container.
func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Container, podIP string) (*kubecontainer.RunContainerOptions, bool, error) {
useClusterFirstPolicy := false
opts, err := kl.containerManager.GetResources(pod, container, kl.GetActivePods())
if err != nil {
return nil, false, err
}
cgroupParent := kl.GetPodCgroupParent(pod)
opts := &kubecontainer.RunContainerOptions{CgroupParent: cgroupParent}
opts.CgroupParent = cgroupParent
hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
if err != nil {
return nil, false, err
@ -364,19 +369,23 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Contai
opts.PortMappings = kubecontainer.MakePortMappings(container)
// TODO(random-liu): Move following convert functions into pkg/kubelet/container
opts.Devices, err = kl.makeDevices(pod, container)
devices, err := kl.makeDevices(pod, container)
if err != nil {
return nil, false, err
}
opts.Devices = append(opts.Devices, devices...)
opts.Mounts, err = makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
mounts, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
if err != nil {
return nil, false, err
}
opts.Envs, err = kl.makeEnvironmentVariables(pod, container, podIP)
opts.Mounts = append(opts.Mounts, mounts...)
envs, err := kl.makeEnvironmentVariables(pod, container, podIP)
if err != nil {
return nil, false, err
}
opts.Envs = append(opts.Envs, envs...)
// Disabling adding TerminationMessagePath on Windows as these files would be mounted as docker volume and
// Docker for Windows has a bug where only directories can be mounted