Fix downward API for resource limits

This commit is contained in:
derekwaynecarr 2016-06-13 17:07:56 -04:00
parent afdd9ea262
commit 712860d55f
5 changed files with 138 additions and 57 deletions

View File

@ -832,6 +832,7 @@ type Kubelet struct {
// and disable kubelet from executing any attach/detach operations // and disable kubelet from executing any attach/detach operations
enableControllerAttachDetach bool enableControllerAttachDetach bool
// lastUpdatedNodeObject is a cached version of the node as last reported back to the api server.
lastUpdatedNodeObject atomic.Value lastUpdatedNodeObject atomic.Value
} }
@ -1560,11 +1561,11 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Contain
return result, err return result, err
} }
case envVar.ValueFrom.ResourceFieldRef != nil: case envVar.ValueFrom.ResourceFieldRef != nil:
defaultedPod, err := kl.defaultPodLimitsForDownwardApi(pod) defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardApi(pod, container)
if err != nil { if err != nil {
return result, err return result, err
} }
runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, defaultedPod, container) runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, defaultedPod, defaultedContainer)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -1905,7 +1906,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
// Mount volumes and update the volume manager // Mount volumes and update the volume manager
// Default limits for containers here to have downward API expose user-friendly limits to pods. // Default limits for containers here to have downward API expose user-friendly limits to pods.
defaultedPod, err := kl.defaultPodLimitsForDownwardApi(pod) defaultedPod, _, err := kl.defaultPodLimitsForDownwardApi(pod, nil)
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,84 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
)
// defaultPodLimitsForDownwardApi copies the input pod, and optional container,
// and applies default resource limits. it returns a copy of the input pod,
// and a copy of the input container (if specified) with default limits
// applied. if a container has no limit specified, it will default the limit to
// the node capacity.
// TODO: if/when we have pod level resources, we need to update this function
// to use those limits instead of node capacity.
func (kl *Kubelet) defaultPodLimitsForDownwardApi(pod *api.Pod, container *api.Container) (*api.Pod, *api.Container, error) {
if pod == nil {
return nil, nil, fmt.Errorf("invalid input, pod cannot be nil")
}
lastUpdatedNodeObject := kl.lastUpdatedNodeObject.Load()
if lastUpdatedNodeObject == nil {
return nil, nil, fmt.Errorf("failed to find node object in cache, expected a non-nil object in the cache.")
}
capacity := lastUpdatedNodeObject.(*api.Node).Status.Capacity
podCopy, err := api.Scheme.Copy(pod)
if err != nil {
return nil, nil, fmt.Errorf("failed to perform a deep copy of pod object: %v", err)
}
outputPod, ok := podCopy.(*api.Pod)
if !ok {
return nil, nil, fmt.Errorf("unexpected type")
}
for idx := range outputPod.Spec.Containers {
mergeContainerResourceLimitsWithCapacity(&outputPod.Spec.Containers[idx], capacity)
}
var outputContainer *api.Container
if container != nil {
containerCopy, err := api.Scheme.DeepCopy(container)
if err != nil {
return nil, nil, fmt.Errorf("failed to perform a deep copy of container object: %v", err)
}
outputContainer, ok = containerCopy.(*api.Container)
if !ok {
return nil, nil, fmt.Errorf("unexpected type")
}
mergeContainerResourceLimitsWithCapacity(outputContainer, capacity)
}
return outputPod, outputContainer, nil
}
// mergeContainerResourceLimitsWithCapacity checks if a limit is applied for
// the container, and if not, it sets the limit based on the capacity.
func mergeContainerResourceLimitsWithCapacity(container *api.Container,
capacity api.ResourceList) {
if container.Resources.Limits == nil {
container.Resources.Limits = make(api.ResourceList)
}
for _, resource := range []api.ResourceName{api.ResourceCPU, api.ResourceMemory} {
if quantity, exists := container.Resources.Limits[resource]; !exists || quantity.IsZero() {
if cap, exists := capacity[resource]; exists {
container.Resources.Limits[resource] = *cap.Copy()
}
}
}
}

View File

