diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index eb1dce4d8d8..8e9d8ea4e9e 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -209,6 +209,7 @@ go_test( "//pkg/volume/host_path:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util/volumehelper:go_default_library", + "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/google/cadvisor/info/v1:go_default_library", "//vendor/github.com/google/cadvisor/info/v2:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/kubelet/cm/deviceplugin/BUILD b/pkg/kubelet/cm/deviceplugin/BUILD index 679945e1fe0..456325870db 100644 --- a/pkg/kubelet/cm/deviceplugin/BUILD +++ b/pkg/kubelet/cm/deviceplugin/BUILD @@ -57,6 +57,8 @@ go_test( library = ":go_default_library", deps = [ "//pkg/kubelet/apis/deviceplugin/v1alpha:go_default_library", + "//pkg/kubelet/lifecycle:go_default_library", + "//plugin/pkg/scheduler/schedulercache:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", diff --git a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go index 17c01059593..252968c3806 100644 --- a/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go +++ b/pkg/kubelet/cm/deviceplugin/device_plugin_handler_test.go @@ -30,6 +30,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) func TestUpdateCapacity(t *testing.T) { @@ -224,13 +226,24 @@ func TestCheckpoint(t *testing.T) { as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices)) } +type activePodsStub struct { + activePods []*v1.Pod +} + +func (a *activePodsStub) getActivePods() []*v1.Pod { + return a.activePods +} + +func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) { + a.activePods = newPods +} + func TestPodContainerDeviceAllocation(t *testing.T) { flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) var logLevel string flag.StringVar(&logLevel, "logLevel", "4", "test") flag.Lookup("v").Value.Set(logLevel) - var activePods []*v1.Pod resourceName1 := "domain1.com/resource1" resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI) devID1 := "dev1" @@ -244,6 +257,16 @@ func TestPodContainerDeviceAllocation(t *testing.T) { as := assert.New(t) as.Nil(err) monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + podsStub := activePodsStub{ + activePods: []*v1.Pod{}, + } + cachedNode := &v1.Node{ + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{}, + }, + } + nodeInfo := &schedulercache.NodeInfo{} + nodeInfo.SetNode(cachedNode) testHandler := &HandlerImpl{ devicePluginManager: m, @@ -251,6 +274,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { allDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), + activePods: podsStub.getActivePods, } testHandler.allDevices[resourceName1] = sets.NewString() testHandler.allDevices[resourceName1].Insert(devID1) @@ -288,8 +312,8 @@ func TestPodContainerDeviceAllocation(t *testing.T) { }, } - activePods = append(activePods, pod) - err = testHandler.Allocate(pod, &pod.Spec.Containers[0], activePods) + podsStub.updateActivePods([]*v1.Pod{pod}) + err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) as.Nil(err) runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) as.Equal(len(runContainerOpts.Devices), 3) @@ -315,7 +339,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { }, }, } - err = testHandler.Allocate(failPod, &failPod.Spec.Containers[0], activePods) + err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod}) as.NotNil(err) runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0]) as.Nil(runContainerOpts2) @@ -338,8 +362,53 @@ func TestPodContainerDeviceAllocation(t *testing.T) { }, }, } - err = testHandler.Allocate(newPod, &newPod.Spec.Containers[0], activePods) + err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod}) as.Nil(err) runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0]) as.Equal(1, len(runContainerOpts3.Envs)) } + +func TestSanitizeNodeAllocatable(t *testing.T) { + resourceName1 := "domain1.com/resource1" + devID1 := "dev1" + + resourceName2 := "domain2.com/resource2" + devID2 := "dev2" + + m, err := NewDevicePluginManagerTestStub() + as := assert.New(t) + as.Nil(err) + monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} + + testHandler := &HandlerImpl{ + devicePluginManager: m, + devicePluginManagerMonitorCallback: monitorCallback, + allDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + } + // require one of resource1 and one of resource2 + testHandler.allocatedDevices[resourceName1] = sets.NewString() + testHandler.allocatedDevices[resourceName1].Insert(devID1) + testHandler.allocatedDevices[resourceName2] = sets.NewString() + testHandler.allocatedDevices[resourceName2].Insert(devID2) + + cachedNode := &v1.Node{ + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + // has no resource1 and two of resource2 + v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI), + }, + }, + } + nodeInfo := &schedulercache.NodeInfo{} + nodeInfo.SetNode(cachedNode) + + testHandler.sanitizeNodeAllocatable(nodeInfo) + + allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources + // allocatable in nodeInfo is less than needed, should update + as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)])) + // allocatable in nodeInfo is more than needed, should skip updating + as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)])) +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 70e2dd6eb02..5019121d43a 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -72,6 +72,7 @@ import ( _ "k8s.io/kubernetes/pkg/volume/host_path" volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util/volumehelper" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) func init() { @@ -284,7 +285,7 @@ func newTestKubeletWithImageList( kubelet.evictionManager = evictionManager kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler) // Add this as cleanup predicate pod admitter - kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub())) + kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources)) plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil} var prober volume.DynamicPluginProber = nil // TODO (#51147) inject mock @@ -573,6 +574,116 @@ func TestHandleMemExceeded(t *testing.T) { checkPodStatus(t, kl, fittingPod, v1.PodPending) } +// Tests that we handle result of interface UpdatePluginResources correctly +// by setting corresponding status in status map. +func TestHandlePluginResources(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + testKubelet.chainMock() + kl := testKubelet.kubelet + + adjustedResource := v1.ResourceName("domain1.com/adjustedResource") + unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce") + failedResource := v1.ResourceName("domain2.com/failedResource") + resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI) + resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI) + resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI) + allowedPodQuantity := *resource.NewQuantity(int64(10), resource.DecimalSI) + nodes := []*v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}, + Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{ + adjustedResource: resourceQuantity1, + unadjustedResouce: resourceQuantity1, + v1.ResourcePods: allowedPodQuantity, + }}}, + } + kl.nodeInfo = testNodeInfo{nodes: nodes} + + updatePluginResourcesFunc := func(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + // Maps from resourceName to the value we use to set node.allocatableResource[resourceName]. + // A resource with invalid value (< 0) causes the function to return an error + // to emulate resource Allocation failure. + // Resources not contained in this map will have their node.allocatableResource + // quantity unchanged. + updateResourceMap := map[v1.ResourceName]resource.Quantity{ + adjustedResource: resourceQuantity2, + failedResource: resourceQuantityInvalid, + } + pod := attrs.Pod + allocatableResource := node.AllocatableResource() + newAllocatableResource := allocatableResource.Clone() + for _, container := range pod.Spec.Containers { + for resource := range container.Resources.Requests { + newQuantity, exist := updateResourceMap[resource] + if !exist { + continue + } + if newQuantity.Value() < 0 { + return fmt.Errorf("Allocation failed") + } + newAllocatableResource.ScalarResources[resource] = newQuantity.Value() + } + } + node.SetAllocatableResource(newAllocatableResource) + return nil + } + + // add updatePluginResourcesFunc to admission handler, to test it's behavior. + kl.admitHandlers = lifecycle.PodAdmitHandlers{} + kl.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kl.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), updatePluginResourcesFunc)) + + // pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc + // adjusts node.allocatableResource for this resource to a sufficient value. + fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName), + Containers: []v1.Container{{Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + adjustedResource: resourceQuantity2, + }, + Requests: v1.ResourceList{ + adjustedResource: resourceQuantity2, + }, + }}}, + } + // pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit. + exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName), + Containers: []v1.Container{{Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + unadjustedResouce: resourceQuantity2, + }, + Requests: v1.ResourceList{ + unadjustedResouce: resourceQuantity2, + }, + }}}, + } + // pod requiring failedResource will fail with the resource failed to be allocated. + failedPodSpec := v1.PodSpec{NodeName: string(kl.nodeName), + Containers: []v1.Container{{Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + failedResource: resourceQuantity1, + }, + Requests: v1.ResourceList{ + failedResource: resourceQuantity1, + }, + }}}, + } + pods := []*v1.Pod{ + podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec), + podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec), + podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec), + } + // The latter two pod should be rejected. + fittingPod := pods[0] + exceededPod := pods[1] + failedPod := pods[2] + + kl.HandlePodAdditions(pods) + + // Check pod status stored in the status map. + checkPodStatus(t, kl, fittingPod, v1.PodPending) + checkPodStatus(t, kl, exceededPod, v1.PodFailed) + checkPodStatus(t, kl, failedPod, v1.PodFailed) +} + // TODO(filipg): This test should be removed once StatusSyncer can do garbage collection without external signal. func TestPurgingObsoleteStatusMapEntries(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)