From c9f1b57b5b71de30e63cedeac459d3b0009cd6c2 Mon Sep 17 00:00:00 2001 From: Richard Chen Date: Thu, 9 May 2019 14:07:22 -0700 Subject: [PATCH] Reset extended resources only when node is recreated. --- pkg/kubelet/cm/container_manager.go | 6 ++- pkg/kubelet/cm/container_manager_linux.go | 6 ++- pkg/kubelet/cm/container_manager_stub.go | 18 ++++++-- pkg/kubelet/cm/container_manager_windows.go | 6 ++- pkg/kubelet/cm/devicemanager/BUILD | 2 + pkg/kubelet/cm/devicemanager/manager.go | 18 +++++++- pkg/kubelet/cm/devicemanager/manager_stub.go | 7 +++- pkg/kubelet/cm/devicemanager/manager_test.go | 41 +++++++++++++++++- pkg/kubelet/cm/devicemanager/types.go | 7 +++- pkg/kubelet/kubelet_node_status.go | 17 ++++---- pkg/kubelet/kubelet_node_status_test.go | 44 ++++++++++++++++++-- 11 files changed, 151 insertions(+), 21 deletions(-) diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 5427183cb3c..315214bc634 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -21,7 +21,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" // TODO: Migrate kubelet to either use its own internal objects or client library. - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/config" @@ -104,6 +104,10 @@ type ContainerManager interface { // GetDevices returns information about the devices assigned to pods and containers GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices + + // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, + // due to node recreation. + ShouldResetExtendedResourceCapacity() bool } type NodeConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 79e647b8c97..1c4688ccf7f 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -34,7 +34,7 @@ import ( "github.com/opencontainers/runc/libcontainer/configs" "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" @@ -897,3 +897,7 @@ func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceLi func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices { return cm.deviceManager.GetDevices(podUID, containerName) } + +func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { + return cm.deviceManager.ShouldResetExtendedResourceCapacity() +} diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index db5de157f98..4ea918511c3 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -17,7 +17,7 @@ limitations under the License. package cm import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/klog" "k8s.io/apimachinery/pkg/api/resource" @@ -32,7 +32,9 @@ import ( schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo" ) -type containerManagerStub struct{} +type containerManagerStub struct { + shouldResetExtendedResourceCapacity bool +} var _ ContainerManager = &containerManagerStub{} @@ -110,6 +112,14 @@ func (cm *containerManagerStub) GetDevices(_, _ string) []*podresourcesapi.Conta return nil } -func NewStubContainerManager() ContainerManager { - return &containerManagerStub{} +func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool { + return cm.shouldResetExtendedResourceCapacity +} + +func NewStubContainerManager() ContainerManager { + return &containerManagerStub{shouldResetExtendedResourceCapacity: false} +} + +func NewStubContainerManagerWithExtendedResource(shouldResetExtendedResourceCapacity bool) ContainerManager { + return &containerManagerStub{shouldResetExtendedResourceCapacity: shouldResetExtendedResourceCapacity} } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 092249af9bb..d2e0574b7f2 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -24,7 +24,7 @@ package cm import ( "fmt" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" @@ -171,3 +171,7 @@ func (cm *containerManagerImpl) GetPodCgroupRoot() string { func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices { return nil } + +func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { + return false +} diff --git a/pkg/kubelet/cm/devicemanager/BUILD b/pkg/kubelet/cm/devicemanager/BUILD index 6301b6e2dba..796bfe6a059 100644 --- a/pkg/kubelet/cm/devicemanager/BUILD +++ b/pkg/kubelet/cm/devicemanager/BUILD @@ -14,6 +14,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/apis/core/v1/helper:go_default_library", + "//pkg/features:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", "//pkg/kubelet/apis/pluginregistration/v1:go_default_library", "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", @@ -30,6 +31,7 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/google.golang.org/grpc:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 8c3905eaf58..8fa58c4782d 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -28,10 +28,12 @@ import ( "google.golang.org/grpc" "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/features" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -838,3 +840,17 @@ func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesap defer m.mutex.Unlock() return m.podDevices.getContainerDevices(podUID, containerName) } + +// ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not, +// depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node +// has been recreated. +func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool { + if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) { + checkpoints, err := m.checkpointManager.ListCheckpoints() + if err != nil { + return false + } + return len(checkpoints) == 0 + } + return false +} diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index a4309c78a40..b24c116c10f 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -17,7 +17,7 @@ limitations under the License. package devicemanager import ( - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -67,3 +67,8 @@ func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler { func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices { return nil } + +// ShouldResetExtendedResourceCapacity returns false +func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool { + return false +} diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 6cd969412cf..a3c55dd15fb 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -946,6 +946,45 @@ func TestDevicePreStartContainer(t *testing.T) { as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs)) } +func TestResetExtendedResource(t *testing.T) { + as := assert.New(t) + tmpDir, err := ioutil.TempDir("", "checkpoint") + as.Nil(err) + ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) + as.Nil(err) + testManager := &ManagerImpl{ + endpoints: make(map[string]endpointInfo), + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + checkpointManager: ckm, + } + + extendedResourceName := "domain.com/resource" + testManager.podDevices.insert("pod", "con", extendedResourceName, + constructDevices([]string{"dev1"}), + constructAllocResp(map[string]string{"/dev/dev1": "/dev/dev1"}, + map[string]string{"/home/lib1": "/usr/lib1"}, map[string]string{})) + + testManager.healthyDevices[extendedResourceName] = sets.NewString() + testManager.healthyDevices[extendedResourceName].Insert("dev1") + // checkpoint is present, indicating node hasn't been recreated + err = testManager.writeCheckpoint() + as.Nil(err) + + as.False(testManager.ShouldResetExtendedResourceCapacity()) + + // checkpoint is absent, representing node recreation + ckpts, err := ckm.ListCheckpoints() + as.Nil(err) + for _, ckpt := range ckpts { + err = ckm.RemoveCheckpoint(ckpt) + as.Nil(err) + } + as.True(testManager.ShouldResetExtendedResourceCapacity()) +} + func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) { return func(devs []string) (*pluginapi.AllocateResponse, error) { resp := new(pluginapi.ContainerAllocateResponse) diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index a420cf6541f..0904db5f1b3 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -19,7 +19,7 @@ package devicemanager import ( "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -58,6 +58,11 @@ type Manager interface { // GetDevices returns information about the devices assigned to pods and containers GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices + + // ShouldResetExtendedResourceCapacity returns whether the extended resources should be reset or not, + // depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates + // the node has been recreated. + ShouldResetExtendedResourceCapacity() bool } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index c1d1d499e2b..410b8847bc1 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -26,7 +26,7 @@ import ( "k8s.io/klog" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -132,12 +132,15 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { // Zeros out extended resource capacity during reconciliation. func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool { requiresUpdate := false - for k := range node.Status.Capacity { - if v1helper.IsExtendedResourceName(k) { - klog.Infof("Zero out resource %s capacity in existing node.", k) - node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) - node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) - requiresUpdate = true + // Check with the device manager to see if node has been recreated, in which case extended resources should be zeroed until they are available + if kl.containerManager.ShouldResetExtendedResourceCapacity() { + for k := range node.Status.Capacity { + if v1helper.IsExtendedResourceName(k) { + klog.Infof("Zero out resource %s capacity in existing node.", k) + node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) + node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI) + requiresUpdate = true + } } } return requiresUpdate diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 81d51ef2f5e..665f4309d03 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -31,7 +31,7 @@ import ( "github.com/stretchr/testify/require" cadvisorapi "github.com/google/cadvisor/info/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -1737,17 +1737,21 @@ func TestUpdateDefaultLabels(t *testing.T) { func TestReconcileExtendedResource(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used + testKubelet.kubelet.containerManager = cm.NewStubContainerManagerWithExtendedResource(true /* shouldResetExtendedResourceCapacity*/) + testKubeletNoReset := newTestKubelet(t, false /* controllerAttachDetachEnabled */) extendedResourceName1 := v1.ResourceName("test.com/resource1") extendedResourceName2 := v1.ResourceName("test.com/resource2") cases := []struct { name string + testKubelet *TestKubelet existingNode *v1.Node expectedNode *v1.Node needsUpdate bool }{ { - name: "no update needed without extended resource", + name: "no update needed without extended resource", + testKubelet: testKubelet, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{ @@ -1779,7 +1783,41 @@ func TestReconcileExtendedResource(t *testing.T) { needsUpdate: false, }, { - name: "extended resource capacity is zeroed", + name: "extended resource capacity is not zeroed due to presence of checkpoint file", + testKubelet: testKubelet, + existingNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + }, + }, + expectedNode: &v1.Node{ + Status: v1.NodeStatus{ + Capacity: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + Allocatable: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI), + v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI), + }, + }, + }, + needsUpdate: false, + }, + { + name: "extended resource capacity is zeroed", + testKubelet: testKubeletNoReset, existingNode: &v1.Node{ Status: v1.NodeStatus{ Capacity: v1.ResourceList{