From b563101efbb09d9d63512e53672ce06eb1694d2e Mon Sep 17 00:00:00 2001 From: Renaud Gaubert Date: Thu, 10 Aug 2017 17:14:34 -0700 Subject: [PATCH] Added Device Plugin Manager --- pkg/kubelet/BUILD | 1 + .../apis/deviceplugin/v1alpha1/constants.go | 18 ++ pkg/kubelet/deviceplugin/BUILD | 38 ++++ pkg/kubelet/deviceplugin/endpoint.go | 196 ++++++++++++++++++ pkg/kubelet/deviceplugin/manager.go | 193 +++++++++++++++++ pkg/kubelet/deviceplugin/types.go | 74 +++++++ pkg/kubelet/deviceplugin/utils.go | 76 +++++++ 7 files changed, 596 insertions(+) create mode 100644 pkg/kubelet/deviceplugin/BUILD create mode 100644 pkg/kubelet/deviceplugin/endpoint.go create mode 100644 pkg/kubelet/deviceplugin/manager.go create mode 100644 pkg/kubelet/deviceplugin/types.go create mode 100644 pkg/kubelet/deviceplugin/utils.go diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index 979ef5e2d22..257cea02d9a 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -253,6 +253,7 @@ filegroup( "//pkg/kubelet/configmap:all-srcs", "//pkg/kubelet/container:all-srcs", "//pkg/kubelet/custommetrics:all-srcs", + "//pkg/kubelet/deviceplugin:all-srcs", "//pkg/kubelet/dockershim:all-srcs", "//pkg/kubelet/envvars:all-srcs", "//pkg/kubelet/events:all-srcs", diff --git a/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go b/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go index fb0440cce59..b771ec8c960 100644 --- a/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go +++ b/pkg/kubelet/apis/deviceplugin/v1alpha1/constants.go @@ -30,4 +30,22 @@ const ( DevicePluginPath = "/var/lib/kubelet/device-plugins/" // KubeletSocket is the path of the Kubelet registry socket KubeletSocket = DevicePluginPath + "kubelet.sock" + + // InvalidChars are the characters that may not appear in a Vendor or Kind field + InvalidChars = "/ " + + // ErrFailedToDialDevicePlugin is the error raised when the device plugin could not be + // reached on the registered socket + ErrFailedToDialDevicePlugin = "Failed to dial device plugin:" + // ErrUnsuportedVersion is the error raised when the device plugin uses an API version not + // supported by the Kubelet registry + ErrUnsuportedVersion = "Unsupported version" + // ErrDevicePluginAlreadyExists is the error raised when a device plugin with the + // same Resource Name tries to register itself + ErrDevicePluginAlreadyExists = "Another device plugin already registered this Resource Name" + // ErrInvalidResourceName is the error raised when a device plugin is registering + // itself with an invalid ResourceName + ErrInvalidResourceName = "The Resource Name is invalid" + // ErrEmptyResourceName is the error raised when the resource name field is empty + ErrEmptyResourceName = "Invalid Empty ResourceName" ) diff --git a/pkg/kubelet/deviceplugin/BUILD b/pkg/kubelet/deviceplugin/BUILD new file mode 100644 index 00000000000..30e5891b2b3 --- /dev/null +++ b/pkg/kubelet/deviceplugin/BUILD @@ -0,0 +1,38 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = [ + "endpoint.go", + "manager.go", + "types.go", + "utils.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/google.golang.org/grpc:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/kubelet/deviceplugin/endpoint.go b/pkg/kubelet/deviceplugin/endpoint.go new file mode 100644 index 00000000000..2e418c56843 --- /dev/null +++ b/pkg/kubelet/deviceplugin/endpoint.go @@ -0,0 +1,196 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +type endpoint struct { + client pluginapi.DevicePluginClient + + socketPath string + resourceName string + + devices map[string]*pluginapi.Device + mutex sync.Mutex + + callback MonitorCallback + + cancel context.CancelFunc + ctx context.Context +} + +func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*endpoint, error) { + client, err := dial(socketPath) + if err != nil { + return nil, err + } + + ctx, stop := context.WithCancel(context.Background()) + + return &endpoint{ + client: client, + + socketPath: socketPath, + resourceName: resourceName, + + devices: nil, + callback: callback, + + cancel: stop, + ctx: ctx, + }, nil +} + +func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { + glog.V(2).Infof("Starting ListAndWatch") + + stream, err := e.client.ListAndWatch(e.ctx, &pluginapi.Empty{}) + if err != nil { + glog.Errorf(ErrListAndWatch, e.resourceName, err) + + return nil, err + } + + devs, err := stream.Recv() + if err != nil { + glog.Errorf(ErrListAndWatch, e.resourceName, err) + return nil, err + } + + devices := make(map[string]*pluginapi.Device) + for _, d := range devs.Devices { + devices[d.ID] = d + } + + e.mutex.Lock() + e.devices = devices + e.mutex.Unlock() + + return stream, nil +} + +func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) { + glog.V(2).Infof("Starting ListAndWatch") + + devices := make(map[string]*pluginapi.Device) + + e.mutex.Lock() + for _, d := range e.devices { + devices[d.ID] = CloneDevice(d) + } + e.mutex.Unlock() + + for { + response, err := stream.Recv() + if err != nil { + glog.Errorf(ErrListAndWatch, e.resourceName, err) + return + } + + devs := response.Devices + glog.V(2).Infof("State pushed for device plugin %s", e.resourceName) + + newDevs := make(map[string]*pluginapi.Device) + var added, updated []*pluginapi.Device + + for _, d := range devs { + dOld, ok := devices[d.ID] + newDevs[d.ID] = d + + if !ok { + glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d) + + devices[d.ID] = d + added = append(added, CloneDevice(d)) + + continue + } + + if d.Health == dOld.Health { + continue + } + + if d.Health == pluginapi.Unhealthy { + glog.Errorf("Device %s is now Unhealthy", d.ID) + } else if d.Health == pluginapi.Healthy { + glog.V(2).Infof("Device %s is now Healthy", d.ID) + } + + devices[d.ID] = d + updated = append(updated, CloneDevice(d)) + } + + var deleted []*pluginapi.Device + for id, d := range devices { + if _, ok := newDevs[id]; ok { + continue + } + + glog.Errorf("Device %s was deleted", d.ID) + + deleted = append(deleted, CloneDevice(d)) + delete(devices, id) + } + + e.mutex.Lock() + e.devices = devices + e.mutex.Unlock() + + e.callback(e.resourceName, added, updated, deleted) + } + +} + +func (e *endpoint) allocate(devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) { + var ids []string + for _, d := range devs { + ids = append(ids, d.ID) + } + + return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{ + DevicesIDs: ids, + }) +} + +func (e *endpoint) stop() { + e.cancel() +} + +func dial(unixSocketPath string) (pluginapi.DevicePluginClient, error) { + c, err := grpc.Dial(unixSocketPath, grpc.WithInsecure(), + grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout("unix", addr, timeout) + }), + ) + + if err != nil { + return nil, fmt.Errorf(pluginapi.ErrFailedToDialDevicePlugin+" %v", err) + } + + return pluginapi.NewDevicePluginClient(c), nil +} diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/deviceplugin/manager.go new file mode 100644 index 00000000000..f941b38c626 --- /dev/null +++ b/pkg/kubelet/deviceplugin/manager.go @@ -0,0 +1,193 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "fmt" + "net" + "os" + "path/filepath" + + "github.com/golang/glog" + "golang.org/x/net/context" + "google.golang.org/grpc" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +// NewManagerImpl creates a new manager on the socket `socketPath` and can +// rebuild state from devices and available []Device. +// f is the callback that is called when a device becomes unhealthy +// socketPath is present for testing purposes in production this is pluginapi.KubeletSocket +func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) { + glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) + + if socketPath == "" || !filepath.IsAbs(socketPath) { + return nil, fmt.Errorf(ErrBadSocket+" %v", socketPath) + } + + dir, file := filepath.Split(socketPath) + return &ManagerImpl{ + Endpoints: make(map[string]*endpoint), + + socketname: file, + socketdir: dir, + callback: f, + }, nil +} + +// Start starts the Device Plugin Manager +func (m *ManagerImpl) Start() error { + glog.V(2).Infof("Starting Device Plugin manager") + + socketPath := filepath.Join(m.socketdir, m.socketname) + os.MkdirAll(m.socketdir, 0755) + + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + glog.Errorf(ErrRemoveSocket+" %+v", err) + return err + } + + s, err := net.Listen("unix", socketPath) + if err != nil { + glog.Errorf(ErrListenSocket+" %+v", err) + return err + } + + m.server = grpc.NewServer([]grpc.ServerOption{}...) + + pluginapi.RegisterRegistrationServer(m.server, m) + go m.server.Serve(s) + + return nil +} + +// Devices is the map of devices that are known by the Device +// Plugin manager with the Kind of the devices as key +func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device { + glog.V(2).Infof("Devices called") + + m.mutex.Lock() + defer m.mutex.Unlock() + + devs := make(map[string][]*pluginapi.Device) + for k, e := range m.Endpoints { + glog.V(2).Infof("Endpoint: %+v: %+v", k, e) + e.mutex.Lock() + devs[k] = copyDevices(e.devices) + e.mutex.Unlock() + } + + return devs +} + +// Allocate is the call that you can use to allocate a set of Devices +func (m *ManagerImpl) Allocate(resourceName string, + devs []*pluginapi.Device) (*pluginapi.AllocateResponse, error) { + + m.mutex.Lock() + defer m.mutex.Unlock() + + if len(devs) == 0 { + return nil, nil + } + + glog.Infof("Recieved request for devices %v for device plugin %s", + devs, resourceName) + + e, ok := m.Endpoints[resourceName] + if !ok { + return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName) + } + + return e.allocate(devs) +} + +// Register registers a device plugin +func (m *ManagerImpl) Register(ctx context.Context, + r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) { + + glog.V(2).Infof("Got request for Device Plugin %s", r.ResourceName) + + if r.Version != pluginapi.Version { + return &pluginapi.Empty{}, + fmt.Errorf(pluginapi.ErrUnsuportedVersion) + } + + if err := IsResourceNameValid(r.ResourceName); err != nil { + return &pluginapi.Empty{}, err + } + + if _, ok := m.Endpoints[r.ResourceName]; ok { + return &pluginapi.Empty{}, + fmt.Errorf(pluginapi.ErrDevicePluginAlreadyExists) + } + + go m.addEndpoint(r) + + return &pluginapi.Empty{}, nil +} + +// Stop is the function that can stop the gRPC server +func (m *ManagerImpl) Stop() error { + for _, e := range m.Endpoints { + e.stop() + } + + m.server.Stop() + + return nil +} + +func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { + socketPath := filepath.Join(m.socketdir, r.Endpoint) + + e, err := newEndpoint(socketPath, r.ResourceName, m.callback) + if err != nil { + glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) + return + } + + stream, err := e.list() + if err != nil { + glog.Errorf("Failed to List devices for plugin %v: %v", r.ResourceName, err) + return + } + + go func() { + e.listAndWatch(stream) + + m.mutex.Lock() + e.mutex.Lock() + + delete(m.Endpoints, r.ResourceName) + glog.V(2).Infof("Unregistered endpoint %v", e) + + e.mutex.Unlock() + m.mutex.Unlock() + }() + + m.mutex.Lock() + e.mutex.Lock() + + m.Endpoints[r.ResourceName] = e + glog.V(2).Infof("Registered endpoint %v", e) + + e.mutex.Unlock() + m.mutex.Unlock() + +} diff --git a/pkg/kubelet/deviceplugin/types.go b/pkg/kubelet/deviceplugin/types.go new file mode 100644 index 00000000000..99bc8c0771c --- /dev/null +++ b/pkg/kubelet/deviceplugin/types.go @@ -0,0 +1,74 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "sync" + + "google.golang.org/grpc" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +// MonitorCallback is the function called when a device becomes +// unhealthy (or healthy again) +// Updated contains the most recent state of the Device +type MonitorCallback func(resourceName string, added, updated, deleted []*pluginapi.Device) + +// Manager manages the Device Plugins running on a machine +type Manager interface { + // Start starts the gRPC service + Start() error + // Devices is the map of devices that have registered themselves + // against the manager. + // The map key is the ResourceName of the device plugins + Devices() map[string][]*pluginapi.Device + + // Allocate is calls the gRPC Allocate on the device plugin + Allocate(string, []*pluginapi.Device) (*pluginapi.AllocateResponse, error) + + // Stop stops the manager + Stop() error +} + +// ManagerImpl is the structure in charge of managing Device Plugins +type ManagerImpl struct { + socketname string + socketdir string + + Endpoints map[string]*endpoint // Key is ResourceName + mutex sync.Mutex + + callback MonitorCallback + + server *grpc.Server +} + +const ( + // ErrDevicePluginUnknown is the error raised when the device Plugin returned by Monitor is not know by the Device Plugin manager + ErrDevicePluginUnknown = "Manager does not have device plugin for device:" + // ErrDeviceUnknown is the error raised when the device returned by Monitor is not know by the Device Plugin manager + ErrDeviceUnknown = "Could not find device in it's Device Plugin's Device List:" + // ErrBadSocket is the error raised when the registry socket path is not absolute + ErrBadSocket = "Bad socketPath, must be an absolute path:" + // ErrRemoveSocket is the error raised when the registry could not remove the existing socket + ErrRemoveSocket = "Failed to remove socket while starting device plugin registry, with error" + // ErrListenSocket is the error raised when the registry could not listen on the socket + ErrListenSocket = "Failed to listen to socket while starting device plugin registry, with error" + // ErrListAndWatch is the error raised when ListAndWatch ended unsuccessfully + ErrListAndWatch = "ListAndWatch ended unexpectedly for device plugin %s with error %v" +) diff --git a/pkg/kubelet/deviceplugin/utils.go b/pkg/kubelet/deviceplugin/utils.go new file mode 100644 index 00000000000..57a6b0a71c6 --- /dev/null +++ b/pkg/kubelet/deviceplugin/utils.go @@ -0,0 +1,76 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceplugin + +import ( + "fmt" + "strings" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha1" +) + +// CloneDevice clones a pluginapi.Device +func CloneDevice(d *pluginapi.Device) *pluginapi.Device { + return &pluginapi.Device{ + ID: d.ID, + Health: d.Health, + } + +} + +func copyDevices(devs map[string]*pluginapi.Device) []*pluginapi.Device { + var clones []*pluginapi.Device + + for _, d := range devs { + clones = append(clones, CloneDevice(d)) + } + + return clones +} + +// GetDevice returns the Device if a boolean signaling if the device was found or not +func GetDevice(d *pluginapi.Device, devs []*pluginapi.Device) (*pluginapi.Device, bool) { + name := DeviceKey(d) + + for _, d := range devs { + if DeviceKey(d) != name { + continue + } + + return d, true + } + + return nil, false +} + +// IsResourceNameValid returns an error if the resource is invalid, +func IsResourceNameValid(resourceName string) error { + if resourceName == "" { + return fmt.Errorf(pluginapi.ErrEmptyResourceName) + } + + if strings.ContainsAny(resourceName, pluginapi.InvalidChars) { + return fmt.Errorf(pluginapi.ErrInvalidResourceName) + } + + return nil +} + +// DeviceKey returns the Key of a device +func DeviceKey(d *pluginapi.Device) string { + return d.ID +}