diff --git a/pkg/kubelet/cm/device_plugin_handler.go b/pkg/kubelet/cm/device_plugin_handler.go index 8217e107d2a..afaa05c74bd 100644 --- a/pkg/kubelet/cm/device_plugin_handler.go +++ b/pkg/kubelet/cm/device_plugin_handler.go @@ -86,7 +86,7 @@ type DevicePluginHandler interface { // Start starts device plugin registration service. Start() error // Devices returns all of registered devices keyed by resourceName. - Devices() map[string][]*pluginapi.Device + Devices() map[string][]pluginapi.Device // Allocate attempts to allocate all of required extended resources for // the input container, issues an Allocate rpc request for each of such // resources, and returns their AllocateResponses on success. @@ -114,7 +114,7 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi allocatedDevices: make(map[string]podDevices), } - deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) { + deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) { var capacity = v1.ResourceList{} kept := append(updated, added...) if _, ok := handler.allDevices[resourceName]; !ok { @@ -155,7 +155,7 @@ func (h *DevicePluginHandlerImpl) Start() error { return h.devicePluginManager.Start() } -func (h *DevicePluginHandlerImpl) Devices() map[string][]*pluginapi.Device { +func (h *DevicePluginHandlerImpl) Devices() map[string][]pluginapi.Device { return h.devicePluginManager.Devices() } diff --git a/pkg/kubelet/cm/device_plugin_handler_stub.go b/pkg/kubelet/cm/device_plugin_handler_stub.go index 879997f5504..46cb40b1f3a 100644 --- a/pkg/kubelet/cm/device_plugin_handler_stub.go +++ b/pkg/kubelet/cm/device_plugin_handler_stub.go @@ -32,8 +32,8 @@ func (h *DevicePluginHandlerStub) Start() error { return nil } -func (h *DevicePluginHandlerStub) Devices() map[string][]*pluginapi.Device { - return make(map[string][]*pluginapi.Device) +func (h *DevicePluginHandlerStub) Devices() map[string][]pluginapi.Device { + return make(map[string][]pluginapi.Device) } func (h *DevicePluginHandlerStub) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) { diff --git a/pkg/kubelet/cm/device_plugin_handler_test.go b/pkg/kubelet/cm/device_plugin_handler_test.go index 99d375f65dc..f4d8768dd8a 100644 --- a/pkg/kubelet/cm/device_plugin_handler_test.go +++ b/pkg/kubelet/cm/device_plugin_handler_test.go @@ -41,7 +41,7 @@ func TestUpdateCapacity(t *testing.T) { as.NotNil(testDevicePluginHandler) as.Nil(err) - devs := []*pluginapi.Device{ + devs := []pluginapi.Device{ {ID: "Device1", Health: pluginapi.Healthy}, {ID: "Device2", Health: pluginapi.Healthy}, {ID: "Device3", Health: pluginapi.Unhealthy}, @@ -51,9 +51,9 @@ func TestUpdateCapacity(t *testing.T) { // Adds three devices for resource1, two healthy and one unhealthy. // Expects capacity for resource1 to be 2. expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI) - testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, devs, []*pluginapi.Device{}, []*pluginapi.Device{}) + testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{}) // Deletes an unhealthy device should NOT change capacity. - testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, []*pluginapi.Device{devs[2]}) + testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) // Updates a healthy device to unhealthy should reduce capacity by 1. expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI) // Deletes a healthy device should reduce capacity by 1. @@ -62,7 +62,7 @@ func TestUpdateCapacity(t *testing.T) { delete(expected, v1.ResourceName(resourceName)) resourceName2 := "resource2" expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI) - testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []*pluginapi.Device{}, []*pluginapi.Device{}) + testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) } type stringPairType struct { @@ -90,8 +90,8 @@ func (m *DevicePluginManagerTestStub) Start() error { return nil } -func (m *DevicePluginManagerTestStub) Devices() map[string][]*pluginapi.Device { - return make(map[string][]*pluginapi.Device) +func (m *DevicePluginManagerTestStub) Devices() map[string][]pluginapi.Device { + return make(map[string][]pluginapi.Device) } func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []string) (*pluginapi.AllocateResponse, error) { @@ -181,7 +181,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { m, err := NewDevicePluginManagerTestStub() as := assert.New(t) as.Nil(err) - monitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {} + monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} testDevicePluginHandler := &DevicePluginHandlerImpl{ devicePluginManager: m, diff --git a/pkg/kubelet/deviceplugin/BUILD b/pkg/kubelet/deviceplugin/BUILD index c6cb50c6258..987c76001a7 100644 --- a/pkg/kubelet/deviceplugin/BUILD +++ b/pkg/kubelet/deviceplugin/BUILD @@ -13,7 +13,6 @@ go_library( "endpoint.go", "manager.go", "types.go", - "utils.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/deviceplugin", deps = [ @@ -44,7 +43,6 @@ go_test( srcs = [ "endpoint_test.go", "manager_test.go", - "utils_test.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/deviceplugin", library = ":go_default_library", diff --git a/pkg/kubelet/deviceplugin/endpoint.go b/pkg/kubelet/deviceplugin/endpoint.go index fc55006a60d..dd534bc4ff6 100644 --- a/pkg/kubelet/deviceplugin/endpoint.go +++ b/pkg/kubelet/deviceplugin/endpoint.go @@ -38,7 +38,7 @@ type endpoint struct { socketPath string resourceName string - devices map[string]*pluginapi.Device + devices map[string]pluginapi.Device mutex sync.Mutex callback MonitorCallback @@ -71,10 +71,16 @@ func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*en }, nil } -func (e *endpoint) getDevices() []*pluginapi.Device { +func (e *endpoint) getDevices() []pluginapi.Device { e.mutex.Lock() defer e.mutex.Unlock() - return copyDevices(e.devices) + var devs []pluginapi.Device + + for _, d := range e.devices { + devs = append(devs, d) + } + + return devs } // list initializes ListAndWatch gRPC call for the device plugin and gets the @@ -94,11 +100,11 @@ func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { return nil, err } - devices := make(map[string]*pluginapi.Device) - var added, updated, deleted []*pluginapi.Device + devices := make(map[string]pluginapi.Device) + var added, updated, deleted []pluginapi.Device for _, d := range devs.Devices { - devices[d.ID] = d - added = append(added, cloneDevice(d)) + devices[d.ID] = *d + added = append(added, *d) } e.mutex.Lock() @@ -118,11 +124,11 @@ func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) { func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) { glog.V(3).Infof("Starting ListAndWatch") - devices := make(map[string]*pluginapi.Device) + devices := make(map[string]pluginapi.Device) e.mutex.Lock() for _, d := range e.devices { - devices[d.ID] = cloneDevice(d) + devices[d.ID] = d } e.mutex.Unlock() @@ -137,7 +143,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient glog.V(2).Infof("State pushed for device plugin %s", e.resourceName) newDevs := make(map[string]*pluginapi.Device) - var added, updated []*pluginapi.Device + var added, updated []pluginapi.Device for _, d := range devs { dOld, ok := devices[d.ID] @@ -146,8 +152,8 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient if !ok { glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d) - devices[d.ID] = d - added = append(added, cloneDevice(d)) + devices[d.ID] = *d + added = append(added, *d) continue } @@ -162,11 +168,11 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient glog.V(2).Infof("Device %s is now Healthy", d.ID) } - devices[d.ID] = d - updated = append(updated, cloneDevice(d)) + devices[d.ID] = *d + updated = append(updated, *d) } - var deleted []*pluginapi.Device + var deleted []pluginapi.Device for id, d := range devices { if _, ok := newDevs[id]; ok { continue @@ -174,7 +180,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient glog.Errorf("Device %s was deleted", d.ID) - deleted = append(deleted, cloneDevice(d)) + deleted = append(deleted, d) delete(devices, id) } diff --git a/pkg/kubelet/deviceplugin/endpoint_test.go b/pkg/kubelet/deviceplugin/endpoint_test.go index 362856c69a5..a7e458258e5 100644 --- a/pkg/kubelet/deviceplugin/endpoint_test.go +++ b/pkg/kubelet/deviceplugin/endpoint_test.go @@ -37,7 +37,7 @@ func TestNewEndpoint(t *testing.T) { {ID: "ADeviceId", Health: pluginapi.Healthy}, } - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) {}) + p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {}) defer ecleanup(t, p, e) } @@ -48,7 +48,7 @@ func TestList(t *testing.T) { {ID: "ADeviceId", Health: pluginapi.Healthy}, } - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) {}) + p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {}) defer ecleanup(t, p, e) _, err := e.list() @@ -79,7 +79,7 @@ func TestListAndWatch(t *testing.T) { {ID: "AThirdDeviceId", Health: pluginapi.Healthy}, } - p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) { + p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) { require.Len(t, a, 1) require.Len(t, u, 1) require.Len(t, r, 1) @@ -114,13 +114,23 @@ func TestListAndWatch(t *testing.T) { } +func TestGetDevices(t *testing.T) { + e := endpoint{ + devices: map[string]pluginapi.Device{ + "ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy}, + }, + } + devs := e.getDevices() + require.Len(t, devs, 1) +} + func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*Stub, *endpoint) { p := NewDevicePluginStub(devs, socket) err := p.Start() require.NoError(t, err) - e, err := newEndpoint(socket, "mock", func(n string, a, u, r []*pluginapi.Device) {}) + e, err := newEndpoint(socket, "mock", func(n string, a, u, r []pluginapi.Device) {}) require.NoError(t, err) return p, e diff --git a/pkg/kubelet/deviceplugin/manager.go b/pkg/kubelet/deviceplugin/manager.go index 6701dc63228..df9976784cf 100644 --- a/pkg/kubelet/deviceplugin/manager.go +++ b/pkg/kubelet/deviceplugin/manager.go @@ -27,6 +27,8 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" + "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/api/v1/helper" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" ) @@ -94,9 +96,14 @@ func (m *ManagerImpl) removeContents(dir string) error { return nil } +const ( + // defaultCheckpoint is the file name of device plugin checkpoint + defaultCheckpoint = "kubelet_internal_checkpoint" +) + // CheckpointFile returns device plugin checkpoint file path. func (m *ManagerImpl) CheckpointFile() string { - return filepath.Join(m.socketdir, "kubelet_internal_checkpoint") + return filepath.Join(m.socketdir, defaultCheckpoint) } // Start starts the Device Plugin Manager @@ -112,11 +119,6 @@ func (m *ManagerImpl) Start() error { glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err) } - if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { - glog.Errorf(errRemoveSocket+" %+v", err) - return err - } - s, err := net.Listen("unix", socketPath) if err != nil { glog.Errorf(errListenSocket+" %+v", err) @@ -133,11 +135,11 @@ func (m *ManagerImpl) Start() error { // Devices is the map of devices that are known by the Device // Plugin manager with the kind of the devices as key -func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device { +func (m *ManagerImpl) Devices() map[string][]pluginapi.Device { m.mutex.Lock() defer m.mutex.Unlock() - devs := make(map[string][]*pluginapi.Device) + devs := make(map[string][]pluginapi.Device) for k, e := range m.endpoints { glog.V(3).Infof("Endpoint: %+v: %+v", k, e) devs[k] = e.getDevices() @@ -174,8 +176,8 @@ func (m *ManagerImpl) Register(ctx context.Context, return &pluginapi.Empty{}, fmt.Errorf(errUnsuportedVersion) } - if err := IsResourceNameValid(r.ResourceName); err != nil { - return &pluginapi.Empty{}, err + if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) { + return &pluginapi.Empty{}, fmt.Errorf(errInvalidResourceName, r.ResourceName) } // TODO: for now, always accepts newest device plugin. Later may consider to @@ -231,7 +233,7 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { glog.V(2).Infof("Delete resource for endpoint %v", e) delete(m.endpoints, r.ResourceName) // Issues callback to delete all of devices. - e.callback(e.resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, e.getDevices()) + e.callback(e.resourceName, []pluginapi.Device{}, []pluginapi.Device{}, e.getDevices()) } glog.V(2).Infof("Unregistered endpoint %v", e) m.mutex.Unlock() diff --git a/pkg/kubelet/deviceplugin/manager_test.go b/pkg/kubelet/deviceplugin/manager_test.go index 835ce09a949..a9f05700b4b 100644 --- a/pkg/kubelet/deviceplugin/manager_test.go +++ b/pkg/kubelet/deviceplugin/manager_test.go @@ -33,15 +33,15 @@ const ( ) func TestNewManagerImpl(t *testing.T) { - _, err := NewManagerImpl("", func(n string, a, u, r []*pluginapi.Device) {}) + _, err := NewManagerImpl("", func(n string, a, u, r []pluginapi.Device) {}) require.Error(t, err) - _, err = NewManagerImpl(socketName, func(n string, a, u, r []*pluginapi.Device) {}) + _, err = NewManagerImpl(socketName, func(n string, a, u, r []pluginapi.Device) {}) require.NoError(t, err) } func TestNewManagerImplStart(t *testing.T) { - m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {}) + m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}) cleanup(t, m, p) } @@ -58,7 +58,7 @@ func TestDevicePluginReRegistration(t *testing.T) { callbackChan := make(chan int) var stopping int32 stopping = 0 - callback := func(n string, a, u, r []*pluginapi.Device) { + callback := func(n string, a, u, r []pluginapi.Device) { // Should be called twice, one for each plugin registration, till we are stopping. if callbackCount > 1 && atomic.LoadInt32(&stopping) <= 0 { t.FailNow() diff --git a/pkg/kubelet/deviceplugin/types.go b/pkg/kubelet/deviceplugin/types.go index 6ef8046687f..9d0baffc015 100644 --- a/pkg/kubelet/deviceplugin/types.go +++ b/pkg/kubelet/deviceplugin/types.go @@ -23,7 +23,7 @@ import ( // MonitorCallback is the function called when a device's health state changes, // or new devices are reported, or old devices are deleted. // Updated contains the most recent state of the Device. -type MonitorCallback func(resourceName string, added, updated, deleted []*pluginapi.Device) +type MonitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device) // Manager manages all the Device Plugins running on a node. type Manager interface { @@ -33,7 +33,7 @@ type Manager interface { // Devices is the map of devices that have registered themselves // against the manager. // The map key is the ResourceName of the device plugins. - Devices() map[string][]*pluginapi.Device + Devices() map[string][]pluginapi.Device // Allocate takes resourceName and list of device Ids, and calls the // gRPC Allocate on the device plugin matching the resourceName. @@ -59,7 +59,7 @@ const ( errDevicePluginAlreadyExists = "another device plugin already registered this Resource Name" // errInvalidResourceName is the error raised when a device plugin is registering // itself with an invalid ResourceName - errInvalidResourceName = "the ResourceName is invalid" + errInvalidResourceName = "the ResourceName %s is invalid" // errEmptyResourceName is the error raised when the resource name field is empty errEmptyResourceName = "invalid Empty ResourceName" diff --git a/pkg/kubelet/deviceplugin/utils.go b/pkg/kubelet/deviceplugin/utils.go deleted file mode 100644 index ce7f19665cb..00000000000 --- a/pkg/kubelet/deviceplugin/utils.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package deviceplugin - -import ( - "fmt" - - "k8s.io/api/core/v1" - v1helper "k8s.io/kubernetes/pkg/api/v1/helper" - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" -) - -func cloneDevice(d *pluginapi.Device) *pluginapi.Device { - return &pluginapi.Device{ - ID: d.ID, - Health: d.Health, - } - -} - -func copyDevices(devs map[string]*pluginapi.Device) []*pluginapi.Device { - var clones []*pluginapi.Device - - for _, d := range devs { - clones = append(clones, cloneDevice(d)) - } - - return clones -} - -// IsResourceNameValid returns an error if the resource is invalid or is not an -// extended resource name. -func IsResourceNameValid(resourceName string) error { - if resourceName == "" { - return fmt.Errorf(errEmptyResourceName) - } - if !IsDeviceName(v1.ResourceName(resourceName)) { - return fmt.Errorf(errInvalidResourceName) - } - return nil -} - -// IsDeviceName returns whether the ResourceName points to an extended resource -// name exported by a device plugin. -func IsDeviceName(k v1.ResourceName) bool { - return v1helper.IsExtendedResourceName(k) -} diff --git a/pkg/kubelet/deviceplugin/utils_test.go b/pkg/kubelet/deviceplugin/utils_test.go deleted file mode 100644 index 99156d6475d..00000000000 --- a/pkg/kubelet/deviceplugin/utils_test.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package deviceplugin - -import ( - "testing" - - "github.com/stretchr/testify/require" - - pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" -) - -func TestCloneDevice(t *testing.T) { - d := cloneDevice(&pluginapi.Device{ID: "ADeviceId", Health: pluginapi.Healthy}) - - require.Equal(t, d.ID, "ADeviceId") - require.Equal(t, d.Health, pluginapi.Healthy) -} - -func TestCopyDevices(t *testing.T) { - d := map[string]*pluginapi.Device{ - "ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy}, - } - - devs := copyDevices(d) - require.Len(t, devs, 1) -} - -func TestIsResourceName(t *testing.T) { - require.NotNil(t, IsResourceNameValid("")) - require.NotNil(t, IsResourceNameValid("cpu")) - require.NotNil(t, IsResourceNameValid("name1")) - require.NotNil(t, IsResourceNameValid("alpha.kubernetes.io/name1")) - require.NotNil(t, IsResourceNameValid("beta.kubernetes.io/name1")) - require.NotNil(t, IsResourceNameValid("kubernetes.io/name1")) - require.Nil(t, IsResourceNameValid("domain1.io/name1")) - require.Nil(t, IsResourceNameValid("alpha.domain1.io/name1")) - require.Nil(t, IsResourceNameValid("beta.domain1.io/name1")) -}