diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 8d467bf2054..64728a7d6b1 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -95,7 +95,11 @@ type ContainerManager interface { // GetPodCgroupRoot returns the cgroup which contains all pods. GetPodCgroupRoot() string - GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn + + // GetPluginRegistrationHandler returns a plugin registration handler + // The pluginwatcher's Handlers allow to have a single module for handling + // registration. + GetPluginRegistrationHandler() pluginwatcher.PluginHandler } type NodeConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 2f460ba8cba..132e2de9abe 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -605,8 +605,8 @@ func (cm *containerManagerImpl) Start(node *v1.Node, return nil } -func (cm *containerManagerImpl) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn { - return cm.deviceManager.GetWatcherCallback() +func (cm *containerManagerImpl) GetPluginRegistrationHandler() pluginwatcher.PluginHandler { + return cm.deviceManager.GetWatcherHandler() } // TODO: move the GetResources logic to PodContainerManager. diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index ed219808492..8f948c64d2a 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -77,10 +77,8 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList { return c } -func (cm *containerManagerStub) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn { - return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) { - return nil, nil - } +func (cm *containerManagerStub) GetPluginRegistrationHandler() pluginwatcher.PluginHandler { + return nil } func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 9aacd08af15..8064b572b39 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -56,7 +56,7 @@ type ManagerImpl struct { socketname string socketdir string - endpoints map[string]endpoint // Key is ResourceName + endpoints map[string]endpointInfo // Key is ResourceName mutex sync.Mutex server *grpc.Server @@ -86,10 +86,14 @@ type ManagerImpl struct { // podDevices contains pod to allocated device mapping. podDevices podDevices - pluginOpts map[string]*pluginapi.DevicePluginOptions checkpointManager checkpointmanager.CheckpointManager } +type endpointInfo struct { + e endpoint + opts *pluginapi.DevicePluginOptions +} + type sourcesReadyStub struct{} func (s *sourcesReadyStub) AddSource(source string) {} @@ -109,13 +113,13 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) { dir, file := filepath.Split(socketPath) manager := &ManagerImpl{ - endpoints: make(map[string]endpoint), + endpoints: make(map[string]endpointInfo), + socketname: file, socketdir: dir, healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), - pluginOpts: make(map[string]*pluginapi.DevicePluginOptions), podDevices: make(podDevices), } manager.callback = manager.genericDeviceUpdateCallback @@ -228,8 +232,8 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc return nil } -// GetWatcherCallback returns callback function to be registered with plugin watcher -func (m *ManagerImpl) GetWatcherCallback() watcher.RegisterCallbackFn { +// GetWatcherHandler returns the plugin handler +func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler { if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil { glog.Errorf("Failed to create deprecation file at %s", m.socketdir) } else { @@ -237,16 +241,57 @@ func (m *ManagerImpl) GetWatcherCallback() watcher.RegisterCallbackFn { glog.V(4).Infof("created deprecation file %s", f.Name()) } - return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) { - if !m.isVersionCompatibleWithPlugin(versions) { - return nil, fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions) - } + return watcher.PluginHandler(m) +} - if !v1helper.IsExtendedResourceName(v1.ResourceName(name)) { - return nil, fmt.Errorf("invalid name of device plugin socket: %s", fmt.Sprintf(errInvalidResourceName, name)) - } +// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource +func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error { + glog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions) - return m.addEndpointProbeMode(name, sockPath) + if !m.isVersionCompatibleWithPlugin(versions) { + return fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions) + } + + if !v1helper.IsExtendedResourceName(v1.ResourceName(pluginName)) { + return fmt.Errorf("invalid name of device plugin socket: %s", fmt.Sprintf(errInvalidResourceName, pluginName)) + } + + return nil +} + +// RegisterPlugin starts the endpoint and registers it +// TODO: Start the endpoint and wait for the First ListAndWatch call +// before registering the plugin +func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string) error { + glog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint) + + e, err := newEndpointImpl(endpoint, pluginName, m.callback) + if err != nil { + return fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", endpoint, err) + } + + options, err := e.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{}) + if err != nil { + return fmt.Errorf("Failed to get device plugin options: %v", err) + } + + m.registerEndpoint(pluginName, options, e) + go m.runEndpoint(pluginName, e) + + return nil +} + +// DeRegisterPlugin deregisters the plugin +// TODO work on the behavior for deregistering plugins +// e.g: Should we delete the resource +func (m *ManagerImpl) DeRegisterPlugin(pluginName string) { + m.mutex.Lock() + defer m.mutex.Unlock() + + // Note: This will mark the resource unhealthy as per the behavior + // in runEndpoint + if eI, ok := m.endpoints[pluginName]; ok { + eI.e.stop() } } @@ -333,8 +378,8 @@ func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest func (m *ManagerImpl) Stop() error { m.mutex.Lock() defer m.mutex.Unlock() - for _, e := range m.endpoints { - e.stop() + for _, eI := range m.endpoints { + eI.e.stop() } if m.server == nil { @@ -346,51 +391,26 @@ func (m *ManagerImpl) Stop() error { return nil } -func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) { - chanForAckOfNotification := make(chan bool) - - new, err := newEndpointImpl(socketPath, resourceName, m.callback) - if err != nil { - glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) - return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) - } - - options, err := new.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{}) - if err != nil { - glog.Errorf("Failed to get device plugin options: %v", err) - return nil, fmt.Errorf("Failed to get device plugin options: %v", err) - } - m.registerEndpoint(resourceName, options, new) - - go func() { - select { - case <-chanForAckOfNotification: - close(chanForAckOfNotification) - m.runEndpoint(resourceName, new) - case <-time.After(time.Second): - glog.Errorf("Timed out while waiting for notification ack from plugin") - } - }() - return chanForAckOfNotification, nil -} - -func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e *endpointImpl) { +func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e endpoint) { m.mutex.Lock() defer m.mutex.Unlock() - m.pluginOpts[resourceName] = options - m.endpoints[resourceName] = e + + m.endpoints[resourceName] = endpointInfo{e: e, opts: options} glog.V(2).Infof("Registered endpoint %v", e) } -func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) { +func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) { e.run() e.stop() + m.mutex.Lock() defer m.mutex.Unlock() - if old, ok := m.endpoints[resourceName]; ok && old == e { + + if old, ok := m.endpoints[resourceName]; ok && old.e == e { m.markResourceUnhealthy(resourceName) } - glog.V(2).Infof("Unregistered endpoint %v", e) + + glog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e) } func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { @@ -437,8 +457,8 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) deletedResources := sets.NewString() m.mutex.Lock() for resourceName, devices := range m.healthyDevices { - e, ok := m.endpoints[resourceName] - if (ok && e.stopGracePeriodExpired()) || !ok { + eI, ok := m.endpoints[resourceName] + if (ok && eI.e.stopGracePeriodExpired()) || !ok { // The resources contained in endpoints and (un)healthyDevices // should always be consistent. Otherwise, we run with the risk // of failing to garbage collect non-existing resources or devices. @@ -455,8 +475,8 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) } } for resourceName, devices := range m.unhealthyDevices { - e, ok := m.endpoints[resourceName] - if (ok && e.stopGracePeriodExpired()) || !ok { + eI, ok := m.endpoints[resourceName] + if (ok && eI.e.stopGracePeriodExpired()) || !ok { if !ok { glog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync") } @@ -519,7 +539,7 @@ func (m *ManagerImpl) readCheckpoint() error { // will stay zero till the corresponding device plugin re-registers. m.healthyDevices[resource] = sets.NewString() m.unhealthyDevices[resource] = sets.NewString() - m.endpoints[resource] = newStoppedEndpointImpl(resource) + m.endpoints[resource] = endpointInfo{e: newStoppedEndpointImpl(resource), opts: nil} } return nil } @@ -652,7 +672,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont // plugin Allocate grpc calls if it becomes common that a container may require // resources from multiple device plugins. m.mutex.Lock() - e, ok := m.endpoints[resource] + eI, ok := m.endpoints[resource] m.mutex.Unlock() if !ok { m.mutex.Lock() @@ -665,7 +685,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont // TODO: refactor this part of code to just append a ContainerAllocationRequest // in a passed in AllocateRequest pointer, and issues a single Allocate call per pod. glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource) - resp, err := e.allocate(devs) + resp, err := eI.e.allocate(devs) metrics.DevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime)) if err != nil { // In case of allocation failure, we want to restore m.allocatedDevices @@ -715,11 +735,13 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co // with PreStartRequired option set. func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error { m.mutex.Lock() - opts, ok := m.pluginOpts[resource] + eI, ok := m.endpoints[resource] if !ok { m.mutex.Unlock() - return fmt.Errorf("Plugin options not found in cache for resource: %s", resource) - } else if opts == nil || !opts.PreStartRequired { + return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource) + } + + if eI.opts == nil || !eI.opts.PreStartRequired { m.mutex.Unlock() glog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource: %s", resource) return nil @@ -731,16 +753,10 @@ func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource s return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource) } - e, ok := m.endpoints[resource] - if !ok { - m.mutex.Unlock() - return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource) - } - m.mutex.Unlock() devs := devices.UnsortedList() glog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID) - _, err := e.preStartContainer(devs) + _, err := eI.e.preStartContainer(devs) if err != nil { return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err) } diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index 66f8d1004cd..1008daca3b7 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -57,9 +57,7 @@ func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) return nil, nil, []string{} } -// GetWatcherCallback returns plugin watcher callback -func (h *ManagerStub) GetWatcherCallback() pluginwatcher.RegisterCallbackFn { - return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) { - return nil, nil - } +// GetWatcherHandler returns plugin watcher interface +func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler { + return nil } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index b6ddb46506a..7168f2342c2 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -249,9 +249,10 @@ func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher { w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName)) - w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback()) + w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler()) w.Start() - return &w + + return w } func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) { @@ -295,7 +296,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Expects capacity for resource1 to be 2. resourceName1 := "domain1.com/resource1" e1 := &endpointImpl{} - testManager.endpoints[resourceName1] = e1 + testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil} callback(resourceName1, devs) capacity, allocatable, removedResources := testManager.GetCapacity() resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] @@ -345,7 +346,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Tests adding another resource. resourceName2 := "resource2" e2 := &endpointImpl{} - testManager.endpoints[resourceName2] = e2 + testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil} callback(resourceName2, devs) capacity, allocatable, removedResources = testManager.GetCapacity() as.Equal(2, len(capacity)) @@ -456,7 +457,7 @@ func TestCheckpoint(t *testing.T) { ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) as.Nil(err) testManager := &ManagerImpl{ - endpoints: make(map[string]endpoint), + endpoints: make(map[string]endpointInfo), healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), @@ -577,7 +578,7 @@ func makePod(limits v1.ResourceList) *v1.Pod { } } -func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) { +func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*ManagerImpl, error) { monitorCallback := func(resourceName string, devices []pluginapi.Device) {} ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) if err != nil { @@ -589,41 +590,45 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), - endpoints: make(map[string]endpoint), - pluginOpts: opts, + endpoints: make(map[string]endpointInfo), podDevices: make(podDevices), activePods: activePods, sourcesReady: &sourcesReadyStub{}, checkpointManager: ckm, } + for _, res := range testRes { testManager.healthyDevices[res.resourceName] = sets.NewString() for _, dev := range res.devs { testManager.healthyDevices[res.resourceName].Insert(dev) } if res.resourceName == "domain1.com/resource1" { - testManager.endpoints[res.resourceName] = &MockEndpoint{ - allocateFunc: allocateStubFunc(), + testManager.endpoints[res.resourceName] = endpointInfo{ + e: &MockEndpoint{allocateFunc: allocateStubFunc()}, + opts: nil, } } if res.resourceName == "domain2.com/resource2" { - testManager.endpoints[res.resourceName] = &MockEndpoint{ - allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) { - resp := new(pluginapi.ContainerAllocateResponse) - resp.Envs = make(map[string]string) - for _, dev := range devs { - switch dev { - case "dev3": - resp.Envs["key2"] = "val2" + testManager.endpoints[res.resourceName] = endpointInfo{ + e: &MockEndpoint{ + allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) { + resp := new(pluginapi.ContainerAllocateResponse) + resp.Envs = make(map[string]string) + for _, dev := range devs { + switch dev { + case "dev3": + resp.Envs["key2"] = "val2" - case "dev4": - resp.Envs["key2"] = "val3" + case "dev4": + resp.Envs["key2"] = "val3" + } } - } - resps := new(pluginapi.AllocateResponse) - resps.ContainerResponses = append(resps.ContainerResponses, resp) - return resps, nil + resps := new(pluginapi.AllocateResponse) + resps.ContainerResponses = append(resps.ContainerResponses, resp) + return resps, nil + }, }, + opts: nil, } } } @@ -669,10 +674,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { as.Nil(err) defer os.RemoveAll(tmpDir) nodeInfo := getTestNodeInfo(v1.ResourceList{}) - pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) - pluginOpts[res1.resourceName] = nil - pluginOpts[res2.resourceName] = nil - testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) as.Nil(err) testPods := []*v1.Pod{ @@ -767,10 +769,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) { tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) defer os.RemoveAll(tmpDir) - pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) - pluginOpts[res1.resourceName] = nil - pluginOpts[res2.resourceName] = nil - testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts) + + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) as.Nil(err) podWithPluginResourcesInInitContainers := &v1.Pod{ @@ -904,18 +904,18 @@ func TestDevicePreStartContainer(t *testing.T) { as.Nil(err) defer os.RemoveAll(tmpDir) nodeInfo := getTestNodeInfo(v1.ResourceList{}) - pluginOpts := make(map[string]*pluginapi.DevicePluginOptions) - pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true} - testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts) + testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}) as.Nil(err) ch := make(chan []string, 1) - testManager.endpoints[res1.resourceName] = &MockEndpoint{ - initChan: ch, - allocateFunc: allocateStubFunc(), + testManager.endpoints[res1.resourceName] = endpointInfo{ + e: &MockEndpoint{ + initChan: ch, + allocateFunc: allocateStubFunc(), + }, + opts: &pluginapi.DevicePluginOptions{PreStartRequired: true}, } - pod := makePod(v1.ResourceList{ v1.ResourceName(res1.resourceName): res1.resourceQuantity}) activePods := []*v1.Pod{} diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 52176dec71a..35923b00d12 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -53,7 +53,7 @@ type Manager interface { // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable // and inactive device plugin resources previously registered on the node. GetCapacity() (v1.ResourceList, v1.ResourceList, []string) - GetWatcherCallback() watcher.RegisterCallbackFn + GetWatcherHandler() watcher.PluginHandler } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 88f3de5c328..6d31ace48d4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1367,7 +1367,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { // Adding Registration Callback function for CSI Driver kl.pluginWatcher.AddHandler("CSIPlugin", pluginwatcher.PluginHandler(csi.PluginHandler)) // Adding Registration Callback function for Device Manager - kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandlerCallback()) + kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) // Start the plugin watcher glog.V(4).Infof("starting watcher") if err := kl.pluginWatcher.Start(); err != nil {