@ -59,7 +59,7 @@ func TestPodResourceLimitsDefaulting(t *testing.T) {
} }
as := assert.New(t) as := assert.New(t)
for idx, tc := range cases { for idx, tc := range cases {
actual, err := tk.kubelet.defaultPodLimitsForDownwardApi(tc.pod) actual, _, err := tk.kubelet.defaultPodLimitsForDownwardApi(tc.pod, nil)
as.Nil(err, "failed to default pod limits: %v", err) as.Nil(err, "failed to default pod limits: %v", err)
as.Equal(tc.expected, actual, "test case [%d] failed. Expected: %+v, Got: %+v", idx, tc.expected, actual) as.Equal(tc.expected, actual, "test case [%d] failed. Expected: %+v, Got: %+v", idx, tc.expected, actual)
} }
@ -67,7 +67,7 @@ func TestPodResourceLimitsDefaulting(t *testing.T) {
func getPod(cpuLimit, memoryLimit string) *api.Pod { func getPod(cpuLimit, memoryLimit string) *api.Pod {
resources := api.ResourceRequirements{} resources := api.ResourceRequirements{}
if cpuLimit != "" && memoryLimit != "" { if cpuLimit != "" || memoryLimit != "" {
resources.Limits = make(api.ResourceList) resources.Limits = make(api.ResourceList)
} }
if cpuLimit != "" { if cpuLimit != "" {

View File

@ -1,52 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
)
func (kl *Kubelet) defaultPodLimitsForDownwardApi(pod *api.Pod) (*api.Pod, error) {
capacity := make(api.ResourceList)
lastUpdatedNodeObject := kl.lastUpdatedNodeObject.Load()
if lastUpdatedNodeObject == nil {
return nil, fmt.Errorf("Failed to find node object in cache. Expected a non-nil object in the cache.")
} else {
capacity = lastUpdatedNodeObject.(*api.Node).Status.Capacity
}
podCopy, err := api.Scheme.Copy(pod)
if err != nil {
return nil, fmt.Errorf("failed to perform a deep copy of pod object. Error: %v", err)
}
pod = podCopy.(*api.Pod)
for idx, c := range pod.Spec.Containers {
for _, resource := range []api.ResourceName{api.ResourceCPU, api.ResourceMemory} {
if quantity, exists := c.Resources.Limits[resource]; !exists || quantity.IsZero() {
if cap, exists := capacity[resource]; exists {
if pod.Spec.Containers[idx].Resources.Limits == nil {
pod.Spec.Containers[idx].Resources.Limits = make(api.ResourceList)
}
pod.Spec.Containers[idx].Resources.Limits[resource] = cap
}
}
}
}
return pod, nil
}

View File

@ -128,6 +128,50 @@ var _ = framework.KubeDescribe("Downward API", func() {
testDownwardAPI(f, podName, env, expectations) testDownwardAPI(f, podName, env, expectations)
}) })
It("should provide default limits.cpu/memory from node capacity", func() {
podName := "downward-api-" + string(util.NewUUID())
env := []api.EnvVar{
{
Name: "CPU_LIMIT",
ValueFrom: &api.EnvVarSource{
ResourceFieldRef: &api.ResourceFieldSelector{
Resource: "limits.cpu",
},
},
},
{
Name: "MEMORY_LIMIT",
ValueFrom: &api.EnvVarSource{
ResourceFieldRef: &api.ResourceFieldSelector{
Resource: "limits.memory",
},
},
},
}
expectations := []string{
fmt.Sprintf("CPU_LIMIT=[1-9]"),
fmt.Sprintf("MEMORY_LIMIT=[1-9]"),
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: map[string]string{"name": podName},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "dapi-container",
Image: "gcr.io/google_containers/busybox:1.24",
Command: []string{"sh", "-c", "env"},
Env: env,
},
},
RestartPolicy: api.RestartPolicyNever,
},
}
testDownwardAPIUsingPod(f, pod, env, expectations)
})
}) })
func testDownwardAPI(f *framework.Framework, podName string, env []api.EnvVar, expectations []string) { func testDownwardAPI(f *framework.Framework, podName string, env []api.EnvVar, expectations []string) {
@ -159,5 +203,9 @@ func testDownwardAPI(f *framework.Framework, podName string, env []api.EnvVar, e
}, },
} }
testDownwardAPIUsingPod(f, pod, env, expectations)
}
func testDownwardAPIUsingPod(f *framework.Framework, pod *api.Pod, env []api.EnvVar, expectations []string) {
f.TestContainerOutputRegexp("downward api env vars", pod, 0, expectations) f.TestContainerOutputRegexp("downward api env vars", pod, 0, expectations)
} }