Added Device Plugin Manager

This commit is contained in:
Renaud Gaubert 2017-08-10 17:14:34 -07:00 committed by Jiaying Zhang
parent 44c5182187
commit b563101efb
7 changed files with 596 additions and 0 deletions

View File

@ -253,6 +253,7 @@ filegroup(
"//pkg/kubelet/configmap:all-srcs",
"//pkg/kubelet/container:all-srcs",
"//pkg/kubelet/custommetrics:all-srcs",
"//pkg/kubelet/deviceplugin:all-srcs",
"//pkg/kubelet/dockershim:all-srcs",
"//pkg/kubelet/envvars:all-srcs",
"//pkg/kubelet/events:all-srcs",

View File

@ -30,4 +30,22 @@ const (
DevicePluginPath = "/var/lib/kubelet/device-plugins/"
// KubeletSocket is the path of the Kubelet registry socket
KubeletSocket = DevicePluginPath + "kubelet.sock"
// InvalidChars are the characters that may not appear in a Vendor or Kind field
InvalidChars = "/ "
// ErrFailedToDialDevicePlugin is the error raised when the device plugin could not be
// reached on the registered socket
ErrFailedToDialDevicePlugin = "Failed to dial device plugin:"
// ErrUnsuportedVersion is the error raised when the device plugin uses an API version not
// supported by the Kubelet registry
ErrUnsuportedVersion = "Unsupported version"
// ErrDevicePluginAlreadyExists is the error raised when a device plugin with the
// same Resource Name tries to register itself
ErrDevicePluginAlreadyExists = "Another device plugin already registered this Resource Name"
// ErrInvalidResourceName is the error raised when a device plugin is registering
// itself with an invalid ResourceName
ErrInvalidResourceName = "The Resource Name is invalid"
// ErrEmptyResourceName is the error raised when the resource name field is empty
ErrEmptyResourceName = "Invalid Empty ResourceName"
)

View File

@ -0,0 +1,38 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = [
"endpoint.go",
"manager.go",
"types.go",
"utils.go",
],
tags = ["automanaged"],
deps = [
"//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,196 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deviceplugin
import (
"fmt"
"net"
"sync"
"time"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
type endpoint struct {
client pluginapi.DevicePluginClient
socketPath string
resourceName string
devices map[string]*pluginapi.Device
mutex sync.Mutex
callback MonitorCallback
cancel context.CancelFunc
ctx context.Context
}
func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) {
client, err := dial(socketPath)
if err != nil {
return nil, err
}
ctx, stop := context.WithCancel(context.Background())
return &endpoint{
client: client,
socketPath: socketPath,
resourceName: resourceName,
devices: nil,
callback: callback,
cancel: stop,
ctx: ctx,
}, nil
}
func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
glog.V(2).Infof("Starting ListAndWatch")
stream, err := e.client.ListAndWatch(e.ctx, &pluginapi.Empty{})
if err != nil {
glog.Errorf(ErrListAndWatch, e.resourceName, err)
return nil, err
}
devs, err := stream.Recv()
if err != nil {
glog.Errorf(ErrListAndWatch, e.resourceName, err)
return nil, err
}
devices := make(map[string]*pluginapi.Device)
for _, d := range devs.Devices {
devices[d.ID] = d
}
e.mutex.Lock()
e.devices = devices
e.mutex.Unlock()
return stream, nil
}
func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) {
glog.V(2).Infof("Starting ListAndWatch")
devices := make(map[string]*pluginapi.Device)
e.mutex.Lock()
for _, d := range e.devices {
devices[d.ID] = CloneDevice(d)
}
e.mutex.Unlock()
for {
response, err := stream.Recv()
if err != nil {
glog.Errorf(ErrListAndWatch, e.resourceName, err)
return
}
devs := response.Devices
glog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
newDevs := make(map[string]*pluginapi.Device)
var added, updated []*pluginapi.Device
for _, d := range devs {
dOld, ok := devices[d.ID]
newDevs[d.ID] = d
if !ok {
glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d)
devices[d.ID] = d
added = append(added, CloneDevice(d))
continue
}
if d.Health == dOld.Health {
continue
}
if d.Health == pluginapi.Unhealthy {
glog.Errorf("Device %s is now Unhealthy", d.ID)
} else if d.Health == pluginapi.Healthy {
glog.V(2).Infof("Device %s is now Healthy", d.ID)
}
devices[d.ID] = d
updated = append(updated, CloneDevice(d))
}
var deleted []*pluginapi.Device
for id, d := range devices {
if _, ok := newDevs[id]; ok {
continue
}
glog.Errorf("Device %s was deleted", d.ID)
deleted = append(deleted, CloneDevice(d))
delete(devices, id)
}
e.mutex.Lock()
e.devices = devices
e.mutex.Unlock()
e.callback(e.resourceName, added, updated, deleted)
}
}
func (e *endpoint) allocate(devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) {
var ids []string
for _, d := range devs {
ids = append(ids, d.ID)
}
return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
DevicesIDs: ids,
})
}
func (e *endpoint) stop() {
e.cancel()
}
func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) {
c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, fmt.Errorf(pluginapi.ErrFailedToDialDevicePlugin+" %v", err)
}
return pluginapi.NewDevicePluginClient(c), nil
}

View File

@ -0,0 +1,193 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deviceplugin
import (
"fmt"
"net"
"os"
"path/filepath"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// NewManagerImpl creates a new manager on the socket `socketPath` and can
// rebuild state from devices and available []Device.
// f is the callback that is called when a device becomes unhealthy
// socketPath is present for testing purposes in production this is pluginapi.KubeletSocket
func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) {
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(ErrBadSocket+" %v", socketPath)
}
dir, file := filepath.Split(socketPath)
return &ManagerImpl{
Endpoints: make(map[string]*endpoint),
socketname: file,
socketdir: dir,
callback: f,
}, nil
}
// Start starts the Device Plugin Manager
func (m *ManagerImpl) Start() error {
glog.V(2).Infof("Starting Device Plugin manager")
socketPath := filepath.Join(m.socketdir, m.socketname)
os.MkdirAll(m.socketdir, 0755)
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
glog.Errorf(ErrRemoveSocket+" %+v", err)
return err
}
s, err := net.Listen("unix", socketPath)
if err != nil {
glog.Errorf(ErrListenSocket+" %+v", err)
return err
}
m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterRegistrationServer(m.server, m)
go m.server.Serve(s)
return nil
}
// Devices is the map of devices that are known by the Device
// Plugin manager with the Kind of the devices as key
func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device {
glog.V(2).Infof("Devices called")
m.mutex.Lock()
defer m.mutex.Unlock()
devs := make(map[string][]*pluginapi.Device)
for k, e := range m.Endpoints {
glog.V(2).Infof("Endpoint: %+v: %+v", k, e)
e.mutex.Lock()
devs[k] = copyDevices(e.devices)
e.mutex.Unlock()
}
return devs
}
// Allocate is the call that you can use to allocate a set of Devices
func (m *ManagerImpl) Allocate(resourceName string,
devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if len(devs) == 0 {
return nil, nil
}
glog.Infof("Recieved request for devices %v for device plugin %s",
devs, resourceName)
e, ok := m.Endpoints[resourceName]
if !ok {
return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName)
}
return e.allocate(devs)
}
// Register registers a device plugin
func (m *ManagerImpl) Register(ctx context.Context,
r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
glog.V(2).Infof("Got request for Device Plugin %s", r.ResourceName)
if r.Version != pluginapi.Version {
return &pluginapi.Empty{},
fmt.Errorf(pluginapi.ErrUnsuportedVersion)
}
if err := IsResourceNameValid(r.ResourceName); err != nil {
return &pluginapi.Empty{}, err
}
if _, ok := m.Endpoints[r.ResourceName]; ok {
return &pluginapi.Empty{},
fmt.Errorf(pluginapi.ErrDevicePluginAlreadyExists)
}
go m.addEndpoint(r)
return &pluginapi.Empty{}, nil
}
// Stop is the function that can stop the gRPC server
func (m *ManagerImpl) Stop() error {
for _, e := range m.Endpoints {
e.stop()
}
m.server.Stop()
return nil
}
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
socketPath := filepath.Join(m.socketdir, r.Endpoint)
e, err := newEndpoint(socketPath, r.ResourceName, m.callback)
if err != nil {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
stream, err := e.list()
if err != nil {
glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err)
return
}
go func() {
e.listAndWatch(stream)
m.mutex.Lock()
e.mutex.Lock()
delete(m.Endpoints, r.ResourceName)
glog.V(2).Infof("Unregistered endpoint %v", e)
e.mutex.Unlock()
m.mutex.Unlock()
}()
m.mutex.Lock()
e.mutex.Lock()
m.Endpoints[r.ResourceName] = e
glog.V(2).Infof("Registered endpoint %v", e)
e.mutex.Unlock()
m.mutex.Unlock()
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deviceplugin
import (
"sync"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// MonitorCallback is the function called when a device becomes
// unhealthy (or healthy again)
// Updated contains the most recent state of the Device
type MonitorCallback func(resourceName string, added, updated, deleted []*pluginapi.Device)
// Manager manages the Device Plugins running on a machine
type Manager interface {
// Start starts the gRPC service
Start() error
// Devices is the map of devices that have registered themselves
// against the manager.
// The map key is the ResourceName of the device plugins
Devices() map[string][]*pluginapi.Device
// Allocate is calls the gRPC Allocate on the device plugin
Allocate(string, []*pluginapi.Device) (*pluginapi.AllocateResponse, error)
// Stop stops the manager
Stop() error
}
// ManagerImpl is the structure in charge of managing Device Plugins
type ManagerImpl struct {
socketname string
socketdir string
Endpoints map[string]*endpoint // Key is ResourceName
mutex sync.Mutex
callback MonitorCallback
server *grpc.Server
}
const (
// ErrDevicePluginUnknown is the error raised when the device Plugin returned by Monitor is not know by the Device Plugin manager
ErrDevicePluginUnknown = "Manager does not have device plugin for device:"
// ErrDeviceUnknown is the error raised when the device returned by Monitor is not know by the Device Plugin manager
ErrDeviceUnknown = "Could not find device in it's Device Plugin's Device List:"
// ErrBadSocket is the error raised when the registry socket path is not absolute
ErrBadSocket = "Bad socketPath, must be an absolute path:"
// ErrRemoveSocket is the error raised when the registry could not remove the existing socket
ErrRemoveSocket = "Failed to remove socket while starting device plugin registry, with error"
// ErrListenSocket is the error raised when the registry could not listen on the socket
ErrListenSocket = "Failed to listen to socket while starting device plugin registry, with error"
// ErrListAndWatch is the error raised when ListAndWatch ended unsuccessfully
ErrListAndWatch = "ListAndWatch ended unexpectedly for device plugin %s with error %v"
)

View File

@ -0,0 +1,76 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deviceplugin
import (
"fmt"
"strings"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1"
)
// CloneDevice clones a pluginapi.Device
func CloneDevice(d *pluginapi.Device) *pluginapi.Device {
return &pluginapi.Device{
ID: d.ID,
Health: d.Health,
}
}
func copyDevices(devs map[string]*pluginapi.Device) []*pluginapi.Device {
var clones []*pluginapi.Device
for _, d := range devs {
clones = append(clones, CloneDevice(d))
}
return clones
}
// GetDevice returns the Device if a boolean signaling if the device was found or not
func GetDevice(d *pluginapi.Device, devs []*pluginapi.Device) (*pluginapi.Device, bool) {
name := DeviceKey(d)
for _, d := range devs {
if DeviceKey(d) != name {
continue
}
return d, true
}
return nil, false
}
// IsResourceNameValid returns an error if the resource is invalid,
func IsResourceNameValid(resourceName string) error {
if resourceName == "" {
return fmt.Errorf(pluginapi.ErrEmptyResourceName)
}
if strings.ContainsAny(resourceName, pluginapi.InvalidChars) {
return fmt.Errorf(pluginapi.ErrInvalidResourceName)
}
return nil
}
// DeviceKey returns the Key of a device
func DeviceKey(d *pluginapi.Device) string {
return d.ID
}